diff --git a/core/framework/agent_loop/agent_loop.py b/core/framework/agent_loop/agent_loop.py index e2e4733a..39f47dd4 100644 --- a/core/framework/agent_loop/agent_loop.py +++ b/core/framework/agent_loop/agent_loop.py @@ -2293,7 +2293,24 @@ class AgentLoop(NodeProtocol): False, ) - # Execute tool calls — framework tools (set_output, ask_user) + # Priority drain: if user sent a message while the LLM was + # streaming, inject it into the conversation NOW -- before tool + # execution. The LLM will see it on the next inner turn. + if not self._injection_queue.empty(): + while not self._injection_queue.empty(): + _inj_content, _inj_client, _inj_images = ( + self._injection_queue.get_nowait() + ) + if _inj_client: + await conversation.add_user_message(_inj_content) + logger.info( + "[%s] Priority-injected user message mid-turn (%d chars)", + node_id, len(_inj_content), + ) + else: + await conversation.add_user_message(_inj_content) + + # Execute tool calls -- framework tools (set_output, ask_user) # run inline; real MCP tools run in parallel. real_tool_results: list[dict] = [] limit_hit = False diff --git a/core/framework/agents/queen/__init__.py b/core/framework/agents/queen/__init__.py index caff6298..f86488f6 100644 --- a/core/framework/agents/queen/__init__.py +++ b/core/framework/agents/queen/__init__.py @@ -1,19 +1,13 @@ -""" -Queen — Native agent builder for the Hive framework. +"""Queen -- the agent builder for the Hive framework.""" -Deeply understands the agent framework and produces complete Python packages -with goals, nodes, edges, system prompts, MCP configuration, and tests -from natural language specifications. -""" - -from .agent import queen_goal, queen_graph +from .agent import queen_goal, queen_loop_config from .config import AgentMetadata, RuntimeConfig, default_config, metadata __version__ = "1.0.0" __all__ = [ "queen_goal", - "queen_graph", + "queen_loop_config", "RuntimeConfig", "AgentMetadata", "default_config", diff --git a/core/framework/agents/queen/agent.py b/core/framework/agents/queen/agent.py index c282c1d5..ba59f963 100644 --- a/core/framework/agents/queen/agent.py +++ b/core/framework/agents/queen/agent.py @@ -1,38 +1,29 @@ -"""Queen graph definition.""" +"""Queen agent definition. -from framework.orchestrator import Goal -from framework.orchestrator.edge import GraphSpec +The queen is a single AgentLoop -- no graph, no orchestrator. +Loaded by queen_orchestrator.create_queen(). +""" + +from framework.orchestrator.goal import Goal from .nodes import queen_node -# --------------------------------------------------------------------------- -# Queen graph — the primary persistent conversation. -# Loaded by queen_orchestrator.create_queen(), NOT by AgentRunner. -# --------------------------------------------------------------------------- - queen_goal = Goal( id="queen-manager", name="Queen Manager", description=( - "Manage the worker agent lifecycle and serve as the user's primary interactive interface." + "Manage the worker agent lifecycle and serve as the " + "user's primary interactive interface." ), success_criteria=[], constraints=[], ) -queen_graph = GraphSpec( - id="queen-graph", - goal_id=queen_goal.id, - version="1.0.0", - entry_node="queen", - entry_points={"start": "queen"}, - terminal_nodes=[], - pause_nodes=[], - nodes=[queen_node], - edges=[], - conversation_mode="continuous", - loop_config={ - "max_iterations": 999_999, - "max_tool_calls_per_turn": 30, - }, -) +# Loop config -- used by queen_orchestrator to build LoopConfig +queen_loop_config = { + "max_iterations": 999_999, + "max_tool_calls_per_turn": 30, + "max_context_tokens": 180_000, +} + +__all__ = ["queen_goal", "queen_loop_config", "queen_node"] diff --git a/core/framework/server/queen_orchestrator.py b/core/framework/server/queen_orchestrator.py index ca057ec8..ffb9c7e7 100644 --- a/core/framework/server/queen_orchestrator.py +++ b/core/framework/server/queen_orchestrator.py @@ -32,7 +32,7 @@ async def create_queen( """ from framework.agents.queen.agent import ( queen_goal, - queen_graph as _queen_graph, + queen_loop_config as _base_loop_config, ) from framework.agents.queen.nodes import ( _QUEEN_BUILDING_TOOLS, @@ -66,17 +66,14 @@ async def create_queen( ) from framework.agents.queen.nodes.thinking_hook import select_expert_persona from framework.agent_loop.agent_loop import HookContext, HookResult - from framework.orchestrator.orchestrator import Orchestrator from framework.loader.mcp_registry import MCPRegistry from framework.loader.tool_registry import ToolRegistry - from framework.tracker.decision_tracker import DecisionTracker # noqa: F401 from framework.host.event_bus import AgentEvent, EventType from framework.tools.queen_lifecycle_tools import ( QueenPhaseState, register_queen_lifecycle_tools, ) - hive_home = Path.home() / ".hive" # ---- Tool registry ------------------------------------------------ queen_registry = ToolRegistry() @@ -197,7 +194,7 @@ async def create_queen( phase_state.global_memory_dir = global_dir # ---- Compose phase-specific prompts ------------------------------ - _orig_node = _queen_graph.nodes[0] + from framework.agents.queen.nodes import queen_node as _orig_node if worker_identity is None: worker_identity = ( @@ -321,56 +318,76 @@ async def create_queen( adjusted_node = _orig_node.model_copy(update=node_updates) _queen_loop_config = { - **(_queen_graph.loop_config or {}), + **_base_loop_config, "hooks": {"session_start": [_persona_hook]}, } - queen_graph = _queen_graph.model_copy( - update={"nodes": [adjusted_node], "loop_config": _queen_loop_config} - ) - # ---- Queen event loop -------------------------------------------- - queen_runtime = DecisionTracker(hive_home / "queen") + # ---- Queen event loop (AgentLoop directly, no Orchestrator) ------- + from types import SimpleNamespace + + from framework.agent_loop.agent_loop import AgentLoop, LoopConfig + from framework.storage.conversation_store import FileConversationStore + from framework.orchestrator.node import DataBuffer, NodeContext async def _queen_loop(): logger.debug("[_queen_loop] Starting queen loop for session %s", session.id) try: - logger.debug("[_queen_loop] Creating Orchestrator...") - executor = Orchestrator( - runtime=queen_runtime, - llm=session.llm, - tools=queen_tools, - tool_executor=queen_tool_executor, + # Build LoopConfig from the queen graph's config + persona hook + lc = _queen_loop_config + queen_loop_config = LoopConfig( + max_iterations=lc.get("max_iterations", 999_999), + max_tool_calls_per_turn=lc.get("max_tool_calls_per_turn", 30), + max_context_tokens=lc.get("max_context_tokens", 180_000), + hooks=lc.get("hooks", {}), + ) + + # Create AgentLoop directly -- no Orchestrator, no graph traversal + agent_loop = AgentLoop( event_bus=session.event_bus, + config=queen_loop_config, + tool_executor=queen_tool_executor, + conversation_store=FileConversationStore(queen_dir / "conversations"), + ) + + # Build NodeContext manually + from framework.tracker.decision_tracker import DecisionTracker + + ctx = NodeContext( + runtime=DecisionTracker(queen_dir), + node_id="queen", + node_spec=adjusted_node, + buffer=DataBuffer(), + llm=session.llm, + available_tools=queen_tools, + goal_context=queen_goal.description, + max_tokens=lc.get("max_tokens", 8192), stream_id="queen", - storage_path=queen_dir, - loop_config=_queen_loop_config, execution_id=session.id, dynamic_tools_provider=phase_state.get_current_tools, dynamic_prompt_provider=phase_state.get_current_prompt, iteration_metadata_provider=lambda: {"phase": phase_state.phase}, - skill_dirs=_queen_skill_dirs, - protocols_prompt=phase_state.protocols_prompt, skills_catalog_prompt=phase_state.skills_catalog_prompt, + protocols_prompt=phase_state.protocols_prompt, + skill_dirs=_queen_skill_dirs, + ) + + # Expose for chat handler injection (node_registry compat) + session.queen_executor = SimpleNamespace( + node_registry={"queen": agent_loop}, ) - session.queen_executor = executor - logger.debug("[_queen_loop] Orchestrator created and stored in session.queen_executor") # Wire inject_notification so phase switches notify the queen LLM async def _inject_phase_notification(content: str) -> None: - node = executor.node_registry.get("queen") - if node is not None and hasattr(node, "inject_event"): - await node.inject_event(content) + await agent_loop.inject_event(content) phase_state.inject_notification = _inject_phase_notification # Auto-switch to editing when worker execution finishes. - # The worker stays loaded — queen can tweak config and re-run. async def _on_worker_done(event): if event.stream_id == "queen": return if phase_state.phase == "running": if event.type == EventType.EXECUTION_COMPLETED: - # Mark worker as configured after first successful run session.worker_configured = True output = event.data.get("output", {}) output_summary = "" @@ -388,7 +405,7 @@ async def create_queen( "Ask if they want to re-run with different input " "or tweak the configuration." ) - else: # EXECUTION_FAILED + else: error = event.data.get("error", "Unknown error") notification = ( "[WORKER_TERMINAL] Worker failed.\n" @@ -398,17 +415,14 @@ async def create_queen( "building/planning if code changes are needed." ) - node = executor.node_registry.get("queen") - if node is not None and hasattr(node, "inject_event"): - await node.inject_event(notification) - + await agent_loop.inject_event(notification) await phase_state.switch_to_editing(source="auto") session.event_bus.subscribe( event_types=[EventType.EXECUTION_COMPLETED, EventType.EXECUTION_FAILED], handler=_on_worker_done, ) - session_manager._subscribe_worker_handoffs(session, executor) + session_manager._subscribe_worker_handoffs(session, session.queen_executor) # ---- Reflection + recall memory subscriptions ---------------- from framework.agents.queen.reflection_agent import subscribe_reflection_triggers @@ -420,8 +434,6 @@ async def create_queen( memory_dir=colony_dir, phase_state=phase_state, ) - - # Store sub IDs on session for teardown. session.memory_reflection_subs = _reflection_subs logger.info( @@ -430,23 +442,21 @@ async def create_queen( len(phase_state.get_current_tools()), [t.name for t in phase_state.get_current_tools()], ) - logger.debug("[_queen_loop] Calling executor.execute()...") - result = await executor.execute( - graph=queen_graph, - goal=queen_goal, - input_data={"greeting": initial_prompt or "Session started."}, - session_state={"resume_session_id": session.id}, - ) - logger.debug( - "[_queen_loop] executor.execute() returned with success=%s", result.success - ) - if result.success: - logger.warning("Queen executor returned (should be forever-alive)") - else: - logger.error( - "Queen executor failed: %s", - result.error or "(no error message)", - ) + + # Set the first user message in the node's input_data + # so execute() sees it as the initial context. + ctx.input_data = { + "user_request": initial_prompt or "Session started.", + } + + # Run the queen -- forever-alive conversation loop + result = await agent_loop.execute(ctx) + + if result.stop_reason == "complete": + logger.warning("Queen returned (should be forever-alive)") + elif result.error: + logger.error("Queen failed: %s", result.error) + except asyncio.CancelledError: logger.info("[_queen_loop] Queen loop cancelled (normal shutdown)") raise @@ -455,7 +465,8 @@ async def create_queen( raise finally: logger.warning( - "[_queen_loop] Queen loop exiting — clearing queen_executor for session '%s'", + "[_queen_loop] Queen loop exiting — clearing queen_executor " + "for session '%s'", session.id, ) session.queen_executor = None