Merge pull request #6976 from aden-hive/feat/simplify-queen-memory

Simplify queen memory: remove colony memory, keep global only
This commit is contained in:
RichardTang-Aden
2026-04-07 13:58:26 -07:00
committed by GitHub
17 changed files with 364 additions and 2104 deletions
+4 -34
View File
@@ -84,7 +84,6 @@ _QUEEN_PLANNING_TOOLS = [
"initialize_and_build_agent",
# Load existing agent (after user confirms)
"load_built_agent",
"save_global_memory",
]
# Building phase: full coding + agent construction tools.
@@ -93,7 +92,6 @@ _QUEEN_BUILDING_TOOLS = _SHARED_TOOLS + [
"list_credentials",
"replan_agent",
"save_agent_draft", # Re-draft during building → auto-dissolves + updates flowchart
"save_global_memory",
]
# Staging phase: agent loaded but not yet running — inspect, configure, launch.
@@ -113,7 +111,6 @@ _QUEEN_STAGING_TOOLS = [
"set_trigger",
"remove_trigger",
"list_triggers",
"save_global_memory",
]
# Running phase: worker is executing — monitor, control, or switch to editing.
@@ -137,7 +134,6 @@ _QUEEN_RUNNING_TOOLS = [
"set_trigger",
"remove_trigger",
"list_triggers",
"save_global_memory",
]
# Editing phase: worker done, still loaded — tweak config and re-run.
@@ -160,7 +156,6 @@ _QUEEN_EDITING_TOOLS = [
"set_trigger",
"remove_trigger",
"list_triggers",
"save_global_memory",
]
@@ -646,8 +641,6 @@ to fix the currently loaded agent (no draft required).
- load_built_agent(agent_path) Load an existing agent and switch to STAGING \
phase. Only use this when the user explicitly asks to work with an existing agent \
(e.g. "load my_agent", "run the research agent"). Confirm with the user first.
- save_global_memory(category, description, content, name?) Save durable \
cross-queen memory about the user only (profile, preferences, environment, feedback)
## Workflow summary
1. Understand requirements discover tools design graph
@@ -679,8 +672,6 @@ updated flowchart immediately. Use this when you make structural changes \
restored (with decision/browser nodes intact) so you can edit it. Use \
when the user wants to change integrations, swap tools, rethink the \
flow, or discuss any design changes before you build them.
- save_global_memory(category, description, content, name?) Save durable \
cross-queen memory about the user only
When you finish building an agent, call load_built_agent(path) to stage it.
"""
@@ -694,8 +685,6 @@ The agent is loaded and ready to run. You can inspect it and launch it:
- get_graph_status(focus?) Brief status
- run_agent_with_input(task) Start the worker and switch to RUNNING phase
- set_trigger / remove_trigger / list_triggers Timer management
- save_global_memory(category, description, content, name?) Save \
durable cross-queen memory about the user only
You do NOT have write tools or backward transition tools in staging. \
To modify the agent, run it first after it finishes you enter EDITING \
@@ -715,8 +704,6 @@ The worker is running. You have monitoring and lifecycle tools:
for config tweaks, re-runs, or escalation to building/planning
- run_agent_with_input(task) Re-run the worker with new input
- set_trigger / remove_trigger / list_triggers Timer management
- save_global_memory(category, description, content, name?) Save \
durable cross-queen memory about the user only
When the worker finishes on its own, you automatically move to EDITING \
phase. You can also call switch_to_editing() to stop early and tweak.
@@ -732,7 +719,6 @@ The worker has finished executing and is still loaded. You can tweak and re-run:
- run_agent_with_input(task) Re-run the worker with new input
- get_worker_health_summary() Review last run's health data
- set_trigger / remove_trigger / list_triggers Timer management
- save_global_memory Save durable cross-queen memory
You do NOT have write/edit file tools or backward transition tools. \
You can only re-run or tweak from this phase.
@@ -934,26 +920,10 @@ diagnosis mode — you already have a built agent, you just need to fix it.
_queen_memory_instructions = """
## Your Memory
Relevant colony memories from this queen session may appear in context under \
"--- Colony Memories ---". Relevant global user memories may appear under \
"--- Global Memories ---".
Colony memories are shared with the worker for this queen session. Use them \
for continuity about what this user is trying to do, what has worked, and \
what the colony has learned together.
Global memories are shared across queens and are only for durable knowledge \
about the user: who they are, their preferences, their environment, and \
their feedback.
Memories older than 1 day include a staleness warning. Treat these as \
point-in-time observations verify current details before asserting them \
as fact.
You do NOT need to manually save or recall colony memories. A background \
reflection agent automatically extracts colony learnings from each \
conversation turn. Use `save_global_memory` only when you learn something \
durable about the user that should help future queens.
Relevant global memories about the user may appear at the end of this prompt \
under "--- Global Memories ---". These are automatically maintained across \
sessions. Use them to inform your responses but verify stale claims before \
asserting them as fact.
"""
_queen_behavior_always = _queen_behavior_always + _queen_memory_instructions
-420
View File
@@ -1,420 +0,0 @@
"""Queen global cross-session memory.
Three-tier memory architecture:
~/.hive/queen/MEMORY.md semantic (who, what, why)
~/.hive/queen/memories/MEMORY-YYYY-MM-DD.md episodic (daily journals)
~/.hive/queen/session/{id}/data/adapt.md working (session-scoped)
Semantic and episodic files are injected at queen session start.
Semantic memory (MEMORY.md) is updated automatically at session end via
consolidate_queen_memory() the queen never rewrites this herself.
Episodic memory (MEMORY-date.md) can be written by the queen during a session
via the write_to_diary tool, and is also appended to at session end by
consolidate_queen_memory().
"""
from __future__ import annotations
import asyncio
import json
import logging
import traceback
from datetime import date, datetime
from pathlib import Path
logger = logging.getLogger(__name__)
def _queen_dir() -> Path:
return Path.home() / ".hive" / "queen"
def format_memory_date(d: date) -> str:
"""Return a cross-platform long date label without a zero-padded day."""
return f"{d.strftime('%B')} {d.day}, {d.year}"
def semantic_memory_path() -> Path:
return _queen_dir() / "MEMORY.md"
def episodic_memory_path(d: date | None = None) -> Path:
d = d or date.today()
return _queen_dir() / "memories" / f"MEMORY-{d.strftime('%Y-%m-%d')}.md"
def read_semantic_memory() -> str:
path = semantic_memory_path()
return path.read_text(encoding="utf-8").strip() if path.exists() else ""
def read_episodic_memory(d: date | None = None) -> str:
path = episodic_memory_path(d)
return path.read_text(encoding="utf-8").strip() if path.exists() else ""
def _find_recent_episodic(lookback: int = 7) -> tuple[date, str] | None:
"""Find the most recent non-empty episodic memory within *lookback* days."""
from datetime import timedelta
today = date.today()
for offset in range(lookback):
d = today - timedelta(days=offset)
content = read_episodic_memory(d)
if content:
return d, content
return None
# Budget (in characters) for episodic memory in the system prompt.
_EPISODIC_CHAR_BUDGET = 6_000
def format_for_injection() -> str:
"""Format cross-session memory for system prompt injection.
Returns an empty string if no meaningful content exists yet (e.g. first
session with only the seed template).
"""
semantic = read_semantic_memory()
recent = _find_recent_episodic()
# Suppress injection if semantic is still just the seed template
if semantic and semantic.startswith("# My Understanding of the User\n\n*No sessions"):
semantic = ""
parts: list[str] = []
if semantic:
parts.append(semantic)
if recent:
d, content = recent
# Trim oversized episodic entries to keep the prompt manageable
if len(content) > _EPISODIC_CHAR_BUDGET:
content = content[:_EPISODIC_CHAR_BUDGET] + "\n\n…(truncated)"
today = date.today()
if d == today:
label = f"## Today — {format_memory_date(d)}"
else:
label = f"## {format_memory_date(d)}"
parts.append(f"{label}\n\n{content}")
if not parts:
return ""
body = "\n\n---\n\n".join(parts)
return "--- Your Cross-Session Memory ---\n\n" + body + "\n\n--- End Cross-Session Memory ---"
_SEED_TEMPLATE = """\
# My Understanding of the User
*No sessions recorded yet.*
## Who They Are
## How They Communicate
## What They're Trying to Achieve
## What's Working
## What I've Learned
"""
def append_episodic_entry(content: str) -> None:
"""Append a timestamped prose entry to today's episodic memory file.
Creates the file (with a date heading) if it doesn't exist yet.
Used both by the queen's diary tool and by the consolidation hook.
"""
ep_path = episodic_memory_path()
ep_path.parent.mkdir(parents=True, exist_ok=True)
today = date.today()
today_str = format_memory_date(today)
timestamp = datetime.now().strftime("%H:%M")
if not ep_path.exists():
header = f"# {today_str}\n\n"
block = f"{header}### {timestamp}\n\n{content.strip()}\n"
else:
block = f"\n\n### {timestamp}\n\n{content.strip()}\n"
with ep_path.open("a", encoding="utf-8") as f:
f.write(block)
def seed_if_missing() -> None:
"""Create MEMORY.md with a blank template if it doesn't exist yet."""
path = semantic_memory_path()
if path.exists():
return
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(_SEED_TEMPLATE, encoding="utf-8")
# ---------------------------------------------------------------------------
# Consolidation prompt
# ---------------------------------------------------------------------------
_SEMANTIC_SYSTEM = """\
You maintain the persistent cross-session memory of an AI assistant called the Queen.
Review the session notes and rewrite MEMORY.md the Queen's durable understanding of the
person she works with across all sessions.
Write entirely in the Queen's voice — first person, reflective, honest.
Not a log of events, but genuine understanding of who this person is over time.
Rules:
- Update and synthesise: incorporate new understanding, update facts that have changed, remove
details that are stale, superseded, or no longer say anything meaningful about the person.
- Keep it as structured markdown with named sections about the PERSON, not about today.
- Do NOT include diary sections, daily logs, or session summaries. Those belong elsewhere.
MEMORY.md is about who they are, what they want, what works not what happened today.
- Maintain a "How They Communicate" section: technical depth, preferred pace
(fast/exploratory/thorough), what communication approaches have worked or not,
tone preferences. Update based on diary reflections about communication.
This section should evolve "prefers direct answers" is useful on day 1;
"prefers direct answers for technical questions but wants more context when
discussing architecture trade-offs" is better by day 5.
- Reference dates only when noting a lasting milestone (e.g. "since March 8th they prefer X").
- If the session had no meaningful new information about the person,
return the existing text unchanged.
- Do not add fictional details. Only reflect what is evidenced in the notes.
- Stay concise. Prune rather than accumulate. A lean, accurate file is more useful than a
dense one. If something was true once but has been resolved or superseded, remove it.
- Output only the raw markdown content of MEMORY.md. No preamble, no code fences.
"""
_DIARY_SYSTEM = """\
You maintain the daily episodic diary of an AI assistant called the Queen.
You receive: (1) today's existing diary so far, and (2) notes from the latest session.
Rewrite the complete diary for today as a single unified narrative
first person, reflective, honest.
Merge and deduplicate: if the same story (e.g. a research agent stalling) recurred several times,
describe it once with appropriate weight rather than retelling it. Weave in new developments from
the session notes. Preserve important milestones, emotional texture, and session path references.
Preserve reflections about communication effectiveness these are important inputs for the
Queen's evolving understanding of the user. A reflection like "they responded much better when
I led with the recommendation instead of listing options" is as important as
"we built a Gmail agent."
If today's diary is empty, write the initial entry based on the session notes alone.
Output only the full diary prose no date heading, no timestamp headers,
no preamble, no code fences.
"""
def read_session_context(session_dir: Path, max_messages: int = 80) -> str:
"""Extract a readable transcript from conversation parts + adapt.md.
Reads the last ``max_messages`` conversation parts and the session's
adapt.md (working memory). Tool results are omitted only user and
assistant turns (with tool-call names noted) are included.
"""
parts: list[str] = []
# Working notes
adapt_path = session_dir / "data" / "adapt.md"
if adapt_path.exists():
text = adapt_path.read_text(encoding="utf-8").strip()
if text:
parts.append(f"## Session Working Notes (adapt.md)\n\n{text}")
# Conversation transcript
parts_dir = session_dir / "conversations" / "parts"
if parts_dir.exists():
part_files = sorted(parts_dir.glob("*.json"))[-max_messages:]
lines: list[str] = []
for pf in part_files:
try:
data = json.loads(pf.read_text(encoding="utf-8"))
role = data.get("role", "")
content = str(data.get("content", "")).strip()
tool_calls = data.get("tool_calls") or []
if role == "tool":
continue # skip verbose tool results
if role == "assistant" and tool_calls and not content:
names = [tc.get("function", {}).get("name", "?") for tc in tool_calls]
lines.append(f"[queen calls: {', '.join(names)}]")
elif content:
label = "user" if role == "user" else "queen"
lines.append(f"[{label}]: {content[:600]}")
except (KeyError, TypeError) as exc:
logger.debug("Skipping malformed conversation message: %s", exc)
continue
except Exception:
logger.warning("Unexpected error parsing conversation message", exc_info=True)
continue
if lines:
parts.append("## Conversation\n\n" + "\n".join(lines))
return "\n\n".join(parts)
# ---------------------------------------------------------------------------
# Context compaction (binary-split LLM summarisation)
# ---------------------------------------------------------------------------
# If the raw session context exceeds this many characters, compact it first
# before sending to the consolidation LLM. ~200 k chars ≈ 50 k tokens.
_CTX_COMPACT_CHAR_LIMIT = 200_000
_CTX_COMPACT_MAX_DEPTH = 8
_COMPACT_SYSTEM = (
"Summarise this conversation segment. Preserve: user goals, key decisions, "
"what was built or changed, emotional tone, and important outcomes. "
"Write concisely in third person past tense. Omit routine tool invocations "
"unless the result matters."
)
async def _compact_context(text: str, llm: object, *, _depth: int = 0) -> str:
"""Binary-split and LLM-summarise *text* until it fits within the char limit.
Mirrors the recursive binary-splitting strategy used by the main agent
compaction pipeline (EventLoopNode._llm_compact).
"""
if len(text) <= _CTX_COMPACT_CHAR_LIMIT or _depth >= _CTX_COMPACT_MAX_DEPTH:
return text
# Split near the midpoint on a line boundary so we don't cut mid-message
mid = len(text) // 2
split_at = text.rfind("\n", 0, mid) + 1
if split_at <= 0:
split_at = mid
half1, half2 = text[:split_at], text[split_at:]
async def _summarise(chunk: str) -> str:
try:
resp = await llm.acomplete(
messages=[{"role": "user", "content": chunk}],
system=_COMPACT_SYSTEM,
max_tokens=2048,
)
return resp.content.strip()
except Exception:
logger.warning(
"queen_memory: context compaction LLM call failed (depth=%d), truncating",
_depth,
)
return chunk[: _CTX_COMPACT_CHAR_LIMIT // 4]
s1, s2 = await asyncio.gather(_summarise(half1), _summarise(half2))
combined = s1 + "\n\n" + s2
if len(combined) > _CTX_COMPACT_CHAR_LIMIT:
return await _compact_context(combined, llm, _depth=_depth + 1)
return combined
async def consolidate_queen_memory(
session_id: str,
session_dir: Path,
llm: object,
) -> None:
"""Update MEMORY.md and append a diary entry based on the current session.
Reads conversation parts and adapt.md from session_dir. Called
periodically in the background and once at session end. Failures are
logged and silently swallowed so they never block teardown.
Args:
session_id: The session ID (used for the adapt.md path reference).
session_dir: Path to the session directory (~/.hive/queen/session/{id}).
llm: LLMProvider instance (must support acomplete()).
"""
try:
session_context = read_session_context(session_dir)
if not session_context:
logger.debug("queen_memory: no session context, skipping consolidation")
return
logger.info("queen_memory: consolidating memory for session %s ...", session_id)
# If the transcript is very large, compact it with recursive binary LLM
# summarisation before sending to the consolidation model.
if len(session_context) > _CTX_COMPACT_CHAR_LIMIT:
logger.info(
"queen_memory: session context is %d chars — compacting first",
len(session_context),
)
session_context = await _compact_context(session_context, llm)
logger.info("queen_memory: compacted to %d chars", len(session_context))
existing_semantic = read_semantic_memory()
today_journal = read_episodic_memory()
today = date.today()
today_str = format_memory_date(today)
adapt_path = session_dir / "data" / "adapt.md"
user_msg = (
f"## Existing Semantic Memory (MEMORY.md)\n\n"
f"{existing_semantic or '(none yet)'}\n\n"
f"## Today's Diary So Far ({today_str})\n\n"
f"{today_journal or '(none yet)'}\n\n"
f"{session_context}\n\n"
f"## Session Reference\n\n"
f"Session ID: {session_id}\n"
f"Session path: {adapt_path}\n"
)
logger.debug(
"queen_memory: calling LLM (%d chars of context, ~%d tokens est.)",
len(user_msg),
len(user_msg) // 4,
)
from framework.agents.queen.config import default_config
semantic_resp, diary_resp = await asyncio.gather(
llm.acomplete(
messages=[{"role": "user", "content": user_msg}],
system=_SEMANTIC_SYSTEM,
max_tokens=default_config.max_tokens,
),
llm.acomplete(
messages=[{"role": "user", "content": user_msg}],
system=_DIARY_SYSTEM,
max_tokens=default_config.max_tokens,
),
)
new_semantic = semantic_resp.content.strip()
diary_entry = diary_resp.content.strip()
if new_semantic:
path = semantic_memory_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(new_semantic, encoding="utf-8")
logger.info("queen_memory: semantic memory updated (%d chars)", len(new_semantic))
if diary_entry:
# Rewrite today's episodic file in-place — the LLM has merged and
# deduplicated the full day's content, so we replace rather than append.
ep_path = episodic_memory_path()
ep_path.parent.mkdir(parents=True, exist_ok=True)
heading = f"# {today_str}"
ep_path.write_text(f"{heading}\n\n{diary_entry}\n", encoding="utf-8")
logger.info(
"queen_memory: episodic diary rewritten for %s (%d chars)",
today_str,
len(diary_entry),
)
except Exception:
tb = traceback.format_exc()
logger.exception("queen_memory: consolidation failed")
# Write to file so the cause is findable regardless of log verbosity.
error_path = _queen_dir() / "consolidation_error.txt"
try:
error_path.parent.mkdir(parents=True, exist_ok=True)
error_path.write_text(
f"session: {session_id}\ntime: {datetime.now().isoformat()}\n\n{tb}",
encoding="utf-8",
)
except OSError:
pass # Cannot write error file; original exception already logged
+14 -341
View File
@@ -1,24 +1,17 @@
"""Shared memory helpers for queen/worker recall and reflection.
"""Queen global memory helpers.
Each memory is an individual ``.md`` file in ``~/.hive/queen/memories/``
with optional YAML frontmatter (name, type, description). Frontmatter
is a convention enforced by prompt instructions parsing is lenient and
malformed files degrade gracefully (appear in scans with ``None`` metadata).
Cursor-based incremental processing tracks which conversation messages
have already been processed by the reflection agent.
Global memory lives in ``~/.hive/queen/global_memory/`` and stores durable
cross-session knowledge about the user (profile, preferences, environment,
feedback). Each memory is an individual ``.md`` file with optional YAML
frontmatter (name, type, description).
"""
from __future__ import annotations
import logging
import re
import shutil
import time
from dataclasses import dataclass, field
from datetime import date
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
@@ -26,49 +19,15 @@ logger = logging.getLogger(__name__)
# Constants
# ---------------------------------------------------------------------------
MEMORY_TYPES: tuple[str, ...] = ("goal", "environment", "technique", "reference", "diary")
GLOBAL_MEMORY_CATEGORIES: tuple[str, ...] = ("profile", "preference", "environment", "feedback")
_HIVE_QUEEN_DIR = Path.home() / ".hive" / "queen"
# Legacy shared v2 root. Colony memory now lives under queen sessions.
MEMORY_DIR: Path = _HIVE_QUEEN_DIR / "memories"
MAX_FILES: int = 200
MAX_FILE_SIZE_BYTES: int = 4096 # 4 KB hard limit per memory file
# How many lines of a memory file to read for header scanning.
_HEADER_LINE_LIMIT: int = 30
_MIGRATION_MARKER = ".migrated-from-shared-memory"
_GLOBAL_MEMORY_CODE_PATTERN = re.compile(
r"(/Users/|~/.hive|\.py\b|\.ts\b|\.tsx\b|\.js\b|"
r"\b(graph|node|runtime|session|execution|worker|queen|subagent|checkpoint|flowchart)\b)",
re.IGNORECASE,
)
# Frontmatter example provided to the reflection agent via prompt.
MEMORY_FRONTMATTER_EXAMPLE: list[str] = [
"```markdown",
"---",
"name: {{memory name}}",
(
"description: {{one-line description — used to decide "
"relevance in future conversations, so be specific}}"
),
f"type: {{{{{', '.join(MEMORY_TYPES)}}}}}",
"---",
"",
(
"{{memory content — for feedback/project types, "
"structure as: rule/fact, then **Why:** "
"and **How to apply:** lines}}"
),
"```",
]
def colony_memory_dir(colony_id: str) -> Path:
"""Return the colony memory directory for a queen session."""
return _HIVE_QUEEN_DIR / "session" / colony_id / "memory" / "colony"
def global_memory_dir() -> Path:
@@ -107,15 +66,6 @@ def parse_frontmatter(text: str) -> dict[str, str]:
return result
def parse_memory_type(raw: str | None) -> str | None:
"""Validate *raw* against supported memory categories."""
if raw is None:
return None
normalized = raw.strip().lower()
allowed = set(MEMORY_TYPES) | set(GLOBAL_MEMORY_CATEGORIES)
return normalized if normalized in allowed else None
def parse_global_memory_category(raw: str | None) -> str | None:
"""Validate *raw* against ``GLOBAL_MEMORY_CATEGORIES``."""
if raw is None:
@@ -164,7 +114,7 @@ class MemoryFile:
filename=path.name,
path=path,
name=fm.get("name"),
type=parse_memory_type(fm.get("type")),
type=parse_global_memory_category(fm.get("type")),
description=fm.get("description"),
header_lines=lines,
mtime=mtime,
@@ -182,7 +132,7 @@ def scan_memory_files(memory_dir: Path | None = None) -> list[MemoryFile]:
Files are sorted by modification time (newest first). Dotfiles and
subdirectories are ignored.
"""
d = memory_dir or MEMORY_DIR
d = memory_dir or global_memory_dir()
if not d.is_dir():
return []
@@ -235,307 +185,30 @@ def build_memory_document(
)
def diary_filename(d: date | None = None) -> str:
"""Return the diary memory filename for date *d* (default: today)."""
d = d or date.today()
return f"MEMORY-{d.strftime('%Y-%m-%d')}.md"
def build_diary_document(*, date_str: str, body: str) -> str:
"""Build a diary memory file with frontmatter."""
return build_memory_document(
name=f"diary-{date_str}",
description=f"Daily session narrative for {date_str}",
mem_type="diary",
body=body,
)
def validate_global_memory_payload(
*,
category: str,
description: str,
content: str,
) -> str:
"""Validate a queen-global memory save request."""
parsed = parse_global_memory_category(category)
if parsed is None:
raise ValueError(
"Invalid global memory category. Use one of: " + ", ".join(GLOBAL_MEMORY_CATEGORIES)
)
if not description.strip():
raise ValueError("Global memory description cannot be empty.")
if not content.strip():
raise ValueError("Global memory content cannot be empty.")
probe = f"{description}\n{content}"
if _GLOBAL_MEMORY_CODE_PATTERN.search(probe):
raise ValueError(
"Global memory is only for durable user profile, preferences, "
"environment, or feedback — not task/code/runtime details."
)
return parsed
def save_global_memory(
*,
category: str,
description: str,
content: str,
name: str | None = None,
memory_dir: Path | None = None,
) -> tuple[str, Path]:
"""Persist one queen-global memory entry."""
parsed = validate_global_memory_payload(
category=category,
description=description,
content=content,
)
target_dir = memory_dir or global_memory_dir()
target_dir.mkdir(parents=True, exist_ok=True)
memory_name = (name or description).strip()
filename = allocate_memory_filename(target_dir, memory_name)
doc = build_memory_document(
name=memory_name,
description=description,
mem_type=parsed,
body=content,
)
if len(doc.encode("utf-8")) > MAX_FILE_SIZE_BYTES:
raise ValueError(f"Global memory entry exceeds the {MAX_FILE_SIZE_BYTES} byte limit.")
path = target_dir / filename
path.write_text(doc, encoding="utf-8")
return filename, path
# ---------------------------------------------------------------------------
# Manifest formatting
# ---------------------------------------------------------------------------
def _age_label(mtime: float) -> str:
"""Human-readable age string from an mtime."""
age_days = memory_age_days(mtime)
if age_days <= 0:
return "today"
if age_days == 1:
return "1 day ago"
return f"{age_days} days ago"
def format_memory_manifest(files: list[MemoryFile]) -> str:
"""One-line-per-file text manifest for the recall selector / reflection agent.
"""One-line-per-file text manifest.
Format: ``[type] filename (age): description``
Format: ``[type] filename: description``
"""
lines: list[str] = []
for mf in files:
t = mf.type or "unknown"
desc = mf.description or "(no description)"
age = _age_label(mf.mtime)
lines.append(f"[{t}] {mf.filename} ({age}): {desc}")
lines.append(f"[{t}] {mf.filename}: {desc}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Freshness / staleness
# ---------------------------------------------------------------------------
_SECONDS_PER_DAY = 86_400
def memory_age_days(mtime: float) -> int:
"""Return the age of a memory file in whole days."""
if mtime <= 0:
return 0
return int((time.time() - mtime) / _SECONDS_PER_DAY)
def memory_freshness_text(mtime: float) -> str:
"""Return a staleness warning for injection, or empty string if fresh."""
d = memory_age_days(mtime)
if d <= 1:
return ""
return (
f"This memory is {d} days old. "
"Memories are point-in-time observations, not live state — "
"claims about code behavior or file:line citations may be outdated. "
"Verify against current code before asserting as fact."
)
# ---------------------------------------------------------------------------
# Cursor-based incremental processing
# Initialisation
# ---------------------------------------------------------------------------
async def read_conversation_parts(session_dir: Path) -> list[dict[str, Any]]:
"""Read all conversation parts for a session using FileConversationStore.
Returns a list of raw message dicts in sequence order.
"""
from framework.storage.conversation_store import FileConversationStore
store = FileConversationStore(session_dir / "conversations")
return await store.read_parts()
# ---------------------------------------------------------------------------
# Initialisation and legacy migration
# ---------------------------------------------------------------------------
def init_memory_dir(
memory_dir: Path | None = None,
*,
migrate_legacy: bool = False,
) -> None:
"""Create the memory directory if missing.
When ``migrate_legacy`` is true, migrate both v1 memory files and the
previous shared v2 queen memory store into this directory.
"""
d = memory_dir or MEMORY_DIR
first_run = not d.exists()
def init_memory_dir(memory_dir: Path | None = None) -> None:
"""Create the memory directory if missing."""
d = memory_dir or global_memory_dir()
d.mkdir(parents=True, exist_ok=True)
if migrate_legacy:
migrate_legacy_memories(d)
migrate_shared_v2_memories(d)
elif first_run and d == MEMORY_DIR:
migrate_legacy_memories(d)
def migrate_legacy_memories(memory_dir: Path | None = None) -> None:
"""Convert old MEMORY.md + MEMORY-YYYY-MM-DD.md files to individual memory files.
Originals are moved to ``{memory_dir}/.legacy/``.
"""
d = memory_dir or MEMORY_DIR
queen_dir = _HIVE_QUEEN_DIR
legacy_archive = d / ".legacy"
migrated_any = False
# --- Semantic memory (MEMORY.md) ---
semantic = queen_dir / "MEMORY.md"
if semantic.exists():
content = semantic.read_text(encoding="utf-8").strip()
# Skip the blank seed template.
if content and not content.startswith("# My Understanding of the User\n\n*No sessions"):
_write_migration_file(
d,
filename="legacy-semantic-memory.md",
name="legacy-semantic-memory",
mem_type="reference",
description="Migrated semantic memory from previous memory system",
body=content,
)
migrated_any = True
# Archive original.
legacy_archive.mkdir(parents=True, exist_ok=True)
semantic.rename(legacy_archive / "MEMORY.md")
# --- Episodic memories (MEMORY-YYYY-MM-DD.md) ---
old_memories_dir = queen_dir / "memories"
if old_memories_dir.is_dir():
for ep_file in sorted(old_memories_dir.glob("MEMORY-*.md")):
content = ep_file.read_text(encoding="utf-8").strip()
if not content:
continue
date_part = ep_file.stem.replace("MEMORY-", "")
slug = f"legacy-diary-{date_part}.md"
_write_migration_file(
d,
filename=slug,
name=f"legacy-diary-{date_part}",
mem_type="diary",
description=f"Migrated diary entry from {date_part}",
body=content,
)
migrated_any = True
# Archive original.
legacy_archive.mkdir(parents=True, exist_ok=True)
ep_file.rename(legacy_archive / ep_file.name)
if migrated_any:
logger.info("queen_memory_v2: migrated legacy memory files to %s", d)
def migrate_shared_v2_memories(
memory_dir: Path | None = None,
*,
source_dir: Path | None = None,
) -> None:
"""Move shared queen v2 memory files into a colony directory once."""
d = memory_dir or MEMORY_DIR
d.mkdir(parents=True, exist_ok=True)
src = source_dir or MEMORY_DIR
if d.resolve() == src.resolve():
return
marker = d / _MIGRATION_MARKER
if marker.exists():
return
if not src.is_dir():
return
md_files = sorted(f for f in src.glob("*.md") if f.is_file() and not f.name.startswith("."))
if not md_files:
marker.write_text("no shared memories found\n", encoding="utf-8")
return
archive = src / ".legacy_colony_migration"
archive.mkdir(parents=True, exist_ok=True)
migrated_any = False
for src_file in md_files:
target = d / src_file.name
if not target.exists():
try:
shutil.copy2(src_file, target)
migrated_any = True
except OSError:
logger.debug("shared memory migration copy failed for %s", src_file, exc_info=True)
continue
archived = archive / src_file.name
counter = 2
while archived.exists():
archived = archive / f"{src_file.stem}-{counter}{src_file.suffix}"
counter += 1
try:
src_file.rename(archived)
except OSError:
logger.debug("shared memory migration archive failed for %s", src_file, exc_info=True)
if migrated_any:
logger.info("queen_memory_v2: migrated shared queen memories to %s", d)
marker.write_text(
f"migrated_at={int(time.time())}\nsource={src}\n",
encoding="utf-8",
)
def _write_migration_file(
memory_dir: Path,
filename: str,
name: str,
mem_type: str,
description: str,
body: str,
) -> None:
"""Write a single migrated memory file with frontmatter."""
# Truncate body to respect file size limit (leave room for frontmatter).
header = f"---\nname: {name}\ndescription: {description}\ntype: {mem_type}\n---\n\n"
max_body = MAX_FILE_SIZE_BYTES - len(header.encode("utf-8"))
if len(body.encode("utf-8")) > max_body:
# Rough truncation — cut at character level then trim to last newline.
body = body[: max_body - 20]
nl = body.rfind("\n")
if nl > 0:
body = body[:nl]
body += "\n\n...(truncated during migration)"
path = memory_dir / filename
path.write_text(header + body + "\n", encoding="utf-8")
+24 -131
View File
@@ -1,11 +1,11 @@
"""Recall selector — pre-turn memory selection for queen and worker memory.
"""Recall selector — pre-turn global memory selection for the queen.
Before each conversation turn the system:
1. Scans the memory directory for ``.md`` files (cap: 200).
1. Scans the global memory directory for ``.md`` files (cap: 200).
2. Reads headers (frontmatter + first 30 lines).
3. Uses a single LLM call with structured JSON output to pick the ~5
most relevant memories.
4. Injects them into context with staleness warnings for older ones.
4. Injects them into the system prompt.
The selector only sees the user's query string — no full conversation
context. This keeps it cheap and fast. Errors are caught and return
@@ -20,9 +20,8 @@ from pathlib import Path
from typing import Any
from framework.agents.queen.queen_memory_v2 import (
MEMORY_DIR,
format_memory_manifest,
memory_freshness_text,
global_memory_dir,
scan_memory_files,
)
@@ -32,29 +31,6 @@ logger = logging.getLogger(__name__)
# Structured output schema
# ---------------------------------------------------------------------------
RECALL_SCHEMA: dict[str, Any] = {
"type": "json_schema",
"json_schema": {
"name": "memory_selection",
"strict": True,
"schema": {
"type": "object",
"properties": {
"selected_memories": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["selected_memories"],
"additionalProperties": False,
},
},
}
# ---------------------------------------------------------------------------
# System prompt
# ---------------------------------------------------------------------------
SELECT_MEMORIES_SYSTEM_PROMPT = """\
You are selecting memories that will be useful to the Queen agent as it \
processes a user's query.
@@ -72,9 +48,6 @@ name and description.
query, then do not include it in your list. Be selective and discerning.
- If there are no memories in the list that would clearly be useful, \
return an empty list.
- If a list of recently-used tools is provided, do not select memories \
that are usage reference or API documentation for those tools (the Queen \
is already exercising them). Still select warnings or gotchas about them.
"""
# ---------------------------------------------------------------------------
@@ -86,7 +59,6 @@ async def select_memories(
query: str,
llm: Any,
memory_dir: Path | None = None,
active_tools: list[str] | None = None,
*,
max_results: int = 5,
) -> list[str]:
@@ -94,51 +66,48 @@ async def select_memories(
Returns a list of filenames. Best-effort: on any error returns ``[]``.
"""
mem_dir = memory_dir or MEMORY_DIR
mem_dir = memory_dir or global_memory_dir()
files = scan_memory_files(mem_dir)
if not files:
logger.debug("recall: no memory files found, skipping selection")
return []
logger.debug("recall: selecting from %d memory files for query: %.80s", len(files), query)
logger.debug("recall: selecting from %d memories for query: %.100s", len(files), query)
manifest = format_memory_manifest(files)
user_msg_parts = [f"## User query\n\n{query}\n\n## Available memories\n\n{manifest}"]
if active_tools:
user_msg_parts.append(f"\n\n## Recently-used tools\n\n{', '.join(active_tools)}")
user_msg = "".join(user_msg_parts)
user_msg = f"## User query\n\n{query}\n\n## Available memories\n\n{manifest}"
try:
resp = await llm.acomplete(
messages=[{"role": "user", "content": user_msg}],
system=SELECT_MEMORIES_SYSTEM_PROMPT,
max_tokens=512,
response_format=RECALL_SCHEMA,
max_tokens=1024,
response_format={"type": "json_object"},
)
data = json.loads(resp.content)
raw = (resp.content or "").strip()
if not raw:
logger.warning(
"recall: LLM returned empty response (model=%s, stop=%s)",
resp.model,
resp.stop_reason,
)
return []
data = json.loads(raw)
selected = data.get("selected_memories", [])
# Validate: only return filenames that actually exist.
valid_names = {f.filename for f in files}
result = [s for s in selected if s in valid_names][:max_results]
logger.debug("recall: selected %d memories: %s", len(result), result)
return result
except Exception:
logger.debug("recall: memory selection failed, returning []", exc_info=True)
except Exception as exc:
logger.warning("recall: memory selection failed (%s), returning []", exc)
return []
def format_recall_injection(
filenames: list[str],
memory_dir: Path | None = None,
*,
heading: str = "Selected Memories",
) -> str:
"""Read selected memory files and format for system prompt injection.
Prepends a staleness warning for memories older than 1 day.
"""
mem_dir = memory_dir or MEMORY_DIR
"""Read selected memory files and format for system prompt injection."""
mem_dir = memory_dir or global_memory_dir()
if not filenames:
return ""
@@ -151,86 +120,10 @@ def format_recall_injection(
content = path.read_text(encoding="utf-8").strip()
except OSError:
continue
try:
mtime = path.stat().st_mtime
except OSError:
mtime = 0.0
freshness = memory_freshness_text(mtime)
header = f"### {fname}"
if freshness:
header += f"\n\n> {freshness}"
blocks.append(f"{header}\n\n{content}")
blocks.append(f"### {fname}\n\n{content}")
if not blocks:
return ""
body = "\n\n---\n\n".join(blocks)
logger.debug("recall: injecting %d memory blocks into context", len(blocks))
return f"--- {heading} ---\n\n{body}\n\n--- End {heading} ---"
# ---------------------------------------------------------------------------
# Cache update (called after each queen turn)
# ---------------------------------------------------------------------------
async def update_recall_cache(
session_dir: Path,
llm: Any,
phase_state: Any | None = None,
memory_dir: Path | None = None,
*,
cache_setter: Any = None,
heading: str = "Selected Memories",
active_tools: list[str] | None = None,
) -> None:
"""Update the recall cache on *phase_state* for the next turn.
Reads the latest user message from conversation parts to use as the
query for memory selection.
"""
mem_dir = memory_dir or MEMORY_DIR
# Extract latest user message as the query.
query = _extract_latest_user_query(session_dir)
if not query:
logger.debug("recall: no user query found, skipping cache update")
return
logger.debug("recall: updating cache for query: %.80s", query)
try:
selected = await select_memories(
query,
llm,
mem_dir,
active_tools=active_tools,
)
injection = format_recall_injection(selected, mem_dir, heading=heading)
if cache_setter is not None:
cache_setter(injection)
elif phase_state is not None:
phase_state._cached_recall_block = injection
except Exception:
logger.debug("recall: cache update failed", exc_info=True)
def _extract_latest_user_query(session_dir: Path) -> str:
"""Read the most recent user message from conversation parts."""
parts_dir = session_dir / "conversations" / "parts"
if not parts_dir.is_dir():
return ""
part_files = sorted(parts_dir.glob("*.json"), reverse=True)
for f in part_files[:20]: # Look back at most 20 messages.
try:
data = json.loads(f.read_text(encoding="utf-8"))
if data.get("role") == "user":
content = str(data.get("content", "")).strip()
if content:
# Truncate very long queries.
return content[:1000] if len(content) > 1000 else content
except (json.JSONDecodeError, OSError):
continue
return ""
return f"--- Global Memories ---\n\n{body}\n\n--- End Global Memories ---"
+164 -403
View File
@@ -1,21 +1,20 @@
"""Reflect agent — background memory extraction for queen and worker memory.
"""Reflection agent — background global memory extraction for the queen.
A lightweight side agent that runs after each queen LLM turn. It
inspects recent conversation messages (cursor-based incremental
processing) and extracts learnings into individual memory files.
A lightweight side agent that runs after each queen LLM turn. It inspects
recent conversation messages and extracts durable user knowledge into
individual memory files in ``~/.hive/queen/global_memory/``.
Two reflection types:
- **Short reflection**: every queen turn. Distills learnings. Nudged
toward a 2-turn pattern (batch reads batch writes).
- **Long reflection**: every 5 short reflections, on CONTEXT_COMPACTED,
and at session end. Organises, deduplicates, trims holistically.
- **Short reflection**: after conversational queen turns. Distills
learnings about the user (profile, preferences, environment, feedback).
- **Long reflection**: every 5 short reflections and on CONTEXT_COMPACTED.
Organises, deduplicates, trims the global memory directory.
The agent has restricted tool access: it can only read/write/delete
memory files in ``~/.hive/queen/memories/`` and list them.
Concurrency: an ``asyncio.Lock`` prevents overlapping runs. If a trigger
fires while a reflection is already active the event is skipped.
Concurrency: an ``asyncio.Lock`` prevents overlapping runs. If a
trigger fires while a reflection is already active the event is skipped
(cursor hasn't advanced, so messages will be reconsidered next time).
All reflections are fire-and-forget (spawned via ``asyncio.create_task``)
so they never block the queen's event loop.
"""
from __future__ import annotations
@@ -23,23 +22,18 @@ from __future__ import annotations
import asyncio
import json
import logging
import re
import traceback
from datetime import datetime
from pathlib import Path
from typing import Any
from framework.agents.queen.queen_memory_v2 import (
GLOBAL_MEMORY_CATEGORIES,
MAX_FILE_SIZE_BYTES,
MAX_FILES,
MEMORY_DIR,
MEMORY_FRONTMATTER_EXAMPLE,
MEMORY_TYPES,
build_diary_document,
diary_filename,
format_memory_manifest,
global_memory_dir,
parse_frontmatter,
read_conversation_parts,
scan_memory_files,
)
from framework.llm.provider import LLMResponse, Tool
@@ -54,7 +48,7 @@ _REFLECTION_TOOLS: list[Tool] = [
Tool(
name="list_memory_files",
description=(
"List all memory files with their type, name, age, and description. "
"List all memory files with their type, name, and description. "
"Returns a text manifest — one line per file."
),
parameters={
@@ -135,28 +129,7 @@ def _safe_memory_path(filename: str, memory_dir: Path) -> Path:
return candidate
# Memory types that workers are NOT allowed to write.
_WORKER_BLOCKED_TYPES: frozenset[str] = frozenset(
{"environment", "technique", "reference", "diary", "goal"}
)
def _inject_last_modified_by(content: str, caller: str) -> str:
"""Inject or update ``last_modified_by`` in frontmatter."""
m = re.match(r"^---\s*\n(.*?)\n---", content, re.DOTALL)
if not m:
return content
fm_body = m.group(1)
# Remove existing last_modified_by line if present.
fm_lines = [
ln for ln in fm_body.splitlines() if not ln.strip().lower().startswith("last_modified_by")
]
fm_lines.append(f"last_modified_by: {caller}")
new_fm = "\n".join(fm_lines)
return f"---\n{new_fm}\n---{content[m.end() :]}"
def _execute_tool(name: str, args: dict[str, Any], memory_dir: Path, caller: str) -> str:
def _execute_tool(name: str, args: dict[str, Any], memory_dir: Path) -> str:
"""Execute a reflection tool synchronously. Returns the result string."""
if name == "list_memory_files":
files = scan_memory_files(memory_dir)
@@ -183,16 +156,14 @@ def _execute_tool(name: str, args: dict[str, Any], memory_dir: Path, caller: str
content = args.get("content", "")
if not filename.endswith(".md"):
return "ERROR: Filename must end with .md"
# Enforce caller-based type restrictions.
# Enforce global memory type restrictions.
fm = parse_frontmatter(content)
mem_type = (fm.get("type") or "").strip().lower()
if caller == "worker" and mem_type in _WORKER_BLOCKED_TYPES:
if mem_type and mem_type not in GLOBAL_MEMORY_CATEGORIES:
return (
f"ERROR: Workers cannot write memory type '{mem_type}'. "
f"Blocked types for workers: {', '.join(sorted(_WORKER_BLOCKED_TYPES))}."
f"ERROR: Invalid memory type '{mem_type}'. "
f"Allowed types: {', '.join(GLOBAL_MEMORY_CATEGORIES)}."
)
# Inject last_modified_by into frontmatter.
content = _inject_last_modified_by(content, caller)
# Enforce file size limit.
if len(content.encode("utf-8")) > MAX_FILE_SIZE_BYTES:
return f"ERROR: Content exceeds {MAX_FILE_SIZE_BYTES} byte limit."
@@ -207,9 +178,7 @@ def _execute_tool(name: str, args: dict[str, Any], memory_dir: Path, caller: str
return f"ERROR: File cap reached ({MAX_FILES}). Delete a file first."
memory_dir.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
logger.debug(
"reflect: tool write_memory_file [%s] → %s (%d chars)", caller, filename, len(content)
)
logger.debug("reflect: tool write_memory_file → %s (%d chars)", filename, len(content))
return f"Wrote {filename} ({len(content)} chars)."
if name == "delete_memory_file":
@@ -221,7 +190,7 @@ def _execute_tool(name: str, args: dict[str, Any], memory_dir: Path, caller: str
if not path.exists():
return f"ERROR: File not found: {filename}"
path.unlink()
logger.debug("reflect: tool delete_memory_file [%s]%s", caller, filename)
logger.debug("reflect: tool delete_memory_file → %s", filename)
return f"Deleted {filename}."
return f"ERROR: Unknown tool: {name}"
@@ -239,36 +208,18 @@ async def _reflection_loop(
system: str,
user_msg: str,
memory_dir: Path,
caller: str,
max_turns: int = _MAX_TURNS,
) -> tuple[bool, list[str], str]:
"""Run a mini tool-use loop: LLM → tool calls → repeat.
Hard cap of *max_turns* iterations. Prompt nudges the LLM toward a
2-turn pattern (batch reads in turn 1, batch writes in turn 2).
Returns a tuple of (success, changed_files, last_text) where *success*
is ``True`` if the loop completed without LLM errors, *changed_files*
lists filenames that were written or deleted, and *last_text* is the
final assistant text (useful as a skip-reason when no files changed).
Returns (success, changed_files, last_text).
"""
messages: list[dict[str, Any]] = [{"role": "user", "content": user_msg}]
changed_files: list[str] = []
last_text: str = ""
logger.debug("reflect: starting loop (caller=%s, max %d turns)", caller, max_turns)
for _turn in range(max_turns):
# Log what we're sending to the LLM.
user_content = messages[-1].get("content", "") if messages else ""
preview = user_content[:300] if isinstance(user_content, str) else str(user_content)[:300]
logger.debug(
"reflect: turn %d — sending %d messages to LLM, last msg role=%s, preview=%s",
_turn,
len(messages),
messages[-1].get("role", "?") if messages else "?",
preview,
)
logger.info("reflect: loop turn %d/%d (msgs=%d)", _turn + 1, max_turns, len(messages))
try:
resp: LLMResponse = await llm.acomplete(
messages=messages,
@@ -276,45 +227,49 @@ async def _reflection_loop(
tools=_REFLECTION_TOOLS,
max_tokens=2048,
)
except asyncio.CancelledError:
logger.warning("reflect: LLM call cancelled (task cancelled)")
return False, changed_files, last_text
except Exception:
logger.warning("reflect: LLM call failed", exc_info=True)
return False, changed_files, last_text
# Build assistant message.
# Extract tool calls from litellm/OpenAI response object.
tool_calls_raw: list[dict[str, Any]] = []
if resp.raw_response and isinstance(resp.raw_response, dict):
tool_calls_raw = resp.raw_response.get("tool_calls", [])
raw = resp.raw_response
if raw is not None:
# litellm returns a ModelResponse object; tool calls live on
# choices[0].message.tool_calls as a list of ChatCompletionMessageToolCall.
try:
msg_obj = raw.choices[0].message
if hasattr(msg_obj, "tool_calls") and msg_obj.tool_calls:
for tc in msg_obj.tool_calls:
fn = tc.function
try:
args = json.loads(fn.arguments) if fn.arguments else {}
except (json.JSONDecodeError, TypeError):
args = {}
tool_calls_raw.append(
{
"id": tc.id,
"name": fn.name,
"input": args,
}
)
except (AttributeError, IndexError):
pass
# Log the full LLM response for debugging.
raw_keys = (
list(resp.raw_response.keys())
if isinstance(resp.raw_response, dict)
else type(resp.raw_response).__name__
)
logger.debug(
"reflect: turn %d — LLM response: content=%r (len=%d), stop_reason=%s, "
"tool_calls=%d, model=%s, tokens=%d/%d, raw_keys=%s",
_turn,
(resp.content or "")[:200],
logger.info(
"reflect: LLM responded, text=%d chars, tool_calls=%d",
len(resp.content or ""),
resp.stop_reason,
len(tool_calls_raw),
resp.model,
resp.input_tokens,
resp.output_tokens,
raw_keys,
)
# Accumulate non-empty text across turns so we don't lose a reason
# given alongside tool calls on an earlier turn.
turn_text = resp.content or ""
if turn_text:
last_text = turn_text
assistant_msg: dict[str, Any] = {
"role": "assistant",
"content": turn_text,
}
assistant_msg: dict[str, Any] = {"role": "assistant", "content": turn_text}
if tool_calls_raw:
# Convert to OpenAI format for the conversation.
assistant_msg["tool_calls"] = [
{
"id": tc["id"],
@@ -328,32 +283,16 @@ async def _reflection_loop(
]
messages.append(assistant_msg)
# No tool calls → agent is done.
if not tool_calls_raw:
logger.debug("reflect: loop done after %d turn(s) (no tool calls)", _turn + 1)
break
# Execute each tool call and append results.
logger.debug(
"reflect: turn %d — executing %d tool call(s): %s",
_turn + 1,
len(tool_calls_raw),
[tc["name"] for tc in tool_calls_raw],
)
for tc in tool_calls_raw:
result = _execute_tool(tc["name"], tc.get("input", {}), memory_dir, caller)
# Track files that were written or deleted.
result = _execute_tool(tc["name"], tc.get("input", {}), memory_dir)
if tc["name"] in ("write_memory_file", "delete_memory_file"):
fname = tc.get("input", {}).get("filename", "")
if fname and not result.startswith("ERROR"):
changed_files.append(fname)
messages.append(
{
"role": "tool",
"tool_call_id": tc["id"],
"content": result,
}
)
messages.append({"role": "tool", "tool_call_id": tc["id"], "content": result})
return True, changed_files, last_text
@@ -362,51 +301,55 @@ async def _reflection_loop(
# System prompts
# ---------------------------------------------------------------------------
_FRONTMATTER_EXAMPLE = "\n".join(MEMORY_FRONTMATTER_EXAMPLE)
_CATEGORIES_STR = ", ".join(GLOBAL_MEMORY_CATEGORIES)
_SHORT_REFLECT_SYSTEM = f"""\
You are a reflection agent that distills learnings from a conversation into
persistent memory files. You run in the background after each assistant turn.
You are a reflection agent that distills durable knowledge about the USER
into persistent global memory files. You run in the background after each
assistant turn.
Your goal: identify anything from the recent messages worth remembering across
future sessions user preferences, project context, techniques that worked,
goals, environment details, reference pointers.
Your goal: identify anything from the recent messages worth remembering
about the user across ALL future sessions their profile, preferences,
environment setup, or feedback on assistant behavior.
Memory types: {", ".join(MEMORY_TYPES)}
Memory categories: {_CATEGORIES_STR}
Expected format for each memory file:
{_FRONTMATTER_EXAMPLE}
```markdown
---
name: {{{{memory name}}}}
description: {{{{one-line description specific and search-friendly}}}}
type: {{{{{_CATEGORIES_STR}}}}}
---
{{{{memory content}}}}
```
Workflow (aim for 2 turns):
Turn 1 call list_memory_files to see what already exists, then
read_memory_file for any that might need updating.
Turn 1 call list_memory_files to see what exists, then read_memory_file
for any that might need updating.
Turn 2 call write_memory_file for new/updated memories.
Rules:
- Only persist information that would be useful in a *future* conversation.
Skip ephemeral task details, routine tool output, and anything obvious
from the code or git history.
- ONLY persist durable knowledge about the USER who they are, how they
like to work, their tech environment, their feedback on your behavior.
- Do NOT store task-specific details, code patterns, file paths, or
ephemeral session state.
- Keep files concise. Each file should cover ONE topic.
- If an existing memory already covers the learning, UPDATE it rather than
creating a duplicate.
- If there is nothing worth remembering from these messages, do nothing
(respond with a brief reason why nothing was saved no tool calls needed).
- IMPORTANT: Always end with a text message (no tool calls) summarising what
you did or why you skipped. Never end on an empty response.
- If there is nothing worth remembering, do nothing (respond with a brief
reason no tool calls needed).
- File names should be kebab-case slugs ending in .md.
- Include a specific, search-friendly description in the frontmatter.
- Do NOT exceed {MAX_FILE_SIZE_BYTES} bytes per file or {MAX_FILES} total files.
"""
_LONG_REFLECT_SYSTEM = f"""\
You are a reflection agent performing a periodic housekeeping pass over the
memory directory. Your job is to organise, deduplicate, and trim noise from
the accumulated memory files.
global memory directory. Your job is to organise, deduplicate, and trim
noise from the accumulated memory files.
Memory types: {", ".join(MEMORY_TYPES)}
Expected format for each memory file:
{_FRONTMATTER_EXAMPLE}
Memory categories: {_CATEGORIES_STR}
Workflow:
1. list_memory_files to get the full manifest.
@@ -420,29 +363,6 @@ Rules:
- Remove memories that are no longer relevant or are superseded.
- Keep the total collection lean and high-signal.
- Do NOT invent new information only reorganise what exists.
- Do NOT delete or merge MEMORY-*.md diary files. These are daily narratives
managed by a separate process. You may read them for context but should not
modify them.
"""
_DIARY_SYSTEM = """\
You maintain a daily diary entry for an AI colony session. You receive:
(1) Today's existing diary content (may be empty if this is the first entry).
(2) A transcript of recent conversation messages.
Write a cohesive 3-8 sentence narrative about what happened in this session today.
Cover: what the user asked for, what was accomplished, key decisions or obstacles,
and current status.
Rules:
- If an existing diary is provided, rewrite it as a unified narrative incorporating
the new developments. Merge and deduplicate do not simply append.
- Keep the total narrative under 3000 characters.
- Focus on the story arc of the day, not individual tool calls or code details.
- If the recent messages contain nothing substantive (greetings, routine
confirmations), return the existing diary text unchanged.
- Output only the diary prose. No headings, no timestamps, no code fences, no
frontmatter.
"""
@@ -451,31 +371,33 @@ Rules:
# ---------------------------------------------------------------------------
async def _read_conversation_parts(session_dir: Path) -> list[dict[str, Any]]:
"""Read conversation parts from the queen session directory."""
from framework.storage.conversation_store import FileConversationStore
store = FileConversationStore(session_dir / "conversations")
return await store.read_parts()
async def run_short_reflection(
session_dir: Path,
llm: Any,
memory_dir: Path | None = None,
*,
caller: str,
) -> None:
"""Run a short reflection: extract learnings from conversation."""
mem_dir = memory_dir or MEMORY_DIR
"""Run a short reflection: extract user knowledge from conversation."""
logger.info("reflect: starting short reflection for %s", session_dir)
mem_dir = memory_dir or global_memory_dir()
messages = await read_conversation_parts(session_dir)
messages = await _read_conversation_parts(session_dir)
if not messages:
logger.debug("reflect: short [%s] — no conversation parts", caller)
logger.info("reflect: no conversation parts found in %s, skipping", session_dir)
return
logger.debug("reflect: short [%s] — %d conversation parts", caller, len(messages))
# Build a readable transcript from recent messages.
transcript_lines: list[str] = []
for msg in messages[-50:]:
role = msg.get("role", "")
content = str(msg.get("content", "")).strip()
if role == "tool":
continue # Skip verbose tool results.
if not content:
if role == "tool" or not content:
continue
label = "user" if role == "user" else "assistant"
if len(content) > 800:
@@ -483,6 +405,7 @@ async def run_short_reflection(
transcript_lines.append(f"[{label}]: {content}")
if not transcript_lines:
logger.info("reflect: no transcript lines after filtering, skipping")
return
transcript = "\n".join(transcript_lines)
@@ -492,38 +415,26 @@ async def run_short_reflection(
f"Timestamp: {datetime.now().isoformat(timespec='minutes')}"
)
_, changed, reason = await _reflection_loop(
llm,
_SHORT_REFLECT_SYSTEM,
user_msg,
mem_dir,
caller=caller,
)
_, changed, reason = await _reflection_loop(llm, _SHORT_REFLECT_SYSTEM, user_msg, mem_dir)
if changed:
logger.debug("reflect: short reflection done [%s], changed files: %s", caller, changed)
logger.info("reflect: short reflection done, changed files: %s", changed)
else:
logger.debug(
"reflect: short reflection done [%s], no changes — %s",
caller,
reason or "no reason given",
)
logger.info("reflect: short reflection done, no changes — %s", reason or "no reason")
async def run_long_reflection(
llm: Any,
memory_dir: Path | None = None,
*,
caller: str,
) -> None:
"""Run a long reflection: organise and deduplicate all memories."""
mem_dir = memory_dir or MEMORY_DIR
"""Run a long reflection: organise and deduplicate all global memories."""
logger.debug("reflect: starting long reflection")
mem_dir = memory_dir or global_memory_dir()
files = scan_memory_files(mem_dir)
if not files:
logger.debug("reflect: long [%s] — no memory files to organise", caller)
logger.debug("reflect: no memory files, skipping long reflection")
return
logger.debug("reflect: long [%s] — organising %d memory files", caller, len(files))
manifest = format_memory_manifest(files)
user_msg = (
f"## Current memory manifest ({len(files)} files)\n\n"
@@ -531,105 +442,43 @@ async def run_long_reflection(
f"Timestamp: {datetime.now().isoformat(timespec='minutes')}"
)
_, changed, reason = await _reflection_loop(
llm,
_LONG_REFLECT_SYSTEM,
user_msg,
mem_dir,
caller=caller,
)
_, changed, reason = await _reflection_loop(llm, _LONG_REFLECT_SYSTEM, user_msg, mem_dir)
if changed:
logger.debug(
"reflect: long reflection done [%s] (%d files), changed files: %s",
caller,
len(files),
changed,
)
logger.debug("reflect: long reflection done (%d files), changed: %s", len(files), changed)
else:
logger.debug(
"reflect: long reflection done [%s] (%d files), no changes — %s",
caller,
"reflect: long reflection done (%d files), no changes — %s",
len(files),
reason or "no reason given",
reason or "no reason",
)
async def run_diary_update(
async def run_shutdown_reflection(
session_dir: Path,
llm: Any,
memory_dir: Path | None = None,
) -> None:
"""Update today's diary file with a narrative of recent activity."""
mem_dir = memory_dir or MEMORY_DIR
fname = diary_filename()
diary_path = mem_dir / fname
today_str = datetime.now().strftime("%Y-%m-%d")
# Read existing diary body (strip frontmatter).
existing_body = ""
if diary_path.exists():
try:
raw = diary_path.read_text(encoding="utf-8")
m = re.match(r"^---\s*\n.*?\n---\s*\n?", raw, re.DOTALL)
existing_body = raw[m.end() :].strip() if m else raw.strip()
except OSError:
pass
# Read all conversation messages for context.
messages = await read_conversation_parts(session_dir)
transcript_lines: list[str] = []
for msg in messages[-40:]:
role = msg.get("role", "")
content = str(msg.get("content", "")).strip()
if role == "tool" or not content:
continue
label = "user" if role == "user" else "assistant"
if len(content) > 600:
content = content[:600] + "..."
transcript_lines.append(f"[{label}]: {content}")
if not transcript_lines:
return
transcript = "\n".join(transcript_lines)
user_msg = (
f"## Today's Diary So Far\n\n"
f"{existing_body or '(no entries yet)'}\n\n"
f"## Recent Conversation\n\n"
f"{transcript}\n\n"
f"Date: {today_str}"
)
"""Run a final short reflection on session shutdown.
Called during session teardown so recent conversation insights are
persisted before the session is destroyed.
"""
logger.info("reflect: running shutdown reflection for %s", session_dir)
mem_dir = memory_dir or global_memory_dir()
try:
from framework.agents.queen.config import default_config
resp = await llm.acomplete(
messages=[{"role": "user", "content": user_msg}],
system=_DIARY_SYSTEM,
max_tokens=min(default_config.max_tokens, 1024),
)
new_body = (resp.content or "").strip()
if not new_body:
return
doc = build_diary_document(date_str=today_str, body=new_body)
if len(doc.encode("utf-8")) > MAX_FILE_SIZE_BYTES:
new_body = new_body[:2800]
doc = build_diary_document(date_str=today_str, body=new_body)
mem_dir.mkdir(parents=True, exist_ok=True)
diary_path.write_text(doc, encoding="utf-8")
logger.debug("diary: updated %s (%d chars)", fname, len(doc))
await run_short_reflection(session_dir, llm, mem_dir)
logger.info("reflect: shutdown reflection completed for %s", session_dir)
except asyncio.CancelledError:
logger.warning("reflect: shutdown reflection cancelled for %s", session_dir)
except Exception:
logger.warning("diary: update failed", exc_info=True)
logger.warning("reflect: shutdown reflection failed", exc_info=True)
_write_error("shutdown reflection")
# ---------------------------------------------------------------------------
# Event-bus integration
# ---------------------------------------------------------------------------
# Run a long reflection every N short reflections.
_LONG_REFLECT_INTERVAL = 5
@@ -638,7 +487,6 @@ async def subscribe_reflection_triggers(
session_dir: Path,
llm: Any,
memory_dir: Path | None = None,
phase_state: Any = None,
) -> list[str]:
"""Subscribe to queen turn events and return subscription IDs.
@@ -647,103 +495,74 @@ async def subscribe_reflection_triggers(
"""
from framework.runtime.event_bus import EventType
mem_dir = memory_dir or MEMORY_DIR
mem_dir = memory_dir or global_memory_dir()
_lock = asyncio.Lock()
_short_count = 0
_background_tasks: set[asyncio.Task] = set()
async def _do_turn_reflect(is_interval: bool, count: int) -> None:
async with _lock:
try:
if is_interval:
await run_short_reflection(session_dir, llm, mem_dir)
await run_long_reflection(llm, mem_dir)
else:
await run_short_reflection(session_dir, llm, mem_dir)
except Exception:
logger.warning("reflect: reflection failed", exc_info=True)
_write_error("short/long reflection")
async def _do_compaction_reflect() -> None:
async with _lock:
try:
await run_long_reflection(llm, mem_dir)
except Exception:
logger.warning("reflect: compaction-triggered reflection failed", exc_info=True)
_write_error("compaction reflection")
def _fire_and_forget(coro: Any) -> None:
"""Spawn a background task and prevent GC before it finishes."""
task = asyncio.create_task(coro)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
async def _on_turn_complete(event: Any) -> None:
nonlocal _short_count
# Only process queen turns.
if getattr(event, "stream_id", None) != "queen":
return
_short_count += 1
# Decide whether to reflect: only when the LLM turn ended without
# tool calls (a conversational response) OR every _LONG_REFLECT_INTERVAL turns.
event_data = getattr(event, "data", {}) or {}
stop_reason = event_data.get("stop_reason", "")
is_tool_turn = stop_reason in ("tool_use", "tool_calls")
is_interval = _short_count % _LONG_REFLECT_INTERVAL == 0
if is_tool_turn and not is_interval:
logger.debug(
"reflect: skipping turn %d (stop_reason=%s, next reflect at %d)",
_short_count,
stop_reason,
(_short_count // _LONG_REFLECT_INTERVAL + 1) * _LONG_REFLECT_INTERVAL,
)
logger.debug("reflect: skipping tool turn (count=%d)", _short_count)
return
if _lock.locked():
logger.debug("reflect: skipping — reflection already in progress")
logger.debug("reflect: skipping, already running (count=%d)", _short_count)
return
async with _lock:
try:
logger.debug(
"reflect: turn complete — count %d/%d (stop_reason=%s)",
_short_count,
_LONG_REFLECT_INTERVAL,
stop_reason,
)
if is_interval:
await run_short_reflection(session_dir, llm, mem_dir, caller="queen")
await run_long_reflection(llm, mem_dir, caller="queen")
else:
await run_short_reflection(session_dir, llm, mem_dir, caller="queen")
except Exception:
logger.warning("reflect: reflection failed", exc_info=True)
_write_error("short/long reflection")
# Update daily diary after reflection.
try:
await run_diary_update(session_dir, llm, mem_dir)
except Exception:
logger.warning("reflect: diary update failed", exc_info=True)
# Update recall cache after reflection completes, guaranteeing
# recall sees the current turn's extracted memories.
if phase_state is not None:
try:
from framework.agents.queen.recall_selector import update_recall_cache
await update_recall_cache(
session_dir,
llm,
cache_setter=lambda block: (
setattr(phase_state, "_cached_colony_recall_block", block),
setattr(phase_state, "_cached_recall_block", block),
),
memory_dir=mem_dir,
heading="Colony Memories",
)
await update_recall_cache(
session_dir,
llm,
cache_setter=lambda block: setattr(
phase_state, "_cached_global_recall_block", block
),
memory_dir=getattr(phase_state, "global_memory_dir", None),
heading="Global Memories",
)
except Exception:
logger.debug("recall: cache update failed", exc_info=True)
logger.debug(
"reflect: triggered (count=%d, interval=%s, stop_reason=%s)",
_short_count,
is_interval,
stop_reason,
)
_fire_and_forget(_do_turn_reflect(is_interval, _short_count))
async def _on_compaction(event: Any) -> None:
if getattr(event, "stream_id", None) != "queen":
return
if _lock.locked():
logger.debug("reflect: skipping compaction trigger, already running")
return
async with _lock:
try:
await run_long_reflection(llm, mem_dir, caller="queen")
except Exception:
logger.warning("reflect: compaction-triggered reflection failed", exc_info=True)
_write_error("compaction reflection")
logger.debug("reflect: compaction triggered long reflection")
_fire_and_forget(_do_compaction_reflect())
sub_ids: list[str] = []
@@ -762,68 +581,10 @@ async def subscribe_reflection_triggers(
return sub_ids
async def subscribe_worker_memory_triggers(
event_bus: Any,
llm: Any,
*,
worker_sessions_dir: Path,
colony_memory_dir: Path,
recall_cache: dict[str, str],
) -> list[str]:
"""Subscribe colony memory lifecycle events for worker runs.
Short reflection is now handled synchronously at node handoff in
``WorkerAgent._reflect_colony_memory()``. This function only manages:
- Recall cache initialisation on execution start
- Final long reflection + cleanup on execution end
"""
from framework.runtime.event_bus import EventType
_terminal_lock = asyncio.Lock()
def _is_worker_event(event: Any) -> bool:
return bool(
getattr(event, "execution_id", None)
and getattr(event, "stream_id", None) not in ("queen", "judge")
)
async def _on_execution_started(event: Any) -> None:
if not _is_worker_event(event):
return
if event.execution_id is not None:
recall_cache[event.execution_id] = ""
async def _on_execution_terminal(event: Any) -> None:
if not _is_worker_event(event):
return
execution_id = event.execution_id
if execution_id is None:
return
async with _terminal_lock:
try:
await run_long_reflection(llm, colony_memory_dir, caller="worker")
except Exception:
logger.warning("reflect: worker final reflection failed", exc_info=True)
_write_error("worker final reflection")
finally:
recall_cache.pop(execution_id, None)
return [
event_bus.subscribe(
event_types=[EventType.EXECUTION_STARTED],
handler=_on_execution_started,
),
event_bus.subscribe(
event_types=[EventType.EXECUTION_COMPLETED, EventType.EXECUTION_FAILED],
handler=_on_execution_terminal,
),
]
def _write_error(context: str) -> None:
"""Best-effort write of the last traceback to an error file."""
try:
error_path = MEMORY_DIR / ".reflection_error.txt"
error_path = global_memory_dir() / ".reflection_error.txt"
error_path.parent.mkdir(parents=True, exist_ok=True)
error_path.write_text(
f"context: {context}\ntime: {datetime.now().isoformat()}\n\n{traceback.format_exc()}",
-6
View File
@@ -67,12 +67,6 @@ class GraphContext:
# Retry tracking: worker_id → retry_count (for execution quality assessment)
retry_counts: dict[str, int] = field(default_factory=dict)
nodes_with_retries: set[str] = field(default_factory=set)
# Colony memory reflection at node handoff
colony_memory_dir: Any = None # Path | None
worker_sessions_dir: Any = None # Path | None
colony_recall_cache: dict[str, str] = field(default_factory=dict)
colony_reflect_llm: Any = None # LLMProvider for reflection
_colony_reflect_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
def build_scoped_buffer(buffer: DataBuffer, node_spec: NodeSpec) -> DataBuffer:
-13
View File
@@ -160,10 +160,6 @@ class GraphExecutor:
skill_dirs: list[str] | None = None,
context_warn_ratio: float | None = None,
batch_init_nudge: str | None = None,
colony_memory_dir: Any = None,
colony_worker_sessions_dir: Any = None,
colony_recall_cache: dict[str, str] | None = None,
colony_reflect_llm: Any = None,
):
"""
Initialize the executor.
@@ -232,11 +228,6 @@ class GraphExecutor:
self.skill_dirs: list[str] = skill_dirs or []
self.context_warn_ratio: float | None = context_warn_ratio
self.batch_init_nudge: str | None = batch_init_nudge
self.colony_memory_dir = colony_memory_dir
self.colony_worker_sessions_dir = colony_worker_sessions_dir
self.colony_recall_cache = colony_recall_cache or {}
self.colony_reflect_llm = colony_reflect_llm
if protocols_prompt:
self.logger.info(
"GraphExecutor[%s] received protocols_prompt (%d chars)",
@@ -1345,10 +1336,6 @@ class GraphExecutor:
iteration_metadata_provider=self.iteration_metadata_provider,
loop_config=self._loop_config,
node_visit_counts=dict(node_visit_counts),
colony_memory_dir=self.colony_memory_dir,
worker_sessions_dir=self.colony_worker_sessions_dir,
colony_recall_cache=self.colony_recall_cache,
colony_reflect_llm=self.colony_reflect_llm,
)
# Create one WorkerAgent per node
-59
View File
@@ -320,8 +320,6 @@ class WorkerAgent:
self.lifecycle = WorkerLifecycle.COMPLETED
self._last_result = result
self._last_activations = activations
# Colony memory reflection — runs before downstream activation
await self._reflect_colony_memory()
completion = WorkerCompletion(
worker_id=node_spec.id,
success=True,
@@ -342,8 +340,6 @@ class WorkerAgent:
self.lifecycle = WorkerLifecycle.FAILED
self._last_result = result
self._last_activations = activations
# Colony memory reflection — capture learnings even on failure
await self._reflect_colony_memory()
await self._publish_failure(result.error or "Unknown error")
except Exception as exc:
error = str(exc) or type(exc).__name__
@@ -658,61 +654,6 @@ class WorkerAgent:
pause_event=self._pause_requested,
)
async def _reflect_colony_memory(self) -> None:
"""Run colony memory reflection at node handoff.
Awaits the shared colony lock so parallel workers queue (never skip).
"""
gc = self._gc
if gc.colony_memory_dir is None or gc.colony_reflect_llm is None:
return
if gc.worker_sessions_dir is None:
return
from pathlib import Path
session_dir = Path(gc.worker_sessions_dir) / gc.execution_id
if not session_dir.exists():
return
# Await lock — serializes reflection but never skips
async with gc._colony_reflect_lock:
try:
from framework.agents.queen.reflection_agent import run_short_reflection
await run_short_reflection(
session_dir,
gc.colony_reflect_llm,
gc.colony_memory_dir,
caller="worker",
)
except Exception:
logger.warning(
"Worker %s: colony reflection failed",
self.node_spec.id,
exc_info=True,
)
# Update recall cache outside lock (per-execution key, no write races)
try:
from framework.agents.queen.recall_selector import update_recall_cache
await update_recall_cache(
session_dir,
gc.colony_reflect_llm,
memory_dir=gc.colony_memory_dir,
cache_setter=lambda block: gc.colony_recall_cache.__setitem__(
gc.execution_id, block
),
heading="Colony Memories",
)
except Exception:
logger.warning(
"Worker %s: recall cache update failed",
self.node_spec.id,
exc_info=True,
)
# ------------------------------------------------------------------
# Event publishing
# ------------------------------------------------------------------
-9
View File
@@ -241,11 +241,6 @@ class AgentRuntime:
self._dynamic_memory_provider_factory: Callable[[str], Callable[[], str] | None] | None = (
None
)
# Colony memory config for reflection-at-handoff (set by session_manager)
self._colony_memory_dir: Any = None
self._colony_worker_sessions_dir: Any = None
self._colony_recall_cache: dict[str, str] | None = None
self._colony_reflect_llm: Any = None
self._accounts_data = accounts_data
self._tool_provider_map = tool_provider_map
@@ -369,10 +364,6 @@ class AgentRuntime:
context_warn_ratio=self.context_warn_ratio,
batch_init_nudge=self.batch_init_nudge,
dynamic_memory_provider_factory=self._dynamic_memory_provider_factory,
colony_memory_dir=self._colony_memory_dir,
colony_worker_sessions_dir=self._colony_worker_sessions_dir,
colony_recall_cache=self._colony_recall_cache,
colony_reflect_llm=self._colony_reflect_llm,
)
await stream.start()
self._streams[ep_id] = stream
@@ -192,10 +192,6 @@ class ExecutionStream:
context_warn_ratio: float | None = None,
batch_init_nudge: str | None = None,
dynamic_memory_provider_factory: Callable[[str], Callable[[], str] | None] | None = None,
colony_memory_dir: Any = None,
colony_worker_sessions_dir: Any = None,
colony_recall_cache: dict[str, str] | None = None,
colony_reflect_llm: Any = None,
):
"""
Initialize execution stream.
@@ -251,10 +247,6 @@ class ExecutionStream:
self._context_warn_ratio: float | None = context_warn_ratio
self._batch_init_nudge: str | None = batch_init_nudge
self._dynamic_memory_provider_factory = dynamic_memory_provider_factory
self._colony_memory_dir = colony_memory_dir
self._colony_worker_sessions_dir = colony_worker_sessions_dir
self._colony_recall_cache = colony_recall_cache
self._colony_reflect_llm = colony_reflect_llm
_es_logger = logging.getLogger(__name__)
if protocols_prompt:
@@ -735,10 +727,6 @@ class ExecutionStream:
if self._dynamic_memory_provider_factory is not None
else None
),
colony_memory_dir=self._colony_memory_dir,
colony_worker_sessions_dir=self._colony_worker_sessions_dir,
colony_recall_cache=self._colony_recall_cache,
colony_reflect_llm=self._colony_reflect_llm,
)
# Track executor so inject_input() can reach EventLoopNode instances
self._active_executors[execution_id] = executor
+44 -15
View File
@@ -183,16 +183,13 @@ async def create_queen(
phase_state.running_tools = [t for t in queen_tools if t.name in running_names]
phase_state.editing_tools = [t for t in queen_tools if t.name in editing_names]
# ---- Cross-session memory ----------------------------------------
# ---- Global memory -------------------------------------------------
from framework.agents.queen.queen_memory_v2 import (
colony_memory_dir,
global_memory_dir,
init_memory_dir,
)
colony_dir = colony_memory_dir(session.id)
global_dir = global_memory_dir()
init_memory_dir(colony_dir, migrate_legacy=True)
init_memory_dir(global_dir)
phase_state.global_memory_dir = global_dir
@@ -281,13 +278,33 @@ async def create_queen(
_session_llm = session.llm
_session_event_bus = session.event_bus
async def _persona_hook(ctx: HookContext) -> HookResult | None:
from framework.agents.queen.queen_memory import format_for_injection
# ---- Recall on each real user turn --------------------------------
async def _recall_on_user_input(event: AgentEvent) -> None:
"""Re-select memories when real user input arrives."""
content = (event.data or {}).get("content", "")
if not content or not isinstance(content, str):
return
try:
from framework.agents.queen.recall_selector import (
format_recall_injection,
select_memories,
)
memory_context = format_for_injection()
result = await select_expert_persona(
ctx.trigger or "", _session_llm, memory_context=memory_context
)
mem_dir = phase_state.global_memory_dir
selected = await select_memories(content, _session_llm, mem_dir)
phase_state._cached_global_recall_block = format_recall_injection(selected, mem_dir)
except Exception:
logger.debug("recall: user-turn cache update failed", exc_info=True)
session.event_bus.subscribe(
[EventType.CLIENT_INPUT_RECEIVED],
_recall_on_user_input,
filter_stream="queen",
)
async def _persona_hook(ctx: HookContext) -> HookResult | None:
trigger = ctx.trigger or ""
result = await select_expert_persona(trigger, _session_llm, memory_context="")
if not result:
return None
# Store on phase_state so persona/style persist across dynamic prompt refreshes
@@ -301,6 +318,21 @@ async def create_queen(
data={"persona": result.persona_prefix},
)
)
# Seed recall cache so the first turn has relevant memories.
if trigger:
try:
from framework.agents.queen.recall_selector import (
format_recall_injection,
select_memories,
)
mem_dir = phase_state.global_memory_dir
selected = await select_memories(trigger, _session_llm, mem_dir)
phase_state._cached_global_recall_block = format_recall_injection(selected, mem_dir)
except Exception:
logger.debug("recall: initial seeding failed", exc_info=True)
return HookResult(system_prompt=phase_state.get_current_prompt())
# ---- Graph preparation -------------------------------------------
@@ -410,18 +442,15 @@ async def create_queen(
)
session_manager._subscribe_worker_handoffs(session, executor)
# ---- Reflection + recall memory subscriptions ----------------
# ---- Global memory reflection + recall -------------------------
from framework.agents.queen.reflection_agent import subscribe_reflection_triggers
_reflection_subs = await subscribe_reflection_triggers(
session.event_bus,
queen_dir,
session.llm,
memory_dir=colony_dir,
phase_state=phase_state,
memory_dir=global_dir,
)
# Store sub IDs on session for teardown.
session.memory_reflection_subs = _reflection_subs
logger.info(
+9 -8
View File
@@ -185,14 +185,8 @@ async def handle_chat(request: web.Request) -> web.Response:
logger.error("[handle_chat] Node still not available after 5s wait")
if node is not None and hasattr(node, "inject_event"):
try:
logger.debug("[handle_chat] Calling node.inject_event()...")
await node.inject_event(message, is_client_input=True, image_content=image_content)
logger.debug("[handle_chat] inject_event() completed successfully")
except Exception as e:
logger.exception("[handle_chat] inject_event() failed: %s", e)
raise
# Publish to EventBus so the session event log captures user messages
# Publish BEFORE inject_event so handlers (e.g. memory recall)
# complete before the event loop unblocks and starts the LLM turn.
from framework.runtime.event_bus import AgentEvent, EventType
await session.event_bus.publish(
@@ -209,6 +203,13 @@ async def handle_chat(request: web.Request) -> web.Response:
},
)
)
try:
logger.debug("[handle_chat] Calling node.inject_event()...")
await node.inject_event(message, is_client_input=True, image_content=image_content)
logger.debug("[handle_chat] inject_event() completed successfully")
except Exception as e:
logger.exception("[handle_chat] inject_event() failed: %s", e)
raise
return web.json_response(
{
"status": "queen",
+26 -115
View File
@@ -45,12 +45,8 @@ class Session:
phase_state: Any = None # QueenPhaseState
# Worker handoff subscription
worker_handoff_sub: str | None = None
# Memory reflection + recall subscriptions
# Memory reflection + recall subscriptions (global memory)
memory_reflection_subs: list = field(default_factory=list) # list[str]
# Worker colony memory subscriptions
worker_memory_subs: list = field(default_factory=list) # list[str]
# Per-execution colony recall cache for worker prompts
worker_colony_recall_blocks: dict[str, str] = field(default_factory=dict)
# Trigger definitions loaded from agent's triggers.json (available but inactive)
available_triggers: dict[str, TriggerDefinition] = field(default_factory=dict)
# Active trigger tracking (IDs currently firing + their asyncio tasks)
@@ -69,6 +65,8 @@ class Session:
# directory instead of creating a new one. This lets cold-restores accumulate
# all messages in the original session folder so history is never fragmented.
queen_resume_from: str | None = None
# Queen session directory (set during _start_queen, used for shutdown reflection)
queen_dir: Path | None = None
class SessionManager:
@@ -84,6 +82,9 @@ class SessionManager:
self._model = model
self._credential_store = credential_store
self._lock = asyncio.Lock()
# Strong references for fire-and-forget background tasks (e.g. shutdown
# reflections) so they aren't garbage-collected before completion.
self._background_tasks: set[asyncio.Task] = set()
# ------------------------------------------------------------------
# Session lifecycle
@@ -323,16 +324,6 @@ class SessionManager:
runtime = runner._agent_runtime
if runtime is not None:
runtime._dynamic_memory_provider_factory = lambda execution_id, session=session: (
lambda execution_id=execution_id, session=session: (
session.worker_colony_recall_blocks.get(
execution_id,
"",
)
)
)
# Load triggers from the agent's triggers.json definition file.
from framework.tools.queen_lifecycle_tools import _read_agent_triggers_json
@@ -368,18 +359,6 @@ class SessionManager:
session.graph_runtime = runtime
session.worker_info = info
# Colony memory is additive; worker loading should still succeed if
# that optional subscription path hits an import/runtime issue while
# restoring an older session.
try:
await self._subscribe_worker_colony_memory(session)
except Exception:
logger.warning(
"Worker colony memory subscription failed for '%s'; continuing without it",
resolved_graph_id,
exc_info=True,
)
async with self._lock:
self._loading.discard(session.id)
@@ -617,14 +596,6 @@ class SessionManager:
await self._emit_trigger_events(session, "removed", session.available_triggers)
session.available_triggers.clear()
for sub_id in session.worker_memory_subs:
try:
session.event_bus.unsubscribe(sub_id)
except Exception:
pass
session.worker_memory_subs.clear()
session.worker_colony_recall_blocks.clear()
graph_id = session.graph_id
session.graph_id = None
session.worker_path = None
@@ -650,11 +621,6 @@ class SessionManager:
if session is None:
return False
# Capture session data for memory consolidation before teardown
_llm = getattr(session, "llm", None)
_storage_id = getattr(session, "queen_resume_from", None) or session_id
_session_dir = Path.home() / ".hive" / "queen" / "session" / _storage_id
if session.worker_handoff_sub is not None:
try:
session.event_bus.unsubscribe(session.worker_handoff_sub)
@@ -662,21 +628,31 @@ class SessionManager:
pass
session.worker_handoff_sub = None
for sub_id in session.worker_memory_subs:
try:
session.event_bus.unsubscribe(sub_id)
except Exception:
pass
session.worker_memory_subs.clear()
session.worker_colony_recall_blocks.clear()
# Stop queen and memory reflection/recall subscriptions
# Stop memory reflection/recall subscriptions
for sub_id in session.memory_reflection_subs:
try:
session.event_bus.unsubscribe(sub_id)
except Exception:
pass
session.memory_reflection_subs.clear()
# Run a final shutdown reflection so recent conversation insights
# are persisted before the session is destroyed (fire-and-forget).
if session.queen_dir is not None:
try:
from framework.agents.queen.reflection_agent import run_shutdown_reflection
task = asyncio.create_task(
asyncio.shield(run_shutdown_reflection(session.queen_dir, session.llm)),
)
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
logger.info("Session '%s': shutdown reflection spawned", session_id)
except Exception:
logger.warning(
"Session '%s': failed to spawn shutdown reflection", session_id, exc_info=True
)
if session.queen_task is not None:
session.queen_task.cancel()
session.queen_task = None
@@ -708,20 +684,6 @@ class SessionManager:
except Exception as e:
logger.error("Error cleaning up worker: %s", e)
# Final long reflection — fire-and-forget so teardown isn't blocked.
if _llm is not None:
import asyncio
from framework.agents.queen.queen_memory_v2 import colony_memory_dir
from framework.agents.queen.reflection_agent import run_long_reflection
asyncio.create_task(
run_long_reflection(
_llm, memory_dir=colony_memory_dir(_storage_id), caller="queen"
),
name=f"queen-memory-long-reflection-{session_id}",
)
# Close per-session event log
session.event_bus.close_session_log()
@@ -757,54 +719,6 @@ class SessionManager:
else:
logger.warning("Worker handoff received but queen node not ready")
async def _subscribe_worker_colony_memory(self, session: Session) -> None:
"""Subscribe shared colony reflection/recall for top-level worker runs."""
for sub_id in session.worker_memory_subs:
try:
session.event_bus.unsubscribe(sub_id)
except Exception:
pass
session.worker_memory_subs.clear()
session.worker_colony_recall_blocks.clear()
runtime = session.graph_runtime
if runtime is None:
return
worker_sessions_dir = getattr(runtime, "_session_store", None)
worker_sessions_dir = getattr(worker_sessions_dir, "sessions_dir", None)
if worker_sessions_dir is None:
return
from framework.agents.queen.queen_memory_v2 import colony_memory_dir, init_memory_dir
from framework.agents.queen.reflection_agent import subscribe_worker_memory_triggers
colony_dir = colony_memory_dir(session.id)
init_memory_dir(colony_dir, migrate_legacy=True)
runtime._dynamic_memory_provider_factory = lambda execution_id, session=session: (
lambda execution_id=execution_id, session=session: (
session.worker_colony_recall_blocks.get(
execution_id,
"",
)
)
)
# Colony memory config for reflection-at-handoff
runtime._colony_memory_dir = colony_dir
runtime._colony_worker_sessions_dir = worker_sessions_dir
runtime._colony_recall_cache = session.worker_colony_recall_blocks
runtime._colony_reflect_llm = session.llm
session.worker_memory_subs = await subscribe_worker_memory_triggers(
session.event_bus,
session.llm,
worker_sessions_dir=worker_sessions_dir,
colony_memory_dir=colony_dir,
recall_cache=session.worker_colony_recall_blocks,
)
def _subscribe_worker_handoffs(self, session: Session, executor: Any) -> None:
"""Subscribe queen to worker/subagent escalation handoff events."""
from framework.runtime.event_bus import EventType as _ET
@@ -849,6 +763,7 @@ class SessionManager:
storage_session_id = session.queen_resume_from or session.id
queen_dir = hive_home / "queen" / "session" / storage_session_id
queen_dir.mkdir(parents=True, exist_ok=True)
session.queen_dir = queen_dir
# Always write/update session metadata so history sidebar has correct
# agent name, path, and last-active timestamp (important so the original
@@ -970,10 +885,6 @@ class SessionManager:
except Exception:
logger.warning("Cold restore: failed to auto-load worker", exc_info=True)
# Memory reflection/recall subscriptions are set up inside
# queen_orchestrator.create_queen() → _queen_loop() and stored
# on session.memory_reflection_subs for teardown.
# ------------------------------------------------------------------
# Queen notifications
# ------------------------------------------------------------------
+3 -93
View File
@@ -133,10 +133,9 @@ class QueenPhaseState:
persona_prefix: str = "" # e.g. "You are a CFO. I am a CFO with 20 years..."
style_directive: str = "" # e.g. "## Communication Style: Peer\n\n..."
# Cached recall block — populated async by recall_selector after each turn.
_cached_recall_block: str = ""
_cached_colony_recall_block: str = ""
# Cached global recall block — populated async by recall_selector after each turn.
_cached_global_recall_block: str = ""
# Global memory directory.
global_memory_dir: Path | None = None
def get_current_tools(self) -> list:
@@ -152,7 +151,7 @@ class QueenPhaseState:
return list(self.building_tools)
def get_current_prompt(self) -> str:
"""Return the system prompt for the current phase, with fresh memory appended."""
"""Return the system prompt for the current phase."""
if self.phase == "planning":
base = self.prompt_planning
elif self.phase == "running":
@@ -164,9 +163,6 @@ class QueenPhaseState:
else:
base = self.prompt_building
from framework.agents.queen.queen_memory import format_for_injection
_memory = format_for_injection() # noqa: F841
parts = []
if self.persona_prefix:
parts.append(self.persona_prefix)
@@ -177,9 +173,6 @@ class QueenPhaseState:
parts.append(self.skills_catalog_prompt)
if self.protocols_prompt:
parts.append(self.protocols_prompt)
colony_memory = self._cached_colony_recall_block or self._cached_recall_block
if colony_memory:
parts.append(colony_memory)
if self._cached_global_recall_block:
parts.append(self._cached_global_recall_block)
return "\n\n".join(parts)
@@ -4077,89 +4070,6 @@ def register_queen_lifecycle_tools(
)
tools_registered += 1
# --- save_global_memory --------------------------------------------------
async def save_global_memory_entry(
category: str,
description: str,
content: str,
name: str | None = None,
) -> str:
"""Persist a queen-global memory entry about the user."""
from framework.agents.queen.queen_memory_v2 import (
global_memory_dir as _global_memory_dir,
init_memory_dir as _init_memory_dir,
save_global_memory as _save_global_memory,
)
target_dir = (
phase_state.global_memory_dir
if phase_state is not None and phase_state.global_memory_dir is not None
else _global_memory_dir()
)
_init_memory_dir(target_dir)
try:
filename, path = _save_global_memory(
category=category,
description=description,
content=content,
name=name,
memory_dir=target_dir,
)
return json.dumps(
{
"status": "saved",
"filename": filename,
"path": str(path),
"category": category,
}
)
except ValueError as exc:
return json.dumps({"error": str(exc)})
_save_global_memory_tool = Tool(
name="save_global_memory",
description=(
"Save durable global memory about the user. "
"Only use for user profile, preferences, environment, or feedback."
),
parameters={
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": ["profile", "preference", "environment", "feedback"],
},
"description": {
"type": "string",
"description": "Specific one-line description for future recall selection.",
},
"content": {
"type": "string",
"description": "Durable user-centric memory content.",
},
"name": {
"type": "string",
"description": "Optional short memory title.",
},
},
"required": ["category", "description", "content"],
"additionalProperties": False,
},
)
registry.register(
"save_global_memory",
_save_global_memory_tool,
lambda inputs: save_global_memory_entry(
inputs["category"],
inputs["description"],
inputs["content"],
inputs.get("name"),
),
)
tools_registered += 1
# --- list_triggers ---------------------------------------------------------
async def list_triggers() -> str:
-100
View File
@@ -1,100 +0,0 @@
"""Tools for the queen to read and write episodic memory.
The queen can consciously record significant moments during a session like
writing in a diary and recall past diary entries when needed. Semantic
memory (MEMORY.md) is updated automatically at session end and is never
written by the queen directly.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from framework.runner.tool_registry import ToolRegistry
def write_to_diary(entry: str) -> str:
"""Write a prose entry to today's episodic memory.
Use this when something significant just happened: a pipeline went live, the
user shared an important preference, a goal was achieved or abandoned, or
you want to record something that should be remembered across sessions.
Write in first person, as you would in a private diary. Be specific what
happened, how the user responded, what it means going forward. One or two
paragraphs is enough.
You do not need to include a timestamp or date heading; those are added
automatically.
"""
from framework.agents.queen.queen_memory import append_episodic_entry
append_episodic_entry(entry)
return "Diary entry recorded."
def recall_diary(query: str = "", days_back: int = 7) -> str:
"""Search recent diary entries (episodic memory).
Use this when the user asks about what happened in the past "what did we
do yesterday?", "what happened last week?", "remind me about the pipeline
issue", etc. Also use it proactively when you need context from recent
sessions to answer a question or make a decision.
Args:
query: Optional keyword or phrase to filter entries. If empty, all
recent entries are returned.
days_back: How many days to look back (1-30). Defaults to 7.
"""
from datetime import date, timedelta
from framework.agents.queen.queen_memory import format_memory_date, read_episodic_memory
days_back = max(1, min(int(days_back), 30))
today = date.today()
results: list[str] = []
total_chars = 0
char_budget = 12_000
for offset in range(days_back):
d = today - timedelta(days=offset)
content = read_episodic_memory(d)
if not content:
continue
# If a query is given, only include entries that mention it
if query:
# Check each section (split by ###) for relevance
sections = content.split("### ")
matched = [s for s in sections if query.lower() in s.lower()]
if not matched:
continue
content = "### ".join(matched)
label = format_memory_date(d)
if d == today:
label = f"Today — {label}"
entry = f"## {label}\n\n{content}"
if total_chars + len(entry) > char_budget:
remaining = char_budget - total_chars
if remaining > 200:
# Fit a partial entry within budget
trimmed = content[: remaining - 100] + "\n\n…(truncated)"
results.append(f"## {label}\n\n{trimmed}")
else:
results.append(f"## {label}\n\n(truncated — hit size limit)")
break
results.append(entry)
total_chars += len(entry)
if not results:
if query:
return f"No diary entries matching '{query}' in the last {days_back} days."
return f"No diary entries found in the last {days_back} days."
return "\n\n---\n\n".join(results)
def register_queen_memory_tools(registry: ToolRegistry) -> None:
"""Register the episodic memory tools into the queen's tool registry."""
registry.register_function(write_to_diary)
registry.register_function(recall_diary)
+76 -309
View File
@@ -1,4 +1,4 @@
"""Tests for the queen memory v2 system (reflection + recall)."""
"""Tests for the queen global memory system (reflection + recall)."""
from __future__ import annotations
@@ -15,9 +15,7 @@ from framework.agents.queen.recall_selector import (
format_recall_injection,
select_memories,
)
from framework.agents.queen.reflection_agent import subscribe_worker_memory_triggers
from framework.graph.prompting import build_system_prompt_for_node_context
from framework.runtime.event_bus import AgentEvent, EventBus, EventType
from framework.tools.queen_lifecycle_tools import QueenPhaseState
# ---------------------------------------------------------------------------
@@ -26,9 +24,9 @@ from framework.tools.queen_lifecycle_tools import QueenPhaseState
def test_parse_frontmatter_valid():
text = "---\nname: foo\ntype: goal\ndescription: bar baz\n---\ncontent"
text = "---\nname: foo\ntype: profile\ndescription: bar baz\n---\ncontent"
fm = qm.parse_frontmatter(text)
assert fm == {"name": "foo", "type": "goal", "description": "bar baz"}
assert fm == {"name": "foo", "type": "profile", "description": "bar baz"}
def test_parse_frontmatter_missing():
@@ -42,34 +40,30 @@ def test_parse_frontmatter_empty():
def test_parse_frontmatter_broken_yaml():
text = "---\n: bad\nno colon\n---\n"
fm = qm.parse_frontmatter(text)
# ": bad" has colon at pos 0, so key is empty → skipped
# "no colon" has no colon → skipped
assert fm == {}
# ---------------------------------------------------------------------------
# parse_memory_type
# parse_global_memory_category
# ---------------------------------------------------------------------------
def test_parse_memory_type_valid():
assert qm.parse_memory_type("goal") == "goal"
assert qm.parse_memory_type("environment") == "environment"
assert qm.parse_memory_type("technique") == "technique"
assert qm.parse_memory_type("reference") == "reference"
assert qm.parse_memory_type("profile") == "profile"
assert qm.parse_memory_type("feedback") == "feedback"
def test_parse_global_memory_category_valid():
assert qm.parse_global_memory_category("profile") == "profile"
assert qm.parse_global_memory_category("preference") == "preference"
assert qm.parse_global_memory_category("environment") == "environment"
assert qm.parse_global_memory_category("feedback") == "feedback"
def test_parse_memory_type_case_insensitive():
assert qm.parse_memory_type("Goal") == "goal"
assert qm.parse_memory_type(" TECHNIQUE ") == "technique"
def test_parse_global_memory_category_case_insensitive():
assert qm.parse_global_memory_category("Profile") == "profile"
assert qm.parse_global_memory_category(" FEEDBACK ") == "feedback"
def test_parse_memory_type_invalid():
assert qm.parse_memory_type("user") is None
assert qm.parse_memory_type("unknown") is None
assert qm.parse_memory_type(None) is None
def test_parse_global_memory_category_invalid():
assert qm.parse_global_memory_category("goal") is None
assert qm.parse_global_memory_category("unknown") is None
assert qm.parse_global_memory_category(None) is None
# ---------------------------------------------------------------------------
@@ -79,11 +73,11 @@ def test_parse_memory_type_invalid():
def test_memory_file_from_path(tmp_path: Path):
f = tmp_path / "test.md"
f.write_text("---\nname: test\ntype: goal\ndescription: a test\n---\nbody\n")
f.write_text("---\nname: test\ntype: profile\ndescription: a test\n---\nbody\n")
mf = qm.MemoryFile.from_path(f)
assert mf.filename == "test.md"
assert mf.name == "test"
assert mf.type == "goal"
assert mf.type == "profile"
assert mf.description == "a test"
assert mf.mtime > 0
@@ -145,7 +139,7 @@ def test_format_memory_manifest():
filename="a.md",
path=Path("a.md"),
name="a",
type="goal",
type="profile",
description="desc a",
mtime=time.time(),
),
@@ -159,54 +153,12 @@ def test_format_memory_manifest():
),
]
manifest = qm.format_memory_manifest(files)
assert "[goal] a.md" in manifest
assert "[profile] a.md" in manifest
assert "desc a" in manifest
assert "[unknown] b.md" in manifest
assert "(no description)" in manifest
# ---------------------------------------------------------------------------
# memory_freshness_text
# ---------------------------------------------------------------------------
def test_memory_freshness_text_recent():
assert qm.memory_freshness_text(time.time()) == ""
def test_memory_freshness_text_old():
three_days_ago = time.time() - 3 * 86_400
text = qm.memory_freshness_text(three_days_ago)
assert "3 days old" in text
assert "point-in-time" in text
# ---------------------------------------------------------------------------
# read_conversation_parts
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_conversation_parts(tmp_path: Path):
parts_dir = tmp_path / "conversations" / "parts"
parts_dir.mkdir(parents=True)
for i in range(5):
(parts_dir / f"{i:010d}.json").write_text(
json.dumps({"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"})
)
msgs = await qm.read_conversation_parts(tmp_path)
assert len(msgs) == 5
assert msgs[0]["content"] == "msg 0"
assert msgs[4]["content"] == "msg 4"
@pytest.mark.asyncio
async def test_read_conversation_parts_empty(tmp_path: Path):
msgs = await qm.read_conversation_parts(tmp_path)
assert msgs == []
# ---------------------------------------------------------------------------
# init_memory_dir
# ---------------------------------------------------------------------------
@@ -233,8 +185,10 @@ async def test_select_memories_empty_dir(tmp_path: Path):
@pytest.mark.asyncio
async def test_select_memories_with_files(tmp_path: Path):
(tmp_path / "a.md").write_text("---\nname: a\ndescription: about A\ntype: goal\n---\nbody")
(tmp_path / "b.md").write_text("---\nname: b\ndescription: about B\ntype: reference\n---\nbody")
(tmp_path / "a.md").write_text("---\nname: a\ndescription: about A\ntype: profile\n---\nbody")
(tmp_path / "b.md").write_text(
"---\nname: b\ndescription: about B\ntype: preference\n---\nbody"
)
llm = AsyncMock()
llm.acomplete.return_value = MagicMock(content=json.dumps({"selected_memories": ["a.md"]}))
@@ -258,7 +212,7 @@ async def test_select_memories_error_returns_empty(tmp_path: Path):
def test_format_recall_injection(tmp_path: Path):
(tmp_path / "a.md").write_text("---\nname: a\n---\nbody of a")
result = format_recall_injection(["a.md"], memory_dir=tmp_path)
assert "Selected Memories" in result
assert "Global Memories" in result
assert "body of a" in result
@@ -266,12 +220,6 @@ def test_format_recall_injection_empty():
assert format_recall_injection([]) == ""
def test_format_recall_injection_custom_heading(tmp_path: Path):
(tmp_path / "a.md").write_text("---\nname: a\n---\nbody of a")
result = format_recall_injection(["a.md"], memory_dir=tmp_path, heading="Colony Memories")
assert "Colony Memories" in result
# ---------------------------------------------------------------------------
# reflection_agent
# ---------------------------------------------------------------------------
@@ -279,10 +227,9 @@ def test_format_recall_injection_custom_heading(tmp_path: Path):
@pytest.mark.asyncio
async def test_short_reflection(tmp_path: Path):
"""Short reflection reads new messages and writes a memory file via LLM tools."""
"""Short reflection reads messages and writes a global memory file via LLM tools."""
from framework.agents.queen.reflection_agent import run_short_reflection
# Set up a fake session dir with conversation parts.
parts_dir = tmp_path / "session" / "conversations" / "parts"
parts_dir.mkdir(parents=True)
for i in range(3):
@@ -291,13 +238,12 @@ async def test_short_reflection(tmp_path: Path):
json.dumps({"role": role, "content": f"message {i}"})
)
mem_dir = tmp_path / "memories"
mem_dir = tmp_path / "global_memory"
mem_dir.mkdir()
# Mock LLM: turn 1 lists files, turn 2 writes a memory, turn 3 stops.
llm = AsyncMock()
llm.acomplete.side_effect = [
# Turn 1: LLM calls write_memory_file
# Turn 1: LLM writes a global memory file
MagicMock(
content="",
raw_response={
@@ -309,7 +255,7 @@ async def test_short_reflection(tmp_path: Path):
"filename": "user-likes-tests.md",
"content": (
"---\nname: user-likes-tests\n"
"type: technique\n"
"type: preference\n"
"description: User values thorough testing\n"
"---\nObserved emphasis on test coverage."
),
@@ -318,23 +264,36 @@ async def test_short_reflection(tmp_path: Path):
]
},
),
# Turn 2: LLM has no more tool calls → done
# Turn 2: done
MagicMock(content="Done reflecting.", raw_response={}),
]
session_dir = tmp_path / "session"
await run_short_reflection(
session_dir,
llm,
memory_dir=mem_dir,
caller="queen",
)
await run_short_reflection(session_dir, llm, memory_dir=mem_dir)
# Verify the memory file was created.
written = mem_dir / "user-likes-tests.md"
assert written.exists()
assert "user-likes-tests" in written.read_text()
assert llm.acomplete.call_count == 2
@pytest.mark.asyncio
async def test_short_reflection_rejects_non_global_types(tmp_path: Path):
"""Reflection agent rejects memory types not in GLOBAL_MEMORY_CATEGORIES."""
from framework.agents.queen.reflection_agent import _execute_tool
mem_dir = tmp_path / "global_memory"
mem_dir.mkdir()
result = _execute_tool(
"write_memory_file",
{
"filename": "bad-type.md",
"content": "---\nname: bad\ntype: goal\n---\nbody",
},
mem_dir,
)
assert "ERROR" in result
assert not (mem_dir / "bad-type.md").exists()
@pytest.mark.asyncio
@@ -342,18 +301,17 @@ async def test_long_reflection(tmp_path: Path):
"""Long reflection reads all memories and can merge/delete them."""
from framework.agents.queen.reflection_agent import run_long_reflection
mem_dir = tmp_path / "memories"
mem_dir = tmp_path / "global_memory"
mem_dir.mkdir()
(mem_dir / "dup-a.md").write_text(
"---\nname: dup-a\ntype: goal\ndescription: goal A\n---\nGoal A details."
"---\nname: dup-a\ntype: profile\ndescription: profile A\n---\nProfile A details."
)
(mem_dir / "dup-b.md").write_text(
"---\nname: dup-b\ntype: goal\ndescription: goal A duplicate\n---\nSame goal A."
"---\nname: dup-b\ntype: profile\ndescription: profile A dup\n---\nSame profile A."
)
llm = AsyncMock()
llm.acomplete.side_effect = [
# Turn 1: LLM lists files
MagicMock(
content="",
raw_response={
@@ -362,7 +320,6 @@ async def test_long_reflection(tmp_path: Path):
]
},
),
# Turn 2: LLM merges dup-b into dup-a and deletes dup-b
MagicMock(
content="",
raw_response={
@@ -373,10 +330,9 @@ async def test_long_reflection(tmp_path: Path):
"input": {
"filename": "dup-a.md",
"content": (
"---\nname: dup-a\ntype: goal\n"
"description: goal A (merged)\n"
"---\nGoal A details."
" Also same goal A."
"---\nname: dup-a\ntype: profile\n"
"description: profile A (merged)\n"
"---\nProfile A details. Also same profile A."
),
},
},
@@ -388,21 +344,18 @@ async def test_long_reflection(tmp_path: Path):
]
},
),
# Turn 3: done
MagicMock(content="Housekeeping complete.", raw_response={}),
]
await run_long_reflection(llm, memory_dir=mem_dir, caller="queen")
await run_long_reflection(llm, memory_dir=mem_dir)
# dup-b should be deleted, dup-a should be updated.
assert not (mem_dir / "dup-b.md").exists()
assert (mem_dir / "dup-a.md").exists()
assert "merged" in (mem_dir / "dup-a.md").read_text()
assert llm.acomplete.call_count == 3
# ---------------------------------------------------------------------------
# Bug 1: Path traversal prevention
# Path traversal prevention
# ---------------------------------------------------------------------------
@@ -410,11 +363,8 @@ def test_path_traversal_read(tmp_path: Path):
from framework.agents.queen.reflection_agent import _execute_tool
(tmp_path / "safe.md").write_text("safe content")
result = _execute_tool(
"read_memory_file", {"filename": "../../etc/passwd"}, tmp_path, caller="queen"
)
result = _execute_tool("read_memory_file", {"filename": "../../etc/passwd"}, tmp_path)
assert "ERROR" in result
assert "path components not allowed" in result.lower() or "escapes" in result.lower()
def test_path_traversal_write(tmp_path: Path):
@@ -424,111 +374,32 @@ def test_path_traversal_write(tmp_path: Path):
"write_memory_file",
{"filename": "../escape.md", "content": "---\nname: evil\n---\nbad"},
tmp_path,
caller="queen",
)
assert "ERROR" in result
assert not (tmp_path.parent / "escape.md").exists()
def test_path_traversal_delete(tmp_path: Path):
from framework.agents.queen.reflection_agent import _execute_tool
(tmp_path / "target.md").write_text("content")
result = _execute_tool(
"delete_memory_file", {"filename": "../target.md"}, tmp_path, caller="queen"
)
assert "ERROR" in result
assert (tmp_path / "target.md").exists() # not deleted
def test_safe_path_accepted(tmp_path: Path):
from framework.agents.queen.reflection_agent import _execute_tool
result = _execute_tool(
"write_memory_file",
{"filename": "good-file.md", "content": "---\nname: good\n---\ncontent"},
{"filename": "good-file.md", "content": "---\nname: good\ntype: profile\n---\ncontent"},
tmp_path,
caller="queen",
)
assert "Wrote" in result
assert (tmp_path / "good-file.md").exists()
result = _execute_tool(
"read_memory_file", {"filename": "good-file.md"}, tmp_path, caller="queen"
)
result = _execute_tool("read_memory_file", {"filename": "good-file.md"}, tmp_path)
assert "content" in result
result = _execute_tool(
"delete_memory_file", {"filename": "good-file.md"}, tmp_path, caller="queen"
)
result = _execute_tool("delete_memory_file", {"filename": "good-file.md"}, tmp_path)
assert "Deleted" in result
def test_init_memory_dir_migrates_shared_memories_into_colony(tmp_path: Path):
source = tmp_path / "legacy-shared"
source.mkdir()
(source / "shared-memory.md").write_text(
"---\nname: shared\ndescription: old shared memory\ntype: goal\n---\nbody",
encoding="utf-8",
)
target = tmp_path / "colony"
qm.migrate_shared_v2_memories(target, source_dir=source)
assert (target / "shared-memory.md").exists()
assert not (source / "shared-memory.md").exists()
assert (target / ".migrated-from-shared-memory").exists()
def test_shared_memory_migration_marker_prevents_repeat(tmp_path: Path):
source = tmp_path / "legacy-shared"
source.mkdir()
target = tmp_path / "colony"
target.mkdir()
(target / ".migrated-from-shared-memory").write_text("done\n", encoding="utf-8")
(source / "shared-memory.md").write_text("body", encoding="utf-8")
qm.migrate_shared_v2_memories(target, source_dir=source)
assert not (target / "shared-memory.md").exists()
assert (source / "shared-memory.md").exists()
def test_global_memory_is_not_populated_by_colony_migration(tmp_path: Path):
source = tmp_path / "legacy-shared"
source.mkdir()
(source / "shared-memory.md").write_text("body", encoding="utf-8")
colony = tmp_path / "colony"
global_dir = tmp_path / "global"
qm.migrate_shared_v2_memories(colony, source_dir=source)
qm.init_memory_dir(global_dir)
assert list(global_dir.glob("*.md")) == []
def test_save_global_memory_rejects_runtime_details(tmp_path: Path):
with pytest.raises(ValueError):
qm.save_global_memory(
category="profile",
description="codebase preference",
content="The user wants the worker graph to use node retries.",
memory_dir=tmp_path,
)
def test_save_global_memory_persists_frontmatter(tmp_path: Path):
filename, path = qm.save_global_memory(
category="preference",
description="Prefers concise updates",
content="The user prefers concise, direct status updates.",
memory_dir=tmp_path,
)
assert filename.endswith(".md")
text = path.read_text(encoding="utf-8")
assert "type: preference" in text
assert "Prefers concise updates" in text
# ---------------------------------------------------------------------------
# system prompt integration
# ---------------------------------------------------------------------------
def test_build_system_prompt_injects_dynamic_memory():
@@ -542,134 +413,30 @@ def test_build_system_prompt_injects_dynamic_memory():
skills_catalog_prompt="",
protocols_prompt="",
memory_prompt="",
dynamic_memory_provider=lambda: "--- Colony Memories ---\nremember this",
dynamic_memory_provider=lambda: "--- Global Memories ---\nremember this",
is_subagent_mode=False,
)
prompt = build_system_prompt_for_node_context(ctx)
assert "Colony Memories" in prompt
assert "Global Memories" in prompt
assert "remember this" in prompt
def test_queen_phase_state_appends_colony_and_global_memory_blocks():
def test_queen_phase_state_appends_global_memory_block():
phase = QueenPhaseState(
prompt_building="base prompt",
_cached_colony_recall_block="--- Colony Memories ---\ncolony",
_cached_global_recall_block="--- Global Memories ---\nglobal",
_cached_global_recall_block="--- Global Memories ---\nglobal stuff",
)
prompt = phase.get_current_prompt()
assert "base prompt" in prompt
assert "Colony Memories" in prompt
assert "Global Memories" in prompt
assert "global stuff" in prompt
@pytest.mark.asyncio
async def test_worker_colony_reflection_at_handoff(tmp_path: Path):
"""Colony reflection runs via WorkerAgent._reflect_colony_memory at node handoff."""
import asyncio
def test_queen_phase_state_prompt_without_memory():
phase = QueenPhaseState(prompt_building="base prompt")
from framework.graph.context import GraphContext
from framework.graph.worker_agent import WorkerAgent
worker_sessions_dir = tmp_path / "worker-sessions"
execution_id = "exec-1"
session_dir = worker_sessions_dir / execution_id / "conversations" / "parts"
session_dir.mkdir(parents=True)
(session_dir / "0000000000.json").write_text(
json.dumps({"role": "user", "content": "Please remember I like terse summaries."}),
encoding="utf-8",
)
(session_dir / "0000000001.json").write_text(
json.dumps({"role": "assistant", "content": "I'll keep that in mind."}),
encoding="utf-8",
)
colony_dir = tmp_path / "colony"
colony_dir.mkdir()
recall_cache: dict[str, str] = {execution_id: ""}
reflect_llm = AsyncMock()
reflect_llm.acomplete.side_effect = [
# Short reflection: write a memory file
MagicMock(
content="",
raw_response={
"tool_calls": [
{
"id": "tc_1",
"name": "write_memory_file",
"input": {
"filename": "user-prefers-terse-summaries.md",
"content": (
"---\n"
"name: user-prefers-terse-summaries\n"
"description: Prefers terse summaries\n"
"type: preference\n"
"---\n\n"
"The user prefers terse summaries."
),
},
}
]
},
),
# Short reflection done
MagicMock(content="done", raw_response={}),
# Recall selector picks the new memory
MagicMock(content=json.dumps({"selected_memories": ["user-prefers-terse-summaries.md"]})),
]
# Build a minimal GraphContext with colony memory fields
gc = MagicMock(spec=GraphContext)
gc.colony_memory_dir = colony_dir
gc.worker_sessions_dir = worker_sessions_dir
gc.colony_recall_cache = recall_cache
gc.colony_reflect_llm = reflect_llm
gc.execution_id = execution_id
gc._colony_reflect_lock = asyncio.Lock()
node_spec = SimpleNamespace(id="test-node")
worker = WorkerAgent.__new__(WorkerAgent)
worker._gc = gc
worker.node_spec = node_spec
await worker._reflect_colony_memory()
assert (colony_dir / "user-prefers-terse-summaries.md").exists()
assert "Colony Memories" in recall_cache[execution_id]
assert "terse summaries" in recall_cache[execution_id]
@pytest.mark.asyncio
async def test_subscribe_worker_triggers_only_lifecycle_events(tmp_path: Path):
"""After simplification, worker triggers only subscribe to start and terminal events."""
colony_dir = tmp_path / "colony"
colony_dir.mkdir()
recall_cache: dict[str, str] = {}
bus = EventBus()
llm = AsyncMock()
subs = await subscribe_worker_memory_triggers(
bus,
llm,
worker_sessions_dir=tmp_path / "sessions",
colony_memory_dir=colony_dir,
recall_cache=recall_cache,
)
try:
# Should have exactly 2 subscriptions (start + terminal)
assert len(subs) == 2
# EXECUTION_STARTED initialises cache
await bus.publish(
AgentEvent(
type=EventType.EXECUTION_STARTED,
stream_id="default",
execution_id="exec-1",
)
)
assert recall_cache.get("exec-1") == ""
finally:
for sub_id in subs:
bus.unsubscribe(sub_id)
prompt = phase.get_current_prompt()
assert "base prompt" in prompt
assert "Global Memories" not in prompt
@@ -147,7 +147,6 @@ async def test_load_worker_core_defaults_to_session_llm_model(monkeypatch, tmp_p
monkeypatch.setattr("framework.runner.AgentRunner.load", fake_load)
monkeypatch.setattr(manager, "_cleanup_stale_active_sessions", lambda *_args: None)
monkeypatch.setattr(manager, "_subscribe_worker_colony_memory", AsyncMock())
monkeypatch.setattr(
"framework.tools.queen_lifecycle_tools._read_agent_triggers_json",
lambda *_args: [],
@@ -158,7 +157,6 @@ async def test_load_worker_core_defaults_to_session_llm_model(monkeypatch, tmp_p
assert load_calls[0]["model"] == "queen-shared-model"
assert session.runner is runner
assert session.runner._llm is session_llm
assert runtime._dynamic_memory_provider_factory is not None
@pytest.mark.asyncio
@@ -183,7 +181,6 @@ async def test_load_worker_core_keeps_explicit_worker_model_override(monkeypatch
monkeypatch.setattr("framework.runner.AgentRunner.load", fake_load)
monkeypatch.setattr(manager, "_cleanup_stale_active_sessions", lambda *_args: None)
monkeypatch.setattr(manager, "_subscribe_worker_colony_memory", AsyncMock())
monkeypatch.setattr(
"framework.tools.queen_lifecycle_tools._read_agent_triggers_json",
lambda *_args: [],
@@ -199,37 +196,4 @@ async def test_load_worker_core_keeps_explicit_worker_model_override(monkeypatch
assert session.runner is runner
assert session.runner._llm is None
@pytest.mark.asyncio
async def test_load_worker_core_continues_when_colony_memory_subscription_fails(
monkeypatch, tmp_path
) -> None:
bus = EventBus()
manager = SessionManager(model="manager-default")
session_llm = SimpleNamespace(model="queen-shared-model")
session = Session(id="session_memory_warning", event_bus=bus, llm=session_llm, loaded_at=0.0)
runtime = SimpleNamespace(is_running=True)
runner = SimpleNamespace(
_llm=None,
_agent_runtime=runtime,
info=MagicMock(return_value={"id": "worker"}),
)
monkeypatch.setattr("framework.runner.AgentRunner.load", lambda *args, **kwargs: runner)
monkeypatch.setattr(manager, "_cleanup_stale_active_sessions", lambda *_args: None)
monkeypatch.setattr(
manager,
"_subscribe_worker_colony_memory",
AsyncMock(side_effect=ImportError("optional memory hook unavailable")),
)
monkeypatch.setattr(
"framework.tools.queen_lifecycle_tools._read_agent_triggers_json",
lambda *_args: [],
)
await manager._load_worker_core(session, tmp_path / "worker_agent")
assert session.runner is runner
assert session.graph_runtime is runtime
assert session.worker_path == tmp_path / "worker_agent"