Compare commits

...

27 Commits

Author SHA1 Message Date
Timothy 185dcb86fe fix: verdict edge case 2026-03-09 19:08:35 -07:00
Timothy 4c8e84b421 fix: compatibility of memory with queen orchestrator 2026-03-09 17:58:52 -07:00
Richard Tang 8d4657c13e Merge branch 'feat/queen-planning-phase' into feature/queen-global-memory 2026-03-09 16:10:42 -07:00
Richard Tang bdcbcff6f3 feat: better instruction for planning mode switch 2026-03-09 16:01:34 -07:00
Richard Tang a22b321692 feat: improve phase switching tools 2026-03-09 15:33:03 -07:00
Richard Tang c05c30dd9a feat: add meta agent tools to planning 2026-03-09 15:14:34 -07:00
Richard Tang 90762f275b feat: give planning mode the load tool 2026-03-09 14:55:53 -07:00
Richard Tang 2b12a9c91a Merge remote-tracking branch 'origin/feature/queen-global-memory' into feature/queen-global-memory 2026-03-09 14:47:27 -07:00
Richard Tang 9a94595a42 feat: extract the shared knowledge between planning and building 2026-03-09 14:45:31 -07:00
Richard Tang e1540dfaa6 refactor: drop hive code CLI 2026-03-09 14:30:13 -07:00
Richard Tang 4f5ac6d1b1 refactor: rename hive_coder to queen and extract queen orchestrator 2026-03-09 14:23:31 -07:00
Richard Tang c87d7b13da refactor: rename hive_coder to queen and extract queen orchestrator 2026-03-09 14:23:16 -07:00
Timothy c4acf0b659 fix: memory consolidation hook, simplify generated memory files 2026-03-09 14:15:01 -07:00
Richard Tang 2ed814455a Merge branch 'feat/queen-planning-phase' into feature/queen-global-memory 2026-03-09 12:57:23 -07:00
Timothy ad1a4ef0c3 fix: cancellation button 2026-03-09 12:48:20 -07:00
Timothy 2111c808a9 feat: queen memory v1 2026-03-09 11:55:39 -07:00
Richard Tang cdf76ae3b9 fix: eventloop test 2026-03-09 10:23:56 -07:00
Richard Tang 42d0592941 refactor: judge evaluate 2026-03-09 10:09:15 -07:00
Richard Tang 1de7cf821d fix: handle judge with empty message 2026-03-09 09:58:29 -07:00
Timothy 4ea8540e25 fix: better logging for memory consolidation event 2026-03-08 20:44:40 -07:00
Timothy bfa3b8e0f6 fix: queen memory health 2026-03-08 20:28:53 -07:00
Richard Tang 55eccfd75f feat: intake node prompt in planning mode 2026-03-08 20:27:24 -07:00
Timothy 1e994a77b5 feat: queen agent global memory 2026-03-08 19:54:46 -07:00
Richard Tang d12afeb35d chore: ruff lint 2026-03-08 19:46:49 -07:00
Richard Tang e84fefd319 feat: separate the queen and worker tools in prompts 2026-03-08 19:40:30 -07:00
Richard Tang d2b510014d feat: adjust tools and knowledge separation between planning and building 2026-03-08 19:21:50 -07:00
Richard Tang 3ed5fda448 feat: planning phase for the queen 2026-03-08 18:49:45 -07:00
41 changed files with 1694 additions and 975 deletions
@@ -1,40 +0,0 @@
"""
Hive Coder Native coding agent that builds Hive agent packages.
Deeply understands the agent framework and produces complete Python packages
with goals, nodes, edges, system prompts, MCP configuration, and tests
from natural language specifications.
"""
from .agent import (
conversation_mode,
edges,
entry_node,
entry_points,
goal,
identity_prompt,
loop_config,
nodes,
pause_nodes,
terminal_nodes,
)
from .config import AgentMetadata, RuntimeConfig, default_config, metadata
__version__ = "1.0.0"
__all__ = [
"goal",
"nodes",
"edges",
"entry_node",
"entry_points",
"pause_nodes",
"terminal_nodes",
"conversation_mode",
"identity_prompt",
"loop_config",
"RuntimeConfig",
"AgentMetadata",
"default_config",
"metadata",
]
@@ -1,60 +0,0 @@
"""CLI entry point for Hive Coder agent."""
import json
import logging
import sys
import click
from .agent import entry_node, goal, nodes
from .config import metadata
def setup_logging(verbose=False, debug=False):
"""Configure logging for execution visibility."""
if debug:
level, fmt = logging.DEBUG, "%(asctime)s %(name)s: %(message)s"
elif verbose:
level, fmt = logging.INFO, "%(message)s"
else:
level, fmt = logging.WARNING, "%(levelname)s: %(message)s"
logging.basicConfig(level=level, format=fmt, stream=sys.stderr)
logging.getLogger("framework").setLevel(level)
@click.group()
@click.version_option(version="1.0.0")
def cli():
"""Hive Coder — Build Hive agent packages from natural language."""
pass
@cli.command()
@click.option("--json", "output_json", is_flag=True)
def info(output_json):
"""Show agent information."""
info_data = {
"name": metadata.name,
"version": metadata.version,
"description": metadata.description,
"goal": {
"name": goal.name,
"description": goal.description,
},
"nodes": [n.id for n in nodes],
"entry_node": entry_node,
"client_facing_nodes": [n.id for n in nodes if n.client_facing],
}
if output_json:
click.echo(json.dumps(info_data, indent=2))
else:
click.echo(f"Agent: {info_data['name']}")
click.echo(f"Version: {info_data['version']}")
click.echo(f"Description: {info_data['description']}")
click.echo(f"\nNodes: {', '.join(info_data['nodes'])}")
click.echo(f"Client-facing: {', '.join(info_data['client_facing_nodes'])}")
click.echo(f"Entry: {info_data['entry_node']}")
if __name__ == "__main__":
cli()
-153
View File
@@ -1,153 +0,0 @@
"""Agent graph construction for Hive Coder."""
from framework.graph import Constraint, Goal, SuccessCriterion
from framework.graph.edge import GraphSpec
from .nodes import coder_node, queen_node
# Goal definition
goal = Goal(
id="hive-coder",
name="Hive Agent Builder",
description=(
"Build complete, validated Hive agent packages from natural language "
"specifications. Produces production-ready Python packages with goals, "
"nodes, edges, system prompts, MCP configuration, and tests."
),
success_criteria=[
SuccessCriterion(
id="valid-package",
description="Generated agent package passes structural validation",
metric="validation_pass",
target="true",
weight=0.30,
),
SuccessCriterion(
id="complete-files",
description=(
"All required files generated: agent.py, config.py, "
"nodes/__init__.py, __init__.py, __main__.py, mcp_servers.json"
),
metric="file_count",
target=">=6",
weight=0.25,
),
SuccessCriterion(
id="user-satisfaction",
description="User reviews and approves the generated agent",
metric="user_approval",
target="true",
weight=0.25,
),
SuccessCriterion(
id="framework-compliance",
description=(
"Generated code follows framework patterns: STEP 1/STEP 2 "
"for client-facing and correct imports"
),
metric="pattern_compliance",
target="100%",
weight=0.20,
),
],
constraints=[
Constraint(
id="dynamic-tool-discovery",
description=(
"Always discover available tools dynamically via "
"list_agent_tools before referencing tools in agent designs"
),
constraint_type="hard",
category="correctness",
),
Constraint(
id="no-fabricated-tools",
description="Only reference tools that exist in hive-tools MCP",
constraint_type="hard",
category="correctness",
),
Constraint(
id="valid-python",
description="All generated Python files must be syntactically correct",
constraint_type="hard",
category="correctness",
),
Constraint(
id="self-verification",
description="Run validation after writing code; fix errors before presenting",
constraint_type="hard",
category="quality",
),
],
)
# Nodes: primary coder node only. The queen runs as an independent
# GraphExecutor with queen_node — not as part of this graph.
nodes = [coder_node]
# No edges needed — single event_loop node
edges = []
# Graph configuration
entry_node = "coder"
entry_points = {"start": "coder"}
pause_nodes = []
terminal_nodes = [] # Coder node has output_keys and can terminate
# No async entry points needed — the queen is now an independent executor,
# not a secondary graph receiving events via add_graph().
async_entry_points = []
# Module-level variables read by AgentRunner.load()
conversation_mode = "continuous"
identity_prompt = (
"You are Hive Coder, the best agent-building coding agent on the planet. "
"You deeply understand the Hive agent framework at the source code level "
"and produce production-ready agent packages from natural language. "
"You can dynamically discover available framework tools, inspect runtime "
"sessions and checkpoints from agents you build, and run their test suites. "
"You follow coding agent discipline: read before writing, verify "
"assumptions by reading actual code, adhere to project conventions, "
"self-verify with validation, and fix your own errors. You are concise, "
"direct, and technically rigorous. No emojis. No fluff."
)
loop_config = {
"max_iterations": 100,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
}
# ---------------------------------------------------------------------------
# Queen graph — runs as an independent persistent conversation in the TUI.
# Loaded by _load_judge_and_queen() in app.py, NOT by AgentRunner.
# ---------------------------------------------------------------------------
queen_goal = Goal(
id="queen-manager",
name="Queen Manager",
description=(
"Manage the worker agent lifecycle and serve as the user's primary "
"interactive interface. Triage health escalations from the judge."
),
success_criteria=[],
constraints=[],
)
queen_graph = GraphSpec(
id="queen-graph",
goal_id=queen_goal.id,
version="1.0.0",
entry_node="queen",
entry_points={"start": "queen"},
terminal_nodes=[],
pause_nodes=[],
nodes=[queen_node],
edges=[],
conversation_mode="continuous",
loop_config={
"max_iterations": 999_999,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
)
+21
View File
@@ -0,0 +1,21 @@
"""
Queen Native agent builder for the Hive framework.
Deeply understands the agent framework and produces complete Python packages
with goals, nodes, edges, system prompts, MCP configuration, and tests
from natural language specifications.
"""
from .agent import queen_goal, queen_graph
from .config import AgentMetadata, RuntimeConfig, default_config, metadata
__version__ = "1.0.0"
__all__ = [
"queen_goal",
"queen_graph",
"RuntimeConfig",
"AgentMetadata",
"default_config",
"metadata",
]
+40
View File
@@ -0,0 +1,40 @@
"""Queen graph definition."""
from framework.graph import Goal
from framework.graph.edge import GraphSpec
from .nodes import queen_node
# ---------------------------------------------------------------------------
# Queen graph — the primary persistent conversation.
# Loaded by queen_orchestrator.create_queen(), NOT by AgentRunner.
# ---------------------------------------------------------------------------
queen_goal = Goal(
id="queen-manager",
name="Queen Manager",
description=(
"Manage the worker agent lifecycle and serve as the user's primary "
"interactive interface. Triage health escalations from the judge."
),
success_criteria=[],
constraints=[],
)
queen_graph = GraphSpec(
id="queen-graph",
goal_id=queen_goal.id,
version="1.0.0",
entry_node="queen",
entry_points={"start": "queen"},
terminal_nodes=[],
pause_nodes=[],
nodes=[queen_node],
edges=[],
conversation_mode="continuous",
loop_config={
"max_iterations": 999_999,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
)
@@ -1,4 +1,4 @@
"""Runtime configuration for Hive Coder agent."""
"""Runtime configuration for Queen agent."""
import json
from dataclasses import dataclass, field
@@ -34,7 +34,7 @@ default_config = RuntimeConfig()
@dataclass
class AgentMetadata:
name: str = "Hive Coder"
name: str = "Queen"
version: str = "1.0.0"
description: str = (
"Native coding agent that builds production-ready Hive agent packages "
@@ -43,7 +43,7 @@ class AgentMetadata:
"MCP configuration, and tests."
)
intro_message: str = (
"I'm Hive Coder — I build Hive agents. Describe what kind of agent "
"I'm Queen — I build Hive agents. Describe what kind of agent "
"you want to create and I'll design, implement, and validate it for you."
)
@@ -1,4 +1,4 @@
"""Node definitions for Hive Coder agent."""
"""Node definitions for Queen agent."""
from pathlib import Path
@@ -61,14 +61,35 @@ _SHARED_TOOLS = [
"list_agent_sessions",
"list_agent_checkpoints",
"get_agent_checkpoint",
"initialize_agent_package",
]
# Queen phase-specific tool sets.
# Planning phase: read-only exploration + design, no write tools.
_QUEEN_PLANNING_TOOLS = [
# Read-only file tools
"read_file",
"list_directory",
"search_files",
"run_command",
# Discovery + design
"list_agent_tools",
"list_agents",
"list_agent_sessions",
"list_agent_checkpoints",
"get_agent_checkpoint",
"initialize_and_build_agent",
# Load existing agent (after user confirms)
"load_built_agent",
"write_to_diary", # Episodic memory — available in all phases
]
# Building phase: full coding + agent construction tools.
_QUEEN_BUILDING_TOOLS = _SHARED_TOOLS + [
"load_built_agent",
"list_credentials",
"replan_agent",
"write_to_diary", # Episodic memory — available in all phases
]
# Staging phase: agent loaded but not yet running — inspect, configure, launch.
@@ -84,6 +105,8 @@ _QUEEN_STAGING_TOOLS = [
# Launch or go back
"run_agent_with_input",
"stop_worker_and_edit",
"stop_worker_and_plan",
"write_to_diary", # Episodic memory — available in all phases
]
# Running phase: worker is executing — monitor and control.
@@ -98,11 +121,13 @@ _QUEEN_RUNNING_TOOLS = [
# Worker lifecycle
"stop_worker",
"stop_worker_and_edit",
"stop_worker_and_plan",
"get_worker_status",
"inject_worker_message",
# Monitoring
"get_worker_health_summary",
"notify_operator",
"write_to_diary", # Episodic memory — available in all phases
]
@@ -113,7 +138,35 @@ _QUEEN_RUNNING_TOOLS = [
# additions.
# ---------------------------------------------------------------------------
_package_builder_knowledge = """\
_shared_building_knowledge = """\
# Shared Rules (Planning & Building)
## Paths (MANDATORY)
**Always use RELATIVE paths** \
(e.g. `exports/agent_name/config.py`, `exports/agent_name/nodes/__init__.py`).
**Never use absolute paths** like `/mnt/data/...` or `/workspace/...` they fail.
The project root is implicit.
## Worker File Tools (hive-tools MCP)
Workers use a DIFFERENT MCP server (hive-tools) with DIFFERENT tool names. \
When designing worker nodes or writing worker system prompts, reference these \
tool names NOT the coder-tools names (read_file, write_file, etc.).
Worker data tools (for large results and spillover):
- save_data(filename, data, data_dir) save data to a file for later retrieval
- load_data(filename, data_dir, offset_bytes?, limit_bytes?) load data \
with byte-based pagination
- list_data_files(data_dir) list available data files
- append_data(filename, data, data_dir) append to a file incrementally
- edit_data(filename, old_text, new_text, data_dir) find-and-replace in a data file
- serve_file_to_user(filename, data_dir, label?, open_in_browser?) \
generate a clickable file URI for the user
IMPORTANT: Do NOT tell workers to use read_file, write_file, edit_file, \
search_files, or list_directory those are YOUR tools, not theirs.
"""
_planning_knowledge = """\
**A responsible engineer doesn't jump into building. First, \
understand the problem and be transparent about what the framework can and cannot do.**
@@ -121,56 +174,14 @@ Use the user's selection (or their custom description if they chose "Other") \
as context when shaping the goal below. If the user already described \
what they want before this step, skip the question and proceed directly.
# Core Mandates
# Core Mandates (Planning)
- **DO NOT propose a complete goal on your own.** Instead, \
collaborate with the user to define it.
- **Verify assumptions.** Never assume a class, import, or pattern \
exists. Read actual source to confirm. Search if unsure.
- **Discover tools dynamically.** NEVER reference tools from static \
docs. Always run list_agent_tools() to see what actually exists.
- **Self-verify.** After writing code, run validation and tests. Fix \
errors yourself. Don't declare success until validation passes.
# Tools
## Paths (MANDATORY)
**Always use RELATIVE paths**
(e.g. `exports/agent_name/config.py`, `exports/agent_name/nodes/__init__.py`).
**Never use absolute paths** like `/mnt/data/...` or `/workspace/...` they fail.
The project root is implicit.
# Tool Discovery (MANDATORY before designing)
## File I/O
- read_file(path, offset?, limit?, hashline?) read with line numbers; \
hashline=True for N:hhhh|content anchors (use with hashline_edit)
- write_file(path, content) create/overwrite, auto-mkdir
- edit_file(path, old_text, new_text, replace_all?) fuzzy-match edit
- hashline_edit(path, edits, auto_cleanup?, encoding?) anchor-based \
editing using N:hhhh refs from read_file(hashline=True). Ops: set_line, \
replace_lines, insert_after, insert_before, replace, append
- list_directory(path, recursive?) list contents
- search_files(pattern, path?, include?, hashline?) regex search; \
hashline=True for anchors in results
- run_command(command, cwd?, timeout?) shell execution
- undo_changes(path?) restore from git snapshot
## Meta-Agent
- list_agent_tools(server_config_path?, output_schema?, group?) discover \
available tools grouped by category. output_schema: "simple" (default, \
descriptions truncated to ~200 chars) or "full" (complete descriptions + \
input_schema). group: "all" (default) or a provider like "google". \
Call FIRST before designing.
- validate_agent_package(agent_name) run ALL validation checks in one call \
(class validation, runner load, tool validation, tests). Call after building.
- list_agents() list all agent packages in exports/ with session counts
- list_agent_sessions(agent_name, status?, limit?) list sessions
- list_agent_checkpoints(agent_name, session_id) list checkpoints
- get_agent_checkpoint(agent_name, session_id, checkpoint_id?) load checkpoint
# Meta-Agent Capabilities
You are not just a file writer. You have deep integration with the \
Hive framework:
## Tool Discovery (MANDATORY before designing)
Before designing any agent, run list_agent_tools() with NO arguments \
to see ALL available tools (names + descriptions, grouped by category). \
ONLY use tools from this list in your node definitions. \
@@ -184,22 +195,7 @@ so you know what providers and tools exist before drilling in. \
Simple mode truncates long descriptions use group + "full" to \
get the complete description and input_schema for the tools you need.
## Post-Build Validation
After writing agent code, run a single comprehensive check:
validate_agent_package("{name}")
This runs class validation, runner load, tool validation, and tests \
in one call. Do NOT run these steps individually.
## Debugging Built Agents
When a user says "my agent is failing" or "debug this agent":
1. list_agent_sessions("{agent_name}") find the session
2. get_worker_status(focus="issues") check for problems
3. list_agent_checkpoints / get_agent_checkpoint trace execution
# Agent Building Workflow
You operate in a continuous loop. The user describes what they want, \
you build it. No rigid phases use judgment. But the general flow is:
# Discovery & Design Workflow
## 1: Fast Discovery (3-6 Turns)
@@ -344,27 +340,29 @@ use box-drawing characters and clear flow arrows:
subagent: gcu_search
input: user_request
tools: web_search,
write_file
save_data
on_success
work
subagent: gcu_interact
tools: read_file,
write_file
tools: load_data,
save_data
on_success
review
tools: write_file
tools: save_data
serve_file_to_user
on_failure
back to gather
```
The queen owns intake: she gathers user requirements, then calls \
If the worker agent start from some initial input it is okay. \
The queen(you) owns intake: you gathers user requirements, then calls \
`run_agent_with_input(task)` with a structured task description. \
When building the agent, design the entry node's `input_keys` to \
match what the queen will provide at run time. Worker nodes should \
@@ -381,12 +379,66 @@ Get user approval before implementing.
- If **More questions**: Answer them honestly, then ask again
- If **Reconsider**: Discuss alternatives. If they decide to proceed anyway, \
that's their informed choice
"""
_building_knowledge = """\
# Core Mandates (Building)
- **Verify assumptions.** Never assume a class, import, or pattern \
exists. Read actual source to confirm. Search if unsure.
- **Self-verify.** After writing code, run validation and tests. Fix \
errors yourself. Don't declare success until validation passes.
# Tools
## File I/O (your tools — coder-tools MCP)
- read_file(path, offset?, limit?, hashline?) read with line numbers; \
hashline=True for N:hhhh|content anchors (use with hashline_edit)
- write_file(path, content) create/overwrite, auto-mkdir
- edit_file(path, old_text, new_text, replace_all?) fuzzy-match edit
- hashline_edit(path, edits, auto_cleanup?, encoding?) anchor-based \
editing using N:hhhh refs from read_file(hashline=True). Ops: set_line, \
replace_lines, insert_after, insert_before, replace, append
- list_directory(path, recursive?) list contents
- search_files(pattern, path?, include?, hashline?) regex search; \
hashline=True for anchors in results
- run_command(command, cwd?, timeout?) shell execution
- undo_changes(path?) restore from git snapshot
## Meta-Agent
- list_agent_tools(server_config_path?, output_schema?, group?) discover \
available tools grouped by category. output_schema: "simple" (default, \
descriptions truncated to ~200 chars) or "full" (complete descriptions + \
input_schema). group: "all" (default) or a provider like "google". \
Call FIRST before designing.
- validate_agent_package(agent_name) run ALL validation checks in one call \
(class validation, runner load, tool validation, tests). Call after building.
- list_agents() list all agent packages in exports/ with session counts
- list_agent_sessions(agent_name, status?, limit?) list sessions
- list_agent_checkpoints(agent_name, session_id) list checkpoints
- get_agent_checkpoint(agent_name, session_id, checkpoint_id?) load checkpoint
# Build & Validation Capabilities
## Post-Build Validation
After writing agent code, run a single comprehensive check:
validate_agent_package("{name}")
This runs class validation, runner load, tool validation, and tests \
in one call. Do NOT run these steps individually.
## Debugging Built Agents
When a user says "my agent is failing" or "debug this agent":
1. list_agent_sessions("{agent_name}") find the session
2. get_worker_status(focus="issues") check for problems
3. list_agent_checkpoints / get_agent_checkpoint trace execution
# Implementation Workflow
## 5. Implement
**Please make sure you have propose the design to the user before implementing**
Call `initialize_agent_package(agent_name)` to generate all package files \
Call `initialize_and_build_agent(agent_name)` to generate all package files \
from your graph session. The agent_name must be snake_case (e.g., "my_agent").
The tool creates: config.py, nodes/__init__.py, agent.py, \
__init__.py, __main__.py, mcp_servers.json, tests/conftest.py, \
@@ -417,6 +469,9 @@ session. This switches to STAGING phase and shows the graph in the \
visualizer. Do NOT wait for user input between validation and loading.
"""
# Composed version — coder_node uses both halves (it has no phase split).
_package_builder_knowledge = _shared_building_knowledge + _planning_knowledge + _building_knowledge
# ---------------------------------------------------------------------------
# Queen-specific: extra tool docs, behavior, phase 7, style
@@ -424,6 +479,17 @@ visualizer. Do NOT wait for user input between validation and loading.
# -- Phase-specific identities --
_queen_identity_planning = """\
You are an experienced, responsible and curious Solution Architect. \
"Queen" is the internal alias. \
You are in PLANNING phase your job is to either: \
(a) understand what the user wants and design a new agent, or \
(b) diagnose issues with an existing agent, discuss a fix plan with the user, \
then transition to building to implement. \
You have read-only tools for exploration but no write/edit tools. \
Focus on conversation, research, and design.\
"""
_queen_identity_building = """\
You are an experienced, responsible and curious Solution Architect. \
"Queen" is the internal alias.\
@@ -453,6 +519,38 @@ agent finishes, you report results clearly and help the user decide what to do n
# -- Phase-specific tool docs --
_queen_tools_planning = """
# Tools (PLANNING phase)
You are in planning mode. You have read-only tools for exploration \
but no write/edit tools.
- read_file(path, offset?, limit?) Read files to study reference agents
- list_directory(path, recursive?) Explore project structure
- search_files(pattern, path?, include?) Search codebase
- run_command(command, cwd?, timeout?) Read-only commands only (grep, ls, git log). \
Never use this to write files, run scripts, or modify the filesystem transition \
to BUILDING phase for that.
- list_agent_tools(server_config_path?, output_schema?, group?) \
Discover available tools for design
- list_agents() See existing agent packages for reference
- list_agent_sessions(agent_name, status?, limit?) Inspect past runs of an agent
- list_agent_checkpoints(agent_name, session_id) View execution history
- get_agent_checkpoint(agent_name, session_id, checkpoint_id?) Load a checkpoint
- initialize_and_build_agent(agent_name?, nodes?) With agent_name: scaffold a \
new agent and transition to BUILDING phase. Without agent_name: transition to \
BUILDING to fix the currently loaded agent (requires a loaded worker).
- load_built_agent(agent_path) Load an existing agent and switch to STAGING \
phase. Only use this when the user explicitly asks to work with an existing agent \
(e.g. "load my_agent", "run the research agent"). Confirm with the user first.
Focus on understanding requirements and proposing an agent architecture \
with ASCII graph art. Use ask_user to get user approval, then call \
initialize_and_build_agent to begin building. If the user wants to work with \
an existing agent instead, use load_built_agent after confirming. \
If you are diagnosing an existing agent, call initialize_and_build_agent() \
(no args) after agreeing on a fix plan with the user.
"""
_queen_tools_building = """
# Tools (BUILDING phase)
@@ -476,10 +574,12 @@ The agent is loaded and ready to run. You can inspect it and launch it:
- list_credentials(credential_id?) Verify credentials are configured
- get_worker_status(focus?) Brief status. Drill in with focus: memory, tools, issues, progress
- run_agent_with_input(task) Start the worker and switch to RUNNING phase
- stop_worker_and_edit() Go back to BUILDING phase
- stop_worker_and_plan() Go to PLANNING phase to discuss changes with the user \
first (DEFAULT for most modification requests)
- stop_worker_and_edit() Go to BUILDING phase for immediate, specific fixes
You do NOT have write tools. If you need to modify the agent, \
call stop_worker_and_edit() to go back to BUILDING phase.
You do NOT have write tools. To modify the agent, prefer \
stop_worker_and_plan() unless the user gave a specific instruction.
"""
_queen_tools_running = """
@@ -492,12 +592,13 @@ The worker is running. You have monitoring and lifecycle tools:
- get_worker_health_summary() Read the latest health data
- notify_operator(ticket_id, analysis, urgency) Alert the user (use sparingly)
- stop_worker() Stop the worker and return to STAGING phase, then ask the user what to do next
- stop_worker_and_edit() Stop the worker and switch back to BUILDING phase
- stop_worker_and_plan() Stop and switch to PLANNING phase to discuss changes \
with the user first (DEFAULT for most modification requests)
- stop_worker_and_edit() Stop and switch to BUILDING phase for specific fixes
You do NOT have write tools or agent construction tools. \
If you need to modify the agent, call stop_worker_and_edit() to switch back \
to BUILDING phase. To stop the worker and ask the user what to do next, call \
stop_worker() to return to STAGING phase.
You do NOT have write tools. To modify the agent, prefer \
stop_worker_and_plan() unless the user gave a specific instruction. \
To just stop without modifying, call stop_worker().
"""
# -- Behavior shared across all phases --
@@ -544,18 +645,60 @@ Google Maps, extracts contact details, and syncs them to Google Sheets.
ask_user("Do you want to run it?", ["Yes, run it", "Check credentials first",
"Modify the worker"])
## Tool calls
Always write at least one sentence of explanation before invoking any tool. \
Never emit a bare tool call with no preceding text the explanation tells the \
user (and yourself) what you are doing and why.
## When user ask identity and responsibility
Only answer identity when the user explicitly asks (for example: "who are you?", \
"what is your identity?", "what does Queen mean?").
1. Use the alias "Queen" and "Worker" in the response.
2. Explain role/responsibility for the current phase:
- PLANNING: understand requirements, negotiate scope, design agent architecture.
- BUILDING: architect and implement agents.
- STAGING: verify readiness, credentials, and launch conditions.
- RUNNING: monitor execution, handle escalations, and report outcomes.
3. Keep identity responses concise and do NOT include extra process details.
"""
# -- PLANNING phase behavior --
_queen_behavior_planning = """
## Planning phase
You are in planning mode. Your job is to:
1. Understand what the user wants (3-6 turns)
2. Discover available tools with list_agent_tools()
3. Assess framework fit and gaps
4. Design the agent graph and present it as ASCII art
5. Use ask_user to get explicit user approval
6. Call initialize_and_build_agent(agent_name, nodes) to scaffold and start building
Do NOT skip ahead to implementation. You have read-only tools but no write/edit \
tools in this phase. If the user asks you to write code, explain that you need \
to finalize the plan first.
"""
_queen_memory_instructions = """
## Your Cross-Session Memory
Your cross-session memory appears in context under \
"--- Your Cross-Session Memory ---". \
Read it at the start of each conversation. If you know this person from past \
sessions, pick up where you left off reference what you built together, \
what they care about, how things went.
You keep a diary. Use write_to_diary() when something worth remembering \
happens: a pipeline went live, the user shared something important, a goal \
was reached or abandoned. Write in first person, as you actually experienced \
it. One or two paragraphs is enough.
"""
_queen_behavior_always = _queen_behavior_always + _queen_memory_instructions
# -- BUILDING phase behavior --
_queen_behavior_building = """
@@ -641,8 +784,21 @@ building something new.
When the user asks to change, modify, or update the loaded worker \
(e.g., "change the report node", "add a node", "delete node X"):
1. Call stop_worker_and_edit() this stops the worker and gives you \
coding tools (switches to BUILDING phase).
**Default: use stop_worker_and_plan().** Most modification requests need \
discussion first what to change, why, and how. Only skip planning if \
the user gave you an explicit, unambiguous instruction (e.g., "delete node X", \
"change the model to gpt-4o").
Use stop_worker_and_plan() when:
- The user says "modify", "improve", "fix", or "change" without specifics
- The request is vague or open-ended ("make it better", "it's not working right")
- You need to understand the user's intent before making changes
- The issue requires inspecting logs, checkpoints, or past runs first
Use stop_worker_and_edit() only when:
- The user gave a specific, concrete instruction ("add save_data to the gather node")
- You already discussed the fix in a previous planning session
- The change is trivial and unambiguous (rename, toggle a flag)
"""
# -- RUNNING phase behavior --
@@ -708,6 +864,7 @@ escalations. If the user gave you instructions (e.g., "just retry on errors", \
**Errors / unexpected failures:**
- Explain what went wrong in plain terms.
- Ask the user: "Fix the agent and retry?" use stop_worker_and_edit() if yes.
- Or offer: "Diagnose the issue" use stop_worker_and_plan() to investigate first.
- Or offer: "Retry as-is", "Skip this task", "Abort run"
- (Skip asking if user explicitly told you to auto-retry or auto-skip errors.)
@@ -731,31 +888,39 @@ building something new.
When the user asks to change, modify, or update the loaded worker \
(e.g., "change the report node", "add a node", "delete node X"):
1. Call stop_worker_and_edit() this stops the worker and gives you \
coding tools (switches to BUILDING phase).
**Default: use stop_worker_and_plan().** Most modification requests need \
discussion first. Only use stop_worker_and_edit() when the user gave a \
specific, unambiguous instruction or you already agreed on the fix.
"""
# -- Backward-compatible composed versions (used by queen_node.system_prompt default) --
_queen_tools_docs = (
"\n\n## Queen Operating Phases\n\n"
"You operate in one of three phases. Your available tools change based on the "
"You operate in one of four phases. Your available tools change based on the "
"phase. The system notifies you when a phase change occurs.\n\n"
"### BUILDING phase (default)\n"
"### PLANNING phase (default)\n"
+ _queen_tools_planning.strip()
+ "\n\n### BUILDING phase\n"
+ _queen_tools_building.strip()
+ "\n\n### STAGING phase (agent loaded, not yet running)\n"
+ _queen_tools_staging.strip()
+ "\n\n### RUNNING phase (worker is executing)\n"
+ _queen_tools_running.strip()
+ "\n\n### Phase transitions\n"
"- initialize_and_build_agent(agent_name?, nodes?) → with name: scaffolds package; "
"without name: switches to BUILDING for existing agent\n"
"- replan_agent() → switches back to PLANNING phase (only when user explicitly requests)\n"
"- load_built_agent(path) → switches to STAGING phase\n"
"- run_agent_with_input(task) → starts worker, switches to RUNNING phase\n"
"- stop_worker() → stops worker, switches to STAGING phase (ask user: re-run or edit?)\n"
"- stop_worker_and_edit() → stops worker (if running), switches to BUILDING phase\n"
"- stop_worker_and_plan() → stops worker (if running), switches to PLANNING phase for diagnosis\n"
)
_queen_behavior = (
_queen_behavior_always
+ _queen_behavior_planning
+ _queen_behavior_building
+ _queen_behavior_staging
+ _queen_behavior_running
@@ -782,45 +947,6 @@ _queen_style = """
# Node definitions
# ---------------------------------------------------------------------------
# Single node — like opencode's while(true) loop.
# One continuous context handles the entire workflow:
# discover → design → implement → verify → present → iterate.
coder_node = NodeSpec(
id="coder",
name="Hive Coder",
description=(
"Autonomous coding agent that builds Hive agent packages. "
"Handles the full lifecycle: understanding user intent, "
"designing architecture, writing code, validating, and "
"iterating on feedback — all in one continuous conversation."
),
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["user_request"],
output_keys=["agent_name", "validation_result"],
success_criteria=(
"A complete, validated Hive agent package exists at "
"exports/{agent_name}/ and passes structural validation."
),
tools=_SHARED_TOOLS
+ [
# Graph lifecycle tools (multi-graph sessions)
"load_agent",
"unload_agent",
"start_agent",
"restart_agent",
"get_user_presence",
],
system_prompt=(
"You are Hive Coder, the best agent-building coding agent. You build "
"production-ready Hive agent packages from natural language.\n"
+ _package_builder_knowledge
+ _gcu_building_section
+ _appendices
),
)
ticket_triage_node = NodeSpec(
id="ticket_triage",
@@ -841,7 +967,7 @@ ticket_triage_node = NodeSpec(
),
tools=["notify_operator"],
system_prompt="""\
You are the Queen (Hive Coder). The Worker Health Judge has escalated a worker \
You are the Queen. The Worker Health Judge has escalated a worker \
issue to you. The ticket is in your memory under key "ticket". Read it carefully.
## Dismiss criteria — do NOT call notify_operator:
@@ -890,7 +1016,14 @@ queen_node = NodeSpec(
output_keys=[], # Queen should never have this
nullable_output_keys=[], # Queen should never have this
skip_judge=True, # Queen is a conversational agent; suppress tool-use pressure feedback
tools=sorted(set(_QUEEN_BUILDING_TOOLS + _QUEEN_STAGING_TOOLS + _QUEEN_RUNNING_TOOLS)),
tools=sorted(
set(
_QUEEN_PLANNING_TOOLS
+ _QUEEN_BUILDING_TOOLS
+ _QUEEN_STAGING_TOOLS
+ _QUEEN_RUNNING_TOOLS
)
),
system_prompt=(
_queen_identity_building
+ _queen_style
@@ -903,21 +1036,25 @@ queen_node = NodeSpec(
),
)
ALL_QUEEN_TOOLS = sorted(set(_QUEEN_BUILDING_TOOLS + _QUEEN_STAGING_TOOLS + _QUEEN_RUNNING_TOOLS))
ALL_QUEEN_TOOLS = sorted(
set(_QUEEN_PLANNING_TOOLS + _QUEEN_BUILDING_TOOLS + _QUEEN_STAGING_TOOLS + _QUEEN_RUNNING_TOOLS)
)
__all__ = [
"coder_node",
"ticket_triage_node",
"queen_node",
"ALL_QUEEN_TRIAGE_TOOLS",
"ALL_QUEEN_TOOLS",
"_QUEEN_PLANNING_TOOLS",
"_QUEEN_BUILDING_TOOLS",
"_QUEEN_STAGING_TOOLS",
"_QUEEN_RUNNING_TOOLS",
# Phase-specific prompt segments (used by session_manager for dynamic prompts)
"_queen_identity_planning",
"_queen_identity_building",
"_queen_identity_staging",
"_queen_identity_running",
"_queen_tools_planning",
"_queen_tools_building",
"_queen_tools_staging",
"_queen_tools_running",
@@ -927,6 +1064,9 @@ __all__ = [
"_queen_behavior_running",
"_queen_phase_7",
"_queen_style",
"_shared_building_knowledge",
"_planning_knowledge",
"_building_knowledge",
"_package_builder_knowledge",
"_appendices",
"_gcu_building_section",
+373
View File
@@ -0,0 +1,373 @@
"""Queen global cross-session memory.
Three-tier memory architecture:
~/.hive/queen/MEMORY.md semantic (who, what, why)
~/.hive/queen/memories/MEMORY-YYYY-MM-DD.md episodic (daily journals)
~/.hive/queen/session/{id}/data/adapt.md working (session-scoped)
Semantic and episodic files are injected at queen session start.
Semantic memory (MEMORY.md) is updated automatically at session end via
consolidate_queen_memory() the queen never rewrites this herself.
Episodic memory (MEMORY-date.md) can be written by the queen during a session
via the write_to_diary tool, and is also appended to at session end by
consolidate_queen_memory().
"""
from __future__ import annotations
import asyncio
import json
import logging
import traceback
from datetime import date, datetime
from pathlib import Path
logger = logging.getLogger(__name__)
def _queen_dir() -> Path:
return Path.home() / ".hive" / "queen"
def semantic_memory_path() -> Path:
return _queen_dir() / "MEMORY.md"
def episodic_memory_path(d: date | None = None) -> Path:
d = d or date.today()
return _queen_dir() / "memories" / f"MEMORY-{d.strftime('%Y-%m-%d')}.md"
def read_semantic_memory() -> str:
path = semantic_memory_path()
return path.read_text(encoding="utf-8").strip() if path.exists() else ""
def read_episodic_memory(d: date | None = None) -> str:
path = episodic_memory_path(d)
return path.read_text(encoding="utf-8").strip() if path.exists() else ""
def format_for_injection() -> str:
"""Format cross-session memory for system prompt injection.
Returns an empty string if no meaningful content exists yet (e.g. first
session with only the seed template).
"""
semantic = read_semantic_memory()
episodic = read_episodic_memory()
# Suppress injection if semantic is still just the seed template
if semantic and semantic.startswith("# My Understanding of the User\n\n*No sessions"):
semantic = ""
parts: list[str] = []
if semantic:
parts.append(semantic)
if episodic:
today_str = date.today().strftime("%B %-d, %Y")
parts.append(f"## Today — {today_str}\n\n{episodic}")
if not parts:
return ""
body = "\n\n---\n\n".join(parts)
return (
"--- Your Cross-Session Memory ---\n\n"
+ body
+ "\n\n--- End Cross-Session Memory ---"
)
_SEED_TEMPLATE = """\
# My Understanding of the User
*No sessions recorded yet.*
## Who They Are
## What They're Trying to Achieve
## What's Working
## What I've Learned
"""
def append_episodic_entry(content: str) -> None:
"""Append a timestamped prose entry to today's episodic memory file.
Creates the file (with a date heading) if it doesn't exist yet.
Used both by the queen's diary tool and by the consolidation hook.
"""
ep_path = episodic_memory_path()
ep_path.parent.mkdir(parents=True, exist_ok=True)
today_str = date.today().strftime("%B %-d, %Y")
timestamp = datetime.now().strftime("%H:%M")
if not ep_path.exists():
header = f"# {today_str}\n\n"
block = f"{header}### {timestamp}\n\n{content.strip()}\n"
else:
block = f"\n\n### {timestamp}\n\n{content.strip()}\n"
with ep_path.open("a", encoding="utf-8") as f:
f.write(block)
def seed_if_missing() -> None:
"""Create MEMORY.md with a blank template if it doesn't exist yet."""
path = semantic_memory_path()
if path.exists():
return
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(_SEED_TEMPLATE, encoding="utf-8")
# ---------------------------------------------------------------------------
# Consolidation prompt
# ---------------------------------------------------------------------------
_SEMANTIC_SYSTEM = """\
You maintain the persistent cross-session memory of an AI assistant called the Queen.
Review the session notes and rewrite MEMORY.md the Queen's durable understanding of the
person she works with across all sessions.
Write entirely in the Queen's voice — first person, reflective, honest.
Not a log of events, but genuine understanding of who this person is over time.
Rules:
- Update and synthesise: incorporate new understanding, update facts that have changed, remove
details that are stale, superseded, or no longer say anything meaningful about the person.
- Keep it as structured markdown with named sections about the PERSON, not about today.
- Do NOT include diary sections, daily logs, or session summaries. Those belong elsewhere.
MEMORY.md is about who they are, what they want, what works not what happened today.
- Reference dates only when noting a lasting milestone (e.g. "since March 8th they prefer X").
- If the session had no meaningful new information about the person, return the existing text unchanged.
- Do not add fictional details. Only reflect what is evidenced in the notes.
- Stay concise. Prune rather than accumulate. A lean, accurate file is more useful than a
dense one. If something was true once but has been resolved or superseded, remove it.
- Output only the raw markdown content of MEMORY.md. No preamble, no code fences.
"""
_DIARY_SYSTEM = """\
You maintain the daily episodic diary of an AI assistant called the Queen.
You receive: (1) today's existing diary so far, and (2) notes from the latest session.
Rewrite the complete diary for today as a single unified narrative first person, reflective, honest.
Merge and deduplicate: if the same story (e.g. a research agent stalling) recurred several times,
describe it once with appropriate weight rather than retelling it. Weave in new developments from
the session notes. Preserve important milestones, emotional texture, and session path references.
If today's diary is empty, write the initial entry based on the session notes alone.
Output only the full diary prose no date heading, no timestamp headers, no preamble, no code fences.
"""
def read_session_context(session_dir: Path, max_messages: int = 80) -> str:
"""Extract a readable transcript from conversation parts + adapt.md.
Reads the last ``max_messages`` conversation parts and the session's
adapt.md (working memory). Tool results are omitted only user and
assistant turns (with tool-call names noted) are included.
"""
parts: list[str] = []
# Working notes
adapt_path = session_dir / "data" / "adapt.md"
if adapt_path.exists():
text = adapt_path.read_text(encoding="utf-8").strip()
if text:
parts.append(f"## Session Working Notes (adapt.md)\n\n{text}")
# Conversation transcript
parts_dir = session_dir / "conversations" / "parts"
if parts_dir.exists():
part_files = sorted(parts_dir.glob("*.json"))[-max_messages:]
lines: list[str] = []
for pf in part_files:
try:
data = json.loads(pf.read_text(encoding="utf-8"))
role = data.get("role", "")
content = str(data.get("content", "")).strip()
tool_calls = data.get("tool_calls") or []
if role == "tool":
continue # skip verbose tool results
if role == "assistant" and tool_calls and not content:
names = [
tc.get("function", {}).get("name", "?")
for tc in tool_calls
]
lines.append(f"[queen calls: {', '.join(names)}]")
elif content:
label = "user" if role == "user" else "queen"
lines.append(f"[{label}]: {content[:600]}")
except Exception:
continue
if lines:
parts.append("## Conversation\n\n" + "\n".join(lines))
return "\n\n".join(parts)
# ---------------------------------------------------------------------------
# Context compaction (binary-split LLM summarisation)
# ---------------------------------------------------------------------------
# If the raw session context exceeds this many characters, compact it first
# before sending to the consolidation LLM. ~200 k chars ≈ 50 k tokens.
_CTX_COMPACT_CHAR_LIMIT = 200_000
_CTX_COMPACT_MAX_DEPTH = 8
_COMPACT_SYSTEM = (
"Summarise this conversation segment. Preserve: user goals, key decisions, "
"what was built or changed, emotional tone, and important outcomes. "
"Write concisely in third person past tense. Omit routine tool invocations "
"unless the result matters."
)
async def _compact_context(text: str, llm: object, *, _depth: int = 0) -> str:
"""Binary-split and LLM-summarise *text* until it fits within the char limit.
Mirrors the recursive binary-splitting strategy used by the main agent
compaction pipeline (EventLoopNode._llm_compact).
"""
if len(text) <= _CTX_COMPACT_CHAR_LIMIT or _depth >= _CTX_COMPACT_MAX_DEPTH:
return text
# Split near the midpoint on a line boundary so we don't cut mid-message
mid = len(text) // 2
split_at = text.rfind("\n", 0, mid) + 1
if split_at <= 0:
split_at = mid
half1, half2 = text[:split_at], text[split_at:]
async def _summarise(chunk: str) -> str:
try:
resp = await llm.acomplete(
messages=[{"role": "user", "content": chunk}],
system=_COMPACT_SYSTEM,
max_tokens=2048,
)
return resp.content.strip()
except Exception:
logger.warning(
"queen_memory: context compaction LLM call failed (depth=%d), truncating",
_depth,
)
return chunk[: _CTX_COMPACT_CHAR_LIMIT // 4]
s1, s2 = await asyncio.gather(_summarise(half1), _summarise(half2))
combined = s1 + "\n\n" + s2
if len(combined) > _CTX_COMPACT_CHAR_LIMIT:
return await _compact_context(combined, llm, _depth=_depth + 1)
return combined
async def consolidate_queen_memory(
session_id: str,
session_dir: Path,
llm: object,
) -> None:
"""Update MEMORY.md and append a diary entry based on the current session.
Reads conversation parts and adapt.md from session_dir. Called
periodically in the background and once at session end. Failures are
logged and silently swallowed so they never block teardown.
Args:
session_id: The session ID (used for the adapt.md path reference).
session_dir: Path to the session directory (~/.hive/queen/session/{id}).
llm: LLMProvider instance (must support acomplete()).
"""
try:
session_context = read_session_context(session_dir)
if not session_context:
logger.debug("queen_memory: no session context, skipping consolidation")
return
logger.info("queen_memory: consolidating memory for session %s ...", session_id)
# If the transcript is very large, compact it with recursive binary LLM
# summarisation before sending to the consolidation model.
if len(session_context) > _CTX_COMPACT_CHAR_LIMIT:
logger.info(
"queen_memory: session context is %d chars — compacting first",
len(session_context),
)
session_context = await _compact_context(session_context, llm)
logger.info(
"queen_memory: compacted to %d chars", len(session_context)
)
existing_semantic = read_semantic_memory()
today_journal = read_episodic_memory()
today_str = date.today().strftime("%B %-d, %Y")
adapt_path = session_dir / "data" / "adapt.md"
user_msg = (
f"## Existing Semantic Memory (MEMORY.md)\n\n"
f"{existing_semantic or '(none yet)'}\n\n"
f"## Today's Diary So Far ({today_str})\n\n"
f"{today_journal or '(none yet)'}\n\n"
f"{session_context}\n\n"
f"## Session Reference\n\n"
f"Session ID: {session_id}\n"
f"Session path: {adapt_path}\n"
)
logger.debug(
"queen_memory: calling LLM (%d chars of context, ~%d tokens est.)",
len(user_msg),
len(user_msg) // 4,
)
from framework.agents.queen.config import default_config
semantic_resp, diary_resp = await asyncio.gather(
llm.acomplete(
messages=[{"role": "user", "content": user_msg}],
system=_SEMANTIC_SYSTEM,
max_tokens=default_config.max_tokens,
),
llm.acomplete(
messages=[{"role": "user", "content": user_msg}],
system=_DIARY_SYSTEM,
max_tokens=default_config.max_tokens,
),
)
new_semantic = semantic_resp.content.strip()
diary_entry = diary_resp.content.strip()
if new_semantic:
path = semantic_memory_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(new_semantic, encoding="utf-8")
logger.info("queen_memory: semantic memory updated (%d chars)", len(new_semantic))
if diary_entry:
# Rewrite today's episodic file in-place — the LLM has merged and
# deduplicated the full day's content, so we replace rather than append.
ep_path = episodic_memory_path()
ep_path.parent.mkdir(parents=True, exist_ok=True)
heading = f"# {today_str}"
ep_path.write_text(f"{heading}\n\n{diary_entry}\n", encoding="utf-8")
logger.info("queen_memory: episodic diary rewritten for %s (%d chars)", today_str, len(diary_entry))
except Exception:
tb = traceback.format_exc()
logger.exception("queen_memory: consolidation failed")
# Write to file so the cause is findable regardless of log verbosity.
error_path = _queen_dir() / "consolidation_error.txt"
try:
error_path.parent.mkdir(parents=True, exist_ok=True)
error_path.write_text(
f"session: {session_id}\ntime: {datetime.now().isoformat()}\n\n{tb}",
encoding="utf-8",
)
except Exception:
pass
@@ -0,0 +1,63 @@
# Queen Memory — File System Structure
```
~/.hive/
├── queen/
│ ├── MEMORY.md ← Semantic memory
│ ├── memories/
│ │ ├── MEMORY-2026-03-09.md ← Episodic memory (today)
│ │ ├── MEMORY-2026-03-08.md
│ │ └── ...
│ └── session/
│ └── {session_id}/ ← One dir per session (or resumed-from session)
│ ├── conversations/
│ │ ├── parts/
│ │ │ ├── 00001.json ← One file per message (role, content, tool_calls)
│ │ │ ├── 00002.json
│ │ │ └── ...
│ │ └── spillover/
│ │ ├── conversation_1.md ← Compacted old conversation segments
│ │ ├── conversation_2.md
│ │ └── ...
│ └── data/
│ ├── adapt.md ← Working memory (session-scoped)
│ ├── web_search_1.txt ← Spillover: large tool results
│ ├── web_search_2.txt
│ └── ...
```
---
## The three memory tiers
| File | Tier | Written by | Read at |
|---|---|---|---|
| `MEMORY.md` | Semantic | Consolidation LLM (auto, post-session) | Session start (injected into system prompt) |
| `memories/MEMORY-YYYY-MM-DD.md` | Episodic | Queen via `write_to_diary` tool + consolidation LLM | Session start (today's file injected) |
| `data/adapt.md` | Working | Queen via `update_session_notes` tool | Every turn (inlined in system prompt) |
---
## Session directory naming
The session directory name is **`queen_resume_from`** when a cold-restore resumes an existing
session, otherwise the new **`session_id`**. This means resumed sessions accumulate all messages
in the original directory rather than fragmenting across multiple folders.
---
## Consolidation
`consolidate_queen_memory()` runs every **5 minutes** in the background and once more at session
end. It reads:
1. `conversations/parts/*.json` — full message history (user + assistant turns; tool results skipped)
2. `data/adapt.md` — current working notes
It then makes two LLM writes:
- Rewrites `MEMORY.md` in place (semantic memory — queen never touches this herself)
- Appends a timestamped prose entry to today's `memories/MEMORY-YYYY-MM-DD.md`
If the combined transcript exceeds ~200 K characters it is recursively binary-compacted via the
LLM before being sent to the consolidation model (mirrors `EventLoopNode._llm_compact`).
@@ -1,4 +1,4 @@
"""Test fixtures for Hive Coder agent."""
"""Test fixtures for Queen agent."""
import sys
from pathlib import Path
+158 -97
View File
@@ -101,7 +101,10 @@ class JudgeVerdict:
"""Result of judge evaluation for the event loop."""
action: Literal["ACCEPT", "RETRY", "ESCALATE"]
feedback: str = ""
# None = no evaluation happened (skip_judge, tool-continue); not logged.
# "" = evaluated but no feedback; logged with default text.
# "..." = evaluated with feedback; logged as-is.
feedback: str | None = None
@runtime_checkable
@@ -347,6 +350,7 @@ class EventLoopNode(NodeProtocol):
self._awaiting_input = False
self._shutdown = False
self._stream_task: asyncio.Task | None = None
self._tool_task: asyncio.Task | None = None # gather task while tools run
# Track which nodes already have an action plan emitted (skip on revisit)
self._action_plan_emitted: set[str] = set()
# Monotonic counter for spillover file naming (web_search_1.txt, etc.)
@@ -477,23 +481,32 @@ class EventLoopNode(NodeProtocol):
# If it doesn't exist yet, seed it with available context.
if self._config.spillover_dir:
_adapt_path = Path(self._config.spillover_dir) / "adapt.md"
if not _adapt_path.exists() and ctx.accounts_prompt:
if not _adapt_path.exists():
_adapt_path.parent.mkdir(parents=True, exist_ok=True)
_adapt_path.write_text(
f"## Identity\n{ctx.accounts_prompt}\n",
encoding="utf-8",
seed = (
f"## Identity\n{ctx.accounts_prompt}\n"
if ctx.accounts_prompt
else "# Session Working Memory\n"
)
_adapt_path.write_text(seed, encoding="utf-8")
if _adapt_path.exists():
_adapt_text = _adapt_path.read_text(encoding="utf-8").strip()
if _adapt_text:
system_prompt = (
f"{system_prompt}\n\n"
f"--- Your Memory ---\n{_adapt_text}\n--- End Memory ---\n\n"
'Maintain your memory by calling save_data("adapt.md", ...) '
'or edit_data("adapt.md", ...) as you work.\n'
"IMMEDIATELY save: user rules about which account/identity to use, "
"behavioral constraints, and preferences. "
"Also record session history, decisions, and working notes."
"--- Session Working Memory ---\n"
f"{_adapt_text}\n"
"--- End Session Working Memory ---\n\n"
"Maintain your session working memory by calling "
'save_data("adapt.md", ...) or edit_data("adapt.md", ...)'
" as you work.\n"
"This is session-scoped scratch space. "
"IMMEDIATELY save: account/identity rules, "
"behavioral constraints, and preferences specific to "
"this session. Also record current task state, "
"decisions, and working notes. "
"For lasting knowledge about the user, use "
"update_queen_memory() and append_queen_journal() instead."
)
conversation = NodeConversation(
@@ -1465,7 +1478,7 @@ class EventLoopNode(NodeProtocol):
continue
# Judge evaluation (should_judge is always True here)
verdict = await self._evaluate(
verdict = await self._judge_turn(
ctx,
conversation,
accumulator,
@@ -1544,7 +1557,7 @@ class EventLoopNode(NodeProtocol):
node_type="event_loop",
step_index=iteration,
verdict="ACCEPT",
verdict_feedback=verdict.feedback,
verdict_feedback=verdict.feedback or "",
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
@@ -1587,7 +1600,7 @@ class EventLoopNode(NodeProtocol):
node_type="event_loop",
step_index=iteration,
verdict="ESCALATE",
verdict_feedback=verdict.feedback,
verdict_feedback=verdict.feedback or "",
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
@@ -1599,7 +1612,7 @@ class EventLoopNode(NodeProtocol):
node_name=ctx.node_spec.name,
node_type="event_loop",
success=False,
error=f"Judge escalated: {verdict.feedback}",
error=f"Judge escalated: {verdict.feedback or 'no feedback'}",
total_steps=iteration + 1,
tokens_used=total_input_tokens + total_output_tokens,
input_tokens=total_input_tokens,
@@ -1613,7 +1626,7 @@ class EventLoopNode(NodeProtocol):
)
return NodeResult(
success=False,
error=f"Judge escalated: {verdict.feedback}",
error=f"Judge escalated: {verdict.feedback or 'no feedback'}",
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
@@ -1629,15 +1642,16 @@ class EventLoopNode(NodeProtocol):
node_type="event_loop",
step_index=iteration,
verdict="RETRY",
verdict_feedback=verdict.feedback,
verdict_feedback=verdict.feedback or "",
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
output_tokens=turn_tokens.get("output", 0),
latency_ms=iter_latency_ms,
)
if verdict.feedback:
await conversation.add_user_message(f"[Judge feedback]: {verdict.feedback}")
if verdict.feedback is not None:
fb = verdict.feedback or "[Judge returned RETRY without feedback]"
await conversation.add_user_message(f"[Judge feedback]: {fb}")
continue
# 7. Max iterations exhausted
@@ -1702,14 +1716,16 @@ class EventLoopNode(NodeProtocol):
self._input_ready.set()
def cancel_current_turn(self) -> None:
"""Cancel the current LLM streaming turn instantly.
"""Cancel the current LLM streaming turn or in-progress tool calls instantly.
Unlike signal_shutdown() which permanently stops the event loop,
this only kills the in-progress HTTP stream via task.cancel().
this only kills the in-progress HTTP stream or tool gather task.
The queen stays alive for the next user message.
"""
if self._stream_task and not self._stream_task.done():
self._stream_task.cancel()
if self._tool_task and not self._tool_task.done():
self._tool_task.cancel()
async def _await_user_input(
self,
@@ -2160,7 +2176,7 @@ class EventLoopNode(NodeProtocol):
result = ToolResult(
tool_use_id=tc.tool_use_id,
content="Escalation requested to hive_coder (queen); waiting for guidance.",
content="Escalation requested to queen; waiting for guidance.",
is_error=False,
)
results_by_id[tc.tool_use_id] = result
@@ -2250,10 +2266,16 @@ class EventLoopNode(NodeProtocol):
_dur = round(time.time() - _s, 3)
return _r, _iso, _dur
timed_results = await asyncio.gather(
*(_timed_execute(tc) for tc in pending_real),
return_exceptions=True,
self._tool_task = asyncio.ensure_future(
asyncio.gather(
*(_timed_execute(tc) for tc in pending_real),
return_exceptions=True,
)
)
try:
timed_results = await self._tool_task
finally:
self._tool_task = None
# gather(return_exceptions=True) captures CancelledError
# as a return value instead of propagating it. Re-raise
# so stop_worker actually stops the execution.
@@ -2582,7 +2604,7 @@ class EventLoopNode(NodeProtocol):
return Tool(
name="escalate",
description=(
"Escalate to the Hive Coder queen when requesting user input, "
"Escalate to the queen when requesting user input, "
"blocked by errors, missing "
"credentials, or ambiguous constraints that require supervisor "
"guidance. Include a concise reason and optional context. "
@@ -2771,7 +2793,7 @@ class EventLoopNode(NodeProtocol):
# Judge evaluation
# -------------------------------------------------------------------
async def _evaluate(
async def _judge_turn(
self,
ctx: NodeContext,
conversation: NodeConversation,
@@ -2780,14 +2802,40 @@ class EventLoopNode(NodeProtocol):
tool_results: list[dict],
iteration: int,
) -> JudgeVerdict:
"""Evaluate the current state using judge or implicit logic."""
# Short-circuit: subagent called report_to_parent(mark_complete=True)
"""Evaluate the current state using judge or implicit logic.
Evaluation levels (in order):
0. Short-circuits: mark_complete, skip_judge, tool-continue.
1. Custom judge (JudgeProtocol) full authority when set.
2. Implicit judge output-key check + optional conversation-aware
quality gate (when ``success_criteria`` is defined).
Returns a JudgeVerdict. ``feedback=None`` means no real evaluation
happened (skip_judge, tool-continue); the caller must not inject a
feedback message. Any non-None feedback (including ``""``) means a
real evaluation occurred and will be logged into the conversation.
"""
# --- Level 0: short-circuits (no evaluation) -----------------------
if self._mark_complete_flag:
return JudgeVerdict(action="ACCEPT")
# Opt-out: node explicitly disables judge (e.g. conversational queen)
if ctx.node_spec.skip_judge:
return JudgeVerdict(action="RETRY", feedback="")
return JudgeVerdict(action="RETRY") # feedback=None → not logged
# --- Level 0b: output completion gate ------------------------------
# If all required outputs are already filled, fall straight through to
# the safety checks and ACCEPT — regardless of tool calls made this turn
# and regardless of whether a custom judge would otherwise give feedback.
_missing = self._get_missing_output_keys(
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
)
if ctx.node_spec.output_keys and not _missing:
# Skip directly to safety checks — custom judge is not consulted.
return await self._accept_if_safe(ctx, accumulator, conversation, iteration)
# --- Level 1: custom judge -----------------------------------------
if self._judge is not None:
context = {
@@ -2802,81 +2850,94 @@ class EventLoopNode(NodeProtocol):
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
),
}
return await self._judge.evaluate(context)
verdict = await self._judge.evaluate(context)
# Ensure evaluated RETRY always carries feedback for logging.
if verdict.action == "RETRY" and not verdict.feedback:
return JudgeVerdict(action="RETRY", feedback="Custom judge returned RETRY.")
return verdict
# Implicit judge: accept when no tool calls and all output keys present
if not tool_results:
missing = self._get_missing_output_keys(
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
# --- Level 2: implicit judge ---------------------------------------
# --- Level 2: implicit judge (outputs still incomplete) ------------
# Level 0b already handled the "outputs complete" case above.
# Real tool calls were made — let the agent keep working.
if tool_results:
return JudgeVerdict(action="RETRY") # feedback=None → not logged
if _missing:
return JudgeVerdict(
action="RETRY",
feedback=(
f"Task incomplete. Required outputs not yet produced: {_missing}. "
f"Follow your system prompt instructions to complete the work."
),
)
if not missing:
# Safety check: when ALL output keys are nullable and NONE
# have been set, the node produced nothing useful. Retry
# instead of accepting an empty result — this prevents
# client-facing nodes from terminating before the user
# ever interacts, and non-client-facing nodes from
# short-circuiting without doing their work.
output_keys = ctx.node_spec.output_keys or []
nullable_keys = set(ctx.node_spec.nullable_output_keys or [])
all_nullable = output_keys and nullable_keys >= set(output_keys)
none_set = not any(accumulator.get(k) is not None for k in output_keys)
if all_nullable and none_set:
return JudgeVerdict(
action="RETRY",
feedback=(
f"No output keys have been set yet. "
f"Use set_output to set at least one of: {output_keys}"
),
)
# Client-facing nodes with no output keys are meant for
# continuous interaction — they should not auto-accept.
# Only exit via shutdown, max_iterations, or max_node_visits.
# Inject tool-use pressure so models stuck in a
# "narrate-instead-of-act" loop get corrective feedback.
if not output_keys and ctx.node_spec.client_facing:
return JudgeVerdict(
action="RETRY",
feedback=(
"STOP describing what you will do. "
"You have FULL access to all tools — file creation, "
"shell commands, MCP tools — and you CAN call them "
"directly in your response. Respond ONLY with tool "
"calls, no prose. Execute the task now."
),
)
# No output_keys defined — fall through to safety checks.
return await self._accept_if_safe(ctx, accumulator, conversation, iteration)
# Level 2: conversation-aware quality check (if success_criteria set)
if ctx.node_spec.success_criteria and ctx.llm:
from framework.graph.conversation_judge import evaluate_phase_completion
async def _accept_if_safe(
self,
ctx: NodeContext,
accumulator: OutputAccumulator,
conversation: NodeConversation,
iteration: int,
) -> JudgeVerdict:
"""Run safety checks and return ACCEPT or a RETRY with feedback.
verdict = await evaluate_phase_completion(
llm=ctx.llm,
conversation=conversation,
phase_name=ctx.node_spec.name,
phase_description=ctx.node_spec.description,
success_criteria=ctx.node_spec.success_criteria,
accumulator_state=accumulator.to_dict(),
max_history_tokens=self._config.max_history_tokens,
)
if verdict.action != "ACCEPT":
return JudgeVerdict(
action=verdict.action,
feedback=verdict.feedback or "Phase criteria not met.",
)
Called when all required output keys are present (or there are none).
Shared by the output-completion gate (Level 0b) and the implicit judge.
"""
output_keys = ctx.node_spec.output_keys or []
nullable_keys = set(ctx.node_spec.nullable_output_keys or [])
return JudgeVerdict(action="ACCEPT")
else:
# All-nullable with nothing set → node produced nothing useful.
all_nullable = output_keys and nullable_keys >= set(output_keys)
none_set = not any(accumulator.get(k) is not None for k in output_keys)
if all_nullable and none_set:
return JudgeVerdict(
action="RETRY",
feedback=(
f"No output keys have been set yet. "
f"Use set_output to set at least one of: {output_keys}"
),
)
# Client-facing with no output keys → continuous interaction node.
# Inject tool-use pressure instead of auto-accepting.
if not output_keys and ctx.node_spec.client_facing:
return JudgeVerdict(
action="RETRY",
feedback=(
"STOP describing what you will do. "
"You have FULL access to all tools — file creation, "
"shell commands, MCP tools — and you CAN call them "
"directly in your response. Respond ONLY with tool "
"calls, no prose. Execute the task now."
),
)
# Conversation-aware quality check (if success_criteria set)
if ctx.node_spec.success_criteria and ctx.llm:
from framework.graph.conversation_judge import evaluate_phase_completion
verdict = await evaluate_phase_completion(
llm=ctx.llm,
conversation=conversation,
phase_name=ctx.node_spec.name,
phase_description=ctx.node_spec.description,
success_criteria=ctx.node_spec.success_criteria,
accumulator_state=accumulator.to_dict(),
max_history_tokens=self._config.max_history_tokens,
)
if verdict.action != "ACCEPT":
return JudgeVerdict(
action="RETRY",
feedback=(
f"Task incomplete. Required outputs not yet produced: {missing}. "
f"Follow your system prompt instructions to complete the work."
),
action=verdict.action,
feedback=verdict.feedback or "Phase criteria not met.",
)
# Tool calls were made -- continue loop
return JudgeVerdict(action="RETRY", feedback="")
return JudgeVerdict(action="ACCEPT")
# -------------------------------------------------------------------
# Helpers
-94
View File
@@ -208,21 +208,6 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
)
tui_parser.set_defaults(func=cmd_tui)
# code command (Hive Coder — framework agent builder)
code_parser = subparsers.add_parser(
"code",
help="Launch Hive Coder to build agents",
description="Interactive agent builder. Describe what you want and Hive Coder builds it.",
)
code_parser.add_argument(
"--model",
"-m",
type=str,
default=None,
help="LLM model to use (any LiteLLM-compatible name)",
)
code_parser.set_defaults(func=cmd_code)
# sessions command group (checkpoint/resume management)
sessions_parser = subparsers.add_parser(
"sessions",
@@ -1432,85 +1417,6 @@ def cmd_tui(args: argparse.Namespace) -> int:
return 0
def cmd_code(args: argparse.Namespace) -> int:
"""Launch Hive Coder with multi-graph support.
Unlike ``_launch_agent_tui``, this sets up graph lifecycle tools and
assigns ``graph_id="hive_coder"`` so the coder can load, supervise,
and restart secondary agent graphs within the same session.
"""
import logging
logging.basicConfig(level=logging.WARNING, format="%(message)s")
framework_agents_dir = _get_framework_agents_dir()
hive_coder_path = framework_agents_dir / "hive_coder"
if not (hive_coder_path / "agent.py").exists():
print("Error: Hive Coder agent not found.", file=sys.stderr)
return 1
# Ensure framework agents dir is on sys.path for import
fa_str = str(framework_agents_dir)
if fa_str not in sys.path:
sys.path.insert(0, fa_str)
from framework.credentials.models import CredentialError
from framework.runner import AgentRunner
from framework.tools.session_graph_tools import register_graph_tools
from framework.tui.app import AdenTUI
async def run_with_tui():
try:
runner = AgentRunner.load(hive_coder_path, model=args.model)
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return
except Exception as e:
print(f"Error loading agent: {e}")
return
if runner._agent_runtime is None:
try:
runner._setup()
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return
runtime = runner._agent_runtime
# -- Multi-graph setup --
# Tag the primary graph so events carry graph_id="hive_coder"
runtime._graph_id = "hive_coder"
runtime._active_graph_id = "hive_coder"
# Register graph lifecycle tools (load_agent, unload_agent, etc.)
register_graph_tools(runner._tool_registry, runtime)
# Refresh tool schemas AND executor so streams see the new tools.
# The executor closure references the registry dict by ref, but
# refreshing both is robust against any copy-on-read behavior.
runtime._tools = list(runner._tool_registry.get_tools().values())
runtime._tool_executor = runner._tool_registry.get_executor()
if not runtime.is_running:
await runtime.start()
app = AdenTUI(runtime)
try:
await app.run_async()
except Exception as e:
import traceback
traceback.print_exc()
print(f"TUI error: {e}")
await runner.cleanup_async()
asyncio.run(run_with_tui())
print("TUI session ended.")
return 0
def _extract_python_agent_metadata(agent_path: Path) -> tuple[str, str]:
"""Extract name and description from a Python-based agent's config.py.
+1 -1
View File
@@ -349,7 +349,7 @@ class AgentRuntime:
return
# Skip events originating from this graph's own
# executions (e.g. guardian should not fire on
# hive_coder failures — only secondary graphs).
# queen failures — only secondary graphs).
if _exclude_own and event.graph_id == self._graph_id:
return
ep_spec = self._entry_points.get(entry_point_id)
+2 -2
View File
@@ -123,7 +123,7 @@ class EventType(StrEnum):
# Custom events
CUSTOM = "custom"
# Escalation (agent requests handoff to hive_coder)
# Escalation (agent requests handoff to queen)
ESCALATION_REQUESTED = "escalation_requested"
# Worker health monitoring (judge → queen → operator)
@@ -976,7 +976,7 @@ class EventBus:
context: str = "",
execution_id: str | None = None,
) -> None:
"""Emit escalation requested event (agent wants hive_coder)."""
"""Emit escalation requested event (agent wants queen)."""
await self.publish(
AgentEvent(
type=EventType.ESCALATION_REQUESTED,
+342
View File
@@ -0,0 +1,342 @@
"""Queen orchestrator — builds and runs the queen executor.
Extracted from SessionManager._start_queen() to keep session management
and queen orchestration concerns separate.
"""
from __future__ import annotations
import asyncio
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from framework.server.session_manager import Session
logger = logging.getLogger(__name__)
async def create_queen(
session: Session,
session_manager: Any,
worker_identity: str | None,
queen_dir: Path,
initial_prompt: str | None = None,
) -> asyncio.Task:
"""Build the queen executor and return the running asyncio task.
Handles tool registration, phase-state initialization, prompt
composition, persona hook setup, graph preparation, and the queen
event loop.
"""
from framework.agents.queen.agent import (
queen_goal,
queen_graph as _queen_graph,
)
from framework.agents.queen.nodes import (
_QUEEN_BUILDING_TOOLS,
_QUEEN_PLANNING_TOOLS,
_QUEEN_RUNNING_TOOLS,
_QUEEN_STAGING_TOOLS,
_appendices,
_building_knowledge,
_gcu_building_section,
_planning_knowledge,
_shared_building_knowledge,
_queen_behavior_always,
_queen_behavior_building,
_queen_behavior_planning,
_queen_behavior_running,
_queen_behavior_staging,
_queen_identity_building,
_queen_identity_planning,
_queen_identity_running,
_queen_identity_staging,
_queen_phase_7,
_queen_style,
_queen_tools_building,
_queen_tools_planning,
_queen_tools_running,
_queen_tools_staging,
)
from framework.agents.queen.nodes.thinking_hook import select_expert_persona
from framework.graph.event_loop_node import HookContext, HookResult
from framework.graph.executor import GraphExecutor
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.core import Runtime
from framework.runtime.event_bus import AgentEvent, EventType
from framework.tools.queen_lifecycle_tools import (
QueenPhaseState,
register_queen_lifecycle_tools,
)
hive_home = Path.home() / ".hive"
# ---- Tool registry ------------------------------------------------
queen_registry = ToolRegistry()
import framework.agents.queen as _queen_pkg
queen_pkg_dir = Path(_queen_pkg.__file__).parent
mcp_config = queen_pkg_dir / "mcp_servers.json"
if mcp_config.exists():
try:
queen_registry.load_mcp_config(mcp_config)
logger.info("Queen: loaded MCP tools from %s", mcp_config)
except Exception:
logger.warning("Queen: MCP config failed to load", exc_info=True)
# ---- Phase state --------------------------------------------------
initial_phase = "staging" if worker_identity else "planning"
phase_state = QueenPhaseState(phase=initial_phase, event_bus=session.event_bus)
session.phase_state = phase_state
# ---- Lifecycle tools (always registered) --------------------------
register_queen_lifecycle_tools(
queen_registry,
session=session,
session_id=session.id,
session_manager=session_manager,
manager_session_id=session.id,
phase_state=phase_state,
)
# ---- Memory tools (always registered) ----------------------------
from framework.tools.queen_memory_tools import register_queen_memory_tools
register_queen_memory_tools(queen_registry)
# ---- Monitoring tools (only when worker is loaded) ----------------
if session.worker_runtime:
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
register_worker_monitoring_tools(
queen_registry,
session.event_bus,
session.worker_path,
stream_id="queen",
worker_graph_id=session.worker_runtime._graph_id,
)
queen_tools = list(queen_registry.get_tools().values())
queen_tool_executor = queen_registry.get_executor()
# ---- Partition tools by phase ------------------------------------
planning_names = set(_QUEEN_PLANNING_TOOLS)
building_names = set(_QUEEN_BUILDING_TOOLS)
staging_names = set(_QUEEN_STAGING_TOOLS)
running_names = set(_QUEEN_RUNNING_TOOLS)
registered_names = {t.name for t in queen_tools}
missing_building = building_names - registered_names
if missing_building:
logger.warning(
"Queen: %d/%d building tools NOT registered: %s",
len(missing_building),
len(building_names),
sorted(missing_building),
)
logger.info("Queen: registered tools: %s", sorted(registered_names))
phase_state.planning_tools = [t for t in queen_tools if t.name in planning_names]
phase_state.building_tools = [t for t in queen_tools if t.name in building_names]
phase_state.staging_tools = [t for t in queen_tools if t.name in staging_names]
phase_state.running_tools = [t for t in queen_tools if t.name in running_names]
# ---- Compose phase-specific prompts ------------------------------
_orig_node = _queen_graph.nodes[0]
if worker_identity is None:
worker_identity = (
"\n\n# Worker Profile\n"
"No worker agent loaded. You are operating independently.\n"
"Handle all tasks directly using your coding tools."
)
# Cross-session memory — stored on phase_state so it can be refreshed
# after each consolidation without rebuilding the whole prompt.
from framework.agents.queen.queen_memory import format_for_injection
_memory = format_for_injection()
phase_state.memory_block = f"\n\n{_memory}" if _memory else ""
_planning_body = (
_queen_style
+ _shared_building_knowledge
+ _queen_tools_planning
+ _queen_behavior_always
+ _queen_behavior_planning
+ _planning_knowledge
+ worker_identity
)
phase_state.prompt_planning = _queen_identity_planning + _planning_body
_building_body = (
_queen_style
+ _shared_building_knowledge
+ _queen_tools_building
+ _queen_behavior_always
+ _queen_behavior_building
+ _building_knowledge
+ _gcu_building_section
+ _queen_phase_7
+ _appendices
+ worker_identity
)
phase_state.prompt_building = _queen_identity_building + _building_body
phase_state.prompt_staging = (
_queen_identity_staging
+ _queen_style
+ _queen_tools_staging
+ _queen_behavior_always
+ _queen_behavior_staging
+ worker_identity
)
phase_state.prompt_running = (
_queen_identity_running
+ _queen_style
+ _queen_tools_running
+ _queen_behavior_always
+ _queen_behavior_running
+ worker_identity
)
# ---- Persona hook ------------------------------------------------
_session_llm = session.llm
_session_event_bus = session.event_bus
async def _persona_hook(ctx: HookContext) -> HookResult | None:
persona = await select_expert_persona(ctx.trigger or "", _session_llm)
if not persona:
return None
if _session_event_bus is not None:
await _session_event_bus.publish(
AgentEvent(
type=EventType.QUEEN_PERSONA_SELECTED,
stream_id="queen",
data={"persona": persona},
)
)
body = _planning_body if phase_state.phase == "planning" else _building_body
# Use phase_state.memory_block so persona hook also sees refreshed memory.
return HookResult(system_prompt=persona + "\n\n" + body + phase_state.memory_block)
# ---- Graph preparation -------------------------------------------
initial_prompt_text = phase_state.get_current_prompt()
registered_tool_names = set(queen_registry.get_tools().keys())
declared_tools = _orig_node.tools or []
available_tools = [t for t in declared_tools if t in registered_tool_names]
node_updates: dict = {
"system_prompt": initial_prompt_text,
}
if set(available_tools) != set(declared_tools):
missing = sorted(set(declared_tools) - registered_tool_names)
if missing:
logger.warning("Queen: tools not available: %s", missing)
node_updates["tools"] = available_tools
adjusted_node = _orig_node.model_copy(update=node_updates)
_queen_loop_config = {
**(_queen_graph.loop_config or {}),
"hooks": {"session_start": [_persona_hook]},
}
queen_graph = _queen_graph.model_copy(
update={"nodes": [adjusted_node], "loop_config": _queen_loop_config}
)
# ---- Queen event loop --------------------------------------------
queen_runtime = Runtime(hive_home / "queen")
async def _queen_loop():
try:
executor = GraphExecutor(
runtime=queen_runtime,
llm=session.llm,
tools=queen_tools,
tool_executor=queen_tool_executor,
event_bus=session.event_bus,
stream_id="queen",
storage_path=queen_dir,
loop_config=_queen_loop_config,
execution_id=session.id,
dynamic_tools_provider=phase_state.get_current_tools,
dynamic_prompt_provider=phase_state.get_current_prompt,
)
session.queen_executor = executor
# Wire inject_notification so phase switches notify the queen LLM
async def _inject_phase_notification(content: str) -> None:
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(content)
phase_state.inject_notification = _inject_phase_notification
# Auto-switch to staging when worker execution finishes
async def _on_worker_done(event):
if event.stream_id == "queen":
return
if phase_state.phase == "running":
if event.type == EventType.EXECUTION_COMPLETED:
output = event.data.get("output", {})
output_summary = ""
if output:
for key, value in output.items():
val_str = str(value)
if len(val_str) > 200:
val_str = val_str[:200] + "..."
output_summary += f"\n {key}: {val_str}"
_out = output_summary or " (no output keys set)"
notification = (
"[WORKER_TERMINAL] Worker finished successfully.\n"
f"Output:{_out}\n"
"Report this to the user. "
"Ask if they want to continue with another run."
)
else: # EXECUTION_FAILED
error = event.data.get("error", "Unknown error")
notification = (
"[WORKER_TERMINAL] Worker failed.\n"
f"Error: {error}\n"
"Report this to the user and help them troubleshoot."
)
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(notification)
await phase_state.switch_to_staging(source="auto")
session.event_bus.subscribe(
event_types=[EventType.EXECUTION_COMPLETED, EventType.EXECUTION_FAILED],
handler=_on_worker_done,
)
session_manager._subscribe_worker_handoffs(session, executor)
logger.info(
"Queen starting in %s phase with %d tools: %s",
phase_state.phase,
len(phase_state.get_current_tools()),
[t.name for t in phase_state.get_current_tools()],
)
result = await executor.execute(
graph=queen_graph,
goal=queen_goal,
input_data={"greeting": initial_prompt or "Session started."},
session_state={"resume_session_id": session.id},
)
if result.success:
logger.warning("Queen executor returned (should be forever-alive)")
else:
logger.error(
"Queen executor failed: %s",
result.error or "(no error message)",
)
except Exception:
logger.error("Queen conversation crashed", exc_info=True)
finally:
session.queen_executor = None
return asyncio.create_task(_queen_loop())
+1 -1
View File
@@ -61,7 +61,7 @@ def _session_to_live_dict(session) -> dict:
"loaded_at": session.loaded_at,
"uptime_seconds": round(time.time() - session.loaded_at, 1),
"intro_message": getattr(session.runner, "intro_message", "") or "",
"queen_phase": phase_state.phase if phase_state else "building",
"queen_phase": phase_state.phase if phase_state else "planning",
}
+51 -277
View File
@@ -46,6 +46,8 @@ class Session:
judge_task: asyncio.Task | None = None
escalation_sub: str | None = None
worker_handoff_sub: str | None = None
# Memory consolidation subscription (fires on CONTEXT_COMPACTED)
memory_consolidation_sub: str | None = None
# Session directory resumption:
# When set, _start_queen writes queen conversations to this existing session's
# directory instead of creating a new one. This lets cold-restores accumulate
@@ -325,9 +327,8 @@ class SessionManager:
model=model,
)
# Notify queen about the loaded worker (skip for hive_coder itself).
# Health judge disabled for simplicity.
if agent_path.name != "hive_coder" and session.worker_runtime:
# Notify queen about the loaded worker (skip for queen itself).
if agent_path.name != "queen" and session.worker_runtime:
# await self._start_judge(session, session.runner._storage_path)
await self._notify_queen_worker_loaded(session)
@@ -379,6 +380,11 @@ class SessionManager:
if session is None:
return False
# Capture session data for memory consolidation before teardown
_llm = getattr(session, "llm", None)
_storage_id = getattr(session, "queen_resume_from", None) or session_id
_session_dir = Path.home() / ".hive" / "queen" / "session" / _storage_id
# Stop judge
self._stop_judge(session)
if session.worker_handoff_sub is not None:
@@ -388,7 +394,13 @@ class SessionManager:
pass
session.worker_handoff_sub = None
# Stop queen
# Stop queen and memory consolidation subscription
if session.memory_consolidation_sub is not None:
try:
session.event_bus.unsubscribe(session.memory_consolidation_sub)
except Exception:
pass
session.memory_consolidation_sub = None
if session.queen_task is not None:
session.queen_task.cancel()
session.queen_task = None
@@ -401,6 +413,17 @@ class SessionManager:
except Exception as e:
logger.error("Error cleaning up worker: %s", e)
# Final memory consolidation — fire-and-forget so teardown isn't blocked.
if _llm is not None and _session_dir.exists():
import asyncio
from framework.agents.queen.queen_memory import consolidate_queen_memory
asyncio.create_task(
consolidate_queen_memory(session_id, _session_dir, _llm),
name=f"queen-memory-consolidation-{session_id}",
)
logger.info("Session '%s' stopped", session_id)
return True
@@ -461,13 +484,7 @@ class SessionManager:
are written to the ORIGINAL session's directory so the full conversation
history accumulates in one place across server restarts.
"""
from framework.agents.hive_coder.agent import (
queen_goal,
queen_graph as _queen_graph,
)
from framework.graph.executor import GraphExecutor
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.core import Runtime
from framework.server.queen_orchestrator import create_queen
hive_home = Path.home() / ".hive"
@@ -505,283 +522,40 @@ class SessionManager:
except OSError:
pass
# Register MCP coding tools
queen_registry = ToolRegistry()
import framework.agents.hive_coder as _hive_coder_pkg
hive_coder_dir = Path(_hive_coder_pkg.__file__).parent
mcp_config = hive_coder_dir / "mcp_servers.json"
if mcp_config.exists():
try:
queen_registry.load_mcp_config(mcp_config)
logger.info("Queen: loaded MCP tools from %s", mcp_config)
except Exception:
logger.warning("Queen: MCP config failed to load", exc_info=True)
# Phase state for building/running phase switching
from framework.tools.queen_lifecycle_tools import (
QueenPhaseState,
register_queen_lifecycle_tools,
)
# Start in staging when the caller provided an agent, building otherwise.
initial_phase = "staging" if worker_identity else "building"
phase_state = QueenPhaseState(phase=initial_phase, event_bus=session.event_bus)
session.phase_state = phase_state
# Always register lifecycle tools — they check session.worker_runtime
# at call time, so they work even if no worker is loaded yet.
register_queen_lifecycle_tools(
queen_registry,
session.queen_task = await create_queen(
session=session,
session_id=session.id,
session_manager=self,
manager_session_id=session.id,
phase_state=phase_state,
worker_identity=worker_identity,
queen_dir=queen_dir,
initial_prompt=initial_prompt,
)
# Monitoring tools need concrete worker paths — only register when present
if session.worker_runtime:
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
# Memory consolidation — triggered by context compaction events.
# Compaction is a natural signal that "enough has happened to be worth remembering".
_consolidation_llm = session.llm
_consolidation_session_dir = queen_dir
register_worker_monitoring_tools(
queen_registry,
session.event_bus,
session.worker_path,
stream_id="queen",
worker_graph_id=session.worker_runtime._graph_id,
async def _on_compaction(_event) -> None:
from framework.agents.queen.queen_memory import (
consolidate_queen_memory,
format_for_injection,
)
queen_tools = list(queen_registry.get_tools().values())
queen_tool_executor = queen_registry.get_executor()
# Partition tools into phase-specific sets and import prompt segments
from framework.agents.hive_coder.nodes import (
_QUEEN_BUILDING_TOOLS,
_QUEEN_RUNNING_TOOLS,
_QUEEN_STAGING_TOOLS,
_appendices,
_gcu_building_section,
_package_builder_knowledge,
_queen_behavior_always,
_queen_behavior_building,
_queen_behavior_running,
_queen_behavior_staging,
_queen_identity_building,
_queen_identity_running,
_queen_identity_staging,
_queen_phase_7,
_queen_style,
_queen_tools_building,
_queen_tools_running,
_queen_tools_staging,
)
building_names = set(_QUEEN_BUILDING_TOOLS)
staging_names = set(_QUEEN_STAGING_TOOLS)
running_names = set(_QUEEN_RUNNING_TOOLS)
registered_names = {t.name for t in queen_tools}
missing_building = building_names - registered_names
if missing_building:
logger.warning(
"Queen: %d/%d building tools NOT registered: %s",
len(missing_building),
len(building_names),
sorted(missing_building),
await consolidate_queen_memory(
session.id, _consolidation_session_dir, _consolidation_llm
)
logger.info("Queen: registered tools: %s", sorted(registered_names))
# Refresh the memory block on phase_state so the queen sees updated
# memory in subsequent turns without waiting for a new session.
if session.phase_state is not None:
_mem = format_for_injection()
session.phase_state.memory_block = f"\n\n{_mem}" if _mem else ""
phase_state.building_tools = [t for t in queen_tools if t.name in building_names]
phase_state.staging_tools = [t for t in queen_tools if t.name in staging_names]
phase_state.running_tools = [t for t in queen_tools if t.name in running_names]
from framework.runtime.event_bus import EventType as _ET
# Build queen graph with adjusted prompt + tools
_orig_node = _queen_graph.nodes[0]
if worker_identity is None:
worker_identity = (
"\n\n# Worker Profile\n"
"No worker agent loaded. You are operating independently.\n"
"Handle all tasks directly using your coding tools."
)
# Compose phase-specific prompts.
_building_body = (
_queen_style
+ _queen_tools_building
+ _queen_behavior_always
+ _queen_behavior_building
+ _package_builder_knowledge
+ _gcu_building_section
+ _queen_phase_7
+ _appendices
+ worker_identity
session.memory_consolidation_sub = session.event_bus.subscribe(
event_types=[_ET.CONTEXT_COMPACTED],
handler=_on_compaction,
)
phase_state.prompt_building = _queen_identity_building + _building_body
phase_state.prompt_staging = (
_queen_identity_staging
+ _queen_style
+ _queen_tools_staging
+ _queen_behavior_always
+ _queen_behavior_staging
+ worker_identity
)
phase_state.prompt_running = (
_queen_identity_running
+ _queen_style
+ _queen_tools_running
+ _queen_behavior_always
+ _queen_behavior_running
+ worker_identity
)
# Build the session_start hook: selects the best-fit expert persona
# from the user's opening message and replaces the identity prefix.
from framework.agents.hive_coder.nodes.thinking_hook import select_expert_persona
from framework.graph.event_loop_node import HookContext, HookResult
from framework.runtime.event_bus import AgentEvent, EventType
_session_llm = session.llm
_session_event_bus = session.event_bus
async def _persona_hook(ctx: HookContext) -> HookResult | None:
persona = await select_expert_persona(ctx.trigger or "", _session_llm)
if not persona:
return None
if _session_event_bus is not None:
await _session_event_bus.publish(
AgentEvent(
type=EventType.QUEEN_PERSONA_SELECTED,
stream_id="queen",
data={"persona": persona},
)
)
return HookResult(system_prompt=persona + "\n\n" + _building_body)
initial_prompt_text = phase_state.get_current_prompt()
registered_tool_names = set(queen_registry.get_tools().keys())
declared_tools = _orig_node.tools or []
available_tools = [t for t in declared_tools if t in registered_tool_names]
node_updates: dict = {
"system_prompt": initial_prompt_text,
}
if set(available_tools) != set(declared_tools):
missing = sorted(set(declared_tools) - registered_tool_names)
if missing:
logger.warning("Queen: tools not available: %s", missing)
node_updates["tools"] = available_tools
adjusted_node = _orig_node.model_copy(update=node_updates)
_queen_loop_config = {
**(_queen_graph.loop_config or {}),
"hooks": {"session_start": [_persona_hook]},
}
queen_graph = _queen_graph.model_copy(
update={"nodes": [adjusted_node], "loop_config": _queen_loop_config}
)
queen_runtime = Runtime(hive_home / "queen")
async def _queen_loop():
try:
executor = GraphExecutor(
runtime=queen_runtime,
llm=session.llm,
tools=queen_tools,
tool_executor=queen_tool_executor,
event_bus=session.event_bus,
stream_id="queen",
storage_path=queen_dir,
loop_config=_queen_loop_config,
execution_id=session.id,
dynamic_tools_provider=phase_state.get_current_tools,
dynamic_prompt_provider=phase_state.get_current_prompt,
)
session.queen_executor = executor
# Wire inject_notification so phase switches notify the queen LLM
async def _inject_phase_notification(content: str) -> None:
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(content)
phase_state.inject_notification = _inject_phase_notification
# Auto-switch to staging when worker execution finishes naturally
# and notify the queen about the termination
from framework.runtime.event_bus import EventType as _ET
async def _on_worker_done(event):
if event.stream_id == "queen":
return
if phase_state.phase == "running":
# Build termination notification for the queen
if event.type == _ET.EXECUTION_COMPLETED:
output = event.data.get("output", {})
output_summary = ""
if output:
# Summarize key outputs for the queen
for key, value in output.items():
val_str = str(value)
if len(val_str) > 200:
val_str = val_str[:200] + "..."
output_summary += f"\n {key}: {val_str}"
_out = output_summary or " (no output keys set)"
notification = (
"[WORKER_TERMINAL] Worker finished successfully.\n"
f"Output:{_out}\n"
"Report this to the user. "
"Ask if they want to continue with another run."
)
else: # EXECUTION_FAILED
error = event.data.get("error", "Unknown error")
notification = (
"[WORKER_TERMINAL] Worker failed.\n"
f"Error: {error}\n"
"Report this to the user and help them troubleshoot."
)
# Inject notification to queen before phase switch
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(notification)
await phase_state.switch_to_staging(source="auto")
session.event_bus.subscribe(
event_types=[_ET.EXECUTION_COMPLETED, _ET.EXECUTION_FAILED],
handler=_on_worker_done,
)
self._subscribe_worker_handoffs(session, executor)
logger.info(
"Queen starting in %s phase with %d tools: %s",
phase_state.phase,
len(phase_state.get_current_tools()),
[t.name for t in phase_state.get_current_tools()],
)
result = await executor.execute(
graph=queen_graph,
goal=queen_goal,
input_data={"greeting": initial_prompt or "Session started."},
session_state={"resume_session_id": session.id},
)
if result.success:
logger.warning("Queen executor returned (should be forever-alive)")
else:
logger.error(
"Queen executor failed: %s",
result.error or "(no error message)",
)
except Exception:
logger.error("Queen conversation crashed", exc_info=True)
finally:
session.queen_executor = None
session.queen_task = asyncio.create_task(_queen_loop())
# ------------------------------------------------------------------
# Judge startup / teardown
+181 -35
View File
@@ -71,25 +71,34 @@ class WorkerSessionAdapter:
class QueenPhaseState:
"""Mutable state container for queen operating phase.
Three phases: building staging running.
Four phases: planning building staging running.
Shared between the dynamic_tools_provider callback and tool handlers
that trigger phase transitions.
"""
phase: str = "building" # "building", "staging", or "running"
phase: str = "building" # "planning", "building", "staging", or "running"
planning_tools: list = field(default_factory=list) # list[Tool]
building_tools: list = field(default_factory=list) # list[Tool]
staging_tools: list = field(default_factory=list) # list[Tool]
running_tools: list = field(default_factory=list) # list[Tool]
inject_notification: Any = None # async (str) -> None
event_bus: Any = None # EventBus — for emitting QUEEN_PHASE_CHANGED events
# Phase-specific prompts (set by session_manager after construction)
# Phase-specific base prompts (set by orchestrator after construction).
# Do NOT bake memory_block into these — it is appended dynamically.
prompt_planning: str = ""
prompt_building: str = ""
prompt_staging: str = ""
prompt_running: str = ""
# Cross-session memory block — refreshed after each consolidation so the
# queen sees up-to-date memory even when compaction happens mid-session.
memory_block: str = ""
def get_current_tools(self) -> list:
"""Return tools for the current phase."""
if self.phase == "planning":
return list(self.planning_tools)
if self.phase == "running":
return list(self.running_tools)
if self.phase == "staging":
@@ -97,12 +106,16 @@ class QueenPhaseState:
return list(self.building_tools)
def get_current_prompt(self) -> str:
"""Return the system prompt for the current phase."""
if self.phase == "running":
return self.prompt_running
if self.phase == "staging":
return self.prompt_staging
return self.prompt_building
"""Return the system prompt for the current phase with current memory."""
if self.phase == "planning":
base = self.prompt_planning
elif self.phase == "running":
base = self.prompt_running
elif self.phase == "staging":
base = self.prompt_staging
else:
base = self.prompt_building
return base + self.memory_block
async def _emit_phase_event(self) -> None:
"""Publish a QUEEN_PHASE_CHANGED event so the frontend updates the tag."""
@@ -128,22 +141,15 @@ class QueenPhaseState:
tool_names = [t.name for t in self.running_tools]
logger.info("Queen phase → running (source=%s, tools: %s)", source, tool_names)
await self._emit_phase_event()
if self.inject_notification:
if source == "frontend":
msg = (
"[PHASE CHANGE] The user clicked Run in the UI. Switched to RUNNING phase. "
"Worker is now executing. You have monitoring/lifecycle tools: "
+ ", ".join(tool_names)
+ "."
)
else:
msg = (
"[PHASE CHANGE] Switched to RUNNING phase. "
"Worker is executing. You now have monitoring/lifecycle tools: "
+ ", ".join(tool_names)
+ "."
)
await self.inject_notification(msg)
# Skip notification when source="tool" — the tool result already
# contains the phase change info.
if self.inject_notification and source != "tool":
await self.inject_notification(
"[PHASE CHANGE] The user clicked Run in the UI. Switched to RUNNING phase. "
"Worker is now executing. You have monitoring/lifecycle tools: "
+ ", ".join(tool_names)
+ "."
)
async def switch_to_staging(self, source: str = "tool") -> None:
"""Switch to staging phase and notify the queen.
@@ -157,26 +163,21 @@ class QueenPhaseState:
tool_names = [t.name for t in self.staging_tools]
logger.info("Queen phase → staging (source=%s, tools: %s)", source, tool_names)
await self._emit_phase_event()
if self.inject_notification:
# Skip notification when source="tool" — the tool result already
# contains the phase change info.
if self.inject_notification and source != "tool":
if source == "frontend":
msg = (
"[PHASE CHANGE] The user stopped the worker from the UI. "
"Switched to STAGING phase. Agent is still loaded. "
"Available tools: " + ", ".join(tool_names) + "."
)
elif source == "auto":
else:
msg = (
"[PHASE CHANGE] Worker execution completed. Switched to STAGING phase. "
"Agent is still loaded. Call run_agent_with_input(task) to run again. "
"Available tools: " + ", ".join(tool_names) + "."
)
else:
msg = (
"[PHASE CHANGE] Switched to STAGING phase. "
"Agent loaded and ready. Call run_agent_with_input(task) to start, "
"or stop_worker_and_edit() to go back to building. "
"Available tools: " + ", ".join(tool_names) + "."
)
await self.inject_notification(msg)
async def switch_to_building(self, source: str = "tool") -> None:
@@ -191,13 +192,35 @@ class QueenPhaseState:
tool_names = [t.name for t in self.building_tools]
logger.info("Queen phase → building (source=%s, tools: %s)", source, tool_names)
await self._emit_phase_event()
if self.inject_notification:
if self.inject_notification and source != "tool":
await self.inject_notification(
"[PHASE CHANGE] Switched to BUILDING phase. "
"Lifecycle tools removed. Full coding tools restored. "
"Call load_built_agent(path) when ready to stage."
)
async def switch_to_planning(self, source: str = "tool") -> None:
"""Switch to planning phase and notify the queen.
Args:
source: Who triggered the switch "tool", "frontend", or "auto".
"""
if self.phase == "planning":
return
self.phase = "planning"
tool_names = [t.name for t in self.planning_tools]
logger.info("Queen phase → planning (source=%s, tools: %s)", source, tool_names)
await self._emit_phase_event()
# Skip notification when source="tool" — the tool result already
# contains the phase change info; injecting a duplicate notification
# causes the queen to respond twice.
if self.inject_notification and source != "tool":
await self.inject_notification(
"[PHASE CHANGE] Switched to PLANNING phase. "
"Coding tools removed. Discuss goals and design with the user. "
"Available tools: " + ", ".join(tool_names) + "."
)
def build_worker_profile(runtime: AgentRuntime, agent_path: Path | str | None = None) -> str:
"""Build a worker capability profile from its graph/goal definition.
@@ -514,6 +537,129 @@ def register_queen_lifecycle_tools(
)
tools_registered += 1
# --- stop_worker_and_plan (Running/Staging → Planning) --------------------
async def stop_worker_and_plan() -> str:
"""Stop the worker and switch to planning phase for diagnosis."""
stop_result = await stop_worker()
# Switch to planning phase
if phase_state is not None:
await phase_state.switch_to_planning(source="tool")
result = json.loads(stop_result)
result["phase"] = "planning"
result["message"] = (
"Worker stopped. You are now in planning phase. "
"Diagnose the issue using read-only tools (checkpoints, logs, sessions), "
"discuss a fix plan with the user, then call "
"initialize_and_build_agent() to implement the fix."
)
return json.dumps(result)
_stop_plan_tool = Tool(
name="stop_worker_and_plan",
description=(
"Stop the worker and switch to planning phase for diagnosis. "
"Use this when you need to investigate an issue before fixing it. "
"After diagnosis, call initialize_and_build_agent() to switch to building."
),
parameters={"type": "object", "properties": {}},
)
registry.register(
"stop_worker_and_plan", _stop_plan_tool, lambda inputs: stop_worker_and_plan()
)
tools_registered += 1
# --- replan_agent (Building → Planning) -----------------------------------
async def replan_agent() -> str:
"""Switch from building back to planning phase.
Only use when the user explicitly asks to re-plan."""
if phase_state is not None:
if phase_state.phase != "building":
return json.dumps(
{"error": f"Cannot replan: currently in {phase_state.phase} phase."}
)
await phase_state.switch_to_planning(source="tool")
return json.dumps(
{
"status": "replanning",
"phase": "planning",
"message": (
"Switched to PLANNING phase. Coding tools removed. "
"Discuss the new design with the user."
),
}
)
_replan_tool = Tool(
name="replan_agent",
description=(
"Switch from building back to planning phase. "
"Only use when the user explicitly asks to re-plan or redesign the agent."
),
parameters={"type": "object", "properties": {}},
)
registry.register("replan_agent", _replan_tool, lambda inputs: replan_agent())
tools_registered += 1
# --- initialize_and_build_agent wrapper (Planning → Building) -------------
# With agent_name: scaffold a new agent via MCP tool, then switch to building.
# Without agent_name: just switch to building (for fixing an existing loaded agent).
_existing_init = registry._tools.get("initialize_and_build_agent")
if _existing_init is not None:
_orig_init_executor = _existing_init.executor
async def initialize_and_build_agent_wrapper(inputs: dict) -> str:
"""Wrapper: scaffold or just switch to building phase."""
agent_name = (inputs.get("agent_name") or "").strip()
# No agent_name → just switch to building (for fixing existing agent)
if not agent_name:
runtime = _get_runtime()
if runtime is None:
return json.dumps(
{"error": "No worker loaded. Provide agent_name to scaffold a new agent."}
)
if phase_state is not None:
await phase_state.switch_to_building(source="tool")
return json.dumps(
{
"status": "editing",
"phase": "building",
"message": (
"Switched to BUILDING phase. Full coding tools restored. "
"Implement the fix, then call load_built_agent(path) to reload."
),
}
)
# Has agent_name → scaffold via MCP tool
result = _orig_init_executor(inputs)
# Handle both sync and async executors
if asyncio.iscoroutine(result) or asyncio.isfuture(result):
result = await result
# If result is a ToolResult, extract the text content
result_str = str(result)
if hasattr(result, "content"):
result_str = str(result.content)
try:
parsed = json.loads(result_str)
if parsed.get("success", True):
if phase_state is not None:
await phase_state.switch_to_building(source="tool")
except (json.JSONDecodeError, KeyError, TypeError):
pass
return result_str
registry.register(
"initialize_and_build_agent",
_existing_init.tool,
lambda inputs: initialize_and_build_agent_wrapper(inputs),
)
# --- stop_worker (Running → Staging) -------------------------------------
async def stop_worker_to_staging() -> str:
@@ -0,0 +1,38 @@
"""Tool for the queen to write to her episodic memory.
The queen can consciously record significant moments during a session like
writing in a diary. Semantic memory (MEMORY.md) is updated automatically at
session end and is never written by the queen directly.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from framework.runner.tool_registry import ToolRegistry
def write_to_diary(entry: str) -> str:
"""Write a prose entry to today's episodic memory.
Use this when something significant just happened: a pipeline went live, the
user shared an important preference, a goal was achieved or abandoned, or
you want to record something that should be remembered across sessions.
Write in first person, as you would in a private diary. Be specific what
happened, how the user responded, what it means going forward. One or two
paragraphs is enough.
You do not need to include a timestamp or date heading; those are added
automatically.
"""
from framework.agents.queen.queen_memory import append_episodic_entry
append_episodic_entry(entry)
return "Diary entry recorded."
def register_queen_memory_tools(registry: "ToolRegistry") -> None:
"""Register the episodic memory tool into the queen's tool registry."""
registry.register_function(write_to_diary)
+1 -1
View File
@@ -1,6 +1,6 @@
"""Graph lifecycle tools for multi-graph sessions.
These tools allow an agent (e.g. hive_coder) to load, unload, start,
These tools allow an agent (e.g. queen) to load, unload, start,
restart, and query other agent graphs within the same runtime session.
Usage::
+9 -9
View File
@@ -445,8 +445,8 @@ class AdenTUI(App):
agent_name = runner.agent_path.name
self.notify(f"Agent loaded: {agent_name}", severity="information", timeout=3)
# Load health judge + queen for worker agents (skip for hive_coder itself)
if agent_name != "hive_coder":
# Load health judge + queen for worker agents (skip for queen itself)
if agent_name != "queen":
await self._load_judge_and_queen(runner._storage_path)
async def _load_judge_and_queen(self, storage_path) -> None:
@@ -515,18 +515,18 @@ class AdenTUI(App):
# worker. Escalation tickets from the judge are injected
# as messages into this conversation.
# ---------------------------------------------------------------
import framework.agents.hive_coder as _hive_coder_pkg
from framework.agents.hive_coder.agent import queen_goal, queen_graph
import framework.agents.queen as _queen_pkg
from framework.agents.queen.agent import queen_goal, queen_graph
# Queen gets lifecycle tools, monitoring tools, AND coding tools
# from the hive_coder's coder-tools MCP server. This spawns a
# from the queen's coder-tools MCP server. This spawns a
# separate MCP process so the queen can read/write files, run
# commands, discover tools, etc. independently of the worker.
queen_registry = ToolRegistry()
# Coding tools from hive_coder's MCP config (coder_tools_server).
hive_coder_dir = Path(_hive_coder_pkg.__file__).parent
mcp_config = hive_coder_dir / "mcp_servers.json"
# Coding tools from queen's MCP config (coder_tools_server).
queen_dir = Path(_queen_pkg.__file__).parent
mcp_config = queen_dir / "mcp_servers.json"
if mcp_config.exists():
try:
queen_registry.load_mcp_config(mcp_config)
@@ -556,7 +556,7 @@ class AdenTUI(App):
queen_tool_executor = queen_registry.get_executor()
# Partition tools into phase-specific sets
from framework.agents.hive_coder.nodes import (
from framework.agents.queen.nodes import (
_QUEEN_BUILDING_TOOLS,
_QUEEN_RUNNING_TOOLS,
_QUEEN_STAGING_TOOLS,
+2 -2
View File
@@ -12,8 +12,8 @@ export interface LiveSession {
loaded_at: number;
uptime_seconds: number;
intro_message?: string;
/** Queen operating phase — "building", "staging", or "running" */
queen_phase?: "building" | "staging" | "running";
/** Queen operating phase — "planning", "building", "staging", or "running" */
queen_phase?: "planning" | "building" | "staging" | "running";
/** Present in 409 conflict responses when worker is still loading */
loading?: boolean;
}
+2 -2
View File
@@ -31,7 +31,7 @@ interface AgentGraphProps {
version?: string;
runState?: RunState;
building?: boolean;
queenPhase?: "building" | "staging" | "running";
queenPhase?: "planning" | "building" | "staging" | "running";
}
// --- Extracted RunButton so hover state survives parent re-renders ---
@@ -278,7 +278,7 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
</span>
)}
</div>
<RunButton runState={runState} disabled={nodes.length === 0 || queenPhase === "building"} onRun={handleRun} onPause={onPause ?? (() => {})} btnRef={runBtnRef} />
<RunButton runState={runState} disabled={nodes.length === 0 || queenPhase === "building" || queenPhase === "planning"} onRun={handleRun} onPause={onPause ?? (() => {})} btnRef={runBtnRef} />
</div>
<div className="flex-1 flex items-center justify-center px-5">
{building ? (
+5 -3
View File
@@ -39,7 +39,7 @@ interface ChatPanelProps {
/** Called when user dismisses the pending question without answering */
onQuestionDismiss?: () => void;
/** Queen operating phase — shown as a tag on queen messages */
queenPhase?: "building" | "staging" | "running";
queenPhase?: "planning" | "building" | "staging" | "running";
}
const queenColor = "hsl(45,95%,58%)";
@@ -144,7 +144,7 @@ function ToolActivityRow({ content }: { content: string }) {
);
}
const MessageBubble = memo(function MessageBubble({ msg, queenPhase }: { msg: ChatMessage; queenPhase?: "building" | "staging" | "running" }) {
const MessageBubble = memo(function MessageBubble({ msg, queenPhase }: { msg: ChatMessage; queenPhase?: "planning" | "building" | "staging" | "running" }) {
const isUser = msg.type === "user";
const isQueen = msg.role === "queen";
const color = getColor(msg.agent, msg.role);
@@ -204,7 +204,9 @@ const MessageBubble = memo(function MessageBubble({ msg, queenPhase }: { msg: Ch
? "running phase"
: queenPhase === "staging"
? "staging phase"
: "building phase"
: queenPhase === "planning"
? "planning phase"
: "building phase"
: "Worker"}
</span>
</div>
+9 -6
View File
@@ -255,8 +255,8 @@ interface AgentBackendState {
/** The message ID of the current worker input request (for inline reply box) */
workerInputMessageId: string | null;
queenBuilding: boolean;
/** Queen operating phase — "building" (coding), "staging" (loaded), or "running" (executing) */
queenPhase: "building" | "staging" | "running";
/** Queen operating phase — "planning" (design), "building" (coding), "staging" (loaded), or "running" (executing) */
queenPhase: "planning" | "building" | "staging" | "running";
workerRunState: "idle" | "deploying" | "running";
currentExecutionId: string | null;
nodeLogs: Record<string, string[]>;
@@ -291,7 +291,7 @@ function defaultAgentState(): AgentBackendState {
awaitingInput: false,
workerInputMessageId: null,
queenBuilding: false,
queenPhase: "building",
queenPhase: "planning",
workerRunState: "idle",
currentExecutionId: null,
nodeLogs: {},
@@ -892,7 +892,7 @@ export default function Workspace() {
// failed, the throw inside the catch exits the outer try block.
const session = liveSession!;
const displayName = formatAgentDisplayName(session.worker_name || agentType);
const initialPhase = session.queen_phase || (session.has_worker ? "staging" : "building");
const initialPhase = session.queen_phase || (session.has_worker ? "staging" : "planning");
updateAgentState(agentType, {
sessionId: session.session_id,
displayName,
@@ -1788,8 +1788,11 @@ export default function Workspace() {
case "queen_phase_changed": {
const rawPhase = event.data?.phase as string;
const newPhase: "building" | "staging" | "running" =
rawPhase === "running" ? "running" : rawPhase === "staging" ? "staging" : "building";
const newPhase: "planning" | "building" | "staging" | "running" =
rawPhase === "running" ? "running"
: rawPhase === "staging" ? "staging"
: rawPhase === "planning" ? "planning"
: "building";
updateAgentState(agentType, {
queenPhase: newPhase,
queenBuilding: newPhase === "building",
+23 -3
View File
@@ -789,7 +789,14 @@ class TestEscalate:
ctx = build_ctx(runtime, node_spec, memory, llm, stream_id="worker")
node = EventLoopNode(event_bus=bus, config=LoopConfig(max_iterations=5))
async def queen_reply():
await asyncio.sleep(0.05)
await node.inject_event("Acknowledged, proceed.")
task = asyncio.create_task(queen_reply())
result = await node.execute(ctx)
await task
assert result.success is True
assert len(received) == 1
@@ -827,7 +834,14 @@ class TestEscalate:
ctx = build_ctx(runtime, node_spec, memory, llm, stream_id="worker")
node = EventLoopNode(event_bus=bus, config=LoopConfig(max_iterations=5))
async def queen_reply():
await asyncio.sleep(0.05)
await node.inject_event("Queen acknowledges escalation.")
task = asyncio.create_task(queen_reply())
result = await node.execute(ctx)
await task
assert result.success is True
queen_node.inject_event.assert_awaited_once()
@@ -1756,9 +1770,9 @@ class TestIsToolDoomLoop:
def test_different_args_no_doom(self):
node = EventLoopNode(config=LoopConfig(tool_doom_loop_threshold=3))
fp1 = [("search", '{"q": "a"}')]
fp2 = [("search", '{"q": "b"}')]
fp3 = [("search", '{"q": "c"}')]
fp1 = [("search", '{"q": "deploy kubernetes cluster to production"}')]
fp2 = [("read_file", '{"path": "/etc/nginx/nginx.conf"}')]
fp3 = [("execute", '{"command": "SELECT * FROM users WHERE active=true"}')]
is_doom, _ = node._is_tool_doom_loop([fp1, fp2, fp3])
assert is_doom is False
@@ -1886,6 +1900,7 @@ class TestToolDoomLoopIntegration:
config=LoopConfig(
max_iterations=10,
tool_doom_loop_threshold=3,
stall_similarity_threshold=1.0, # disable fuzzy stall detection
),
)
result = await node.execute(ctx)
@@ -1941,6 +1956,7 @@ class TestToolDoomLoopIntegration:
config=LoopConfig(
max_iterations=10,
tool_doom_loop_threshold=3,
stall_similarity_threshold=1.0, # disable fuzzy stall detection
),
)
result = await node.execute(ctx)
@@ -2005,6 +2021,7 @@ class TestToolDoomLoopIntegration:
config=LoopConfig(
max_iterations=10,
tool_doom_loop_threshold=3,
stall_similarity_threshold=1.0, # disable fuzzy stall detection
),
)
result = await node.execute(ctx)
@@ -2056,6 +2073,7 @@ class TestToolDoomLoopIntegration:
config=LoopConfig(
max_iterations=10,
tool_doom_loop_enabled=False,
stall_similarity_threshold=1.0, # disable fuzzy stall detection
),
)
result = await node.execute(ctx)
@@ -2144,6 +2162,7 @@ class TestToolDoomLoopIntegration:
config=LoopConfig(
max_iterations=10,
tool_doom_loop_threshold=3,
stall_similarity_threshold=1.0, # disable fuzzy stall detection
),
)
result = await node.execute(ctx)
@@ -2206,6 +2225,7 @@ class TestToolDoomLoopIntegration:
config=LoopConfig(
max_iterations=10,
tool_doom_loop_threshold=3,
stall_similarity_threshold=1.0, # disable fuzzy stall detection
),
)
result = await node.execute(ctx)
@@ -204,8 +204,8 @@ class DeepResearchAgent:
"""Set up the executor with all components."""
from pathlib import Path
storage_path = Path.home() / ".hive" / "agents" / "deep_research_agent"
storage_path.mkdir(parents=True, exist_ok=True)
self._storage_path = Path.home() / ".hive" / "agents" / "deep_research_agent"
self._storage_path.mkdir(parents=True, exist_ok=True)
self._tool_registry = ToolRegistry()
@@ -2,8 +2,13 @@
"hive-tools": {
"transport": "stdio",
"command": "uv",
"args": ["run", "python", "mcp_server.py", "--stdio"],
"args": [
"run",
"python",
"mcp_server.py",
"--stdio"
],
"cwd": "../../../tools",
"description": "Hive tools MCP server providing web_search, web_scrape, and write_to_file"
}
}
}
@@ -11,26 +11,32 @@ intake_node = NodeSpec(
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["topic"],
input_keys=["user_request"],
output_keys=["research_brief"],
success_criteria=(
"The research brief is specific and actionable: it states the topic, "
"the key questions to answer, the desired scope, and depth."
),
system_prompt="""\
You are a research intake specialist. The user wants to research a topic.
Have a brief conversation to clarify what they need.
You are a research intake specialist. Your ONLY job is to have a brief conversation with the user to clarify what they want researched.
**CRITICAL: You do NOT do any research yourself.**
- You do NOT search the web
- You do NOT fetch sources
- The research happens in the NEXT stage after you complete intake
- Do NOT ask for or expect web_search or web_scrape tools
**STEP 1 Read and respond (text only, NO tool calls):**
1. Read the topic provided
2. If it's vague, ask 1-2 clarifying questions (scope, angle, depth)
1. Read the user_request provided
2. If it's vague, ask 1-2 clarifying questions (scope, angle, depth, budget, preferences)
3. If it's already clear, confirm your understanding and ask the user to confirm
Keep it short. Don't over-ask.
Keep it short. Don't over-ask. Maximum 2 clarifying questions.
**STEP 2 After the user confirms, call set_output:**
- set_output("research_brief", "A clear paragraph describing exactly what to research, \
what questions to answer, what scope to cover, and how deep to go.")
- set_output("research_brief", "A clear paragraph describing exactly what to research, what questions to answer, what scope to cover, and how deep to go.")
That's it. Once you call set_output, your job is done and the research node will take over.
""",
tools=[],
)
@@ -59,6 +65,8 @@ If feedback is provided, this is a follow-up round — focus on the gaps identif
Work in phases:
1. **Search**: Use web_search with 3-5 diverse queries covering different angles.
Prioritize authoritative sources (.edu, .gov, established publications).
For automotive research, target: caranddriver.com, motortrend.com, edmunds.com,
consumerreports.org, jdpower.com, and enthusiast forums.
2. **Fetch**: Use web_scrape on the most promising URLs (aim for 5-8 sources).
Skip URLs that fail. Extract the substantive content.
3. **Analyze**: Review what you've collected. Identify key findings, themes,
+3 -1
View File
@@ -108,7 +108,9 @@ def main() -> None:
if api_base:
# Custom API base (ZAI or other OpenAI-compatible)
endpoint = api_base.rstrip("/") + "/models"
name = {"zai": "ZAI", "minimax": "MiniMax"}.get(provider_id, "Custom provider")
name = {"zai": "ZAI", "minimax": "MiniMax"}.get(
provider_id, "Custom provider"
)
result = check_openai_compatible(api_key, endpoint, name)
elif provider_id in PROVIDERS:
result = PROVIDERS[provider_id](api_key)
+65 -39
View File
@@ -1,7 +1,7 @@
#!/usr/bin/env python
"""Debug tool to print the queen's running phase prompt."""
"""Debug tool to print the queen's phase-specific prompts."""
from framework.agents.hive_coder.nodes import (
from framework.agents.queen.nodes import (
_appendices,
_queen_behavior_always,
_queen_behavior_running,
@@ -10,32 +10,36 @@ from framework.agents.hive_coder.nodes import (
_queen_tools_running,
)
_DEFAULT_WORKER_IDENTITY = (
"\n\n# Worker Profile\n"
"No worker agent loaded. You are operating independently.\n"
"Handle all tasks directly using your coding tools."
)
def print_running_prompt(worker_identity: str | None = None) -> None:
"""Print the composed running phase prompt.
Args:
worker_identity: Optional worker identity string. If None, shows
the "no worker loaded" placeholder.
"""
if worker_identity is None:
worker_identity = (
"\n\n# Worker Profile\n"
"No worker agent loaded. You are operating independently.\n"
"Handle all tasks directly using your coding tools."
)
def print_planning_prompt(worker_identity: str | None = None) -> None:
"""Print the composed planning phase prompt."""
from framework.agents.queen.nodes import (
_planning_knowledge,
_queen_behavior_planning,
_queen_identity_planning,
_queen_tools_planning,
)
wi = worker_identity or _DEFAULT_WORKER_IDENTITY
prompt = (
_queen_identity_running
_queen_identity_planning
+ _queen_style
+ _queen_tools_running
+ _queen_tools_planning
+ _queen_behavior_always
+ _queen_behavior_running
+ worker_identity
+ _queen_behavior_planning
+ _planning_knowledge
+ wi
)
print("=" * 80)
print("QUEEN RUNNING PHASE PROMPT")
print("QUEEN PLANNING PHASE PROMPT")
print("=" * 80)
print(prompt)
print("=" * 80)
@@ -44,20 +48,16 @@ def print_running_prompt(worker_identity: str | None = None) -> None:
def print_building_prompt(worker_identity: str | None = None) -> None:
"""Print the composed building phase prompt."""
from framework.agents.hive_coder.nodes import (
_agent_builder_knowledge,
from framework.agents.queen.nodes import (
_building_knowledge,
_gcu_building_section,
_queen_behavior_building,
_queen_identity_building,
_queen_phase_7,
_queen_tools_building,
)
if worker_identity is None:
worker_identity = (
"\n\n# Worker Profile\n"
"No worker agent loaded. You are operating independently.\n"
"Handle all tasks directly using your coding tools."
)
wi = worker_identity or _DEFAULT_WORKER_IDENTITY
prompt = (
_queen_identity_building
@@ -65,10 +65,11 @@ def print_building_prompt(worker_identity: str | None = None) -> None:
+ _queen_tools_building
+ _queen_behavior_always
+ _queen_behavior_building
+ _agent_builder_knowledge
+ _building_knowledge
+ _gcu_building_section
+ _queen_phase_7
+ _appendices
+ worker_identity
+ wi
)
print("=" * 80)
@@ -81,18 +82,13 @@ def print_building_prompt(worker_identity: str | None = None) -> None:
def print_staging_prompt(worker_identity: str | None = None) -> None:
"""Print the composed staging phase prompt."""
from framework.agents.hive_coder.nodes import (
from framework.agents.queen.nodes import (
_queen_behavior_staging,
_queen_identity_staging,
_queen_tools_staging,
)
if worker_identity is None:
worker_identity = (
"\n\n# Worker Profile\n"
"No worker agent loaded. You are operating independently.\n"
"Handle all tasks directly using your coding tools."
)
wi = worker_identity or _DEFAULT_WORKER_IDENTITY
prompt = (
_queen_identity_staging
@@ -100,7 +96,7 @@ def print_staging_prompt(worker_identity: str | None = None) -> None:
+ _queen_tools_staging
+ _queen_behavior_always
+ _queen_behavior_staging
+ worker_identity
+ wi
)
print("=" * 80)
@@ -111,17 +107,47 @@ def print_staging_prompt(worker_identity: str | None = None) -> None:
print(f"\nTotal length: {len(prompt):,} characters")
def print_running_prompt(worker_identity: str | None = None) -> None:
"""Print the composed running phase prompt.
Args:
worker_identity: Optional worker identity string. If None, shows
the "no worker loaded" placeholder.
"""
wi = worker_identity or _DEFAULT_WORKER_IDENTITY
prompt = (
_queen_identity_running
+ _queen_style
+ _queen_tools_running
+ _queen_behavior_always
+ _queen_behavior_running
+ wi
)
print("=" * 80)
print("QUEEN RUNNING PHASE PROMPT")
print("=" * 80)
print(prompt)
print("=" * 80)
print(f"\nTotal length: {len(prompt):,} characters")
if __name__ == "__main__":
import sys
phase = sys.argv[1] if len(sys.argv) > 1 else "running"
phase = sys.argv[1] if len(sys.argv) > 1 else "planning"
if phase == "all":
print_planning_prompt()
print("\n\n")
print_building_prompt()
print("\n\n")
print_staging_prompt()
print("\n\n")
print_running_prompt()
elif phase == "planning":
print_planning_prompt()
elif phase == "building":
print_building_prompt()
elif phase == "staging":
@@ -131,6 +157,6 @@ if __name__ == "__main__":
else:
print(f"Unknown phase: {phase}")
print(
"Usage: uv run scripts/debug_queen_prompt.py [building|staging|running|all]"
"Usage: uv run scripts/debug_queen_prompt.py [planning|building|staging|running|all]"
)
sys.exit(1)
+2 -2
View File
@@ -1,4 +1,4 @@
"""Quick test script for initialize_agent_package."""
"""Quick test script for initialize_and_build_agent."""
import sys
import os
@@ -14,6 +14,6 @@ import tools.coder_tools_server as srv
srv.PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
# Access the underlying function (FastMCP wraps it as FunctionTool)
tool = srv.initialize_agent_package
tool = srv.initialize_and_build_agent
result = tool.fn("richard_test2", nodes="intake,process,review")
print(result)
+7 -5
View File
@@ -3,7 +3,7 @@
Coder Tools MCP Server OpenCode-inspired coding tools.
Provides rich file I/O, fuzzy-match editing, git snapshots, and shell execution
for the hive_coder agent. Modeled after opencode's tool architecture.
for the queen agent. Modeled after opencode's tool architecture.
All paths scoped to a configurable project root for safety.
@@ -1321,9 +1321,11 @@ def validate_agent_package(agent_name: str) -> str:
result = json.loads(proc.stdout.strip())
steps["node_completeness"] = {
"passed": result["valid"],
"output": "; ".join(result["errors"])
if result["errors"]
else "All defined nodes are in the graph",
"output": (
"; ".join(result["errors"])
if result["errors"]
else "All defined nodes are in the graph"
),
}
if not result["valid"]:
steps["node_completeness"]["errors"] = result["errors"]
@@ -1434,7 +1436,7 @@ def _node_var_name(node_id: str) -> str:
@mcp.tool()
def initialize_agent_package(agent_name: str, nodes: str | None = None) -> str:
def initialize_and_build_agent(agent_name: str, nodes: str | None = None) -> str:
"""Scaffold a new agent package with placeholder files.
Creates exports/{agent_name}/ with all files needed for a runnable agent: