fix: remove outdated memory cursor design

This commit is contained in:
Richard Tang
2026-04-03 12:38:05 -07:00
parent 9f70868f98
commit 4f588b3010
4 changed files with 31 additions and 283 deletions
+6 -92
View File
@@ -33,7 +33,6 @@ GLOBAL_MEMORY_CATEGORIES: tuple[str, ...] = ("profile", "preference", "environme
_HIVE_QUEEN_DIR = Path.home() / ".hive" / "queen"
# Legacy shared v2 root. Colony memory now lives under queen sessions.
MEMORY_DIR: Path = _HIVE_QUEEN_DIR / "memories"
CURSOR_FILE: Path = MEMORY_DIR / ".cursor.json"
MAX_FILES: int = 200
MAX_FILE_SIZE_BYTES: int = 4096 # 4 KB hard limit per memory file
@@ -78,21 +77,6 @@ def global_memory_dir() -> Path:
return _HIVE_QUEEN_DIR / "global_memory"
def queen_colony_cursor_file(session_dir: Path) -> Path:
"""Return the queen colony cursor file for a session."""
return session_dir / "memory" / "cursors" / "queen-colony.cursor.json"
def queen_global_cursor_file(session_dir: Path) -> Path:
"""Return the queen global cursor file for a session."""
return session_dir / "memory" / "cursors" / "queen-global.cursor.json"
def worker_colony_cursor_file(worker_session_dir: Path) -> Path:
"""Return the worker colony cursor file for one top-level execution."""
return worker_session_dir / "memory" / "worker-colony.cursor.json"
# ---------------------------------------------------------------------------
# Frontmatter parsing (lenient)
# ---------------------------------------------------------------------------
@@ -388,85 +372,15 @@ def memory_freshness_text(mtime: float) -> str:
# ---------------------------------------------------------------------------
def read_cursor(cursor_file: Path | None = None) -> int:
"""Read ``lastMemoryMessageSeq`` from the cursor file. Returns 0 if missing."""
p = cursor_file or CURSOR_FILE
if not p.exists():
return 0
try:
data = json.loads(p.read_text(encoding="utf-8"))
return int(data.get("lastMemoryMessageSeq", 0))
except (json.JSONDecodeError, TypeError, ValueError, OSError):
return 0
async def read_conversation_parts(session_dir: Path) -> list[dict[str, Any]]:
"""Read all conversation parts for a session using FileConversationStore.
def write_cursor(seq: int, cursor_file: Path | None = None) -> None:
"""Persist the cursor seq number. Creates parent dirs if needed."""
p = cursor_file or CURSOR_FILE
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(
json.dumps({"lastMemoryMessageSeq": seq}),
encoding="utf-8",
)
def read_messages_since_cursor(
session_dir: Path,
cursor_seq: int,
) -> tuple[list[dict[str, Any]], int]:
"""Read conversation parts added since *cursor_seq*.
Returns ``(messages, max_seq)``. Each message is the raw JSON dict
from the part file.
**Compaction fallback**: if no files have ``seq > cursor_seq`` (the
cursor was evicted by compaction), all existing parts are returned so
that the reflection agent can still process visible messages.
Returns a list of raw message dicts in sequence order.
"""
parts_dir = session_dir / "conversations" / "parts"
if not parts_dir.is_dir():
return [], cursor_seq
from framework.storage.conversation_store import FileConversationStore
part_files = sorted(parts_dir.glob("*.json"))
if not part_files:
return [], cursor_seq
# Determine which files are new (seq > cursor_seq).
new_files: list[tuple[int, Path]] = []
all_files: list[tuple[int, Path]] = []
for f in part_files:
try:
seq = int(f.stem)
except ValueError:
continue
all_files.append((seq, f))
if seq > cursor_seq:
new_files.append((seq, f))
# Compaction fallback: cursor evicted → return everything visible.
# Only trigger when cursor_seq is beyond the max seq of existing files,
# meaning files were compacted away. If cursor_seq <= max_all_seq there
# is simply nothing new (already up-to-date) — returning empty is correct.
if not new_files and all_files:
max_all_seq = max(seq for seq, _ in all_files)
if cursor_seq > max_all_seq:
new_files = all_files
if not new_files:
return [], cursor_seq
messages: list[dict[str, Any]] = []
max_seq = cursor_seq
for seq, f in new_files:
try:
data = json.loads(f.read_text(encoding="utf-8"))
messages.append(data)
if seq > max_seq:
max_seq = seq
except (json.JSONDecodeError, OSError):
continue
return messages, max_seq
store = FileConversationStore(session_dir / "conversations")
return await store.read_parts()
# ---------------------------------------------------------------------------
+14 -47
View File
@@ -38,11 +38,8 @@ from framework.agents.queen.queen_memory_v2 import (
build_diary_document,
diary_filename,
format_memory_manifest,
read_cursor,
read_messages_since_cursor,
read_conversation_parts,
scan_memory_files,
worker_colony_cursor_file,
write_cursor,
)
from framework.llm.provider import LLMResponse, Tool
@@ -370,24 +367,20 @@ async def run_short_reflection(
session_dir: Path,
llm: Any,
memory_dir: Path | None = None,
*,
cursor_file: Path | None = None,
) -> None:
"""Run a short reflection: extract learnings from new messages."""
"""Run a short reflection: extract learnings from conversation."""
mem_dir = memory_dir or MEMORY_DIR
cursor_seq = read_cursor(cursor_file)
messages, max_seq = read_messages_since_cursor(session_dir, cursor_seq)
messages = await read_conversation_parts(session_dir)
if not messages:
logger.debug("reflect: short — no new messages since cursor %d", cursor_seq)
logger.debug("reflect: short — no conversation parts")
return
logger.debug("reflect: short — %d new messages (cursor %d%d)", len(messages), cursor_seq, max_seq)
logger.debug("reflect: short — %d conversation parts", len(messages))
# Build a readable transcript of the new messages.
# Build a readable transcript from recent messages.
transcript_lines: list[str] = []
for msg in messages:
for msg in messages[-50:]:
role = msg.get("role", "")
content = str(msg.get("content", "")).strip()
if role == "tool":
@@ -395,31 +388,22 @@ async def run_short_reflection(
if not content:
continue
label = "user" if role == "user" else "assistant"
# Truncate very long messages.
if len(content) > 800:
content = content[:800] + ""
transcript_lines.append(f"[{label}]: {content}")
if not transcript_lines:
# Only tool results in the new messages — still advance cursor.
write_cursor(max_seq, cursor_file)
return
transcript = "\n".join(transcript_lines)
user_msg = (
f"## Recent conversation (messages {cursor_seq + 1}{max_seq})\n\n"
f"## Recent conversation ({len(messages)} messages total)\n\n"
f"{transcript}\n\n"
f"Timestamp: {datetime.now().isoformat(timespec='minutes')}"
)
success = await _reflection_loop(llm, _SHORT_REFLECT_SYSTEM, user_msg, mem_dir)
# Advance cursor only on success.
if success:
write_cursor(max_seq, cursor_file)
logger.debug("reflect: short reflection done, cursor → %d", max_seq)
else:
logger.warning("reflect: short reflection failed, cursor NOT advanced (stays at %d)", cursor_seq)
await _reflection_loop(llm, _SHORT_REFLECT_SYSTEM, user_msg, mem_dir)
logger.debug("reflect: short reflection done")
async def run_long_reflection(
@@ -469,7 +453,7 @@ async def run_diary_update(
pass
# Read all conversation messages for context.
messages, _ = read_messages_since_cursor(session_dir, 0)
messages = await read_conversation_parts(session_dir)
transcript_lines: list[str] = []
for msg in messages[-40:]:
role = msg.get("role", "")
@@ -530,7 +514,6 @@ async def subscribe_reflection_triggers(
session_dir: Path,
llm: Any,
memory_dir: Path | None = None,
cursor_file: Path | None = None,
phase_state: Any = None,
) -> list[str]:
"""Subscribe to queen turn events and return subscription IDs.
@@ -560,20 +543,10 @@ async def subscribe_reflection_triggers(
_short_count += 1
logger.debug("reflect: turn complete — short count %d/%d", _short_count, _LONG_REFLECT_INTERVAL)
if _short_count % _LONG_REFLECT_INTERVAL == 0:
await run_short_reflection(
session_dir,
llm,
mem_dir,
cursor_file=cursor_file,
)
await run_short_reflection(session_dir, llm, mem_dir)
await run_long_reflection(llm, mem_dir)
else:
await run_short_reflection(
session_dir,
llm,
mem_dir,
cursor_file=cursor_file,
)
await run_short_reflection(session_dir, llm, mem_dir)
except Exception:
logger.warning("reflect: reflection failed", exc_info=True)
_write_error("short/long reflection")
@@ -686,17 +659,11 @@ async def subscribe_worker_memory_triggers(
if execution_id is None:
return
session_dir = worker_sessions_dir / execution_id
cursor_file = worker_colony_cursor_file(session_dir)
async with _lock:
try:
_short_counts[execution_id] = _short_counts.get(execution_id, 0) + 1
await run_short_reflection(
session_dir,
llm,
colony_memory_dir,
cursor_file=cursor_file,
)
await run_short_reflection(session_dir, llm, colony_memory_dir)
if _short_counts[execution_id] % _LONG_REFLECT_INTERVAL == 0:
await run_long_reflection(llm, colony_memory_dir)
await _update_cache(execution_id)