Compare commits

...

1 Commits

Author SHA1 Message Date
Timothy 4c8e84b421 fix: compatibility of memory with queen orchestrator 2026-03-09 17:58:52 -07:00
7 changed files with 78 additions and 21 deletions
@@ -81,6 +81,7 @@ _QUEEN_PLANNING_TOOLS = [
"initialize_and_build_agent",
# Load existing agent (after user confirms)
"load_built_agent",
"write_to_diary", # Episodic memory — available in all phases
]
# Building phase: full coding + agent construction tools.
@@ -644,6 +645,12 @@ Google Maps, extracts contact details, and syncs them to Google Sheets.
ask_user("Do you want to run it?", ["Yes, run it", "Check credentials first",
"Modify the worker"])
## Tool calls
Always write at least one sentence of explanation before invoking any tool. \
Never emit a bare tool call with no preceding text the explanation tells the \
user (and yourself) what you are doing and why.
## When user ask identity and responsibility
Only answer identity when the user explicitly asks (for example: "who are you?", \
+1 -1
View File
@@ -325,7 +325,7 @@ async def consolidate_queen_memory(
len(user_msg) // 4,
)
from framework.agents.hive_coder.config import default_config
from framework.agents.queen.config import default_config
semantic_resp, diary_resp = await asyncio.gather(
llm.acomplete(
+31 -8
View File
@@ -2824,6 +2824,17 @@ class EventLoopNode(NodeProtocol):
if ctx.node_spec.skip_judge:
return JudgeVerdict(action="RETRY") # feedback=None → not logged
# --- Level 0b: output completion gate ------------------------------
# If all required outputs are already filled, fall straight through to
# the safety checks and ACCEPT — regardless of tool calls made this turn
# and regardless of whether a custom judge would otherwise give feedback.
_missing = self._get_missing_output_keys(
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
)
if ctx.node_spec.output_keys and not _missing:
# Skip directly to safety checks — custom judge is not consulted.
return await self._accept_if_safe(ctx, accumulator, conversation, iteration)
# --- Level 1: custom judge -----------------------------------------
if self._judge is not None:
@@ -2847,25 +2858,37 @@ class EventLoopNode(NodeProtocol):
# --- Level 2: implicit judge ---------------------------------------
# --- Level 2: implicit judge (outputs still incomplete) ------------
# Level 0b already handled the "outputs complete" case above.
# Real tool calls were made — let the agent keep working.
if tool_results:
return JudgeVerdict(action="RETRY") # feedback=None → not logged
missing = self._get_missing_output_keys(
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
)
if missing:
if _missing:
return JudgeVerdict(
action="RETRY",
feedback=(
f"Task incomplete. Required outputs not yet produced: {missing}. "
f"Task incomplete. Required outputs not yet produced: {_missing}. "
f"Follow your system prompt instructions to complete the work."
),
)
# All output keys present — run safety checks before accepting.
# No output_keys defined — fall through to safety checks.
return await self._accept_if_safe(ctx, accumulator, conversation, iteration)
async def _accept_if_safe(
self,
ctx: NodeContext,
accumulator: OutputAccumulator,
conversation: NodeConversation,
iteration: int,
) -> JudgeVerdict:
"""Run safety checks and return ACCEPT or a RETRY with feedback.
Called when all required output keys are present (or there are none).
Shared by the output-completion gate (Level 0b) and the implicit judge.
"""
output_keys = ctx.node_spec.output_keys or []
nullable_keys = set(ctx.node_spec.nullable_output_keys or [])
@@ -2895,7 +2918,7 @@ class EventLoopNode(NodeProtocol):
),
)
# Level 2b: conversation-aware quality check (if success_criteria set)
# Conversation-aware quality check (if success_criteria set)
if ctx.node_spec.success_criteria and ctx.llm:
from framework.graph.conversation_judge import evaluate_phase_completion
+14 -1
View File
@@ -101,6 +101,11 @@ async def create_queen(
phase_state=phase_state,
)
# ---- Memory tools (always registered) ----------------------------
from framework.tools.queen_memory_tools import register_queen_memory_tools
register_queen_memory_tools(queen_registry)
# ---- Monitoring tools (only when worker is loaded) ----------------
if session.worker_runtime:
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
@@ -148,6 +153,13 @@ async def create_queen(
"Handle all tasks directly using your coding tools."
)
# Cross-session memory — stored on phase_state so it can be refreshed
# after each consolidation without rebuilding the whole prompt.
from framework.agents.queen.queen_memory import format_for_injection
_memory = format_for_injection()
phase_state.memory_block = f"\n\n{_memory}" if _memory else ""
_planning_body = (
_queen_style
+ _shared_building_knowledge
@@ -206,7 +218,8 @@ async def create_queen(
)
)
body = _planning_body if phase_state.phase == "planning" else _building_body
return HookResult(system_prompt=persona + "\n\n" + body)
# Use phase_state.memory_block so persona hook also sees refreshed memory.
return HookResult(system_prompt=persona + "\n\n" + body + phase_state.memory_block)
# ---- Graph preparation -------------------------------------------
initial_prompt_text = phase_state.get_current_prompt()
+9 -2
View File
@@ -328,7 +328,6 @@ class SessionManager:
)
# Notify queen about the loaded worker (skip for queen itself).
# Health judge disabled for simplicity.
if agent_path.name != "queen" and session.worker_runtime:
# await self._start_judge(session, session.runner._storage_path)
await self._notify_queen_worker_loaded(session)
@@ -537,11 +536,19 @@ class SessionManager:
_consolidation_session_dir = queen_dir
async def _on_compaction(_event) -> None:
from framework.agents.queen.queen_memory import consolidate_queen_memory
from framework.agents.queen.queen_memory import (
consolidate_queen_memory,
format_for_injection,
)
await consolidate_queen_memory(
session.id, _consolidation_session_dir, _consolidation_llm
)
# Refresh the memory block on phase_state so the queen sees updated
# memory in subsequent turns without waiting for a new session.
if session.phase_state is not None:
_mem = format_for_injection()
session.phase_state.memory_block = f"\n\n{_mem}" if _mem else ""
from framework.runtime.event_bus import EventType as _ET
+15 -8
View File
@@ -84,12 +84,17 @@ class QueenPhaseState:
inject_notification: Any = None # async (str) -> None
event_bus: Any = None # EventBus — for emitting QUEEN_PHASE_CHANGED events
# Phase-specific prompts (set by session_manager after construction)
# Phase-specific base prompts (set by orchestrator after construction).
# Do NOT bake memory_block into these — it is appended dynamically.
prompt_planning: str = ""
prompt_building: str = ""
prompt_staging: str = ""
prompt_running: str = ""
# Cross-session memory block — refreshed after each consolidation so the
# queen sees up-to-date memory even when compaction happens mid-session.
memory_block: str = ""
def get_current_tools(self) -> list:
"""Return tools for the current phase."""
if self.phase == "planning":
@@ -101,14 +106,16 @@ class QueenPhaseState:
return list(self.building_tools)
def get_current_prompt(self) -> str:
"""Return the system prompt for the current phase."""
"""Return the system prompt for the current phase with current memory."""
if self.phase == "planning":
return self.prompt_planning
if self.phase == "running":
return self.prompt_running
if self.phase == "staging":
return self.prompt_staging
return self.prompt_building
base = self.prompt_planning
elif self.phase == "running":
base = self.prompt_running
elif self.phase == "staging":
base = self.prompt_staging
else:
base = self.prompt_building
return base + self.memory_block
async def _emit_phase_event(self) -> None:
"""Publish a QUEEN_PHASE_CHANGED event so the frontend updates the tag."""
+1 -1
View File
@@ -27,7 +27,7 @@ def write_to_diary(entry: str) -> str:
You do not need to include a timestamp or date heading; those are added
automatically.
"""
from framework.agents.hive_coder.queen_memory import append_episodic_entry
from framework.agents.queen.queen_memory import append_episodic_entry
append_episodic_entry(entry)
return "Diary entry recorded."