refactor(queen): use new infra

This commit is contained in:
Timothy
2026-04-07 17:50:45 -07:00
parent a5b17a293b
commit 3a70243b82
4 changed files with 102 additions and 89 deletions
+18 -1
View File
@@ -2293,7 +2293,24 @@ class AgentLoop(NodeProtocol):
False,
)
# Execute tool calls — framework tools (set_output, ask_user)
# Priority drain: if user sent a message while the LLM was
# streaming, inject it into the conversation NOW -- before tool
# execution. The LLM will see it on the next inner turn.
if not self._injection_queue.empty():
while not self._injection_queue.empty():
_inj_content, _inj_client, _inj_images = (
self._injection_queue.get_nowait()
)
if _inj_client:
await conversation.add_user_message(_inj_content)
logger.info(
"[%s] Priority-injected user message mid-turn (%d chars)",
node_id, len(_inj_content),
)
else:
await conversation.add_user_message(_inj_content)
# Execute tool calls -- framework tools (set_output, ask_user)
# run inline; real MCP tools run in parallel.
real_tool_results: list[dict] = []
limit_hit = False
+3 -9
View File
@@ -1,19 +1,13 @@
"""
Queen Native agent builder for the Hive framework.
"""Queen -- the agent builder for the Hive framework."""
Deeply understands the agent framework and produces complete Python packages
with goals, nodes, edges, system prompts, MCP configuration, and tests
from natural language specifications.
"""
from .agent import queen_goal, queen_graph
from .agent import queen_goal, queen_loop_config
from .config import AgentMetadata, RuntimeConfig, default_config, metadata
__version__ = "1.0.0"
__all__ = [
"queen_goal",
"queen_graph",
"queen_loop_config",
"RuntimeConfig",
"AgentMetadata",
"default_config",
+16 -25
View File
@@ -1,38 +1,29 @@
"""Queen graph definition."""
"""Queen agent definition.
from framework.orchestrator import Goal
from framework.orchestrator.edge import GraphSpec
The queen is a single AgentLoop -- no graph, no orchestrator.
Loaded by queen_orchestrator.create_queen().
"""
from framework.orchestrator.goal import Goal
from .nodes import queen_node
# ---------------------------------------------------------------------------
# Queen graph — the primary persistent conversation.
# Loaded by queen_orchestrator.create_queen(), NOT by AgentRunner.
# ---------------------------------------------------------------------------
queen_goal = Goal(
id="queen-manager",
name="Queen Manager",
description=(
"Manage the worker agent lifecycle and serve as the user's primary interactive interface."
"Manage the worker agent lifecycle and serve as the "
"user's primary interactive interface."
),
success_criteria=[],
constraints=[],
)
queen_graph = GraphSpec(
id="queen-graph",
goal_id=queen_goal.id,
version="1.0.0",
entry_node="queen",
entry_points={"start": "queen"},
terminal_nodes=[],
pause_nodes=[],
nodes=[queen_node],
edges=[],
conversation_mode="continuous",
loop_config={
"max_iterations": 999_999,
"max_tool_calls_per_turn": 30,
},
)
# Loop config -- used by queen_orchestrator to build LoopConfig
queen_loop_config = {
"max_iterations": 999_999,
"max_tool_calls_per_turn": 30,
"max_context_tokens": 180_000,
}
__all__ = ["queen_goal", "queen_loop_config", "queen_node"]
+65 -54
View File
@@ -32,7 +32,7 @@ async def create_queen(
"""
from framework.agents.queen.agent import (
queen_goal,
queen_graph as _queen_graph,
queen_loop_config as _base_loop_config,
)
from framework.agents.queen.nodes import (
_QUEEN_BUILDING_TOOLS,
@@ -66,17 +66,14 @@ async def create_queen(
)
from framework.agents.queen.nodes.thinking_hook import select_expert_persona
from framework.agent_loop.agent_loop import HookContext, HookResult
from framework.orchestrator.orchestrator import Orchestrator
from framework.loader.mcp_registry import MCPRegistry
from framework.loader.tool_registry import ToolRegistry
from framework.tracker.decision_tracker import DecisionTracker # noqa: F401
from framework.host.event_bus import AgentEvent, EventType
from framework.tools.queen_lifecycle_tools import (
QueenPhaseState,
register_queen_lifecycle_tools,
)
hive_home = Path.home() / ".hive"
# ---- Tool registry ------------------------------------------------
queen_registry = ToolRegistry()
@@ -197,7 +194,7 @@ async def create_queen(
phase_state.global_memory_dir = global_dir
# ---- Compose phase-specific prompts ------------------------------
_orig_node = _queen_graph.nodes[0]
from framework.agents.queen.nodes import queen_node as _orig_node
if worker_identity is None:
worker_identity = (
@@ -321,56 +318,76 @@ async def create_queen(
adjusted_node = _orig_node.model_copy(update=node_updates)
_queen_loop_config = {
**(_queen_graph.loop_config or {}),
**_base_loop_config,
"hooks": {"session_start": [_persona_hook]},
}
queen_graph = _queen_graph.model_copy(
update={"nodes": [adjusted_node], "loop_config": _queen_loop_config}
)
# ---- Queen event loop --------------------------------------------
queen_runtime = DecisionTracker(hive_home / "queen")
# ---- Queen event loop (AgentLoop directly, no Orchestrator) -------
from types import SimpleNamespace
from framework.agent_loop.agent_loop import AgentLoop, LoopConfig
from framework.storage.conversation_store import FileConversationStore
from framework.orchestrator.node import DataBuffer, NodeContext
async def _queen_loop():
logger.debug("[_queen_loop] Starting queen loop for session %s", session.id)
try:
logger.debug("[_queen_loop] Creating Orchestrator...")
executor = Orchestrator(
runtime=queen_runtime,
llm=session.llm,
tools=queen_tools,
tool_executor=queen_tool_executor,
# Build LoopConfig from the queen graph's config + persona hook
lc = _queen_loop_config
queen_loop_config = LoopConfig(
max_iterations=lc.get("max_iterations", 999_999),
max_tool_calls_per_turn=lc.get("max_tool_calls_per_turn", 30),
max_context_tokens=lc.get("max_context_tokens", 180_000),
hooks=lc.get("hooks", {}),
)
# Create AgentLoop directly -- no Orchestrator, no graph traversal
agent_loop = AgentLoop(
event_bus=session.event_bus,
config=queen_loop_config,
tool_executor=queen_tool_executor,
conversation_store=FileConversationStore(queen_dir / "conversations"),
)
# Build NodeContext manually
from framework.tracker.decision_tracker import DecisionTracker
ctx = NodeContext(
runtime=DecisionTracker(queen_dir),
node_id="queen",
node_spec=adjusted_node,
buffer=DataBuffer(),
llm=session.llm,
available_tools=queen_tools,
goal_context=queen_goal.description,
max_tokens=lc.get("max_tokens", 8192),
stream_id="queen",
storage_path=queen_dir,
loop_config=_queen_loop_config,
execution_id=session.id,
dynamic_tools_provider=phase_state.get_current_tools,
dynamic_prompt_provider=phase_state.get_current_prompt,
iteration_metadata_provider=lambda: {"phase": phase_state.phase},
skill_dirs=_queen_skill_dirs,
protocols_prompt=phase_state.protocols_prompt,
skills_catalog_prompt=phase_state.skills_catalog_prompt,
protocols_prompt=phase_state.protocols_prompt,
skill_dirs=_queen_skill_dirs,
)
# Expose for chat handler injection (node_registry compat)
session.queen_executor = SimpleNamespace(
node_registry={"queen": agent_loop},
)
session.queen_executor = executor
logger.debug("[_queen_loop] Orchestrator created and stored in session.queen_executor")
# Wire inject_notification so phase switches notify the queen LLM
async def _inject_phase_notification(content: str) -> None:
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(content)
await agent_loop.inject_event(content)
phase_state.inject_notification = _inject_phase_notification
# Auto-switch to editing when worker execution finishes.
# The worker stays loaded — queen can tweak config and re-run.
async def _on_worker_done(event):
if event.stream_id == "queen":
return
if phase_state.phase == "running":
if event.type == EventType.EXECUTION_COMPLETED:
# Mark worker as configured after first successful run
session.worker_configured = True
output = event.data.get("output", {})
output_summary = ""
@@ -388,7 +405,7 @@ async def create_queen(
"Ask if they want to re-run with different input "
"or tweak the configuration."
)
else: # EXECUTION_FAILED
else:
error = event.data.get("error", "Unknown error")
notification = (
"[WORKER_TERMINAL] Worker failed.\n"
@@ -398,17 +415,14 @@ async def create_queen(
"building/planning if code changes are needed."
)
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(notification)
await agent_loop.inject_event(notification)
await phase_state.switch_to_editing(source="auto")
session.event_bus.subscribe(
event_types=[EventType.EXECUTION_COMPLETED, EventType.EXECUTION_FAILED],
handler=_on_worker_done,
)
session_manager._subscribe_worker_handoffs(session, executor)
session_manager._subscribe_worker_handoffs(session, session.queen_executor)
# ---- Reflection + recall memory subscriptions ----------------
from framework.agents.queen.reflection_agent import subscribe_reflection_triggers
@@ -420,8 +434,6 @@ async def create_queen(
memory_dir=colony_dir,
phase_state=phase_state,
)
# Store sub IDs on session for teardown.
session.memory_reflection_subs = _reflection_subs
logger.info(
@@ -430,23 +442,21 @@ async def create_queen(
len(phase_state.get_current_tools()),
[t.name for t in phase_state.get_current_tools()],
)
logger.debug("[_queen_loop] Calling executor.execute()...")
result = await executor.execute(
graph=queen_graph,
goal=queen_goal,
input_data={"greeting": initial_prompt or "Session started."},
session_state={"resume_session_id": session.id},
)
logger.debug(
"[_queen_loop] executor.execute() returned with success=%s", result.success
)
if result.success:
logger.warning("Queen executor returned (should be forever-alive)")
else:
logger.error(
"Queen executor failed: %s",
result.error or "(no error message)",
)
# Set the first user message in the node's input_data
# so execute() sees it as the initial context.
ctx.input_data = {
"user_request": initial_prompt or "Session started.",
}
# Run the queen -- forever-alive conversation loop
result = await agent_loop.execute(ctx)
if result.stop_reason == "complete":
logger.warning("Queen returned (should be forever-alive)")
elif result.error:
logger.error("Queen failed: %s", result.error)
except asyncio.CancelledError:
logger.info("[_queen_loop] Queen loop cancelled (normal shutdown)")
raise
@@ -455,7 +465,8 @@ async def create_queen(
raise
finally:
logger.warning(
"[_queen_loop] Queen loop exiting — clearing queen_executor for session '%s'",
"[_queen_loop] Queen loop exiting — clearing queen_executor "
"for session '%s'",
session.id,
)
session.queen_executor = None