From 6637bc8d9670fb3b66f8451443f74923edad8463 Mon Sep 17 00:00:00 2001 From: Richard Tang Date: Tue, 7 Apr 2026 12:08:35 -0700 Subject: [PATCH 1/5] feat: simplify memory implementation --- core/framework/agents/queen/nodes/__init__.py | 43 +- core/framework/agents/queen/queen_memory.py | 420 ---------------- .../framework/agents/queen/queen_memory_v2.py | 357 +------------- .../framework/agents/queen/recall_selector.py | 75 +-- .../agents/queen/reflection_agent.py | 465 +++--------------- core/framework/graph/context.py | 6 - core/framework/graph/executor.py | 13 - core/framework/graph/worker_agent.py | 59 --- core/framework/runtime/agent_runtime.py | 9 - core/framework/runtime/execution_stream.py | 12 - core/framework/server/queen_orchestrator.py | 16 +- core/framework/server/session_manager.py | 118 +---- core/framework/tools/queen_lifecycle_tools.py | 96 +--- core/framework/tools/queen_memory_tools.py | 100 ---- core/tests/test_queen_memory.py | 387 +++------------ .../test_session_manager_worker_handoff.py | 35 -- 16 files changed, 207 insertions(+), 2004 deletions(-) delete mode 100644 core/framework/agents/queen/queen_memory.py delete mode 100644 core/framework/tools/queen_memory_tools.py diff --git a/core/framework/agents/queen/nodes/__init__.py b/core/framework/agents/queen/nodes/__init__.py index b3264b78..498df752 100644 --- a/core/framework/agents/queen/nodes/__init__.py +++ b/core/framework/agents/queen/nodes/__init__.py @@ -84,7 +84,7 @@ _QUEEN_PLANNING_TOOLS = [ "initialize_and_build_agent", # Load existing agent (after user confirms) "load_built_agent", - "save_global_memory", + ] # Building phase: full coding + agent construction tools. @@ -93,7 +93,7 @@ _QUEEN_BUILDING_TOOLS = _SHARED_TOOLS + [ "list_credentials", "replan_agent", "save_agent_draft", # Re-draft during building → auto-dissolves + updates flowchart - "save_global_memory", + ] # Staging phase: agent loaded but not yet running — inspect, configure, launch. @@ -113,7 +113,7 @@ _QUEEN_STAGING_TOOLS = [ "set_trigger", "remove_trigger", "list_triggers", - "save_global_memory", + ] # Running phase: worker is executing — monitor, control, or switch to editing. @@ -137,7 +137,7 @@ _QUEEN_RUNNING_TOOLS = [ "set_trigger", "remove_trigger", "list_triggers", - "save_global_memory", + ] # Editing phase: worker done, still loaded — tweak config and re-run. @@ -160,7 +160,7 @@ _QUEEN_EDITING_TOOLS = [ "set_trigger", "remove_trigger", "list_triggers", - "save_global_memory", + ] @@ -646,8 +646,6 @@ to fix the currently loaded agent (no draft required). - load_built_agent(agent_path) — Load an existing agent and switch to STAGING \ phase. Only use this when the user explicitly asks to work with an existing agent \ (e.g. "load my_agent", "run the research agent"). Confirm with the user first. -- save_global_memory(category, description, content, name?) — Save durable \ -cross-queen memory about the user only (profile, preferences, environment, feedback) ## Workflow summary 1. Understand requirements → discover tools → design graph @@ -679,8 +677,6 @@ updated flowchart immediately. Use this when you make structural changes \ restored (with decision/browser nodes intact) so you can edit it. Use \ when the user wants to change integrations, swap tools, rethink the \ flow, or discuss any design changes before you build them. -- save_global_memory(category, description, content, name?) — Save durable \ -cross-queen memory about the user only When you finish building an agent, call load_built_agent(path) to stage it. """ @@ -694,8 +690,6 @@ The agent is loaded and ready to run. You can inspect it and launch it: - get_graph_status(focus?) — Brief status - run_agent_with_input(task) — Start the worker and switch to RUNNING phase - set_trigger / remove_trigger / list_triggers — Timer management -- save_global_memory(category, description, content, name?) — Save \ -durable cross-queen memory about the user only You do NOT have write tools or backward transition tools in staging. \ To modify the agent, run it first — after it finishes you enter EDITING \ @@ -715,8 +709,6 @@ The worker is running. You have monitoring and lifecycle tools: for config tweaks, re-runs, or escalation to building/planning - run_agent_with_input(task) — Re-run the worker with new input - set_trigger / remove_trigger / list_triggers — Timer management -- save_global_memory(category, description, content, name?) — Save \ -durable cross-queen memory about the user only When the worker finishes on its own, you automatically move to EDITING \ phase. You can also call switch_to_editing() to stop early and tweak. @@ -732,7 +724,6 @@ The worker has finished executing and is still loaded. You can tweak and re-run: - run_agent_with_input(task) — Re-run the worker with new input - get_worker_health_summary() — Review last run's health data - set_trigger / remove_trigger / list_triggers — Timer management -- save_global_memory — Save durable cross-queen memory You do NOT have write/edit file tools or backward transition tools. \ You can only re-run or tweak from this phase. @@ -934,26 +925,10 @@ diagnosis mode — you already have a built agent, you just need to fix it. _queen_memory_instructions = """ ## Your Memory -Relevant colony memories from this queen session may appear in context under \ -"--- Colony Memories ---". Relevant global user memories may appear under \ -"--- Global Memories ---". - -Colony memories are shared with the worker for this queen session. Use them \ -for continuity about what this user is trying to do, what has worked, and \ -what the colony has learned together. - -Global memories are shared across queens and are only for durable knowledge \ -about the user: who they are, their preferences, their environment, and \ -their feedback. - -Memories older than 1 day include a staleness warning. Treat these as \ -point-in-time observations — verify current details before asserting them \ -as fact. - -You do NOT need to manually save or recall colony memories. A background \ -reflection agent automatically extracts colony learnings from each \ -conversation turn. Use `save_global_memory` only when you learn something \ -durable about the user that should help future queens. +Relevant global memories about the user may appear at the end of this prompt \ +under "--- Global Memories ---". These are automatically maintained across \ +sessions. Use them to inform your responses but verify stale claims before \ +asserting them as fact. """ _queen_behavior_always = _queen_behavior_always + _queen_memory_instructions diff --git a/core/framework/agents/queen/queen_memory.py b/core/framework/agents/queen/queen_memory.py deleted file mode 100644 index c0a8763a..00000000 --- a/core/framework/agents/queen/queen_memory.py +++ /dev/null @@ -1,420 +0,0 @@ -"""Queen global cross-session memory. - -Three-tier memory architecture: - ~/.hive/queen/MEMORY.md — semantic (who, what, why) - ~/.hive/queen/memories/MEMORY-YYYY-MM-DD.md — episodic (daily journals) - ~/.hive/queen/session/{id}/data/adapt.md — working (session-scoped) - -Semantic and episodic files are injected at queen session start. - -Semantic memory (MEMORY.md) is updated automatically at session end via -consolidate_queen_memory() — the queen never rewrites this herself. - -Episodic memory (MEMORY-date.md) can be written by the queen during a session -via the write_to_diary tool, and is also appended to at session end by -consolidate_queen_memory(). -""" - -from __future__ import annotations - -import asyncio -import json -import logging -import traceback -from datetime import date, datetime -from pathlib import Path - -logger = logging.getLogger(__name__) - - -def _queen_dir() -> Path: - return Path.home() / ".hive" / "queen" - - -def format_memory_date(d: date) -> str: - """Return a cross-platform long date label without a zero-padded day.""" - return f"{d.strftime('%B')} {d.day}, {d.year}" - - -def semantic_memory_path() -> Path: - return _queen_dir() / "MEMORY.md" - - -def episodic_memory_path(d: date | None = None) -> Path: - d = d or date.today() - return _queen_dir() / "memories" / f"MEMORY-{d.strftime('%Y-%m-%d')}.md" - - -def read_semantic_memory() -> str: - path = semantic_memory_path() - return path.read_text(encoding="utf-8").strip() if path.exists() else "" - - -def read_episodic_memory(d: date | None = None) -> str: - path = episodic_memory_path(d) - return path.read_text(encoding="utf-8").strip() if path.exists() else "" - - -def _find_recent_episodic(lookback: int = 7) -> tuple[date, str] | None: - """Find the most recent non-empty episodic memory within *lookback* days.""" - from datetime import timedelta - - today = date.today() - for offset in range(lookback): - d = today - timedelta(days=offset) - content = read_episodic_memory(d) - if content: - return d, content - return None - - -# Budget (in characters) for episodic memory in the system prompt. -_EPISODIC_CHAR_BUDGET = 6_000 - - -def format_for_injection() -> str: - """Format cross-session memory for system prompt injection. - - Returns an empty string if no meaningful content exists yet (e.g. first - session with only the seed template). - """ - semantic = read_semantic_memory() - recent = _find_recent_episodic() - - # Suppress injection if semantic is still just the seed template - if semantic and semantic.startswith("# My Understanding of the User\n\n*No sessions"): - semantic = "" - - parts: list[str] = [] - if semantic: - parts.append(semantic) - - if recent: - d, content = recent - # Trim oversized episodic entries to keep the prompt manageable - if len(content) > _EPISODIC_CHAR_BUDGET: - content = content[:_EPISODIC_CHAR_BUDGET] + "\n\n…(truncated)" - today = date.today() - if d == today: - label = f"## Today — {format_memory_date(d)}" - else: - label = f"## {format_memory_date(d)}" - parts.append(f"{label}\n\n{content}") - - if not parts: - return "" - - body = "\n\n---\n\n".join(parts) - return "--- Your Cross-Session Memory ---\n\n" + body + "\n\n--- End Cross-Session Memory ---" - - -_SEED_TEMPLATE = """\ -# My Understanding of the User - -*No sessions recorded yet.* - -## Who They Are - -## How They Communicate - -## What They're Trying to Achieve - -## What's Working - -## What I've Learned -""" - - -def append_episodic_entry(content: str) -> None: - """Append a timestamped prose entry to today's episodic memory file. - - Creates the file (with a date heading) if it doesn't exist yet. - Used both by the queen's diary tool and by the consolidation hook. - """ - ep_path = episodic_memory_path() - ep_path.parent.mkdir(parents=True, exist_ok=True) - today = date.today() - today_str = format_memory_date(today) - timestamp = datetime.now().strftime("%H:%M") - if not ep_path.exists(): - header = f"# {today_str}\n\n" - block = f"{header}### {timestamp}\n\n{content.strip()}\n" - else: - block = f"\n\n### {timestamp}\n\n{content.strip()}\n" - with ep_path.open("a", encoding="utf-8") as f: - f.write(block) - - -def seed_if_missing() -> None: - """Create MEMORY.md with a blank template if it doesn't exist yet.""" - path = semantic_memory_path() - if path.exists(): - return - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(_SEED_TEMPLATE, encoding="utf-8") - - -# --------------------------------------------------------------------------- -# Consolidation prompt -# --------------------------------------------------------------------------- - -_SEMANTIC_SYSTEM = """\ -You maintain the persistent cross-session memory of an AI assistant called the Queen. -Review the session notes and rewrite MEMORY.md — the Queen's durable understanding of the -person she works with across all sessions. - -Write entirely in the Queen's voice — first person, reflective, honest. -Not a log of events, but genuine understanding of who this person is over time. - -Rules: -- Update and synthesise: incorporate new understanding, update facts that have changed, remove - details that are stale, superseded, or no longer say anything meaningful about the person. -- Keep it as structured markdown with named sections about the PERSON, not about today. -- Do NOT include diary sections, daily logs, or session summaries. Those belong elsewhere. - MEMORY.md is about who they are, what they want, what works — not what happened today. -- Maintain a "How They Communicate" section: technical depth, preferred pace - (fast/exploratory/thorough), what communication approaches have worked or not, - tone preferences. Update based on diary reflections about communication. - This section should evolve — "prefers direct answers" is useful on day 1; - "prefers direct answers for technical questions but wants more context when - discussing architecture trade-offs" is better by day 5. -- Reference dates only when noting a lasting milestone (e.g. "since March 8th they prefer X"). -- If the session had no meaningful new information about the person, - return the existing text unchanged. -- Do not add fictional details. Only reflect what is evidenced in the notes. -- Stay concise. Prune rather than accumulate. A lean, accurate file is more useful than a - dense one. If something was true once but has been resolved or superseded, remove it. -- Output only the raw markdown content of MEMORY.md. No preamble, no code fences. -""" - -_DIARY_SYSTEM = """\ -You maintain the daily episodic diary of an AI assistant called the Queen. -You receive: (1) today's existing diary so far, and (2) notes from the latest session. - -Rewrite the complete diary for today as a single unified narrative — -first person, reflective, honest. -Merge and deduplicate: if the same story (e.g. a research agent stalling) recurred several times, -describe it once with appropriate weight rather than retelling it. Weave in new developments from -the session notes. Preserve important milestones, emotional texture, and session path references. -Preserve reflections about communication effectiveness — these are important inputs for the -Queen's evolving understanding of the user. A reflection like "they responded much better when -I led with the recommendation instead of listing options" is as important as -"we built a Gmail agent." - -If today's diary is empty, write the initial entry based on the session notes alone. - -Output only the full diary prose — no date heading, no timestamp headers, -no preamble, no code fences. -""" - - -def read_session_context(session_dir: Path, max_messages: int = 80) -> str: - """Extract a readable transcript from conversation parts + adapt.md. - - Reads the last ``max_messages`` conversation parts and the session's - adapt.md (working memory). Tool results are omitted — only user and - assistant turns (with tool-call names noted) are included. - """ - parts: list[str] = [] - - # Working notes - adapt_path = session_dir / "data" / "adapt.md" - if adapt_path.exists(): - text = adapt_path.read_text(encoding="utf-8").strip() - if text: - parts.append(f"## Session Working Notes (adapt.md)\n\n{text}") - - # Conversation transcript - parts_dir = session_dir / "conversations" / "parts" - if parts_dir.exists(): - part_files = sorted(parts_dir.glob("*.json"))[-max_messages:] - lines: list[str] = [] - for pf in part_files: - try: - data = json.loads(pf.read_text(encoding="utf-8")) - role = data.get("role", "") - content = str(data.get("content", "")).strip() - tool_calls = data.get("tool_calls") or [] - if role == "tool": - continue # skip verbose tool results - if role == "assistant" and tool_calls and not content: - names = [tc.get("function", {}).get("name", "?") for tc in tool_calls] - lines.append(f"[queen calls: {', '.join(names)}]") - elif content: - label = "user" if role == "user" else "queen" - lines.append(f"[{label}]: {content[:600]}") - except (KeyError, TypeError) as exc: - logger.debug("Skipping malformed conversation message: %s", exc) - continue - except Exception: - logger.warning("Unexpected error parsing conversation message", exc_info=True) - continue - if lines: - parts.append("## Conversation\n\n" + "\n".join(lines)) - - return "\n\n".join(parts) - - -# --------------------------------------------------------------------------- -# Context compaction (binary-split LLM summarisation) -# --------------------------------------------------------------------------- - -# If the raw session context exceeds this many characters, compact it first -# before sending to the consolidation LLM. ~200 k chars ≈ 50 k tokens. -_CTX_COMPACT_CHAR_LIMIT = 200_000 -_CTX_COMPACT_MAX_DEPTH = 8 - -_COMPACT_SYSTEM = ( - "Summarise this conversation segment. Preserve: user goals, key decisions, " - "what was built or changed, emotional tone, and important outcomes. " - "Write concisely in third person past tense. Omit routine tool invocations " - "unless the result matters." -) - - -async def _compact_context(text: str, llm: object, *, _depth: int = 0) -> str: - """Binary-split and LLM-summarise *text* until it fits within the char limit. - - Mirrors the recursive binary-splitting strategy used by the main agent - compaction pipeline (EventLoopNode._llm_compact). - """ - if len(text) <= _CTX_COMPACT_CHAR_LIMIT or _depth >= _CTX_COMPACT_MAX_DEPTH: - return text - - # Split near the midpoint on a line boundary so we don't cut mid-message - mid = len(text) // 2 - split_at = text.rfind("\n", 0, mid) + 1 - if split_at <= 0: - split_at = mid - - half1, half2 = text[:split_at], text[split_at:] - - async def _summarise(chunk: str) -> str: - try: - resp = await llm.acomplete( - messages=[{"role": "user", "content": chunk}], - system=_COMPACT_SYSTEM, - max_tokens=2048, - ) - return resp.content.strip() - except Exception: - logger.warning( - "queen_memory: context compaction LLM call failed (depth=%d), truncating", - _depth, - ) - return chunk[: _CTX_COMPACT_CHAR_LIMIT // 4] - - s1, s2 = await asyncio.gather(_summarise(half1), _summarise(half2)) - combined = s1 + "\n\n" + s2 - if len(combined) > _CTX_COMPACT_CHAR_LIMIT: - return await _compact_context(combined, llm, _depth=_depth + 1) - return combined - - -async def consolidate_queen_memory( - session_id: str, - session_dir: Path, - llm: object, -) -> None: - """Update MEMORY.md and append a diary entry based on the current session. - - Reads conversation parts and adapt.md from session_dir. Called - periodically in the background and once at session end. Failures are - logged and silently swallowed so they never block teardown. - - Args: - session_id: The session ID (used for the adapt.md path reference). - session_dir: Path to the session directory (~/.hive/queen/session/{id}). - llm: LLMProvider instance (must support acomplete()). - """ - try: - session_context = read_session_context(session_dir) - if not session_context: - logger.debug("queen_memory: no session context, skipping consolidation") - return - - logger.info("queen_memory: consolidating memory for session %s ...", session_id) - - # If the transcript is very large, compact it with recursive binary LLM - # summarisation before sending to the consolidation model. - if len(session_context) > _CTX_COMPACT_CHAR_LIMIT: - logger.info( - "queen_memory: session context is %d chars — compacting first", - len(session_context), - ) - session_context = await _compact_context(session_context, llm) - logger.info("queen_memory: compacted to %d chars", len(session_context)) - - existing_semantic = read_semantic_memory() - today_journal = read_episodic_memory() - today = date.today() - today_str = format_memory_date(today) - adapt_path = session_dir / "data" / "adapt.md" - - user_msg = ( - f"## Existing Semantic Memory (MEMORY.md)\n\n" - f"{existing_semantic or '(none yet)'}\n\n" - f"## Today's Diary So Far ({today_str})\n\n" - f"{today_journal or '(none yet)'}\n\n" - f"{session_context}\n\n" - f"## Session Reference\n\n" - f"Session ID: {session_id}\n" - f"Session path: {adapt_path}\n" - ) - - logger.debug( - "queen_memory: calling LLM (%d chars of context, ~%d tokens est.)", - len(user_msg), - len(user_msg) // 4, - ) - - from framework.agents.queen.config import default_config - - semantic_resp, diary_resp = await asyncio.gather( - llm.acomplete( - messages=[{"role": "user", "content": user_msg}], - system=_SEMANTIC_SYSTEM, - max_tokens=default_config.max_tokens, - ), - llm.acomplete( - messages=[{"role": "user", "content": user_msg}], - system=_DIARY_SYSTEM, - max_tokens=default_config.max_tokens, - ), - ) - - new_semantic = semantic_resp.content.strip() - diary_entry = diary_resp.content.strip() - - if new_semantic: - path = semantic_memory_path() - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(new_semantic, encoding="utf-8") - logger.info("queen_memory: semantic memory updated (%d chars)", len(new_semantic)) - - if diary_entry: - # Rewrite today's episodic file in-place — the LLM has merged and - # deduplicated the full day's content, so we replace rather than append. - ep_path = episodic_memory_path() - ep_path.parent.mkdir(parents=True, exist_ok=True) - heading = f"# {today_str}" - ep_path.write_text(f"{heading}\n\n{diary_entry}\n", encoding="utf-8") - logger.info( - "queen_memory: episodic diary rewritten for %s (%d chars)", - today_str, - len(diary_entry), - ) - - except Exception: - tb = traceback.format_exc() - logger.exception("queen_memory: consolidation failed") - # Write to file so the cause is findable regardless of log verbosity. - error_path = _queen_dir() / "consolidation_error.txt" - try: - error_path.parent.mkdir(parents=True, exist_ok=True) - error_path.write_text( - f"session: {session_id}\ntime: {datetime.now().isoformat()}\n\n{tb}", - encoding="utf-8", - ) - except OSError: - pass # Cannot write error file; original exception already logged diff --git a/core/framework/agents/queen/queen_memory_v2.py b/core/framework/agents/queen/queen_memory_v2.py index d8a99f2f..f95c8218 100644 --- a/core/framework/agents/queen/queen_memory_v2.py +++ b/core/framework/agents/queen/queen_memory_v2.py @@ -1,24 +1,17 @@ -"""Shared memory helpers for queen/worker recall and reflection. +"""Queen global memory helpers. -Each memory is an individual ``.md`` file in ``~/.hive/queen/memories/`` -with optional YAML frontmatter (name, type, description). Frontmatter -is a convention enforced by prompt instructions — parsing is lenient and -malformed files degrade gracefully (appear in scans with ``None`` metadata). - -Cursor-based incremental processing tracks which conversation messages -have already been processed by the reflection agent. +Global memory lives in ``~/.hive/queen/global_memory/`` and stores durable +cross-session knowledge about the user (profile, preferences, environment, +feedback). Each memory is an individual ``.md`` file with optional YAML +frontmatter (name, type, description). """ from __future__ import annotations import logging import re -import shutil -import time from dataclasses import dataclass, field -from datetime import date from pathlib import Path -from typing import Any logger = logging.getLogger(__name__) @@ -26,51 +19,15 @@ logger = logging.getLogger(__name__) # Constants # --------------------------------------------------------------------------- -MEMORY_TYPES: tuple[str, ...] = ("goal", "environment", "technique", "reference", "diary") GLOBAL_MEMORY_CATEGORIES: tuple[str, ...] = ("profile", "preference", "environment", "feedback") _HIVE_QUEEN_DIR = Path.home() / ".hive" / "queen" -# Legacy shared v2 root. Colony memory now lives under queen sessions. -MEMORY_DIR: Path = _HIVE_QUEEN_DIR / "memories" MAX_FILES: int = 200 MAX_FILE_SIZE_BYTES: int = 4096 # 4 KB hard limit per memory file # How many lines of a memory file to read for header scanning. _HEADER_LINE_LIMIT: int = 30 -_MIGRATION_MARKER = ".migrated-from-shared-memory" -_GLOBAL_MEMORY_CODE_PATTERN = re.compile( - r"(/Users/|~/.hive|\.py\b|\.ts\b|\.tsx\b|\.js\b|" - r"\b(graph|node|runtime|session|execution|worker|queen|subagent|checkpoint|flowchart)\b)", - re.IGNORECASE, -) - -# Frontmatter example provided to the reflection agent via prompt. -MEMORY_FRONTMATTER_EXAMPLE: list[str] = [ - "```markdown", - "---", - "name: {{memory name}}", - ( - "description: {{one-line description — used to decide " - "relevance in future conversations, so be specific}}" - ), - f"type: {{{{{', '.join(MEMORY_TYPES)}}}}}", - "---", - "", - ( - "{{memory content — for feedback/project types, " - "structure as: rule/fact, then **Why:** " - "and **How to apply:** lines}}" - ), - "```", -] - - -def colony_memory_dir(colony_id: str) -> Path: - """Return the colony memory directory for a queen session.""" - return _HIVE_QUEEN_DIR / "session" / colony_id / "memory" / "colony" - - def global_memory_dir() -> Path: """Return the queen-global memory directory.""" return _HIVE_QUEEN_DIR / "global_memory" @@ -107,15 +64,6 @@ def parse_frontmatter(text: str) -> dict[str, str]: return result -def parse_memory_type(raw: str | None) -> str | None: - """Validate *raw* against supported memory categories.""" - if raw is None: - return None - normalized = raw.strip().lower() - allowed = set(MEMORY_TYPES) | set(GLOBAL_MEMORY_CATEGORIES) - return normalized if normalized in allowed else None - - def parse_global_memory_category(raw: str | None) -> str | None: """Validate *raw* against ``GLOBAL_MEMORY_CATEGORIES``.""" if raw is None: @@ -164,7 +112,7 @@ class MemoryFile: filename=path.name, path=path, name=fm.get("name"), - type=parse_memory_type(fm.get("type")), + type=parse_global_memory_category(fm.get("type")), description=fm.get("description"), header_lines=lines, mtime=mtime, @@ -182,7 +130,7 @@ def scan_memory_files(memory_dir: Path | None = None) -> list[MemoryFile]: Files are sorted by modification time (newest first). Dotfiles and subdirectories are ignored. """ - d = memory_dir or MEMORY_DIR + d = memory_dir or global_memory_dir() if not d.is_dir(): return [] @@ -235,307 +183,30 @@ def build_memory_document( ) -def diary_filename(d: date | None = None) -> str: - """Return the diary memory filename for date *d* (default: today).""" - d = d or date.today() - return f"MEMORY-{d.strftime('%Y-%m-%d')}.md" - - -def build_diary_document(*, date_str: str, body: str) -> str: - """Build a diary memory file with frontmatter.""" - return build_memory_document( - name=f"diary-{date_str}", - description=f"Daily session narrative for {date_str}", - mem_type="diary", - body=body, - ) - - -def validate_global_memory_payload( - *, - category: str, - description: str, - content: str, -) -> str: - """Validate a queen-global memory save request.""" - parsed = parse_global_memory_category(category) - if parsed is None: - raise ValueError( - "Invalid global memory category. Use one of: " + ", ".join(GLOBAL_MEMORY_CATEGORIES) - ) - if not description.strip(): - raise ValueError("Global memory description cannot be empty.") - if not content.strip(): - raise ValueError("Global memory content cannot be empty.") - - probe = f"{description}\n{content}" - if _GLOBAL_MEMORY_CODE_PATTERN.search(probe): - raise ValueError( - "Global memory is only for durable user profile, preferences, " - "environment, or feedback — not task/code/runtime details." - ) - return parsed - - -def save_global_memory( - *, - category: str, - description: str, - content: str, - name: str | None = None, - memory_dir: Path | None = None, -) -> tuple[str, Path]: - """Persist one queen-global memory entry.""" - parsed = validate_global_memory_payload( - category=category, - description=description, - content=content, - ) - target_dir = memory_dir or global_memory_dir() - target_dir.mkdir(parents=True, exist_ok=True) - memory_name = (name or description).strip() - filename = allocate_memory_filename(target_dir, memory_name) - doc = build_memory_document( - name=memory_name, - description=description, - mem_type=parsed, - body=content, - ) - if len(doc.encode("utf-8")) > MAX_FILE_SIZE_BYTES: - raise ValueError(f"Global memory entry exceeds the {MAX_FILE_SIZE_BYTES} byte limit.") - path = target_dir / filename - path.write_text(doc, encoding="utf-8") - return filename, path - - # --------------------------------------------------------------------------- # Manifest formatting # --------------------------------------------------------------------------- -def _age_label(mtime: float) -> str: - """Human-readable age string from an mtime.""" - age_days = memory_age_days(mtime) - if age_days <= 0: - return "today" - if age_days == 1: - return "1 day ago" - return f"{age_days} days ago" - - def format_memory_manifest(files: list[MemoryFile]) -> str: - """One-line-per-file text manifest for the recall selector / reflection agent. + """One-line-per-file text manifest. - Format: ``[type] filename (age): description`` + Format: ``[type] filename: description`` """ lines: list[str] = [] for mf in files: t = mf.type or "unknown" desc = mf.description or "(no description)" - age = _age_label(mf.mtime) - lines.append(f"[{t}] {mf.filename} ({age}): {desc}") + lines.append(f"[{t}] {mf.filename}: {desc}") return "\n".join(lines) # --------------------------------------------------------------------------- -# Freshness / staleness -# --------------------------------------------------------------------------- - -_SECONDS_PER_DAY = 86_400 - - -def memory_age_days(mtime: float) -> int: - """Return the age of a memory file in whole days.""" - if mtime <= 0: - return 0 - return int((time.time() - mtime) / _SECONDS_PER_DAY) - - -def memory_freshness_text(mtime: float) -> str: - """Return a staleness warning for injection, or empty string if fresh.""" - d = memory_age_days(mtime) - if d <= 1: - return "" - return ( - f"This memory is {d} days old. " - "Memories are point-in-time observations, not live state — " - "claims about code behavior or file:line citations may be outdated. " - "Verify against current code before asserting as fact." - ) - - -# --------------------------------------------------------------------------- -# Cursor-based incremental processing +# Initialisation # --------------------------------------------------------------------------- -async def read_conversation_parts(session_dir: Path) -> list[dict[str, Any]]: - """Read all conversation parts for a session using FileConversationStore. - - Returns a list of raw message dicts in sequence order. - """ - from framework.storage.conversation_store import FileConversationStore - - store = FileConversationStore(session_dir / "conversations") - return await store.read_parts() - - -# --------------------------------------------------------------------------- -# Initialisation and legacy migration -# --------------------------------------------------------------------------- - - -def init_memory_dir( - memory_dir: Path | None = None, - *, - migrate_legacy: bool = False, -) -> None: - """Create the memory directory if missing. - - When ``migrate_legacy`` is true, migrate both v1 memory files and the - previous shared v2 queen memory store into this directory. - """ - d = memory_dir or MEMORY_DIR - first_run = not d.exists() +def init_memory_dir(memory_dir: Path | None = None) -> None: + """Create the memory directory if missing.""" + d = memory_dir or global_memory_dir() d.mkdir(parents=True, exist_ok=True) - if migrate_legacy: - migrate_legacy_memories(d) - migrate_shared_v2_memories(d) - elif first_run and d == MEMORY_DIR: - migrate_legacy_memories(d) - - -def migrate_legacy_memories(memory_dir: Path | None = None) -> None: - """Convert old MEMORY.md + MEMORY-YYYY-MM-DD.md files to individual memory files. - - Originals are moved to ``{memory_dir}/.legacy/``. - """ - d = memory_dir or MEMORY_DIR - queen_dir = _HIVE_QUEEN_DIR - legacy_archive = d / ".legacy" - - migrated_any = False - - # --- Semantic memory (MEMORY.md) --- - semantic = queen_dir / "MEMORY.md" - if semantic.exists(): - content = semantic.read_text(encoding="utf-8").strip() - # Skip the blank seed template. - if content and not content.startswith("# My Understanding of the User\n\n*No sessions"): - _write_migration_file( - d, - filename="legacy-semantic-memory.md", - name="legacy-semantic-memory", - mem_type="reference", - description="Migrated semantic memory from previous memory system", - body=content, - ) - migrated_any = True - # Archive original. - legacy_archive.mkdir(parents=True, exist_ok=True) - semantic.rename(legacy_archive / "MEMORY.md") - - # --- Episodic memories (MEMORY-YYYY-MM-DD.md) --- - old_memories_dir = queen_dir / "memories" - if old_memories_dir.is_dir(): - for ep_file in sorted(old_memories_dir.glob("MEMORY-*.md")): - content = ep_file.read_text(encoding="utf-8").strip() - if not content: - continue - date_part = ep_file.stem.replace("MEMORY-", "") - slug = f"legacy-diary-{date_part}.md" - _write_migration_file( - d, - filename=slug, - name=f"legacy-diary-{date_part}", - mem_type="diary", - description=f"Migrated diary entry from {date_part}", - body=content, - ) - migrated_any = True - # Archive original. - legacy_archive.mkdir(parents=True, exist_ok=True) - ep_file.rename(legacy_archive / ep_file.name) - - if migrated_any: - logger.info("queen_memory_v2: migrated legacy memory files to %s", d) - - -def migrate_shared_v2_memories( - memory_dir: Path | None = None, - *, - source_dir: Path | None = None, -) -> None: - """Move shared queen v2 memory files into a colony directory once.""" - d = memory_dir or MEMORY_DIR - d.mkdir(parents=True, exist_ok=True) - src = source_dir or MEMORY_DIR - if d.resolve() == src.resolve(): - return - - marker = d / _MIGRATION_MARKER - if marker.exists(): - return - - if not src.is_dir(): - return - - md_files = sorted(f for f in src.glob("*.md") if f.is_file() and not f.name.startswith(".")) - if not md_files: - marker.write_text("no shared memories found\n", encoding="utf-8") - return - - archive = src / ".legacy_colony_migration" - archive.mkdir(parents=True, exist_ok=True) - migrated_any = False - - for src_file in md_files: - target = d / src_file.name - if not target.exists(): - try: - shutil.copy2(src_file, target) - migrated_any = True - except OSError: - logger.debug("shared memory migration copy failed for %s", src_file, exc_info=True) - continue - - archived = archive / src_file.name - counter = 2 - while archived.exists(): - archived = archive / f"{src_file.stem}-{counter}{src_file.suffix}" - counter += 1 - try: - src_file.rename(archived) - except OSError: - logger.debug("shared memory migration archive failed for %s", src_file, exc_info=True) - - if migrated_any: - logger.info("queen_memory_v2: migrated shared queen memories to %s", d) - marker.write_text( - f"migrated_at={int(time.time())}\nsource={src}\n", - encoding="utf-8", - ) - - -def _write_migration_file( - memory_dir: Path, - filename: str, - name: str, - mem_type: str, - description: str, - body: str, -) -> None: - """Write a single migrated memory file with frontmatter.""" - # Truncate body to respect file size limit (leave room for frontmatter). - header = f"---\nname: {name}\ndescription: {description}\ntype: {mem_type}\n---\n\n" - max_body = MAX_FILE_SIZE_BYTES - len(header.encode("utf-8")) - if len(body.encode("utf-8")) > max_body: - # Rough truncation — cut at character level then trim to last newline. - body = body[: max_body - 20] - nl = body.rfind("\n") - if nl > 0: - body = body[:nl] - body += "\n\n...(truncated during migration)" - - path = memory_dir / filename - path.write_text(header + body + "\n", encoding="utf-8") diff --git a/core/framework/agents/queen/recall_selector.py b/core/framework/agents/queen/recall_selector.py index 0c9a6026..897652d5 100644 --- a/core/framework/agents/queen/recall_selector.py +++ b/core/framework/agents/queen/recall_selector.py @@ -1,11 +1,11 @@ -"""Recall selector — pre-turn memory selection for queen and worker memory. +"""Recall selector — pre-turn global memory selection for the queen. Before each conversation turn the system: - 1. Scans the memory directory for ``.md`` files (cap: 200). + 1. Scans the global memory directory for ``.md`` files (cap: 200). 2. Reads headers (frontmatter + first 30 lines). 3. Uses a single LLM call with structured JSON output to pick the ~5 most relevant memories. - 4. Injects them into context with staleness warnings for older ones. + 4. Injects them into the system prompt. The selector only sees the user's query string — no full conversation context. This keeps it cheap and fast. Errors are caught and return @@ -20,9 +20,8 @@ from pathlib import Path from typing import Any from framework.agents.queen.queen_memory_v2 import ( - MEMORY_DIR, format_memory_manifest, - memory_freshness_text, + global_memory_dir, scan_memory_files, ) @@ -72,9 +71,6 @@ name and description. query, then do not include it in your list. Be selective and discerning. - If there are no memories in the list that would clearly be useful, \ return an empty list. -- If a list of recently-used tools is provided, do not select memories \ -that are usage reference or API documentation for those tools (the Queen \ -is already exercising them). Still select warnings or gotchas about them. """ # --------------------------------------------------------------------------- @@ -86,7 +82,6 @@ async def select_memories( query: str, llm: Any, memory_dir: Path | None = None, - active_tools: list[str] | None = None, *, max_results: int = 5, ) -> list[str]: @@ -94,20 +89,13 @@ async def select_memories( Returns a list of filenames. Best-effort: on any error returns ``[]``. """ - mem_dir = memory_dir or MEMORY_DIR + mem_dir = memory_dir or global_memory_dir() files = scan_memory_files(mem_dir) if not files: - logger.debug("recall: no memory files found, skipping selection") return [] - logger.debug("recall: selecting from %d memory files for query: %.80s", len(files), query) manifest = format_memory_manifest(files) - - user_msg_parts = [f"## User query\n\n{query}\n\n## Available memories\n\n{manifest}"] - if active_tools: - user_msg_parts.append(f"\n\n## Recently-used tools\n\n{', '.join(active_tools)}") - - user_msg = "".join(user_msg_parts) + user_msg = f"## User query\n\n{query}\n\n## Available memories\n\n{manifest}" try: resp = await llm.acomplete( @@ -118,7 +106,6 @@ async def select_memories( ) data = json.loads(resp.content) selected = data.get("selected_memories", []) - # Validate: only return filenames that actually exist. valid_names = {f.filename for f in files} result = [s for s in selected if s in valid_names][:max_results] logger.debug("recall: selected %d memories: %s", len(result), result) @@ -131,14 +118,9 @@ async def select_memories( def format_recall_injection( filenames: list[str], memory_dir: Path | None = None, - *, - heading: str = "Selected Memories", ) -> str: - """Read selected memory files and format for system prompt injection. - - Prepends a staleness warning for memories older than 1 day. - """ - mem_dir = memory_dir or MEMORY_DIR + """Read selected memory files and format for system prompt injection.""" + mem_dir = memory_dir or global_memory_dir() if not filenames: return "" @@ -151,67 +133,43 @@ def format_recall_injection( content = path.read_text(encoding="utf-8").strip() except OSError: continue - - try: - mtime = path.stat().st_mtime - except OSError: - mtime = 0.0 - - freshness = memory_freshness_text(mtime) - header = f"### {fname}" - if freshness: - header += f"\n\n> {freshness}" - blocks.append(f"{header}\n\n{content}") + blocks.append(f"### {fname}\n\n{content}") if not blocks: return "" body = "\n\n---\n\n".join(blocks) - logger.debug("recall: injecting %d memory blocks into context", len(blocks)) - return f"--- {heading} ---\n\n{body}\n\n--- End {heading} ---" + return f"--- Global Memories ---\n\n{body}\n\n--- End Global Memories ---" # --------------------------------------------------------------------------- -# Cache update (called after each queen turn) +# Cache update (called after each queen reflection) # --------------------------------------------------------------------------- async def update_recall_cache( session_dir: Path, llm: Any, - phase_state: Any | None = None, memory_dir: Path | None = None, *, cache_setter: Any = None, - heading: str = "Selected Memories", - active_tools: list[str] | None = None, ) -> None: - """Update the recall cache on *phase_state* for the next turn. + """Update the recall cache for the next turn. Reads the latest user message from conversation parts to use as the query for memory selection. """ - mem_dir = memory_dir or MEMORY_DIR + mem_dir = memory_dir or global_memory_dir() - # Extract latest user message as the query. query = _extract_latest_user_query(session_dir) if not query: - logger.debug("recall: no user query found, skipping cache update") return - logger.debug("recall: updating cache for query: %.80s", query) try: - selected = await select_memories( - query, - llm, - mem_dir, - active_tools=active_tools, - ) - injection = format_recall_injection(selected, mem_dir, heading=heading) + selected = await select_memories(query, llm, mem_dir) + injection = format_recall_injection(selected, mem_dir) if cache_setter is not None: cache_setter(injection) - elif phase_state is not None: - phase_state._cached_recall_block = injection except Exception: logger.debug("recall: cache update failed", exc_info=True) @@ -223,13 +181,12 @@ def _extract_latest_user_query(session_dir: Path) -> str: return "" part_files = sorted(parts_dir.glob("*.json"), reverse=True) - for f in part_files[:20]: # Look back at most 20 messages. + for f in part_files[:20]: try: data = json.loads(f.read_text(encoding="utf-8")) if data.get("role") == "user": content = str(data.get("content", "")).strip() if content: - # Truncate very long queries. return content[:1000] if len(content) > 1000 else content except (json.JSONDecodeError, OSError): continue diff --git a/core/framework/agents/queen/reflection_agent.py b/core/framework/agents/queen/reflection_agent.py index 2730d207..bab900db 100644 --- a/core/framework/agents/queen/reflection_agent.py +++ b/core/framework/agents/queen/reflection_agent.py @@ -1,21 +1,17 @@ -"""Reflect agent — background memory extraction for queen and worker memory. +"""Reflection agent — background global memory extraction for the queen. -A lightweight side agent that runs after each queen LLM turn. It -inspects recent conversation messages (cursor-based incremental -processing) and extracts learnings into individual memory files. +A lightweight side agent that runs after each queen LLM turn. It inspects +recent conversation messages and extracts durable user knowledge into +individual memory files in ``~/.hive/queen/global_memory/``. Two reflection types: - - **Short reflection**: every queen turn. Distills learnings. Nudged - toward a 2-turn pattern (batch reads → batch writes). - - **Long reflection**: every 5 short reflections, on CONTEXT_COMPACTED, - and at session end. Organises, deduplicates, trims holistically. + - **Short reflection**: after conversational queen turns. Distills + learnings about the user (profile, preferences, environment, feedback). + - **Long reflection**: every 5 short reflections and on CONTEXT_COMPACTED. + Organises, deduplicates, trims the global memory directory. -The agent has restricted tool access: it can only read/write/delete -memory files in ``~/.hive/queen/memories/`` and list them. - -Concurrency: an ``asyncio.Lock`` prevents overlapping runs. If a -trigger fires while a reflection is already active the event is skipped -(cursor hasn't advanced, so messages will be reconsidered next time). +Concurrency: an ``asyncio.Lock`` prevents overlapping runs. If a trigger +fires while a reflection is already active the event is skipped. """ from __future__ import annotations @@ -30,16 +26,12 @@ from pathlib import Path from typing import Any from framework.agents.queen.queen_memory_v2 import ( + GLOBAL_MEMORY_CATEGORIES, MAX_FILE_SIZE_BYTES, MAX_FILES, - MEMORY_DIR, - MEMORY_FRONTMATTER_EXAMPLE, - MEMORY_TYPES, - build_diary_document, - diary_filename, format_memory_manifest, + global_memory_dir, parse_frontmatter, - read_conversation_parts, scan_memory_files, ) from framework.llm.provider import LLMResponse, Tool @@ -54,7 +46,7 @@ _REFLECTION_TOOLS: list[Tool] = [ Tool( name="list_memory_files", description=( - "List all memory files with their type, name, age, and description. " + "List all memory files with their type, name, and description. " "Returns a text manifest — one line per file." ), parameters={ @@ -135,28 +127,7 @@ def _safe_memory_path(filename: str, memory_dir: Path) -> Path: return candidate -# Memory types that workers are NOT allowed to write. -_WORKER_BLOCKED_TYPES: frozenset[str] = frozenset( - {"environment", "technique", "reference", "diary", "goal"} -) - - -def _inject_last_modified_by(content: str, caller: str) -> str: - """Inject or update ``last_modified_by`` in frontmatter.""" - m = re.match(r"^---\s*\n(.*?)\n---", content, re.DOTALL) - if not m: - return content - fm_body = m.group(1) - # Remove existing last_modified_by line if present. - fm_lines = [ - ln for ln in fm_body.splitlines() if not ln.strip().lower().startswith("last_modified_by") - ] - fm_lines.append(f"last_modified_by: {caller}") - new_fm = "\n".join(fm_lines) - return f"---\n{new_fm}\n---{content[m.end() :]}" - - -def _execute_tool(name: str, args: dict[str, Any], memory_dir: Path, caller: str) -> str: +def _execute_tool(name: str, args: dict[str, Any], memory_dir: Path) -> str: """Execute a reflection tool synchronously. Returns the result string.""" if name == "list_memory_files": files = scan_memory_files(memory_dir) @@ -183,16 +154,14 @@ def _execute_tool(name: str, args: dict[str, Any], memory_dir: Path, caller: str content = args.get("content", "") if not filename.endswith(".md"): return "ERROR: Filename must end with .md" - # Enforce caller-based type restrictions. + # Enforce global memory type restrictions. fm = parse_frontmatter(content) mem_type = (fm.get("type") or "").strip().lower() - if caller == "worker" and mem_type in _WORKER_BLOCKED_TYPES: + if mem_type and mem_type not in GLOBAL_MEMORY_CATEGORIES: return ( - f"ERROR: Workers cannot write memory type '{mem_type}'. " - f"Blocked types for workers: {', '.join(sorted(_WORKER_BLOCKED_TYPES))}." + f"ERROR: Invalid memory type '{mem_type}'. " + f"Allowed types: {', '.join(GLOBAL_MEMORY_CATEGORIES)}." ) - # Inject last_modified_by into frontmatter. - content = _inject_last_modified_by(content, caller) # Enforce file size limit. if len(content.encode("utf-8")) > MAX_FILE_SIZE_BYTES: return f"ERROR: Content exceeds {MAX_FILE_SIZE_BYTES} byte limit." @@ -207,9 +176,7 @@ def _execute_tool(name: str, args: dict[str, Any], memory_dir: Path, caller: str return f"ERROR: File cap reached ({MAX_FILES}). Delete a file first." memory_dir.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") - logger.debug( - "reflect: tool write_memory_file [%s] → %s (%d chars)", caller, filename, len(content) - ) + logger.debug("reflect: tool write_memory_file → %s (%d chars)", filename, len(content)) return f"Wrote {filename} ({len(content)} chars)." if name == "delete_memory_file": @@ -221,7 +188,7 @@ def _execute_tool(name: str, args: dict[str, Any], memory_dir: Path, caller: str if not path.exists(): return f"ERROR: File not found: {filename}" path.unlink() - logger.debug("reflect: tool delete_memory_file [%s] → %s", caller, filename) + logger.debug("reflect: tool delete_memory_file → %s", filename) return f"Deleted {filename}." return f"ERROR: Unknown tool: {name}" @@ -239,36 +206,17 @@ async def _reflection_loop( system: str, user_msg: str, memory_dir: Path, - caller: str, max_turns: int = _MAX_TURNS, ) -> tuple[bool, list[str], str]: """Run a mini tool-use loop: LLM → tool calls → repeat. - Hard cap of *max_turns* iterations. Prompt nudges the LLM toward a - 2-turn pattern (batch reads in turn 1, batch writes in turn 2). - - Returns a tuple of (success, changed_files, last_text) where *success* - is ``True`` if the loop completed without LLM errors, *changed_files* - lists filenames that were written or deleted, and *last_text* is the - final assistant text (useful as a skip-reason when no files changed). + Returns (success, changed_files, last_text). """ messages: list[dict[str, Any]] = [{"role": "user", "content": user_msg}] changed_files: list[str] = [] last_text: str = "" - logger.debug("reflect: starting loop (caller=%s, max %d turns)", caller, max_turns) for _turn in range(max_turns): - # Log what we're sending to the LLM. - user_content = messages[-1].get("content", "") if messages else "" - preview = user_content[:300] if isinstance(user_content, str) else str(user_content)[:300] - logger.debug( - "reflect: turn %d — sending %d messages to LLM, last msg role=%s, preview=%s", - _turn, - len(messages), - messages[-1].get("role", "?") if messages else "?", - preview, - ) - try: resp: LLMResponse = await llm.acomplete( messages=messages, @@ -280,41 +228,15 @@ async def _reflection_loop( logger.warning("reflect: LLM call failed", exc_info=True) return False, changed_files, last_text - # Build assistant message. tool_calls_raw: list[dict[str, Any]] = [] if resp.raw_response and isinstance(resp.raw_response, dict): tool_calls_raw = resp.raw_response.get("tool_calls", []) - # Log the full LLM response for debugging. - raw_keys = ( - list(resp.raw_response.keys()) - if isinstance(resp.raw_response, dict) - else type(resp.raw_response).__name__ - ) - logger.debug( - "reflect: turn %d — LLM response: content=%r (len=%d), stop_reason=%s, " - "tool_calls=%d, model=%s, tokens=%d/%d, raw_keys=%s", - _turn, - (resp.content or "")[:200], - len(resp.content or ""), - resp.stop_reason, - len(tool_calls_raw), - resp.model, - resp.input_tokens, - resp.output_tokens, - raw_keys, - ) - # Accumulate non-empty text across turns so we don't lose a reason - # given alongside tool calls on an earlier turn. turn_text = resp.content or "" if turn_text: last_text = turn_text - assistant_msg: dict[str, Any] = { - "role": "assistant", - "content": turn_text, - } + assistant_msg: dict[str, Any] = {"role": "assistant", "content": turn_text} if tool_calls_raw: - # Convert to OpenAI format for the conversation. assistant_msg["tool_calls"] = [ { "id": tc["id"], @@ -328,32 +250,16 @@ async def _reflection_loop( ] messages.append(assistant_msg) - # No tool calls → agent is done. if not tool_calls_raw: - logger.debug("reflect: loop done after %d turn(s) (no tool calls)", _turn + 1) break - # Execute each tool call and append results. - logger.debug( - "reflect: turn %d — executing %d tool call(s): %s", - _turn + 1, - len(tool_calls_raw), - [tc["name"] for tc in tool_calls_raw], - ) for tc in tool_calls_raw: - result = _execute_tool(tc["name"], tc.get("input", {}), memory_dir, caller) - # Track files that were written or deleted. + result = _execute_tool(tc["name"], tc.get("input", {}), memory_dir) if tc["name"] in ("write_memory_file", "delete_memory_file"): fname = tc.get("input", {}).get("filename", "") if fname and not result.startswith("ERROR"): changed_files.append(fname) - messages.append( - { - "role": "tool", - "tool_call_id": tc["id"], - "content": result, - } - ) + messages.append({"role": "tool", "tool_call_id": tc["id"], "content": result}) return True, changed_files, last_text @@ -362,51 +268,55 @@ async def _reflection_loop( # System prompts # --------------------------------------------------------------------------- -_FRONTMATTER_EXAMPLE = "\n".join(MEMORY_FRONTMATTER_EXAMPLE) +_CATEGORIES_STR = ", ".join(GLOBAL_MEMORY_CATEGORIES) _SHORT_REFLECT_SYSTEM = f"""\ -You are a reflection agent that distills learnings from a conversation into -persistent memory files. You run in the background after each assistant turn. +You are a reflection agent that distills durable knowledge about the USER +into persistent global memory files. You run in the background after each +assistant turn. -Your goal: identify anything from the recent messages worth remembering across -future sessions — user preferences, project context, techniques that worked, -goals, environment details, reference pointers. +Your goal: identify anything from the recent messages worth remembering +about the user across ALL future sessions — their profile, preferences, +environment setup, or feedback on assistant behavior. -Memory types: {", ".join(MEMORY_TYPES)} +Memory categories: {_CATEGORIES_STR} Expected format for each memory file: -{_FRONTMATTER_EXAMPLE} +```markdown +--- +name: {{{{memory name}}}} +description: {{{{one-line description — specific and search-friendly}}}} +type: {{{{{_CATEGORIES_STR}}}}} +--- + +{{{{memory content}}}} +``` Workflow (aim for 2 turns): - Turn 1 — call list_memory_files to see what already exists, then - read_memory_file for any that might need updating. + Turn 1 — call list_memory_files to see what exists, then read_memory_file + for any that might need updating. Turn 2 — call write_memory_file for new/updated memories. Rules: -- Only persist information that would be useful in a *future* conversation. - Skip ephemeral task details, routine tool output, and anything obvious - from the code or git history. +- ONLY persist durable knowledge about the USER — who they are, how they + like to work, their tech environment, their feedback on your behavior. +- Do NOT store task-specific details, code patterns, file paths, or + ephemeral session state. - Keep files concise. Each file should cover ONE topic. - If an existing memory already covers the learning, UPDATE it rather than creating a duplicate. -- If there is nothing worth remembering from these messages, do nothing - (respond with a brief reason why nothing was saved — no tool calls needed). -- IMPORTANT: Always end with a text message (no tool calls) summarising what - you did or why you skipped. Never end on an empty response. +- If there is nothing worth remembering, do nothing (respond with a brief + reason — no tool calls needed). - File names should be kebab-case slugs ending in .md. -- Include a specific, search-friendly description in the frontmatter. - Do NOT exceed {MAX_FILE_SIZE_BYTES} bytes per file or {MAX_FILES} total files. """ _LONG_REFLECT_SYSTEM = f"""\ You are a reflection agent performing a periodic housekeeping pass over the -memory directory. Your job is to organise, deduplicate, and trim noise from -the accumulated memory files. +global memory directory. Your job is to organise, deduplicate, and trim +noise from the accumulated memory files. -Memory types: {", ".join(MEMORY_TYPES)} - -Expected format for each memory file: -{_FRONTMATTER_EXAMPLE} +Memory categories: {_CATEGORIES_STR} Workflow: 1. list_memory_files to get the full manifest. @@ -420,29 +330,6 @@ Rules: - Remove memories that are no longer relevant or are superseded. - Keep the total collection lean and high-signal. - Do NOT invent new information — only reorganise what exists. -- Do NOT delete or merge MEMORY-*.md diary files. These are daily narratives - managed by a separate process. You may read them for context but should not - modify them. -""" - -_DIARY_SYSTEM = """\ -You maintain a daily diary entry for an AI colony session. You receive: -(1) Today's existing diary content (may be empty if this is the first entry). -(2) A transcript of recent conversation messages. - -Write a cohesive 3-8 sentence narrative about what happened in this session today. -Cover: what the user asked for, what was accomplished, key decisions or obstacles, -and current status. - -Rules: -- If an existing diary is provided, rewrite it as a unified narrative incorporating - the new developments. Merge and deduplicate — do not simply append. -- Keep the total narrative under 3000 characters. -- Focus on the story arc of the day, not individual tool calls or code details. -- If the recent messages contain nothing substantive (greetings, routine - confirmations), return the existing diary text unchanged. -- Output only the diary prose. No headings, no timestamps, no code fences, no - frontmatter. """ @@ -451,31 +338,31 @@ Rules: # --------------------------------------------------------------------------- +async def _read_conversation_parts(session_dir: Path) -> list[dict[str, Any]]: + """Read conversation parts from the queen session directory.""" + from framework.storage.conversation_store import FileConversationStore + + store = FileConversationStore(session_dir / "conversations") + return await store.read_parts() + + async def run_short_reflection( session_dir: Path, llm: Any, memory_dir: Path | None = None, - *, - caller: str, ) -> None: - """Run a short reflection: extract learnings from conversation.""" - mem_dir = memory_dir or MEMORY_DIR + """Run a short reflection: extract user knowledge from conversation.""" + mem_dir = memory_dir or global_memory_dir() - messages = await read_conversation_parts(session_dir) + messages = await _read_conversation_parts(session_dir) if not messages: - logger.debug("reflect: short [%s] — no conversation parts", caller) return - logger.debug("reflect: short [%s] — %d conversation parts", caller, len(messages)) - - # Build a readable transcript from recent messages. transcript_lines: list[str] = [] for msg in messages[-50:]: role = msg.get("role", "") content = str(msg.get("content", "")).strip() - if role == "tool": - continue # Skip verbose tool results. - if not content: + if role == "tool" or not content: continue label = "user" if role == "user" else "assistant" if len(content) > 800: @@ -492,38 +379,24 @@ async def run_short_reflection( f"Timestamp: {datetime.now().isoformat(timespec='minutes')}" ) - _, changed, reason = await _reflection_loop( - llm, - _SHORT_REFLECT_SYSTEM, - user_msg, - mem_dir, - caller=caller, - ) + _, changed, reason = await _reflection_loop(llm, _SHORT_REFLECT_SYSTEM, user_msg, mem_dir) if changed: - logger.debug("reflect: short reflection done [%s], changed files: %s", caller, changed) + logger.debug("reflect: short reflection done, changed files: %s", changed) else: - logger.debug( - "reflect: short reflection done [%s], no changes — %s", - caller, - reason or "no reason given", - ) + logger.debug("reflect: short reflection done, no changes — %s", reason or "no reason") async def run_long_reflection( llm: Any, memory_dir: Path | None = None, - *, - caller: str, ) -> None: - """Run a long reflection: organise and deduplicate all memories.""" - mem_dir = memory_dir or MEMORY_DIR + """Run a long reflection: organise and deduplicate all global memories.""" + mem_dir = memory_dir or global_memory_dir() files = scan_memory_files(mem_dir) if not files: - logger.debug("reflect: long [%s] — no memory files to organise", caller) return - logger.debug("reflect: long [%s] — organising %d memory files", caller, len(files)) manifest = format_memory_manifest(files) user_msg = ( f"## Current memory manifest ({len(files)} files)\n\n" @@ -531,105 +404,21 @@ async def run_long_reflection( f"Timestamp: {datetime.now().isoformat(timespec='minutes')}" ) - _, changed, reason = await _reflection_loop( - llm, - _LONG_REFLECT_SYSTEM, - user_msg, - mem_dir, - caller=caller, - ) + _, changed, reason = await _reflection_loop(llm, _LONG_REFLECT_SYSTEM, user_msg, mem_dir) if changed: - logger.debug( - "reflect: long reflection done [%s] (%d files), changed files: %s", - caller, - len(files), - changed, - ) + logger.debug("reflect: long reflection done (%d files), changed: %s", len(files), changed) else: logger.debug( - "reflect: long reflection done [%s] (%d files), no changes — %s", - caller, + "reflect: long reflection done (%d files), no changes — %s", len(files), - reason or "no reason given", + reason or "no reason", ) -async def run_diary_update( - session_dir: Path, - llm: Any, - memory_dir: Path | None = None, -) -> None: - """Update today's diary file with a narrative of recent activity.""" - mem_dir = memory_dir or MEMORY_DIR - - fname = diary_filename() - diary_path = mem_dir / fname - today_str = datetime.now().strftime("%Y-%m-%d") - - # Read existing diary body (strip frontmatter). - existing_body = "" - if diary_path.exists(): - try: - raw = diary_path.read_text(encoding="utf-8") - m = re.match(r"^---\s*\n.*?\n---\s*\n?", raw, re.DOTALL) - existing_body = raw[m.end() :].strip() if m else raw.strip() - except OSError: - pass - - # Read all conversation messages for context. - messages = await read_conversation_parts(session_dir) - transcript_lines: list[str] = [] - for msg in messages[-40:]: - role = msg.get("role", "") - content = str(msg.get("content", "")).strip() - if role == "tool" or not content: - continue - label = "user" if role == "user" else "assistant" - if len(content) > 600: - content = content[:600] + "..." - transcript_lines.append(f"[{label}]: {content}") - - if not transcript_lines: - return - - transcript = "\n".join(transcript_lines) - user_msg = ( - f"## Today's Diary So Far\n\n" - f"{existing_body or '(no entries yet)'}\n\n" - f"## Recent Conversation\n\n" - f"{transcript}\n\n" - f"Date: {today_str}" - ) - - try: - from framework.agents.queen.config import default_config - - resp = await llm.acomplete( - messages=[{"role": "user", "content": user_msg}], - system=_DIARY_SYSTEM, - max_tokens=min(default_config.max_tokens, 1024), - ) - new_body = (resp.content or "").strip() - if not new_body: - return - - doc = build_diary_document(date_str=today_str, body=new_body) - if len(doc.encode("utf-8")) > MAX_FILE_SIZE_BYTES: - new_body = new_body[:2800] - doc = build_diary_document(date_str=today_str, body=new_body) - - mem_dir.mkdir(parents=True, exist_ok=True) - diary_path.write_text(doc, encoding="utf-8") - logger.debug("diary: updated %s (%d chars)", fname, len(doc)) - except Exception: - logger.warning("diary: update failed", exc_info=True) - - # --------------------------------------------------------------------------- # Event-bus integration # --------------------------------------------------------------------------- -# Run a long reflection every N short reflections. _LONG_REFLECT_INTERVAL = 5 @@ -647,64 +436,41 @@ async def subscribe_reflection_triggers( """ from framework.runtime.event_bus import EventType - mem_dir = memory_dir or MEMORY_DIR + mem_dir = memory_dir or global_memory_dir() _lock = asyncio.Lock() _short_count = 0 async def _on_turn_complete(event: Any) -> None: nonlocal _short_count - # Only process queen turns. if getattr(event, "stream_id", None) != "queen": return _short_count += 1 - # Decide whether to reflect: only when the LLM turn ended without - # tool calls (a conversational response) OR every _LONG_REFLECT_INTERVAL turns. event_data = getattr(event, "data", {}) or {} stop_reason = event_data.get("stop_reason", "") is_tool_turn = stop_reason in ("tool_use", "tool_calls") is_interval = _short_count % _LONG_REFLECT_INTERVAL == 0 if is_tool_turn and not is_interval: - logger.debug( - "reflect: skipping turn %d (stop_reason=%s, next reflect at %d)", - _short_count, - stop_reason, - (_short_count // _LONG_REFLECT_INTERVAL + 1) * _LONG_REFLECT_INTERVAL, - ) return if _lock.locked(): - logger.debug("reflect: skipping — reflection already in progress") return async with _lock: try: - logger.debug( - "reflect: turn complete — count %d/%d (stop_reason=%s)", - _short_count, - _LONG_REFLECT_INTERVAL, - stop_reason, - ) if is_interval: - await run_short_reflection(session_dir, llm, mem_dir, caller="queen") - await run_long_reflection(llm, mem_dir, caller="queen") + await run_short_reflection(session_dir, llm, mem_dir) + await run_long_reflection(llm, mem_dir) else: - await run_short_reflection(session_dir, llm, mem_dir, caller="queen") + await run_short_reflection(session_dir, llm, mem_dir) except Exception: logger.warning("reflect: reflection failed", exc_info=True) _write_error("short/long reflection") - # Update daily diary after reflection. - try: - await run_diary_update(session_dir, llm, mem_dir) - except Exception: - logger.warning("reflect: diary update failed", exc_info=True) - - # Update recall cache after reflection completes, guaranteeing - # recall sees the current turn's extracted memories. + # Update recall cache after reflection so next turn sees new memories. if phase_state is not None: try: from framework.agents.queen.recall_selector import update_recall_cache @@ -712,21 +478,10 @@ async def subscribe_reflection_triggers( await update_recall_cache( session_dir, llm, - cache_setter=lambda block: ( - setattr(phase_state, "_cached_colony_recall_block", block), - setattr(phase_state, "_cached_recall_block", block), - ), memory_dir=mem_dir, - heading="Colony Memories", - ) - await update_recall_cache( - session_dir, - llm, cache_setter=lambda block: setattr( phase_state, "_cached_global_recall_block", block ), - memory_dir=getattr(phase_state, "global_memory_dir", None), - heading="Global Memories", ) except Exception: logger.debug("recall: cache update failed", exc_info=True) @@ -734,13 +489,11 @@ async def subscribe_reflection_triggers( async def _on_compaction(event: Any) -> None: if getattr(event, "stream_id", None) != "queen": return - if _lock.locked(): return - async with _lock: try: - await run_long_reflection(llm, mem_dir, caller="queen") + await run_long_reflection(llm, mem_dir) except Exception: logger.warning("reflect: compaction-triggered reflection failed", exc_info=True) _write_error("compaction reflection") @@ -762,68 +515,10 @@ async def subscribe_reflection_triggers( return sub_ids -async def subscribe_worker_memory_triggers( - event_bus: Any, - llm: Any, - *, - worker_sessions_dir: Path, - colony_memory_dir: Path, - recall_cache: dict[str, str], -) -> list[str]: - """Subscribe colony memory lifecycle events for worker runs. - - Short reflection is now handled synchronously at node handoff in - ``WorkerAgent._reflect_colony_memory()``. This function only manages: - - Recall cache initialisation on execution start - - Final long reflection + cleanup on execution end - """ - from framework.runtime.event_bus import EventType - - _terminal_lock = asyncio.Lock() - - def _is_worker_event(event: Any) -> bool: - return bool( - getattr(event, "execution_id", None) - and getattr(event, "stream_id", None) not in ("queen", "judge") - ) - - async def _on_execution_started(event: Any) -> None: - if not _is_worker_event(event): - return - if event.execution_id is not None: - recall_cache[event.execution_id] = "" - - async def _on_execution_terminal(event: Any) -> None: - if not _is_worker_event(event): - return - execution_id = event.execution_id - if execution_id is None: - return - async with _terminal_lock: - try: - await run_long_reflection(llm, colony_memory_dir, caller="worker") - except Exception: - logger.warning("reflect: worker final reflection failed", exc_info=True) - _write_error("worker final reflection") - finally: - recall_cache.pop(execution_id, None) - - return [ - event_bus.subscribe( - event_types=[EventType.EXECUTION_STARTED], - handler=_on_execution_started, - ), - event_bus.subscribe( - event_types=[EventType.EXECUTION_COMPLETED, EventType.EXECUTION_FAILED], - handler=_on_execution_terminal, - ), - ] - - def _write_error(context: str) -> None: """Best-effort write of the last traceback to an error file.""" try: - error_path = MEMORY_DIR / ".reflection_error.txt" + error_path = global_memory_dir() / ".reflection_error.txt" error_path.parent.mkdir(parents=True, exist_ok=True) error_path.write_text( f"context: {context}\ntime: {datetime.now().isoformat()}\n\n{traceback.format_exc()}", diff --git a/core/framework/graph/context.py b/core/framework/graph/context.py index 85dc07ee..5b4fba4d 100644 --- a/core/framework/graph/context.py +++ b/core/framework/graph/context.py @@ -67,12 +67,6 @@ class GraphContext: # Retry tracking: worker_id → retry_count (for execution quality assessment) retry_counts: dict[str, int] = field(default_factory=dict) nodes_with_retries: set[str] = field(default_factory=set) - # Colony memory reflection at node handoff - colony_memory_dir: Any = None # Path | None - worker_sessions_dir: Any = None # Path | None - colony_recall_cache: dict[str, str] = field(default_factory=dict) - colony_reflect_llm: Any = None # LLMProvider for reflection - _colony_reflect_lock: asyncio.Lock = field(default_factory=asyncio.Lock) def build_scoped_buffer(buffer: DataBuffer, node_spec: NodeSpec) -> DataBuffer: diff --git a/core/framework/graph/executor.py b/core/framework/graph/executor.py index 38368dca..c2015744 100644 --- a/core/framework/graph/executor.py +++ b/core/framework/graph/executor.py @@ -160,10 +160,6 @@ class GraphExecutor: skill_dirs: list[str] | None = None, context_warn_ratio: float | None = None, batch_init_nudge: str | None = None, - colony_memory_dir: Any = None, - colony_worker_sessions_dir: Any = None, - colony_recall_cache: dict[str, str] | None = None, - colony_reflect_llm: Any = None, ): """ Initialize the executor. @@ -232,11 +228,6 @@ class GraphExecutor: self.skill_dirs: list[str] = skill_dirs or [] self.context_warn_ratio: float | None = context_warn_ratio self.batch_init_nudge: str | None = batch_init_nudge - self.colony_memory_dir = colony_memory_dir - self.colony_worker_sessions_dir = colony_worker_sessions_dir - self.colony_recall_cache = colony_recall_cache or {} - self.colony_reflect_llm = colony_reflect_llm - if protocols_prompt: self.logger.info( "GraphExecutor[%s] received protocols_prompt (%d chars)", @@ -1345,10 +1336,6 @@ class GraphExecutor: iteration_metadata_provider=self.iteration_metadata_provider, loop_config=self._loop_config, node_visit_counts=dict(node_visit_counts), - colony_memory_dir=self.colony_memory_dir, - worker_sessions_dir=self.colony_worker_sessions_dir, - colony_recall_cache=self.colony_recall_cache, - colony_reflect_llm=self.colony_reflect_llm, ) # Create one WorkerAgent per node diff --git a/core/framework/graph/worker_agent.py b/core/framework/graph/worker_agent.py index 54117b26..ce9d8b4e 100644 --- a/core/framework/graph/worker_agent.py +++ b/core/framework/graph/worker_agent.py @@ -320,8 +320,6 @@ class WorkerAgent: self.lifecycle = WorkerLifecycle.COMPLETED self._last_result = result self._last_activations = activations - # Colony memory reflection — runs before downstream activation - await self._reflect_colony_memory() completion = WorkerCompletion( worker_id=node_spec.id, success=True, @@ -342,8 +340,6 @@ class WorkerAgent: self.lifecycle = WorkerLifecycle.FAILED self._last_result = result self._last_activations = activations - # Colony memory reflection — capture learnings even on failure - await self._reflect_colony_memory() await self._publish_failure(result.error or "Unknown error") except Exception as exc: error = str(exc) or type(exc).__name__ @@ -658,61 +654,6 @@ class WorkerAgent: pause_event=self._pause_requested, ) - async def _reflect_colony_memory(self) -> None: - """Run colony memory reflection at node handoff. - - Awaits the shared colony lock so parallel workers queue (never skip). - """ - gc = self._gc - if gc.colony_memory_dir is None or gc.colony_reflect_llm is None: - return - if gc.worker_sessions_dir is None: - return - - from pathlib import Path - - session_dir = Path(gc.worker_sessions_dir) / gc.execution_id - if not session_dir.exists(): - return - - # Await lock — serializes reflection but never skips - async with gc._colony_reflect_lock: - try: - from framework.agents.queen.reflection_agent import run_short_reflection - - await run_short_reflection( - session_dir, - gc.colony_reflect_llm, - gc.colony_memory_dir, - caller="worker", - ) - except Exception: - logger.warning( - "Worker %s: colony reflection failed", - self.node_spec.id, - exc_info=True, - ) - - # Update recall cache outside lock (per-execution key, no write races) - try: - from framework.agents.queen.recall_selector import update_recall_cache - - await update_recall_cache( - session_dir, - gc.colony_reflect_llm, - memory_dir=gc.colony_memory_dir, - cache_setter=lambda block: gc.colony_recall_cache.__setitem__( - gc.execution_id, block - ), - heading="Colony Memories", - ) - except Exception: - logger.warning( - "Worker %s: recall cache update failed", - self.node_spec.id, - exc_info=True, - ) - # ------------------------------------------------------------------ # Event publishing # ------------------------------------------------------------------ diff --git a/core/framework/runtime/agent_runtime.py b/core/framework/runtime/agent_runtime.py index f21faf6d..320e5371 100644 --- a/core/framework/runtime/agent_runtime.py +++ b/core/framework/runtime/agent_runtime.py @@ -241,11 +241,6 @@ class AgentRuntime: self._dynamic_memory_provider_factory: Callable[[str], Callable[[], str] | None] | None = ( None ) - # Colony memory config for reflection-at-handoff (set by session_manager) - self._colony_memory_dir: Any = None - self._colony_worker_sessions_dir: Any = None - self._colony_recall_cache: dict[str, str] | None = None - self._colony_reflect_llm: Any = None self._accounts_data = accounts_data self._tool_provider_map = tool_provider_map @@ -369,10 +364,6 @@ class AgentRuntime: context_warn_ratio=self.context_warn_ratio, batch_init_nudge=self.batch_init_nudge, dynamic_memory_provider_factory=self._dynamic_memory_provider_factory, - colony_memory_dir=self._colony_memory_dir, - colony_worker_sessions_dir=self._colony_worker_sessions_dir, - colony_recall_cache=self._colony_recall_cache, - colony_reflect_llm=self._colony_reflect_llm, ) await stream.start() self._streams[ep_id] = stream diff --git a/core/framework/runtime/execution_stream.py b/core/framework/runtime/execution_stream.py index ff5f3245..6ff2f89b 100644 --- a/core/framework/runtime/execution_stream.py +++ b/core/framework/runtime/execution_stream.py @@ -192,10 +192,6 @@ class ExecutionStream: context_warn_ratio: float | None = None, batch_init_nudge: str | None = None, dynamic_memory_provider_factory: Callable[[str], Callable[[], str] | None] | None = None, - colony_memory_dir: Any = None, - colony_worker_sessions_dir: Any = None, - colony_recall_cache: dict[str, str] | None = None, - colony_reflect_llm: Any = None, ): """ Initialize execution stream. @@ -251,10 +247,6 @@ class ExecutionStream: self._context_warn_ratio: float | None = context_warn_ratio self._batch_init_nudge: str | None = batch_init_nudge self._dynamic_memory_provider_factory = dynamic_memory_provider_factory - self._colony_memory_dir = colony_memory_dir - self._colony_worker_sessions_dir = colony_worker_sessions_dir - self._colony_recall_cache = colony_recall_cache - self._colony_reflect_llm = colony_reflect_llm _es_logger = logging.getLogger(__name__) if protocols_prompt: @@ -735,10 +727,6 @@ class ExecutionStream: if self._dynamic_memory_provider_factory is not None else None ), - colony_memory_dir=self._colony_memory_dir, - colony_worker_sessions_dir=self._colony_worker_sessions_dir, - colony_recall_cache=self._colony_recall_cache, - colony_reflect_llm=self._colony_reflect_llm, ) # Track executor so inject_input() can reach EventLoopNode instances self._active_executors[execution_id] = executor diff --git a/core/framework/server/queen_orchestrator.py b/core/framework/server/queen_orchestrator.py index fd73ae80..906114af 100644 --- a/core/framework/server/queen_orchestrator.py +++ b/core/framework/server/queen_orchestrator.py @@ -183,16 +183,13 @@ async def create_queen( phase_state.running_tools = [t for t in queen_tools if t.name in running_names] phase_state.editing_tools = [t for t in queen_tools if t.name in editing_names] - # ---- Cross-session memory ---------------------------------------- + # ---- Global memory ------------------------------------------------- from framework.agents.queen.queen_memory_v2 import ( - colony_memory_dir, global_memory_dir, init_memory_dir, ) - colony_dir = colony_memory_dir(session.id) global_dir = global_memory_dir() - init_memory_dir(colony_dir, migrate_legacy=True) init_memory_dir(global_dir) phase_state.global_memory_dir = global_dir @@ -282,11 +279,8 @@ async def create_queen( _session_event_bus = session.event_bus async def _persona_hook(ctx: HookContext) -> HookResult | None: - from framework.agents.queen.queen_memory import format_for_injection - - memory_context = format_for_injection() result = await select_expert_persona( - ctx.trigger or "", _session_llm, memory_context=memory_context + ctx.trigger or "", _session_llm, memory_context="" ) if not result: return None @@ -410,18 +404,16 @@ async def create_queen( ) session_manager._subscribe_worker_handoffs(session, executor) - # ---- Reflection + recall memory subscriptions ---------------- + # ---- Global memory reflection + recall ------------------------- from framework.agents.queen.reflection_agent import subscribe_reflection_triggers _reflection_subs = await subscribe_reflection_triggers( session.event_bus, queen_dir, session.llm, - memory_dir=colony_dir, + memory_dir=global_dir, phase_state=phase_state, ) - - # Store sub IDs on session for teardown. session.memory_reflection_subs = _reflection_subs logger.info( diff --git a/core/framework/server/session_manager.py b/core/framework/server/session_manager.py index fb718059..96cb688b 100644 --- a/core/framework/server/session_manager.py +++ b/core/framework/server/session_manager.py @@ -45,12 +45,8 @@ class Session: phase_state: Any = None # QueenPhaseState # Worker handoff subscription worker_handoff_sub: str | None = None - # Memory reflection + recall subscriptions + # Memory reflection + recall subscriptions (global memory) memory_reflection_subs: list = field(default_factory=list) # list[str] - # Worker colony memory subscriptions - worker_memory_subs: list = field(default_factory=list) # list[str] - # Per-execution colony recall cache for worker prompts - worker_colony_recall_blocks: dict[str, str] = field(default_factory=dict) # Trigger definitions loaded from agent's triggers.json (available but inactive) available_triggers: dict[str, TriggerDefinition] = field(default_factory=dict) # Active trigger tracking (IDs currently firing + their asyncio tasks) @@ -323,16 +319,6 @@ class SessionManager: runtime = runner._agent_runtime - if runtime is not None: - runtime._dynamic_memory_provider_factory = lambda execution_id, session=session: ( - lambda execution_id=execution_id, session=session: ( - session.worker_colony_recall_blocks.get( - execution_id, - "", - ) - ) - ) - # Load triggers from the agent's triggers.json definition file. from framework.tools.queen_lifecycle_tools import _read_agent_triggers_json @@ -368,18 +354,6 @@ class SessionManager: session.graph_runtime = runtime session.worker_info = info - # Colony memory is additive; worker loading should still succeed if - # that optional subscription path hits an import/runtime issue while - # restoring an older session. - try: - await self._subscribe_worker_colony_memory(session) - except Exception: - logger.warning( - "Worker colony memory subscription failed for '%s'; continuing without it", - resolved_graph_id, - exc_info=True, - ) - async with self._lock: self._loading.discard(session.id) @@ -617,14 +591,6 @@ class SessionManager: await self._emit_trigger_events(session, "removed", session.available_triggers) session.available_triggers.clear() - for sub_id in session.worker_memory_subs: - try: - session.event_bus.unsubscribe(sub_id) - except Exception: - pass - session.worker_memory_subs.clear() - session.worker_colony_recall_blocks.clear() - graph_id = session.graph_id session.graph_id = None session.worker_path = None @@ -650,11 +616,6 @@ class SessionManager: if session is None: return False - # Capture session data for memory consolidation before teardown - _llm = getattr(session, "llm", None) - _storage_id = getattr(session, "queen_resume_from", None) or session_id - _session_dir = Path.home() / ".hive" / "queen" / "session" / _storage_id - if session.worker_handoff_sub is not None: try: session.event_bus.unsubscribe(session.worker_handoff_sub) @@ -662,21 +623,14 @@ class SessionManager: pass session.worker_handoff_sub = None - for sub_id in session.worker_memory_subs: - try: - session.event_bus.unsubscribe(sub_id) - except Exception: - pass - session.worker_memory_subs.clear() - session.worker_colony_recall_blocks.clear() - - # Stop queen and memory reflection/recall subscriptions + # Stop memory reflection/recall subscriptions for sub_id in session.memory_reflection_subs: try: session.event_bus.unsubscribe(sub_id) except Exception: pass session.memory_reflection_subs.clear() + if session.queen_task is not None: session.queen_task.cancel() session.queen_task = None @@ -708,20 +662,6 @@ class SessionManager: except Exception as e: logger.error("Error cleaning up worker: %s", e) - # Final long reflection — fire-and-forget so teardown isn't blocked. - if _llm is not None: - import asyncio - - from framework.agents.queen.queen_memory_v2 import colony_memory_dir - from framework.agents.queen.reflection_agent import run_long_reflection - - asyncio.create_task( - run_long_reflection( - _llm, memory_dir=colony_memory_dir(_storage_id), caller="queen" - ), - name=f"queen-memory-long-reflection-{session_id}", - ) - # Close per-session event log session.event_bus.close_session_log() @@ -757,54 +697,6 @@ class SessionManager: else: logger.warning("Worker handoff received but queen node not ready") - async def _subscribe_worker_colony_memory(self, session: Session) -> None: - """Subscribe shared colony reflection/recall for top-level worker runs.""" - for sub_id in session.worker_memory_subs: - try: - session.event_bus.unsubscribe(sub_id) - except Exception: - pass - session.worker_memory_subs.clear() - session.worker_colony_recall_blocks.clear() - - runtime = session.graph_runtime - if runtime is None: - return - - worker_sessions_dir = getattr(runtime, "_session_store", None) - worker_sessions_dir = getattr(worker_sessions_dir, "sessions_dir", None) - if worker_sessions_dir is None: - return - - from framework.agents.queen.queen_memory_v2 import colony_memory_dir, init_memory_dir - from framework.agents.queen.reflection_agent import subscribe_worker_memory_triggers - - colony_dir = colony_memory_dir(session.id) - init_memory_dir(colony_dir, migrate_legacy=True) - - runtime._dynamic_memory_provider_factory = lambda execution_id, session=session: ( - lambda execution_id=execution_id, session=session: ( - session.worker_colony_recall_blocks.get( - execution_id, - "", - ) - ) - ) - - # Colony memory config for reflection-at-handoff - runtime._colony_memory_dir = colony_dir - runtime._colony_worker_sessions_dir = worker_sessions_dir - runtime._colony_recall_cache = session.worker_colony_recall_blocks - runtime._colony_reflect_llm = session.llm - - session.worker_memory_subs = await subscribe_worker_memory_triggers( - session.event_bus, - session.llm, - worker_sessions_dir=worker_sessions_dir, - colony_memory_dir=colony_dir, - recall_cache=session.worker_colony_recall_blocks, - ) - def _subscribe_worker_handoffs(self, session: Session, executor: Any) -> None: """Subscribe queen to worker/subagent escalation handoff events.""" from framework.runtime.event_bus import EventType as _ET @@ -970,10 +862,6 @@ class SessionManager: except Exception: logger.warning("Cold restore: failed to auto-load worker", exc_info=True) - # Memory reflection/recall subscriptions are set up inside - # queen_orchestrator.create_queen() → _queen_loop() and stored - # on session.memory_reflection_subs for teardown. - # ------------------------------------------------------------------ # Queen notifications # ------------------------------------------------------------------ diff --git a/core/framework/tools/queen_lifecycle_tools.py b/core/framework/tools/queen_lifecycle_tools.py index 1c4ab796..ee7e0cb9 100644 --- a/core/framework/tools/queen_lifecycle_tools.py +++ b/core/framework/tools/queen_lifecycle_tools.py @@ -133,10 +133,9 @@ class QueenPhaseState: persona_prefix: str = "" # e.g. "You are a CFO. I am a CFO with 20 years..." style_directive: str = "" # e.g. "## Communication Style: Peer\n\n..." - # Cached recall block — populated async by recall_selector after each turn. - _cached_recall_block: str = "" - _cached_colony_recall_block: str = "" + # Cached global recall block — populated async by recall_selector after each turn. _cached_global_recall_block: str = "" + # Global memory directory. global_memory_dir: Path | None = None def get_current_tools(self) -> list: @@ -152,7 +151,7 @@ class QueenPhaseState: return list(self.building_tools) def get_current_prompt(self) -> str: - """Return the system prompt for the current phase, with fresh memory appended.""" + """Return the system prompt for the current phase.""" if self.phase == "planning": base = self.prompt_planning elif self.phase == "running": @@ -164,9 +163,6 @@ class QueenPhaseState: else: base = self.prompt_building - from framework.agents.queen.queen_memory import format_for_injection - - _memory = format_for_injection() # noqa: F841 parts = [] if self.persona_prefix: parts.append(self.persona_prefix) @@ -177,9 +173,6 @@ class QueenPhaseState: parts.append(self.skills_catalog_prompt) if self.protocols_prompt: parts.append(self.protocols_prompt) - colony_memory = self._cached_colony_recall_block or self._cached_recall_block - if colony_memory: - parts.append(colony_memory) if self._cached_global_recall_block: parts.append(self._cached_global_recall_block) return "\n\n".join(parts) @@ -4077,89 +4070,6 @@ def register_queen_lifecycle_tools( ) tools_registered += 1 - # --- save_global_memory -------------------------------------------------- - - async def save_global_memory_entry( - category: str, - description: str, - content: str, - name: str | None = None, - ) -> str: - """Persist a queen-global memory entry about the user.""" - from framework.agents.queen.queen_memory_v2 import ( - global_memory_dir as _global_memory_dir, - init_memory_dir as _init_memory_dir, - save_global_memory as _save_global_memory, - ) - - target_dir = ( - phase_state.global_memory_dir - if phase_state is not None and phase_state.global_memory_dir is not None - else _global_memory_dir() - ) - _init_memory_dir(target_dir) - - try: - filename, path = _save_global_memory( - category=category, - description=description, - content=content, - name=name, - memory_dir=target_dir, - ) - return json.dumps( - { - "status": "saved", - "filename": filename, - "path": str(path), - "category": category, - } - ) - except ValueError as exc: - return json.dumps({"error": str(exc)}) - - _save_global_memory_tool = Tool( - name="save_global_memory", - description=( - "Save durable global memory about the user. " - "Only use for user profile, preferences, environment, or feedback." - ), - parameters={ - "type": "object", - "properties": { - "category": { - "type": "string", - "enum": ["profile", "preference", "environment", "feedback"], - }, - "description": { - "type": "string", - "description": "Specific one-line description for future recall selection.", - }, - "content": { - "type": "string", - "description": "Durable user-centric memory content.", - }, - "name": { - "type": "string", - "description": "Optional short memory title.", - }, - }, - "required": ["category", "description", "content"], - "additionalProperties": False, - }, - ) - registry.register( - "save_global_memory", - _save_global_memory_tool, - lambda inputs: save_global_memory_entry( - inputs["category"], - inputs["description"], - inputs["content"], - inputs.get("name"), - ), - ) - tools_registered += 1 - # --- list_triggers --------------------------------------------------------- async def list_triggers() -> str: diff --git a/core/framework/tools/queen_memory_tools.py b/core/framework/tools/queen_memory_tools.py deleted file mode 100644 index 5ec0c84b..00000000 --- a/core/framework/tools/queen_memory_tools.py +++ /dev/null @@ -1,100 +0,0 @@ -"""Tools for the queen to read and write episodic memory. - -The queen can consciously record significant moments during a session — like -writing in a diary — and recall past diary entries when needed. Semantic -memory (MEMORY.md) is updated automatically at session end and is never -written by the queen directly. -""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from framework.runner.tool_registry import ToolRegistry - - -def write_to_diary(entry: str) -> str: - """Write a prose entry to today's episodic memory. - - Use this when something significant just happened: a pipeline went live, the - user shared an important preference, a goal was achieved or abandoned, or - you want to record something that should be remembered across sessions. - - Write in first person, as you would in a private diary. Be specific — what - happened, how the user responded, what it means going forward. One or two - paragraphs is enough. - - You do not need to include a timestamp or date heading; those are added - automatically. - """ - from framework.agents.queen.queen_memory import append_episodic_entry - - append_episodic_entry(entry) - return "Diary entry recorded." - - -def recall_diary(query: str = "", days_back: int = 7) -> str: - """Search recent diary entries (episodic memory). - - Use this when the user asks about what happened in the past — "what did we - do yesterday?", "what happened last week?", "remind me about the pipeline - issue", etc. Also use it proactively when you need context from recent - sessions to answer a question or make a decision. - - Args: - query: Optional keyword or phrase to filter entries. If empty, all - recent entries are returned. - days_back: How many days to look back (1-30). Defaults to 7. - """ - from datetime import date, timedelta - - from framework.agents.queen.queen_memory import format_memory_date, read_episodic_memory - - days_back = max(1, min(int(days_back), 30)) - today = date.today() - results: list[str] = [] - total_chars = 0 - char_budget = 12_000 - - for offset in range(days_back): - d = today - timedelta(days=offset) - content = read_episodic_memory(d) - if not content: - continue - # If a query is given, only include entries that mention it - if query: - # Check each section (split by ###) for relevance - sections = content.split("### ") - matched = [s for s in sections if query.lower() in s.lower()] - if not matched: - continue - content = "### ".join(matched) - label = format_memory_date(d) - if d == today: - label = f"Today — {label}" - entry = f"## {label}\n\n{content}" - if total_chars + len(entry) > char_budget: - remaining = char_budget - total_chars - if remaining > 200: - # Fit a partial entry within budget - trimmed = content[: remaining - 100] + "\n\n…(truncated)" - results.append(f"## {label}\n\n{trimmed}") - else: - results.append(f"## {label}\n\n(truncated — hit size limit)") - break - results.append(entry) - total_chars += len(entry) - - if not results: - if query: - return f"No diary entries matching '{query}' in the last {days_back} days." - return f"No diary entries found in the last {days_back} days." - - return "\n\n---\n\n".join(results) - - -def register_queen_memory_tools(registry: ToolRegistry) -> None: - """Register the episodic memory tools into the queen's tool registry.""" - registry.register_function(write_to_diary) - registry.register_function(recall_diary) diff --git a/core/tests/test_queen_memory.py b/core/tests/test_queen_memory.py index cd7fe65a..efdb1401 100644 --- a/core/tests/test_queen_memory.py +++ b/core/tests/test_queen_memory.py @@ -1,4 +1,4 @@ -"""Tests for the queen memory v2 system (reflection + recall).""" +"""Tests for the queen global memory system (reflection + recall).""" from __future__ import annotations @@ -15,9 +15,7 @@ from framework.agents.queen.recall_selector import ( format_recall_injection, select_memories, ) -from framework.agents.queen.reflection_agent import subscribe_worker_memory_triggers from framework.graph.prompting import build_system_prompt_for_node_context -from framework.runtime.event_bus import AgentEvent, EventBus, EventType from framework.tools.queen_lifecycle_tools import QueenPhaseState # --------------------------------------------------------------------------- @@ -26,9 +24,9 @@ from framework.tools.queen_lifecycle_tools import QueenPhaseState def test_parse_frontmatter_valid(): - text = "---\nname: foo\ntype: goal\ndescription: bar baz\n---\ncontent" + text = "---\nname: foo\ntype: profile\ndescription: bar baz\n---\ncontent" fm = qm.parse_frontmatter(text) - assert fm == {"name": "foo", "type": "goal", "description": "bar baz"} + assert fm == {"name": "foo", "type": "profile", "description": "bar baz"} def test_parse_frontmatter_missing(): @@ -42,34 +40,30 @@ def test_parse_frontmatter_empty(): def test_parse_frontmatter_broken_yaml(): text = "---\n: bad\nno colon\n---\n" fm = qm.parse_frontmatter(text) - # ": bad" has colon at pos 0, so key is empty → skipped - # "no colon" has no colon → skipped assert fm == {} # --------------------------------------------------------------------------- -# parse_memory_type +# parse_global_memory_category # --------------------------------------------------------------------------- -def test_parse_memory_type_valid(): - assert qm.parse_memory_type("goal") == "goal" - assert qm.parse_memory_type("environment") == "environment" - assert qm.parse_memory_type("technique") == "technique" - assert qm.parse_memory_type("reference") == "reference" - assert qm.parse_memory_type("profile") == "profile" - assert qm.parse_memory_type("feedback") == "feedback" +def test_parse_global_memory_category_valid(): + assert qm.parse_global_memory_category("profile") == "profile" + assert qm.parse_global_memory_category("preference") == "preference" + assert qm.parse_global_memory_category("environment") == "environment" + assert qm.parse_global_memory_category("feedback") == "feedback" -def test_parse_memory_type_case_insensitive(): - assert qm.parse_memory_type("Goal") == "goal" - assert qm.parse_memory_type(" TECHNIQUE ") == "technique" +def test_parse_global_memory_category_case_insensitive(): + assert qm.parse_global_memory_category("Profile") == "profile" + assert qm.parse_global_memory_category(" FEEDBACK ") == "feedback" -def test_parse_memory_type_invalid(): - assert qm.parse_memory_type("user") is None - assert qm.parse_memory_type("unknown") is None - assert qm.parse_memory_type(None) is None +def test_parse_global_memory_category_invalid(): + assert qm.parse_global_memory_category("goal") is None + assert qm.parse_global_memory_category("unknown") is None + assert qm.parse_global_memory_category(None) is None # --------------------------------------------------------------------------- @@ -79,11 +73,11 @@ def test_parse_memory_type_invalid(): def test_memory_file_from_path(tmp_path: Path): f = tmp_path / "test.md" - f.write_text("---\nname: test\ntype: goal\ndescription: a test\n---\nbody\n") + f.write_text("---\nname: test\ntype: profile\ndescription: a test\n---\nbody\n") mf = qm.MemoryFile.from_path(f) assert mf.filename == "test.md" assert mf.name == "test" - assert mf.type == "goal" + assert mf.type == "profile" assert mf.description == "a test" assert mf.mtime > 0 @@ -145,7 +139,7 @@ def test_format_memory_manifest(): filename="a.md", path=Path("a.md"), name="a", - type="goal", + type="profile", description="desc a", mtime=time.time(), ), @@ -159,54 +153,12 @@ def test_format_memory_manifest(): ), ] manifest = qm.format_memory_manifest(files) - assert "[goal] a.md" in manifest + assert "[profile] a.md" in manifest assert "desc a" in manifest assert "[unknown] b.md" in manifest assert "(no description)" in manifest -# --------------------------------------------------------------------------- -# memory_freshness_text -# --------------------------------------------------------------------------- - - -def test_memory_freshness_text_recent(): - assert qm.memory_freshness_text(time.time()) == "" - - -def test_memory_freshness_text_old(): - three_days_ago = time.time() - 3 * 86_400 - text = qm.memory_freshness_text(three_days_ago) - assert "3 days old" in text - assert "point-in-time" in text - - -# --------------------------------------------------------------------------- -# read_conversation_parts -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_read_conversation_parts(tmp_path: Path): - parts_dir = tmp_path / "conversations" / "parts" - parts_dir.mkdir(parents=True) - for i in range(5): - (parts_dir / f"{i:010d}.json").write_text( - json.dumps({"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"}) - ) - - msgs = await qm.read_conversation_parts(tmp_path) - assert len(msgs) == 5 - assert msgs[0]["content"] == "msg 0" - assert msgs[4]["content"] == "msg 4" - - -@pytest.mark.asyncio -async def test_read_conversation_parts_empty(tmp_path: Path): - msgs = await qm.read_conversation_parts(tmp_path) - assert msgs == [] - - # --------------------------------------------------------------------------- # init_memory_dir # --------------------------------------------------------------------------- @@ -233,8 +185,12 @@ async def test_select_memories_empty_dir(tmp_path: Path): @pytest.mark.asyncio async def test_select_memories_with_files(tmp_path: Path): - (tmp_path / "a.md").write_text("---\nname: a\ndescription: about A\ntype: goal\n---\nbody") - (tmp_path / "b.md").write_text("---\nname: b\ndescription: about B\ntype: reference\n---\nbody") + (tmp_path / "a.md").write_text( + "---\nname: a\ndescription: about A\ntype: profile\n---\nbody" + ) + (tmp_path / "b.md").write_text( + "---\nname: b\ndescription: about B\ntype: preference\n---\nbody" + ) llm = AsyncMock() llm.acomplete.return_value = MagicMock(content=json.dumps({"selected_memories": ["a.md"]})) @@ -258,7 +214,7 @@ async def test_select_memories_error_returns_empty(tmp_path: Path): def test_format_recall_injection(tmp_path: Path): (tmp_path / "a.md").write_text("---\nname: a\n---\nbody of a") result = format_recall_injection(["a.md"], memory_dir=tmp_path) - assert "Selected Memories" in result + assert "Global Memories" in result assert "body of a" in result @@ -266,12 +222,6 @@ def test_format_recall_injection_empty(): assert format_recall_injection([]) == "" -def test_format_recall_injection_custom_heading(tmp_path: Path): - (tmp_path / "a.md").write_text("---\nname: a\n---\nbody of a") - result = format_recall_injection(["a.md"], memory_dir=tmp_path, heading="Colony Memories") - assert "Colony Memories" in result - - # --------------------------------------------------------------------------- # reflection_agent # --------------------------------------------------------------------------- @@ -279,10 +229,9 @@ def test_format_recall_injection_custom_heading(tmp_path: Path): @pytest.mark.asyncio async def test_short_reflection(tmp_path: Path): - """Short reflection reads new messages and writes a memory file via LLM tools.""" + """Short reflection reads messages and writes a global memory file via LLM tools.""" from framework.agents.queen.reflection_agent import run_short_reflection - # Set up a fake session dir with conversation parts. parts_dir = tmp_path / "session" / "conversations" / "parts" parts_dir.mkdir(parents=True) for i in range(3): @@ -291,13 +240,12 @@ async def test_short_reflection(tmp_path: Path): json.dumps({"role": role, "content": f"message {i}"}) ) - mem_dir = tmp_path / "memories" + mem_dir = tmp_path / "global_memory" mem_dir.mkdir() - # Mock LLM: turn 1 lists files, turn 2 writes a memory, turn 3 stops. llm = AsyncMock() llm.acomplete.side_effect = [ - # Turn 1: LLM calls write_memory_file + # Turn 1: LLM writes a global memory file MagicMock( content="", raw_response={ @@ -309,7 +257,7 @@ async def test_short_reflection(tmp_path: Path): "filename": "user-likes-tests.md", "content": ( "---\nname: user-likes-tests\n" - "type: technique\n" + "type: preference\n" "description: User values thorough testing\n" "---\nObserved emphasis on test coverage." ), @@ -318,23 +266,36 @@ async def test_short_reflection(tmp_path: Path): ] }, ), - # Turn 2: LLM has no more tool calls → done + # Turn 2: done MagicMock(content="Done reflecting.", raw_response={}), ] session_dir = tmp_path / "session" - await run_short_reflection( - session_dir, - llm, - memory_dir=mem_dir, - caller="queen", - ) + await run_short_reflection(session_dir, llm, memory_dir=mem_dir) - # Verify the memory file was created. written = mem_dir / "user-likes-tests.md" assert written.exists() assert "user-likes-tests" in written.read_text() - assert llm.acomplete.call_count == 2 + + +@pytest.mark.asyncio +async def test_short_reflection_rejects_non_global_types(tmp_path: Path): + """Reflection agent rejects memory types not in GLOBAL_MEMORY_CATEGORIES.""" + from framework.agents.queen.reflection_agent import _execute_tool + + mem_dir = tmp_path / "global_memory" + mem_dir.mkdir() + + result = _execute_tool( + "write_memory_file", + { + "filename": "bad-type.md", + "content": "---\nname: bad\ntype: goal\n---\nbody", + }, + mem_dir, + ) + assert "ERROR" in result + assert not (mem_dir / "bad-type.md").exists() @pytest.mark.asyncio @@ -342,18 +303,17 @@ async def test_long_reflection(tmp_path: Path): """Long reflection reads all memories and can merge/delete them.""" from framework.agents.queen.reflection_agent import run_long_reflection - mem_dir = tmp_path / "memories" + mem_dir = tmp_path / "global_memory" mem_dir.mkdir() (mem_dir / "dup-a.md").write_text( - "---\nname: dup-a\ntype: goal\ndescription: goal A\n---\nGoal A details." + "---\nname: dup-a\ntype: profile\ndescription: profile A\n---\nProfile A details." ) (mem_dir / "dup-b.md").write_text( - "---\nname: dup-b\ntype: goal\ndescription: goal A duplicate\n---\nSame goal A." + "---\nname: dup-b\ntype: profile\ndescription: profile A dup\n---\nSame profile A." ) llm = AsyncMock() llm.acomplete.side_effect = [ - # Turn 1: LLM lists files MagicMock( content="", raw_response={ @@ -362,7 +322,6 @@ async def test_long_reflection(tmp_path: Path): ] }, ), - # Turn 2: LLM merges dup-b into dup-a and deletes dup-b MagicMock( content="", raw_response={ @@ -373,10 +332,9 @@ async def test_long_reflection(tmp_path: Path): "input": { "filename": "dup-a.md", "content": ( - "---\nname: dup-a\ntype: goal\n" - "description: goal A (merged)\n" - "---\nGoal A details." - " Also same goal A." + "---\nname: dup-a\ntype: profile\n" + "description: profile A (merged)\n" + "---\nProfile A details. Also same profile A." ), }, }, @@ -388,21 +346,18 @@ async def test_long_reflection(tmp_path: Path): ] }, ), - # Turn 3: done MagicMock(content="Housekeeping complete.", raw_response={}), ] - await run_long_reflection(llm, memory_dir=mem_dir, caller="queen") + await run_long_reflection(llm, memory_dir=mem_dir) - # dup-b should be deleted, dup-a should be updated. assert not (mem_dir / "dup-b.md").exists() assert (mem_dir / "dup-a.md").exists() assert "merged" in (mem_dir / "dup-a.md").read_text() - assert llm.acomplete.call_count == 3 # --------------------------------------------------------------------------- -# Bug 1: Path traversal prevention +# Path traversal prevention # --------------------------------------------------------------------------- @@ -410,11 +365,8 @@ def test_path_traversal_read(tmp_path: Path): from framework.agents.queen.reflection_agent import _execute_tool (tmp_path / "safe.md").write_text("safe content") - result = _execute_tool( - "read_memory_file", {"filename": "../../etc/passwd"}, tmp_path, caller="queen" - ) + result = _execute_tool("read_memory_file", {"filename": "../../etc/passwd"}, tmp_path) assert "ERROR" in result - assert "path components not allowed" in result.lower() or "escapes" in result.lower() def test_path_traversal_write(tmp_path: Path): @@ -424,111 +376,32 @@ def test_path_traversal_write(tmp_path: Path): "write_memory_file", {"filename": "../escape.md", "content": "---\nname: evil\n---\nbad"}, tmp_path, - caller="queen", ) assert "ERROR" in result assert not (tmp_path.parent / "escape.md").exists() -def test_path_traversal_delete(tmp_path: Path): - from framework.agents.queen.reflection_agent import _execute_tool - - (tmp_path / "target.md").write_text("content") - result = _execute_tool( - "delete_memory_file", {"filename": "../target.md"}, tmp_path, caller="queen" - ) - assert "ERROR" in result - assert (tmp_path / "target.md").exists() # not deleted - - def test_safe_path_accepted(tmp_path: Path): from framework.agents.queen.reflection_agent import _execute_tool result = _execute_tool( "write_memory_file", - {"filename": "good-file.md", "content": "---\nname: good\n---\ncontent"}, + {"filename": "good-file.md", "content": "---\nname: good\ntype: profile\n---\ncontent"}, tmp_path, - caller="queen", ) assert "Wrote" in result assert (tmp_path / "good-file.md").exists() - result = _execute_tool( - "read_memory_file", {"filename": "good-file.md"}, tmp_path, caller="queen" - ) + result = _execute_tool("read_memory_file", {"filename": "good-file.md"}, tmp_path) assert "content" in result - result = _execute_tool( - "delete_memory_file", {"filename": "good-file.md"}, tmp_path, caller="queen" - ) + result = _execute_tool("delete_memory_file", {"filename": "good-file.md"}, tmp_path) assert "Deleted" in result -def test_init_memory_dir_migrates_shared_memories_into_colony(tmp_path: Path): - source = tmp_path / "legacy-shared" - source.mkdir() - (source / "shared-memory.md").write_text( - "---\nname: shared\ndescription: old shared memory\ntype: goal\n---\nbody", - encoding="utf-8", - ) - target = tmp_path / "colony" - - qm.migrate_shared_v2_memories(target, source_dir=source) - - assert (target / "shared-memory.md").exists() - assert not (source / "shared-memory.md").exists() - assert (target / ".migrated-from-shared-memory").exists() - - -def test_shared_memory_migration_marker_prevents_repeat(tmp_path: Path): - source = tmp_path / "legacy-shared" - source.mkdir() - target = tmp_path / "colony" - target.mkdir() - (target / ".migrated-from-shared-memory").write_text("done\n", encoding="utf-8") - (source / "shared-memory.md").write_text("body", encoding="utf-8") - - qm.migrate_shared_v2_memories(target, source_dir=source) - - assert not (target / "shared-memory.md").exists() - assert (source / "shared-memory.md").exists() - - -def test_global_memory_is_not_populated_by_colony_migration(tmp_path: Path): - source = tmp_path / "legacy-shared" - source.mkdir() - (source / "shared-memory.md").write_text("body", encoding="utf-8") - colony = tmp_path / "colony" - global_dir = tmp_path / "global" - - qm.migrate_shared_v2_memories(colony, source_dir=source) - qm.init_memory_dir(global_dir) - - assert list(global_dir.glob("*.md")) == [] - - -def test_save_global_memory_rejects_runtime_details(tmp_path: Path): - with pytest.raises(ValueError): - qm.save_global_memory( - category="profile", - description="codebase preference", - content="The user wants the worker graph to use node retries.", - memory_dir=tmp_path, - ) - - -def test_save_global_memory_persists_frontmatter(tmp_path: Path): - filename, path = qm.save_global_memory( - category="preference", - description="Prefers concise updates", - content="The user prefers concise, direct status updates.", - memory_dir=tmp_path, - ) - - assert filename.endswith(".md") - text = path.read_text(encoding="utf-8") - assert "type: preference" in text - assert "Prefers concise updates" in text +# --------------------------------------------------------------------------- +# system prompt integration +# --------------------------------------------------------------------------- def test_build_system_prompt_injects_dynamic_memory(): @@ -542,134 +415,30 @@ def test_build_system_prompt_injects_dynamic_memory(): skills_catalog_prompt="", protocols_prompt="", memory_prompt="", - dynamic_memory_provider=lambda: "--- Colony Memories ---\nremember this", + dynamic_memory_provider=lambda: "--- Global Memories ---\nremember this", is_subagent_mode=False, ) prompt = build_system_prompt_for_node_context(ctx) - assert "Colony Memories" in prompt + assert "Global Memories" in prompt assert "remember this" in prompt -def test_queen_phase_state_appends_colony_and_global_memory_blocks(): +def test_queen_phase_state_appends_global_memory_block(): phase = QueenPhaseState( prompt_building="base prompt", - _cached_colony_recall_block="--- Colony Memories ---\ncolony", - _cached_global_recall_block="--- Global Memories ---\nglobal", + _cached_global_recall_block="--- Global Memories ---\nglobal stuff", ) prompt = phase.get_current_prompt() assert "base prompt" in prompt - assert "Colony Memories" in prompt assert "Global Memories" in prompt + assert "global stuff" in prompt -@pytest.mark.asyncio -async def test_worker_colony_reflection_at_handoff(tmp_path: Path): - """Colony reflection runs via WorkerAgent._reflect_colony_memory at node handoff.""" - import asyncio +def test_queen_phase_state_prompt_without_memory(): + phase = QueenPhaseState(prompt_building="base prompt") - from framework.graph.context import GraphContext - from framework.graph.worker_agent import WorkerAgent - - worker_sessions_dir = tmp_path / "worker-sessions" - execution_id = "exec-1" - session_dir = worker_sessions_dir / execution_id / "conversations" / "parts" - session_dir.mkdir(parents=True) - (session_dir / "0000000000.json").write_text( - json.dumps({"role": "user", "content": "Please remember I like terse summaries."}), - encoding="utf-8", - ) - (session_dir / "0000000001.json").write_text( - json.dumps({"role": "assistant", "content": "I'll keep that in mind."}), - encoding="utf-8", - ) - - colony_dir = tmp_path / "colony" - colony_dir.mkdir() - recall_cache: dict[str, str] = {execution_id: ""} - - reflect_llm = AsyncMock() - reflect_llm.acomplete.side_effect = [ - # Short reflection: write a memory file - MagicMock( - content="", - raw_response={ - "tool_calls": [ - { - "id": "tc_1", - "name": "write_memory_file", - "input": { - "filename": "user-prefers-terse-summaries.md", - "content": ( - "---\n" - "name: user-prefers-terse-summaries\n" - "description: Prefers terse summaries\n" - "type: preference\n" - "---\n\n" - "The user prefers terse summaries." - ), - }, - } - ] - }, - ), - # Short reflection done - MagicMock(content="done", raw_response={}), - # Recall selector picks the new memory - MagicMock(content=json.dumps({"selected_memories": ["user-prefers-terse-summaries.md"]})), - ] - - # Build a minimal GraphContext with colony memory fields - gc = MagicMock(spec=GraphContext) - gc.colony_memory_dir = colony_dir - gc.worker_sessions_dir = worker_sessions_dir - gc.colony_recall_cache = recall_cache - gc.colony_reflect_llm = reflect_llm - gc.execution_id = execution_id - gc._colony_reflect_lock = asyncio.Lock() - - node_spec = SimpleNamespace(id="test-node") - worker = WorkerAgent.__new__(WorkerAgent) - worker._gc = gc - worker.node_spec = node_spec - - await worker._reflect_colony_memory() - - assert (colony_dir / "user-prefers-terse-summaries.md").exists() - assert "Colony Memories" in recall_cache[execution_id] - assert "terse summaries" in recall_cache[execution_id] - - -@pytest.mark.asyncio -async def test_subscribe_worker_triggers_only_lifecycle_events(tmp_path: Path): - """After simplification, worker triggers only subscribe to start and terminal events.""" - colony_dir = tmp_path / "colony" - colony_dir.mkdir() - recall_cache: dict[str, str] = {} - bus = EventBus() - llm = AsyncMock() - - subs = await subscribe_worker_memory_triggers( - bus, - llm, - worker_sessions_dir=tmp_path / "sessions", - colony_memory_dir=colony_dir, - recall_cache=recall_cache, - ) - try: - # Should have exactly 2 subscriptions (start + terminal) - assert len(subs) == 2 - - # EXECUTION_STARTED initialises cache - await bus.publish( - AgentEvent( - type=EventType.EXECUTION_STARTED, - stream_id="default", - execution_id="exec-1", - ) - ) - assert recall_cache.get("exec-1") == "" - finally: - for sub_id in subs: - bus.unsubscribe(sub_id) + prompt = phase.get_current_prompt() + assert "base prompt" in prompt + assert "Global Memories" not in prompt diff --git a/core/tests/test_session_manager_worker_handoff.py b/core/tests/test_session_manager_worker_handoff.py index 6b9a90a5..7bcc8261 100644 --- a/core/tests/test_session_manager_worker_handoff.py +++ b/core/tests/test_session_manager_worker_handoff.py @@ -147,7 +147,6 @@ async def test_load_worker_core_defaults_to_session_llm_model(monkeypatch, tmp_p monkeypatch.setattr("framework.runner.AgentRunner.load", fake_load) monkeypatch.setattr(manager, "_cleanup_stale_active_sessions", lambda *_args: None) - monkeypatch.setattr(manager, "_subscribe_worker_colony_memory", AsyncMock()) monkeypatch.setattr( "framework.tools.queen_lifecycle_tools._read_agent_triggers_json", lambda *_args: [], @@ -158,7 +157,6 @@ async def test_load_worker_core_defaults_to_session_llm_model(monkeypatch, tmp_p assert load_calls[0]["model"] == "queen-shared-model" assert session.runner is runner assert session.runner._llm is session_llm - assert runtime._dynamic_memory_provider_factory is not None @pytest.mark.asyncio @@ -183,7 +181,6 @@ async def test_load_worker_core_keeps_explicit_worker_model_override(monkeypatch monkeypatch.setattr("framework.runner.AgentRunner.load", fake_load) monkeypatch.setattr(manager, "_cleanup_stale_active_sessions", lambda *_args: None) - monkeypatch.setattr(manager, "_subscribe_worker_colony_memory", AsyncMock()) monkeypatch.setattr( "framework.tools.queen_lifecycle_tools._read_agent_triggers_json", lambda *_args: [], @@ -200,36 +197,4 @@ async def test_load_worker_core_keeps_explicit_worker_model_override(monkeypatch assert session.runner._llm is None -@pytest.mark.asyncio -async def test_load_worker_core_continues_when_colony_memory_subscription_fails( - monkeypatch, tmp_path -) -> None: - bus = EventBus() - manager = SessionManager(model="manager-default") - session_llm = SimpleNamespace(model="queen-shared-model") - session = Session(id="session_memory_warning", event_bus=bus, llm=session_llm, loaded_at=0.0) - - runtime = SimpleNamespace(is_running=True) - runner = SimpleNamespace( - _llm=None, - _agent_runtime=runtime, - info=MagicMock(return_value={"id": "worker"}), - ) - - monkeypatch.setattr("framework.runner.AgentRunner.load", lambda *args, **kwargs: runner) - monkeypatch.setattr(manager, "_cleanup_stale_active_sessions", lambda *_args: None) - monkeypatch.setattr( - manager, - "_subscribe_worker_colony_memory", - AsyncMock(side_effect=ImportError("optional memory hook unavailable")), - ) - monkeypatch.setattr( - "framework.tools.queen_lifecycle_tools._read_agent_triggers_json", - lambda *_args: [], - ) - - await manager._load_worker_core(session, tmp_path / "worker_agent") - - assert session.runner is runner - assert session.graph_runtime is runtime assert session.worker_path == tmp_path / "worker_agent" From c9d62139af1ea7c89977efc4fcc6b4269a0d6d42 Mon Sep 17 00:00:00 2001 From: Richard Tang Date: Tue, 7 Apr 2026 12:45:37 -0700 Subject: [PATCH 2/5] feat: add extra logging --- .../framework/agents/queen/recall_selector.py | 5 +++++ .../agents/queen/reflection_agent.py | 13 ++++++++++++ core/framework/server/queen_orchestrator.py | 20 ++++++++++++++++++- 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/core/framework/agents/queen/recall_selector.py b/core/framework/agents/queen/recall_selector.py index 897652d5..4740de6a 100644 --- a/core/framework/agents/queen/recall_selector.py +++ b/core/framework/agents/queen/recall_selector.py @@ -92,8 +92,10 @@ async def select_memories( mem_dir = memory_dir or global_memory_dir() files = scan_memory_files(mem_dir) if not files: + logger.debug("recall: no memory files found, skipping selection") return [] + logger.debug("recall: selecting from %d memories for query: %.100s", len(files), query) manifest = format_memory_manifest(files) user_msg = f"## User query\n\n{query}\n\n## Available memories\n\n{manifest}" @@ -163,13 +165,16 @@ async def update_recall_cache( query = _extract_latest_user_query(session_dir) if not query: + logger.debug("recall: no user query found for cache update") return + logger.debug("recall: updating cache for query: %.100s", query) try: selected = await select_memories(query, llm, mem_dir) injection = format_recall_injection(selected, mem_dir) if cache_setter is not None: cache_setter(injection) + logger.debug("recall: cache updated (%d chars injected)", len(injection)) except Exception: logger.debug("recall: cache update failed", exc_info=True) diff --git a/core/framework/agents/queen/reflection_agent.py b/core/framework/agents/queen/reflection_agent.py index bab900db..447c7d5a 100644 --- a/core/framework/agents/queen/reflection_agent.py +++ b/core/framework/agents/queen/reflection_agent.py @@ -217,6 +217,7 @@ async def _reflection_loop( last_text: str = "" for _turn in range(max_turns): + logger.debug("reflect: loop turn %d/%d", _turn + 1, max_turns) try: resp: LLMResponse = await llm.acomplete( messages=messages, @@ -352,6 +353,7 @@ async def run_short_reflection( memory_dir: Path | None = None, ) -> None: """Run a short reflection: extract user knowledge from conversation.""" + logger.debug("reflect: starting short reflection") mem_dir = memory_dir or global_memory_dir() messages = await _read_conversation_parts(session_dir) @@ -391,10 +393,12 @@ async def run_long_reflection( memory_dir: Path | None = None, ) -> None: """Run a long reflection: organise and deduplicate all global memories.""" + logger.debug("reflect: starting long reflection") mem_dir = memory_dir or global_memory_dir() files = scan_memory_files(mem_dir) if not files: + logger.debug("reflect: no memory files, skipping long reflection") return manifest = format_memory_manifest(files) @@ -454,11 +458,17 @@ async def subscribe_reflection_triggers( is_interval = _short_count % _LONG_REFLECT_INTERVAL == 0 if is_tool_turn and not is_interval: + logger.debug("reflect: skipping tool turn (count=%d)", _short_count) return if _lock.locked(): + logger.debug("reflect: skipping, already running (count=%d)", _short_count) return + logger.debug( + "reflect: triggered (count=%d, interval=%s, stop_reason=%s)", + _short_count, is_interval, stop_reason, + ) async with _lock: try: if is_interval: @@ -475,6 +485,7 @@ async def subscribe_reflection_triggers( try: from framework.agents.queen.recall_selector import update_recall_cache + logger.debug("recall: post-reflection cache update starting") await update_recall_cache( session_dir, llm, @@ -490,7 +501,9 @@ async def subscribe_reflection_triggers( if getattr(event, "stream_id", None) != "queen": return if _lock.locked(): + logger.debug("reflect: skipping compaction trigger, already running") return + logger.debug("reflect: compaction triggered long reflection") async with _lock: try: await run_long_reflection(llm, mem_dir) diff --git a/core/framework/server/queen_orchestrator.py b/core/framework/server/queen_orchestrator.py index 906114af..21afea83 100644 --- a/core/framework/server/queen_orchestrator.py +++ b/core/framework/server/queen_orchestrator.py @@ -279,8 +279,9 @@ async def create_queen( _session_event_bus = session.event_bus async def _persona_hook(ctx: HookContext) -> HookResult | None: + trigger = ctx.trigger or "" result = await select_expert_persona( - ctx.trigger or "", _session_llm, memory_context="" + trigger, _session_llm, memory_context="" ) if not result: return None @@ -295,6 +296,23 @@ async def create_queen( data={"persona": result.persona_prefix}, ) ) + + # Seed recall cache so the first turn has relevant memories. + if trigger: + try: + from framework.agents.queen.recall_selector import ( + format_recall_injection, + select_memories, + ) + + mem_dir = phase_state.global_memory_dir + selected = await select_memories(trigger, _session_llm, mem_dir) + phase_state._cached_global_recall_block = format_recall_injection( + selected, mem_dir + ) + except Exception: + logger.debug("recall: initial seeding failed", exc_info=True) + return HookResult(system_prompt=phase_state.get_current_prompt()) # ---- Graph preparation ------------------------------------------- From 3dbd20040ae922ebf9fd27140edad79d9225c6c5 Mon Sep 17 00:00:00 2001 From: Richard Tang Date: Tue, 7 Apr 2026 13:07:41 -0700 Subject: [PATCH 3/5] fix: reflection agent runner --- .../agents/queen/reflection_agent.py | 141 +++++++++++++----- core/framework/server/session_manager.py | 21 +++ 2 files changed, 125 insertions(+), 37 deletions(-) diff --git a/core/framework/agents/queen/reflection_agent.py b/core/framework/agents/queen/reflection_agent.py index 447c7d5a..6dd21d65 100644 --- a/core/framework/agents/queen/reflection_agent.py +++ b/core/framework/agents/queen/reflection_agent.py @@ -12,6 +12,9 @@ Two reflection types: Concurrency: an ``asyncio.Lock`` prevents overlapping runs. If a trigger fires while a reflection is already active the event is skipped. + +All reflections are fire-and-forget (spawned via ``asyncio.create_task``) +so they never block the queen's event loop. """ from __future__ import annotations @@ -217,7 +220,7 @@ async def _reflection_loop( last_text: str = "" for _turn in range(max_turns): - logger.debug("reflect: loop turn %d/%d", _turn + 1, max_turns) + logger.info("reflect: loop turn %d/%d (msgs=%d)", _turn + 1, max_turns, len(messages)) try: resp: LLMResponse = await llm.acomplete( messages=messages, @@ -225,13 +228,40 @@ async def _reflection_loop( tools=_REFLECTION_TOOLS, max_tokens=2048, ) + except asyncio.CancelledError: + logger.warning("reflect: LLM call cancelled (task cancelled)") + return False, changed_files, last_text except Exception: logger.warning("reflect: LLM call failed", exc_info=True) return False, changed_files, last_text + # Extract tool calls from litellm/OpenAI response object. tool_calls_raw: list[dict[str, Any]] = [] - if resp.raw_response and isinstance(resp.raw_response, dict): - tool_calls_raw = resp.raw_response.get("tool_calls", []) + raw = resp.raw_response + if raw is not None: + # litellm returns a ModelResponse object; tool calls live on + # choices[0].message.tool_calls as a list of ChatCompletionMessageToolCall. + try: + msg_obj = raw.choices[0].message + if hasattr(msg_obj, "tool_calls") and msg_obj.tool_calls: + for tc in msg_obj.tool_calls: + fn = tc.function + try: + args = json.loads(fn.arguments) if fn.arguments else {} + except (json.JSONDecodeError, TypeError): + args = {} + tool_calls_raw.append({ + "id": tc.id, + "name": fn.name, + "input": args, + }) + except (AttributeError, IndexError): + pass + + logger.info( + "reflect: LLM responded, text=%d chars, tool_calls=%d", + len(resp.content or ""), len(tool_calls_raw), + ) turn_text = resp.content or "" if turn_text: @@ -353,11 +383,12 @@ async def run_short_reflection( memory_dir: Path | None = None, ) -> None: """Run a short reflection: extract user knowledge from conversation.""" - logger.debug("reflect: starting short reflection") + logger.info("reflect: starting short reflection for %s", session_dir) mem_dir = memory_dir or global_memory_dir() messages = await _read_conversation_parts(session_dir) if not messages: + logger.info("reflect: no conversation parts found in %s, skipping", session_dir) return transcript_lines: list[str] = [] @@ -372,6 +403,7 @@ async def run_short_reflection( transcript_lines.append(f"[{label}]: {content}") if not transcript_lines: + logger.info("reflect: no transcript lines after filtering, skipping") return transcript = "\n".join(transcript_lines) @@ -383,9 +415,9 @@ async def run_short_reflection( _, changed, reason = await _reflection_loop(llm, _SHORT_REFLECT_SYSTEM, user_msg, mem_dir) if changed: - logger.debug("reflect: short reflection done, changed files: %s", changed) + logger.info("reflect: short reflection done, changed files: %s", changed) else: - logger.debug("reflect: short reflection done, no changes — %s", reason or "no reason") + logger.info("reflect: short reflection done, no changes — %s", reason or "no reason") async def run_long_reflection( @@ -419,6 +451,28 @@ async def run_long_reflection( ) +async def run_shutdown_reflection( + session_dir: Path, + llm: Any, + memory_dir: Path | None = None, +) -> None: + """Run a final short reflection on session shutdown. + + Called during session teardown so recent conversation insights are + persisted before the session is destroyed. + """ + logger.info("reflect: running shutdown reflection for %s", session_dir) + mem_dir = memory_dir or global_memory_dir() + try: + await run_short_reflection(session_dir, llm, mem_dir) + logger.info("reflect: shutdown reflection completed for %s", session_dir) + except asyncio.CancelledError: + logger.warning("reflect: shutdown reflection cancelled for %s", session_dir) + except Exception: + logger.warning("reflect: shutdown reflection failed", exc_info=True) + _write_error("shutdown reflection") + + # --------------------------------------------------------------------------- # Event-bus integration # --------------------------------------------------------------------------- @@ -443,32 +497,9 @@ async def subscribe_reflection_triggers( mem_dir = memory_dir or global_memory_dir() _lock = asyncio.Lock() _short_count = 0 + _background_tasks: set[asyncio.Task] = set() - async def _on_turn_complete(event: Any) -> None: - nonlocal _short_count - - if getattr(event, "stream_id", None) != "queen": - return - - _short_count += 1 - - event_data = getattr(event, "data", {}) or {} - stop_reason = event_data.get("stop_reason", "") - is_tool_turn = stop_reason in ("tool_use", "tool_calls") - is_interval = _short_count % _LONG_REFLECT_INTERVAL == 0 - - if is_tool_turn and not is_interval: - logger.debug("reflect: skipping tool turn (count=%d)", _short_count) - return - - if _lock.locked(): - logger.debug("reflect: skipping, already running (count=%d)", _short_count) - return - - logger.debug( - "reflect: triggered (count=%d, interval=%s, stop_reason=%s)", - _short_count, is_interval, stop_reason, - ) + async def _do_turn_reflect(is_interval: bool, count: int) -> None: async with _lock: try: if is_interval: @@ -497,6 +528,47 @@ async def subscribe_reflection_triggers( except Exception: logger.debug("recall: cache update failed", exc_info=True) + async def _do_compaction_reflect() -> None: + async with _lock: + try: + await run_long_reflection(llm, mem_dir) + except Exception: + logger.warning("reflect: compaction-triggered reflection failed", exc_info=True) + _write_error("compaction reflection") + + def _fire_and_forget(coro: Any) -> None: + """Spawn a background task and prevent GC before it finishes.""" + task = asyncio.create_task(coro) + _background_tasks.add(task) + task.add_done_callback(_background_tasks.discard) + + async def _on_turn_complete(event: Any) -> None: + nonlocal _short_count + + if getattr(event, "stream_id", None) != "queen": + return + + _short_count += 1 + + event_data = getattr(event, "data", {}) or {} + stop_reason = event_data.get("stop_reason", "") + is_tool_turn = stop_reason in ("tool_use", "tool_calls") + is_interval = _short_count % _LONG_REFLECT_INTERVAL == 0 + + if is_tool_turn and not is_interval: + logger.debug("reflect: skipping tool turn (count=%d)", _short_count) + return + + if _lock.locked(): + logger.debug("reflect: skipping, already running (count=%d)", _short_count) + return + + logger.debug( + "reflect: triggered (count=%d, interval=%s, stop_reason=%s)", + _short_count, is_interval, stop_reason, + ) + _fire_and_forget(_do_turn_reflect(is_interval, _short_count)) + async def _on_compaction(event: Any) -> None: if getattr(event, "stream_id", None) != "queen": return @@ -504,12 +576,7 @@ async def subscribe_reflection_triggers( logger.debug("reflect: skipping compaction trigger, already running") return logger.debug("reflect: compaction triggered long reflection") - async with _lock: - try: - await run_long_reflection(llm, mem_dir) - except Exception: - logger.warning("reflect: compaction-triggered reflection failed", exc_info=True) - _write_error("compaction reflection") + _fire_and_forget(_do_compaction_reflect()) sub_ids: list[str] = [] diff --git a/core/framework/server/session_manager.py b/core/framework/server/session_manager.py index 96cb688b..26aa5386 100644 --- a/core/framework/server/session_manager.py +++ b/core/framework/server/session_manager.py @@ -65,6 +65,8 @@ class Session: # directory instead of creating a new one. This lets cold-restores accumulate # all messages in the original session folder so history is never fragmented. queen_resume_from: str | None = None + # Queen session directory (set during _start_queen, used for shutdown reflection) + queen_dir: Path | None = None class SessionManager: @@ -80,6 +82,9 @@ class SessionManager: self._model = model self._credential_store = credential_store self._lock = asyncio.Lock() + # Strong references for fire-and-forget background tasks (e.g. shutdown + # reflections) so they aren't garbage-collected before completion. + self._background_tasks: set[asyncio.Task] = set() # ------------------------------------------------------------------ # Session lifecycle @@ -631,6 +636,21 @@ class SessionManager: pass session.memory_reflection_subs.clear() + # Run a final shutdown reflection so recent conversation insights + # are persisted before the session is destroyed (fire-and-forget). + if session.queen_dir is not None: + try: + from framework.agents.queen.reflection_agent import run_shutdown_reflection + + task = asyncio.create_task( + asyncio.shield(run_shutdown_reflection(session.queen_dir, session.llm)), + ) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + logger.info("Session '%s': shutdown reflection spawned", session_id) + except Exception: + logger.warning("Session '%s': failed to spawn shutdown reflection", session_id, exc_info=True) + if session.queen_task is not None: session.queen_task.cancel() session.queen_task = None @@ -741,6 +761,7 @@ class SessionManager: storage_session_id = session.queen_resume_from or session.id queen_dir = hive_home / "queen" / "session" / storage_session_id queen_dir.mkdir(parents=True, exist_ok=True) + session.queen_dir = queen_dir # Always write/update session metadata so history sidebar has correct # agent name, path, and last-active timestamp (important so the original From 7fcb51985d112b97af41fac287c5b609b151841b Mon Sep 17 00:00:00 2001 From: Richard Tang Date: Tue, 7 Apr 2026 13:55:18 -0700 Subject: [PATCH 4/5] fix: edge case for memory recall --- .../framework/agents/queen/recall_selector.py | 91 ++----------------- .../agents/queen/reflection_agent.py | 17 ---- core/framework/server/queen_orchestrator.py | 27 +++++- core/framework/server/routes_execution.py | 17 ++-- 4 files changed, 44 insertions(+), 108 deletions(-) diff --git a/core/framework/agents/queen/recall_selector.py b/core/framework/agents/queen/recall_selector.py index 4740de6a..84591d46 100644 --- a/core/framework/agents/queen/recall_selector.py +++ b/core/framework/agents/queen/recall_selector.py @@ -31,29 +31,6 @@ logger = logging.getLogger(__name__) # Structured output schema # --------------------------------------------------------------------------- -RECALL_SCHEMA: dict[str, Any] = { - "type": "json_schema", - "json_schema": { - "name": "memory_selection", - "strict": True, - "schema": { - "type": "object", - "properties": { - "selected_memories": { - "type": "array", - "items": {"type": "string"}, - }, - }, - "required": ["selected_memories"], - "additionalProperties": False, - }, - }, -} - -# --------------------------------------------------------------------------- -# System prompt -# --------------------------------------------------------------------------- - SELECT_MEMORIES_SYSTEM_PROMPT = """\ You are selecting memories that will be useful to the Queen agent as it \ processes a user's query. @@ -103,17 +80,21 @@ async def select_memories( resp = await llm.acomplete( messages=[{"role": "user", "content": user_msg}], system=SELECT_MEMORIES_SYSTEM_PROMPT, - max_tokens=512, - response_format=RECALL_SCHEMA, + max_tokens=1024, + response_format={"type": "json_object"}, ) - data = json.loads(resp.content) + raw = (resp.content or "").strip() + if not raw: + logger.warning("recall: LLM returned empty response (model=%s, stop=%s)", resp.model, resp.stop_reason) + return [] + data = json.loads(raw) selected = data.get("selected_memories", []) valid_names = {f.filename for f in files} result = [s for s in selected if s in valid_names][:max_results] logger.debug("recall: selected %d memories: %s", len(result), result) return result - except Exception: - logger.debug("recall: memory selection failed, returning []", exc_info=True) + except Exception as exc: + logger.warning("recall: memory selection failed (%s), returning []", exc) return [] @@ -142,57 +123,3 @@ def format_recall_injection( body = "\n\n---\n\n".join(blocks) return f"--- Global Memories ---\n\n{body}\n\n--- End Global Memories ---" - - -# --------------------------------------------------------------------------- -# Cache update (called after each queen reflection) -# --------------------------------------------------------------------------- - - -async def update_recall_cache( - session_dir: Path, - llm: Any, - memory_dir: Path | None = None, - *, - cache_setter: Any = None, -) -> None: - """Update the recall cache for the next turn. - - Reads the latest user message from conversation parts to use as the - query for memory selection. - """ - mem_dir = memory_dir or global_memory_dir() - - query = _extract_latest_user_query(session_dir) - if not query: - logger.debug("recall: no user query found for cache update") - return - - logger.debug("recall: updating cache for query: %.100s", query) - try: - selected = await select_memories(query, llm, mem_dir) - injection = format_recall_injection(selected, mem_dir) - if cache_setter is not None: - cache_setter(injection) - logger.debug("recall: cache updated (%d chars injected)", len(injection)) - except Exception: - logger.debug("recall: cache update failed", exc_info=True) - - -def _extract_latest_user_query(session_dir: Path) -> str: - """Read the most recent user message from conversation parts.""" - parts_dir = session_dir / "conversations" / "parts" - if not parts_dir.is_dir(): - return "" - - part_files = sorted(parts_dir.glob("*.json"), reverse=True) - for f in part_files[:20]: - try: - data = json.loads(f.read_text(encoding="utf-8")) - if data.get("role") == "user": - content = str(data.get("content", "")).strip() - if content: - return content[:1000] if len(content) > 1000 else content - except (json.JSONDecodeError, OSError): - continue - return "" diff --git a/core/framework/agents/queen/reflection_agent.py b/core/framework/agents/queen/reflection_agent.py index 6dd21d65..e2127bbb 100644 --- a/core/framework/agents/queen/reflection_agent.py +++ b/core/framework/agents/queen/reflection_agent.py @@ -485,7 +485,6 @@ async def subscribe_reflection_triggers( session_dir: Path, llm: Any, memory_dir: Path | None = None, - phase_state: Any = None, ) -> list[str]: """Subscribe to queen turn events and return subscription IDs. @@ -511,22 +510,6 @@ async def subscribe_reflection_triggers( logger.warning("reflect: reflection failed", exc_info=True) _write_error("short/long reflection") - # Update recall cache after reflection so next turn sees new memories. - if phase_state is not None: - try: - from framework.agents.queen.recall_selector import update_recall_cache - - logger.debug("recall: post-reflection cache update starting") - await update_recall_cache( - session_dir, - llm, - memory_dir=mem_dir, - cache_setter=lambda block: setattr( - phase_state, "_cached_global_recall_block", block - ), - ) - except Exception: - logger.debug("recall: cache update failed", exc_info=True) async def _do_compaction_reflect() -> None: async with _lock: diff --git a/core/framework/server/queen_orchestrator.py b/core/framework/server/queen_orchestrator.py index 21afea83..925cb706 100644 --- a/core/framework/server/queen_orchestrator.py +++ b/core/framework/server/queen_orchestrator.py @@ -278,6 +278,32 @@ async def create_queen( _session_llm = session.llm _session_event_bus = session.event_bus + # ---- Recall on each real user turn -------------------------------- + async def _recall_on_user_input(event: AgentEvent) -> None: + """Re-select memories when real user input arrives.""" + content = (event.data or {}).get("content", "") + if not content or not isinstance(content, str): + return + try: + from framework.agents.queen.recall_selector import ( + format_recall_injection, + select_memories, + ) + + mem_dir = phase_state.global_memory_dir + selected = await select_memories(content, _session_llm, mem_dir) + phase_state._cached_global_recall_block = format_recall_injection( + selected, mem_dir + ) + except Exception: + logger.debug("recall: user-turn cache update failed", exc_info=True) + + session.event_bus.subscribe( + [EventType.CLIENT_INPUT_RECEIVED], + _recall_on_user_input, + filter_stream="queen", + ) + async def _persona_hook(ctx: HookContext) -> HookResult | None: trigger = ctx.trigger or "" result = await select_expert_persona( @@ -430,7 +456,6 @@ async def create_queen( queen_dir, session.llm, memory_dir=global_dir, - phase_state=phase_state, ) session.memory_reflection_subs = _reflection_subs diff --git a/core/framework/server/routes_execution.py b/core/framework/server/routes_execution.py index be390b64..f757746b 100644 --- a/core/framework/server/routes_execution.py +++ b/core/framework/server/routes_execution.py @@ -185,14 +185,8 @@ async def handle_chat(request: web.Request) -> web.Response: logger.error("[handle_chat] Node still not available after 5s wait") if node is not None and hasattr(node, "inject_event"): - try: - logger.debug("[handle_chat] Calling node.inject_event()...") - await node.inject_event(message, is_client_input=True, image_content=image_content) - logger.debug("[handle_chat] inject_event() completed successfully") - except Exception as e: - logger.exception("[handle_chat] inject_event() failed: %s", e) - raise - # Publish to EventBus so the session event log captures user messages + # Publish BEFORE inject_event so handlers (e.g. memory recall) + # complete before the event loop unblocks and starts the LLM turn. from framework.runtime.event_bus import AgentEvent, EventType await session.event_bus.publish( @@ -209,6 +203,13 @@ async def handle_chat(request: web.Request) -> web.Response: }, ) ) + try: + logger.debug("[handle_chat] Calling node.inject_event()...") + await node.inject_event(message, is_client_input=True, image_content=image_content) + logger.debug("[handle_chat] inject_event() completed successfully") + except Exception as e: + logger.exception("[handle_chat] inject_event() failed: %s", e) + raise return web.json_response( { "status": "queen", From 19469ff404b1e535c878010d9f3654a9272d32e8 Mon Sep 17 00:00:00 2001 From: Richard Tang Date: Tue, 7 Apr 2026 13:57:05 -0700 Subject: [PATCH 5/5] chore: lint format --- core/framework/agents/queen/nodes/__init__.py | 5 ----- .../framework/agents/queen/queen_memory_v2.py | 2 ++ .../framework/agents/queen/recall_selector.py | 6 +++++- .../agents/queen/reflection_agent.py | 21 +++++++++++-------- core/framework/server/queen_orchestrator.py | 12 +++-------- core/framework/server/session_manager.py | 4 +++- core/tests/test_queen_memory.py | 4 +--- .../test_session_manager_worker_handoff.py | 1 - 8 files changed, 26 insertions(+), 29 deletions(-) diff --git a/core/framework/agents/queen/nodes/__init__.py b/core/framework/agents/queen/nodes/__init__.py index 498df752..3f447541 100644 --- a/core/framework/agents/queen/nodes/__init__.py +++ b/core/framework/agents/queen/nodes/__init__.py @@ -84,7 +84,6 @@ _QUEEN_PLANNING_TOOLS = [ "initialize_and_build_agent", # Load existing agent (after user confirms) "load_built_agent", - ] # Building phase: full coding + agent construction tools. @@ -93,7 +92,6 @@ _QUEEN_BUILDING_TOOLS = _SHARED_TOOLS + [ "list_credentials", "replan_agent", "save_agent_draft", # Re-draft during building → auto-dissolves + updates flowchart - ] # Staging phase: agent loaded but not yet running — inspect, configure, launch. @@ -113,7 +111,6 @@ _QUEEN_STAGING_TOOLS = [ "set_trigger", "remove_trigger", "list_triggers", - ] # Running phase: worker is executing — monitor, control, or switch to editing. @@ -137,7 +134,6 @@ _QUEEN_RUNNING_TOOLS = [ "set_trigger", "remove_trigger", "list_triggers", - ] # Editing phase: worker done, still loaded — tweak config and re-run. @@ -160,7 +156,6 @@ _QUEEN_EDITING_TOOLS = [ "set_trigger", "remove_trigger", "list_triggers", - ] diff --git a/core/framework/agents/queen/queen_memory_v2.py b/core/framework/agents/queen/queen_memory_v2.py index f95c8218..bf41a5a6 100644 --- a/core/framework/agents/queen/queen_memory_v2.py +++ b/core/framework/agents/queen/queen_memory_v2.py @@ -28,6 +28,8 @@ MAX_FILE_SIZE_BYTES: int = 4096 # 4 KB hard limit per memory file # How many lines of a memory file to read for header scanning. _HEADER_LINE_LIMIT: int = 30 + + def global_memory_dir() -> Path: """Return the queen-global memory directory.""" return _HIVE_QUEEN_DIR / "global_memory" diff --git a/core/framework/agents/queen/recall_selector.py b/core/framework/agents/queen/recall_selector.py index 84591d46..ad1676eb 100644 --- a/core/framework/agents/queen/recall_selector.py +++ b/core/framework/agents/queen/recall_selector.py @@ -85,7 +85,11 @@ async def select_memories( ) raw = (resp.content or "").strip() if not raw: - logger.warning("recall: LLM returned empty response (model=%s, stop=%s)", resp.model, resp.stop_reason) + logger.warning( + "recall: LLM returned empty response (model=%s, stop=%s)", + resp.model, + resp.stop_reason, + ) return [] data = json.loads(raw) selected = data.get("selected_memories", []) diff --git a/core/framework/agents/queen/reflection_agent.py b/core/framework/agents/queen/reflection_agent.py index e2127bbb..4bb20d64 100644 --- a/core/framework/agents/queen/reflection_agent.py +++ b/core/framework/agents/queen/reflection_agent.py @@ -22,7 +22,6 @@ from __future__ import annotations import asyncio import json import logging -import re import traceback from datetime import datetime from pathlib import Path @@ -250,17 +249,20 @@ async def _reflection_loop( args = json.loads(fn.arguments) if fn.arguments else {} except (json.JSONDecodeError, TypeError): args = {} - tool_calls_raw.append({ - "id": tc.id, - "name": fn.name, - "input": args, - }) + tool_calls_raw.append( + { + "id": tc.id, + "name": fn.name, + "input": args, + } + ) except (AttributeError, IndexError): pass logger.info( "reflect: LLM responded, text=%d chars, tool_calls=%d", - len(resp.content or ""), len(tool_calls_raw), + len(resp.content or ""), + len(tool_calls_raw), ) turn_text = resp.content or "" @@ -510,7 +512,6 @@ async def subscribe_reflection_triggers( logger.warning("reflect: reflection failed", exc_info=True) _write_error("short/long reflection") - async def _do_compaction_reflect() -> None: async with _lock: try: @@ -548,7 +549,9 @@ async def subscribe_reflection_triggers( logger.debug( "reflect: triggered (count=%d, interval=%s, stop_reason=%s)", - _short_count, is_interval, stop_reason, + _short_count, + is_interval, + stop_reason, ) _fire_and_forget(_do_turn_reflect(is_interval, _short_count)) diff --git a/core/framework/server/queen_orchestrator.py b/core/framework/server/queen_orchestrator.py index 925cb706..43f5c5dd 100644 --- a/core/framework/server/queen_orchestrator.py +++ b/core/framework/server/queen_orchestrator.py @@ -292,9 +292,7 @@ async def create_queen( mem_dir = phase_state.global_memory_dir selected = await select_memories(content, _session_llm, mem_dir) - phase_state._cached_global_recall_block = format_recall_injection( - selected, mem_dir - ) + phase_state._cached_global_recall_block = format_recall_injection(selected, mem_dir) except Exception: logger.debug("recall: user-turn cache update failed", exc_info=True) @@ -306,9 +304,7 @@ async def create_queen( async def _persona_hook(ctx: HookContext) -> HookResult | None: trigger = ctx.trigger or "" - result = await select_expert_persona( - trigger, _session_llm, memory_context="" - ) + result = await select_expert_persona(trigger, _session_llm, memory_context="") if not result: return None # Store on phase_state so persona/style persist across dynamic prompt refreshes @@ -333,9 +329,7 @@ async def create_queen( mem_dir = phase_state.global_memory_dir selected = await select_memories(trigger, _session_llm, mem_dir) - phase_state._cached_global_recall_block = format_recall_injection( - selected, mem_dir - ) + phase_state._cached_global_recall_block = format_recall_injection(selected, mem_dir) except Exception: logger.debug("recall: initial seeding failed", exc_info=True) diff --git a/core/framework/server/session_manager.py b/core/framework/server/session_manager.py index 26aa5386..25e3961f 100644 --- a/core/framework/server/session_manager.py +++ b/core/framework/server/session_manager.py @@ -649,7 +649,9 @@ class SessionManager: task.add_done_callback(self._background_tasks.discard) logger.info("Session '%s': shutdown reflection spawned", session_id) except Exception: - logger.warning("Session '%s': failed to spawn shutdown reflection", session_id, exc_info=True) + logger.warning( + "Session '%s': failed to spawn shutdown reflection", session_id, exc_info=True + ) if session.queen_task is not None: session.queen_task.cancel() diff --git a/core/tests/test_queen_memory.py b/core/tests/test_queen_memory.py index efdb1401..8691851e 100644 --- a/core/tests/test_queen_memory.py +++ b/core/tests/test_queen_memory.py @@ -185,9 +185,7 @@ async def test_select_memories_empty_dir(tmp_path: Path): @pytest.mark.asyncio async def test_select_memories_with_files(tmp_path: Path): - (tmp_path / "a.md").write_text( - "---\nname: a\ndescription: about A\ntype: profile\n---\nbody" - ) + (tmp_path / "a.md").write_text("---\nname: a\ndescription: about A\ntype: profile\n---\nbody") (tmp_path / "b.md").write_text( "---\nname: b\ndescription: about B\ntype: preference\n---\nbody" ) diff --git a/core/tests/test_session_manager_worker_handoff.py b/core/tests/test_session_manager_worker_handoff.py index 7bcc8261..b450d162 100644 --- a/core/tests/test_session_manager_worker_handoff.py +++ b/core/tests/test_session_manager_worker_handoff.py @@ -196,5 +196,4 @@ async def test_load_worker_core_keeps_explicit_worker_model_override(monkeypatch assert session.runner is runner assert session.runner._llm is None - assert session.worker_path == tmp_path / "worker_agent"