Merge branch 'feature/colonized-memory' into refactor/automated-testing

This commit is contained in:
Timothy
2026-04-03 14:43:41 -07:00
18 changed files with 349 additions and 1400 deletions
-399
View File
@@ -1,399 +0,0 @@
"""Queen global cross-session memory.
Two-tier memory architecture:
~/.hive/queen/MEMORY.md semantic (who, what, why)
~/.hive/queen/memories/MEMORY-YYYY-MM-DD.md episodic (daily journals)
Semantic and episodic files are injected at queen session start.
Semantic memory (MEMORY.md) is updated automatically at session end via
consolidate_queen_memory() the queen never rewrites this herself.
Episodic memory (MEMORY-date.md) can be written by the queen during a session
via the write_to_diary tool, and is also appended to at session end by
consolidate_queen_memory().
"""
from __future__ import annotations
import asyncio
import json
import logging
import traceback
from datetime import date, datetime
from pathlib import Path
logger = logging.getLogger(__name__)
def _queen_dir() -> Path:
return Path.home() / ".hive" / "queen"
def format_memory_date(d: date) -> str:
"""Return a cross-platform long date label without a zero-padded day."""
return f"{d.strftime('%B')} {d.day}, {d.year}"
def semantic_memory_path() -> Path:
return _queen_dir() / "MEMORY.md"
def episodic_memory_path(d: date | None = None) -> Path:
d = d or date.today()
return _queen_dir() / "memories" / f"MEMORY-{d.strftime('%Y-%m-%d')}.md"
def read_semantic_memory() -> str:
path = semantic_memory_path()
return path.read_text(encoding="utf-8").strip() if path.exists() else ""
def read_episodic_memory(d: date | None = None) -> str:
path = episodic_memory_path(d)
return path.read_text(encoding="utf-8").strip() if path.exists() else ""
def _find_recent_episodic(lookback: int = 7) -> tuple[date, str] | None:
"""Find the most recent non-empty episodic memory within *lookback* days."""
from datetime import timedelta
today = date.today()
for offset in range(lookback):
d = today - timedelta(days=offset)
content = read_episodic_memory(d)
if content:
return d, content
return None
# Budget (in characters) for episodic memory in the system prompt.
_EPISODIC_CHAR_BUDGET = 6_000
def format_for_injection() -> str:
"""Format cross-session memory for system prompt injection.
Returns an empty string if no meaningful content exists yet (e.g. first
session with only the seed template).
"""
semantic = read_semantic_memory()
recent = _find_recent_episodic()
# Suppress injection if semantic is still just the seed template
if semantic and semantic.startswith("# My Understanding of the User\n\n*No sessions"):
semantic = ""
parts: list[str] = []
if semantic:
parts.append(semantic)
if recent:
d, content = recent
# Trim oversized episodic entries to keep the prompt manageable
if len(content) > _EPISODIC_CHAR_BUDGET:
content = content[:_EPISODIC_CHAR_BUDGET] + "\n\n…(truncated)"
today = date.today()
if d == today:
label = f"## Today — {format_memory_date(d)}"
else:
label = f"## {format_memory_date(d)}"
parts.append(f"{label}\n\n{content}")
if not parts:
return ""
body = "\n\n---\n\n".join(parts)
return "--- Your Cross-Session Memory ---\n\n" + body + "\n\n--- End Cross-Session Memory ---"
_SEED_TEMPLATE = """\
# My Understanding of the User
*No sessions recorded yet.*
## Who They Are
## What They're Trying to Achieve
## What's Working
## What I've Learned
"""
def append_episodic_entry(content: str) -> None:
"""Append a timestamped prose entry to today's episodic memory file.
Creates the file (with a date heading) if it doesn't exist yet.
Used both by the queen's diary tool and by the consolidation hook.
"""
ep_path = episodic_memory_path()
ep_path.parent.mkdir(parents=True, exist_ok=True)
today = date.today()
today_str = format_memory_date(today)
timestamp = datetime.now().strftime("%H:%M")
if not ep_path.exists():
header = f"# {today_str}\n\n"
block = f"{header}### {timestamp}\n\n{content.strip()}\n"
else:
block = f"\n\n### {timestamp}\n\n{content.strip()}\n"
with ep_path.open("a", encoding="utf-8") as f:
f.write(block)
def seed_if_missing() -> None:
"""Create MEMORY.md with a blank template if it doesn't exist yet."""
path = semantic_memory_path()
if path.exists():
return
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(_SEED_TEMPLATE, encoding="utf-8")
# ---------------------------------------------------------------------------
# Consolidation prompt
# ---------------------------------------------------------------------------
_SEMANTIC_SYSTEM = """\
You maintain the persistent cross-session memory of an AI assistant called the Queen.
Review the session notes and rewrite MEMORY.md the Queen's durable understanding of the
person she works with across all sessions.
Write entirely in the Queen's voice — first person, reflective, honest.
Not a log of events, but genuine understanding of who this person is over time.
Rules:
- Update and synthesise: incorporate new understanding, update facts that have changed, remove
details that are stale, superseded, or no longer say anything meaningful about the person.
- Keep it as structured markdown with named sections about the PERSON, not about today.
- Do NOT include diary sections, daily logs, or session summaries. Those belong elsewhere.
MEMORY.md is about who they are, what they want, what works not what happened today.
- Reference dates only when noting a lasting milestone (e.g. "since March 8th they prefer X").
- If the session had no meaningful new information about the person,
return the existing text unchanged.
- Do not add fictional details. Only reflect what is evidenced in the notes.
- Stay concise. Prune rather than accumulate. A lean, accurate file is more useful than a
dense one. If something was true once but has been resolved or superseded, remove it.
- Output only the raw markdown content of MEMORY.md. No preamble, no code fences.
"""
_DIARY_SYSTEM = """\
You maintain the daily episodic diary of an AI assistant called the Queen.
You receive: (1) today's existing diary so far, and (2) notes from the latest session.
Rewrite the complete diary for today as a single unified narrative
first person, reflective, honest.
Merge and deduplicate: if the same story (e.g. a research agent stalling) recurred several times,
describe it once with appropriate weight rather than retelling it. Weave in new developments from
the session notes. Preserve important milestones, emotional texture, and session path references.
If today's diary is empty, write the initial entry based on the session notes alone.
Output only the full diary prose no date heading, no timestamp headers,
no preamble, no code fences.
"""
def read_session_context(session_dir: Path, max_messages: int = 80) -> str:
"""Extract a readable transcript from conversation parts.
Reads the last ``max_messages`` conversation parts. Tool results are
omitted only user and assistant turns (with tool-call names noted)
are included.
"""
parts: list[str] = []
# Conversation transcript
parts_dir = session_dir / "conversations" / "parts"
if parts_dir.exists():
part_files = sorted(parts_dir.glob("*.json"))[-max_messages:]
lines: list[str] = []
for pf in part_files:
try:
data = json.loads(pf.read_text(encoding="utf-8"))
role = data.get("role", "")
content = str(data.get("content", "")).strip()
tool_calls = data.get("tool_calls") or []
if role == "tool":
continue # skip verbose tool results
if role == "assistant" and tool_calls and not content:
names = [tc.get("function", {}).get("name", "?") for tc in tool_calls]
lines.append(f"[queen calls: {', '.join(names)}]")
elif content:
label = "user" if role == "user" else "queen"
lines.append(f"[{label}]: {content[:600]}")
except (KeyError, TypeError) as exc:
logger.debug("Skipping malformed conversation message: %s", exc)
continue
except Exception:
logger.warning("Unexpected error parsing conversation message", exc_info=True)
continue
if lines:
parts.append("## Conversation\n\n" + "\n".join(lines))
return "\n\n".join(parts)
# ---------------------------------------------------------------------------
# Context compaction (binary-split LLM summarisation)
# ---------------------------------------------------------------------------
# If the raw session context exceeds this many characters, compact it first
# before sending to the consolidation LLM. ~200 k chars ≈ 50 k tokens.
_CTX_COMPACT_CHAR_LIMIT = 200_000
_CTX_COMPACT_MAX_DEPTH = 8
_COMPACT_SYSTEM = (
"Summarise this conversation segment. Preserve: user goals, key decisions, "
"what was built or changed, emotional tone, and important outcomes. "
"Write concisely in third person past tense. Omit routine tool invocations "
"unless the result matters."
)
async def _compact_context(text: str, llm: object, *, _depth: int = 0) -> str:
"""Binary-split and LLM-summarise *text* until it fits within the char limit.
Mirrors the recursive binary-splitting strategy used by the main agent
compaction pipeline (EventLoopNode._llm_compact).
"""
if len(text) <= _CTX_COMPACT_CHAR_LIMIT or _depth >= _CTX_COMPACT_MAX_DEPTH:
return text
# Split near the midpoint on a line boundary so we don't cut mid-message
mid = len(text) // 2
split_at = text.rfind("\n", 0, mid) + 1
if split_at <= 0:
split_at = mid
half1, half2 = text[:split_at], text[split_at:]
async def _summarise(chunk: str) -> str:
try:
resp = await llm.acomplete(
messages=[{"role": "user", "content": chunk}],
system=_COMPACT_SYSTEM,
max_tokens=2048,
)
return resp.content.strip()
except Exception:
logger.warning(
"queen_memory: context compaction LLM call failed (depth=%d), truncating",
_depth,
)
return chunk[: _CTX_COMPACT_CHAR_LIMIT // 4]
s1, s2 = await asyncio.gather(_summarise(half1), _summarise(half2))
combined = s1 + "\n\n" + s2
if len(combined) > _CTX_COMPACT_CHAR_LIMIT:
return await _compact_context(combined, llm, _depth=_depth + 1)
return combined
async def consolidate_queen_memory(
session_id: str,
session_dir: Path,
llm: object,
) -> None:
"""Update MEMORY.md and append a diary entry based on the current session.
Reads conversation parts from session_dir. Called periodically in
the background and once at session end. Failures are logged and
silently swallowed so they never block teardown.
Args:
session_id: The session ID.
session_dir: Path to the session directory (~/.hive/queen/session/{id}).
llm: LLMProvider instance (must support acomplete()).
"""
try:
session_context = read_session_context(session_dir)
if not session_context:
logger.debug("queen_memory: no session context, skipping consolidation")
return
logger.info("queen_memory: consolidating memory for session %s ...", session_id)
# If the transcript is very large, compact it with recursive binary LLM
# summarisation before sending to the consolidation model.
if len(session_context) > _CTX_COMPACT_CHAR_LIMIT:
logger.info(
"queen_memory: session context is %d chars — compacting first",
len(session_context),
)
session_context = await _compact_context(session_context, llm)
logger.info("queen_memory: compacted to %d chars", len(session_context))
existing_semantic = read_semantic_memory()
today_journal = read_episodic_memory()
today = date.today()
today_str = format_memory_date(today)
user_msg = (
f"## Existing Semantic Memory (MEMORY.md)\n\n"
f"{existing_semantic or '(none yet)'}\n\n"
f"## Today's Diary So Far ({today_str})\n\n"
f"{today_journal or '(none yet)'}\n\n"
f"{session_context}\n\n"
f"## Session Reference\n\n"
f"Session ID: {session_id}\n"
f"Session dir: {session_dir}\n"
)
logger.debug(
"queen_memory: calling LLM (%d chars of context, ~%d tokens est.)",
len(user_msg),
len(user_msg) // 4,
)
from framework.agents.queen.config import default_config
semantic_resp, diary_resp = await asyncio.gather(
llm.acomplete(
messages=[{"role": "user", "content": user_msg}],
system=_SEMANTIC_SYSTEM,
max_tokens=default_config.max_tokens,
),
llm.acomplete(
messages=[{"role": "user", "content": user_msg}],
system=_DIARY_SYSTEM,
max_tokens=default_config.max_tokens,
),
)
new_semantic = semantic_resp.content.strip()
diary_entry = diary_resp.content.strip()
if new_semantic:
path = semantic_memory_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(new_semantic, encoding="utf-8")
logger.info("queen_memory: semantic memory updated (%d chars)", len(new_semantic))
if diary_entry:
# Rewrite today's episodic file in-place — the LLM has merged and
# deduplicated the full day's content, so we replace rather than append.
ep_path = episodic_memory_path()
ep_path.parent.mkdir(parents=True, exist_ok=True)
heading = f"# {today_str}"
ep_path.write_text(f"{heading}\n\n{diary_entry}\n", encoding="utf-8")
logger.info(
"queen_memory: episodic diary rewritten for %s (%d chars)",
today_str,
len(diary_entry),
)
except Exception:
tb = traceback.format_exc()
logger.exception("queen_memory: consolidation failed")
# Write to file so the cause is findable regardless of log verbosity.
error_path = _queen_dir() / "consolidation_error.txt"
try:
error_path.parent.mkdir(parents=True, exist_ok=True)
error_path.write_text(
f"session: {session_id}\ntime: {datetime.now().isoformat()}\n\n{tb}",
encoding="utf-8",
)
except OSError:
pass # Cannot write error file; original exception already logged
+25 -94
View File
@@ -17,6 +17,7 @@ import re
import shutil
import time
from dataclasses import dataclass, field
from datetime import date
from pathlib import Path
from typing import Any
@@ -26,13 +27,12 @@ logger = logging.getLogger(__name__)
# Constants
# ---------------------------------------------------------------------------
MEMORY_TYPES: tuple[str, ...] = ("goal", "environment", "technique", "reference")
MEMORY_TYPES: tuple[str, ...] = ("goal", "environment", "technique", "reference", "diary")
GLOBAL_MEMORY_CATEGORIES: tuple[str, ...] = ("profile", "preference", "environment", "feedback")
_HIVE_QUEEN_DIR = Path.home() / ".hive" / "queen"
# Legacy shared v2 root. Colony memory now lives under queen sessions.
MEMORY_DIR: Path = _HIVE_QUEEN_DIR / "memories"
CURSOR_FILE: Path = MEMORY_DIR / ".cursor.json"
MAX_FILES: int = 200
MAX_FILE_SIZE_BYTES: int = 4096 # 4 KB hard limit per memory file
@@ -77,21 +77,6 @@ def global_memory_dir() -> Path:
return _HIVE_QUEEN_DIR / "global_memory"
def queen_colony_cursor_file(session_dir: Path) -> Path:
"""Return the queen colony cursor file for a session."""
return session_dir / "memory" / "cursors" / "queen-colony.cursor.json"
def queen_global_cursor_file(session_dir: Path) -> Path:
"""Return the queen global cursor file for a session."""
return session_dir / "memory" / "cursors" / "queen-global.cursor.json"
def worker_colony_cursor_file(worker_session_dir: Path) -> Path:
"""Return the worker colony cursor file for one top-level execution."""
return worker_session_dir / "memory" / "worker-colony.cursor.json"
# ---------------------------------------------------------------------------
# Frontmatter parsing (lenient)
# ---------------------------------------------------------------------------
@@ -251,6 +236,22 @@ def build_memory_document(
)
def diary_filename(d: date | None = None) -> str:
"""Return the diary memory filename for date *d* (default: today)."""
d = d or date.today()
return f"MEMORY-{d.strftime('%Y-%m-%d')}.md"
def build_diary_document(*, date_str: str, body: str) -> str:
"""Build a diary memory file with frontmatter."""
return build_memory_document(
name=f"diary-{date_str}",
description=f"Daily session narrative for {date_str}",
mem_type="diary",
body=body,
)
def validate_global_memory_payload(
*,
category: str,
@@ -371,85 +372,15 @@ def memory_freshness_text(mtime: float) -> str:
# ---------------------------------------------------------------------------
def read_cursor(cursor_file: Path | None = None) -> int:
"""Read ``lastMemoryMessageSeq`` from the cursor file. Returns 0 if missing."""
p = cursor_file or CURSOR_FILE
if not p.exists():
return 0
try:
data = json.loads(p.read_text(encoding="utf-8"))
return int(data.get("lastMemoryMessageSeq", 0))
except (json.JSONDecodeError, TypeError, ValueError, OSError):
return 0
async def read_conversation_parts(session_dir: Path) -> list[dict[str, Any]]:
"""Read all conversation parts for a session using FileConversationStore.
def write_cursor(seq: int, cursor_file: Path | None = None) -> None:
"""Persist the cursor seq number. Creates parent dirs if needed."""
p = cursor_file or CURSOR_FILE
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(
json.dumps({"lastMemoryMessageSeq": seq}),
encoding="utf-8",
)
def read_messages_since_cursor(
session_dir: Path,
cursor_seq: int,
) -> tuple[list[dict[str, Any]], int]:
"""Read conversation parts added since *cursor_seq*.
Returns ``(messages, max_seq)``. Each message is the raw JSON dict
from the part file.
**Compaction fallback**: if no files have ``seq > cursor_seq`` (the
cursor was evicted by compaction), all existing parts are returned so
that the reflection agent can still process visible messages.
Returns a list of raw message dicts in sequence order.
"""
parts_dir = session_dir / "conversations" / "parts"
if not parts_dir.is_dir():
return [], cursor_seq
from framework.storage.conversation_store import FileConversationStore
part_files = sorted(parts_dir.glob("*.json"))
if not part_files:
return [], cursor_seq
# Determine which files are new (seq > cursor_seq).
new_files: list[tuple[int, Path]] = []
all_files: list[tuple[int, Path]] = []
for f in part_files:
try:
seq = int(f.stem)
except ValueError:
continue
all_files.append((seq, f))
if seq > cursor_seq:
new_files.append((seq, f))
# Compaction fallback: cursor evicted → return everything visible.
# Only trigger when cursor_seq is beyond the max seq of existing files,
# meaning files were compacted away. If cursor_seq <= max_all_seq there
# is simply nothing new (already up-to-date) — returning empty is correct.
if not new_files and all_files:
max_all_seq = max(seq for seq, _ in all_files)
if cursor_seq > max_all_seq:
new_files = all_files
if not new_files:
return [], cursor_seq
messages: list[dict[str, Any]] = []
max_seq = cursor_seq
for seq, f in new_files:
try:
data = json.loads(f.read_text(encoding="utf-8"))
messages.append(data)
if seq > max_seq:
max_seq = seq
except (json.JSONDecodeError, OSError):
continue
return messages, max_seq
store = FileConversationStore(session_dir / "conversations")
return await store.read_parts()
# ---------------------------------------------------------------------------
@@ -520,7 +451,7 @@ def migrate_legacy_memories(memory_dir: Path | None = None) -> None:
d,
filename=slug,
name=f"legacy-diary-{date_part}",
mem_type="reference",
mem_type="diary",
description=f"Migrated diary entry from {date_part}",
body=content,
)
@@ -1,127 +0,0 @@
# Queen Memory v2 — File-Per-Memory Architecture
```
~/.hive/
├── queen/
│ ├── memories/
│ │ ├── .cursor.json ← Cursor: last processed message seq
│ │ ├── .reflection_error.txt ← Last reflection error traceback
│ │ ├── .legacy/ ← Archived v1 MEMORY.md files
│ │ ├── user-prefers-tests.md ← Individual memory files
│ │ ├── project-api-patterns.md
│ │ └── ...
│ └── session/
│ └── {session_id}/
│ ├── conversations/
│ │ ├── parts/
│ │ │ ├── 0000000001.json
│ │ │ └── ...
│ │ └── spillover/
│ │ └── ...
│ └── data/
│ └── ...
```
---
## How it works
Queen memory has two subsystems: **Reflect** (writing) and **Recall** (reading).
### Reflect — incremental memory extraction
After each queen turn, a lightweight background agent inspects the new messages and extracts learnings into individual `.md` files.
- **Short reflection** — every queen turn. Reads messages since the last cursor position, passes them to a mini LLM loop (max 5 turns) with restricted tools that can list/read/write/delete memory files. Advances the cursor on success.
- **Long reflection** — every 5 short reflections, on context compaction, and at session end. Reads all memory files holistically to organize, deduplicate, and trim noise.
Both run under an `asyncio.Lock` — if a trigger fires while a reflection is active, it's skipped (messages will be reconsidered next time).
### Recall — pre-turn memory selection
Before each turn, a single structured-output LLM call picks up to 5 relevant memories from the file index. The selected files are read, prepended with staleness warnings for files older than 1 day, and injected into the system prompt via `phase_state._cached_recall_block`.
Recall runs as a background task after each `LLM_TURN_COMPLETE` and caches the result. This means memories are technically one turn behind — acceptable because the user's next query isn't known yet when the prompt is composed.
---
## Memory file format
Each file uses YAML frontmatter (convention enforced by prompt, not code):
```markdown
---
name: {{memory name}}
description: {{one-line description — used to decide relevance in future conversations}}
type: {{goal, environment, technique, reference}}
---
{{memory content}}
```
Parsing is **lenient**: broken or missing frontmatter degrades gracefully. Files show up in scans with `None` description and no type. Nothing rejects or repairs malformed files.
---
## Limits
| Setting | Value |
|---------|-------|
| Max memory files | 200 |
| Max file size | 4 KB |
| Max recall selections per turn | 5 |
| Reflection loop max turns | 5 |
| Long reflection interval | Every 5 short reflections |
---
## Cursor-based incremental processing
The cursor (`.cursor.json`) stores the last processed message sequence number. On each short reflection:
1. Read all `parts/*.json` files where `seq > cursor`
2. **Compaction fallback**: if no files match (cursor evicted by compaction), read all visible parts instead
3. Pass new messages to reflection LLM
4. Advance cursor to the new max seq
---
## Debugging
Enable debug logging to see reflect and recall activity:
```bash
# With hive serve
hive serve --debug
# With hive run
hive run --debug path/to/agent
```
This sets the log level to DEBUG, which shows:
| Logger prefix | What you see |
|---|---|
| `reflect: short` | Message count, cursor range, "no new messages" |
| `reflect: long` | File count being organized |
| `reflect: loop` | Turn-by-turn progress, tool call names |
| `reflect: tool` | Individual tool results (write/delete/list) |
| `reflect: turn complete` | Short count progress (e.g. `3/5`) |
| `recall:` | File scan count, selected filenames, injection block count |
| `recall: cache` | Cache update/skip with truncated query |
For tests:
```bash
uv run pytest core/tests/test_queen_memory.py -v --log-cli-level=DEBUG
```
---
## Migration from v1
On first run, `init_memory_dir()` calls `migrate_legacy_memories()` which:
1. Reads old `MEMORY.md` and recent `MEMORY-YYYY-MM-DD.md` files
2. Converts each section/entry into an individual memory file
3. Archives originals to `~/.hive/queen/memories/.legacy/`
+124 -110
View File
@@ -23,6 +23,7 @@ from __future__ import annotations
import asyncio
import json
import logging
import re
import traceback
from datetime import datetime
from pathlib import Path
@@ -34,12 +35,11 @@ from framework.agents.queen.queen_memory_v2 import (
MEMORY_DIR,
MEMORY_FRONTMATTER_EXAMPLE,
MEMORY_TYPES,
build_diary_document,
diary_filename,
format_memory_manifest,
read_cursor,
read_messages_since_cursor,
read_conversation_parts,
scan_memory_files,
worker_colony_cursor_file,
write_cursor,
)
from framework.llm.provider import LLMResponse, Tool
@@ -332,6 +332,29 @@ Rules:
- Remove memories that are no longer relevant or are superseded.
- Keep the total collection lean and high-signal.
- Do NOT invent new information only reorganise what exists.
- Do NOT delete or merge MEMORY-*.md diary files. These are daily narratives
managed by a separate process. You may read them for context but should not
modify them.
"""
_DIARY_SYSTEM = """\
You maintain a daily diary entry for an AI colony session. You receive:
(1) Today's existing diary content (may be empty if this is the first entry).
(2) A transcript of recent conversation messages.
Write a cohesive 3-8 sentence narrative about what happened in this session today.
Cover: what the user asked for, what was accomplished, key decisions or obstacles,
and current status.
Rules:
- If an existing diary is provided, rewrite it as a unified narrative incorporating
the new developments. Merge and deduplicate do not simply append.
- Keep the total narrative under 3000 characters.
- Focus on the story arc of the day, not individual tool calls or code details.
- If the recent messages contain nothing substantive (greetings, routine
confirmations), return the existing diary text unchanged.
- Output only the diary prose. No headings, no timestamps, no code fences, no
frontmatter.
"""
@@ -344,24 +367,20 @@ async def run_short_reflection(
session_dir: Path,
llm: Any,
memory_dir: Path | None = None,
*,
cursor_file: Path | None = None,
) -> None:
"""Run a short reflection: extract learnings from new messages."""
"""Run a short reflection: extract learnings from conversation."""
mem_dir = memory_dir or MEMORY_DIR
cursor_seq = read_cursor(cursor_file)
messages, max_seq = read_messages_since_cursor(session_dir, cursor_seq)
messages = await read_conversation_parts(session_dir)
if not messages:
logger.debug("reflect: short — no new messages since cursor %d", cursor_seq)
logger.debug("reflect: short — no conversation parts")
return
logger.debug("reflect: short — %d new messages (cursor %d%d)", len(messages), cursor_seq, max_seq)
logger.debug("reflect: short — %d conversation parts", len(messages))
# Build a readable transcript of the new messages.
# Build a readable transcript from recent messages.
transcript_lines: list[str] = []
for msg in messages:
for msg in messages[-50:]:
role = msg.get("role", "")
content = str(msg.get("content", "")).strip()
if role == "tool":
@@ -369,31 +388,22 @@ async def run_short_reflection(
if not content:
continue
label = "user" if role == "user" else "assistant"
# Truncate very long messages.
if len(content) > 800:
content = content[:800] + ""
transcript_lines.append(f"[{label}]: {content}")
if not transcript_lines:
# Only tool results in the new messages — still advance cursor.
write_cursor(max_seq, cursor_file)
return
transcript = "\n".join(transcript_lines)
user_msg = (
f"## Recent conversation (messages {cursor_seq + 1}{max_seq})\n\n"
f"## Recent conversation ({len(messages)} messages total)\n\n"
f"{transcript}\n\n"
f"Timestamp: {datetime.now().isoformat(timespec='minutes')}"
)
success = await _reflection_loop(llm, _SHORT_REFLECT_SYSTEM, user_msg, mem_dir)
# Advance cursor only on success.
if success:
write_cursor(max_seq, cursor_file)
logger.debug("reflect: short reflection done, cursor → %d", max_seq)
else:
logger.warning("reflect: short reflection failed, cursor NOT advanced (stays at %d)", cursor_seq)
await _reflection_loop(llm, _SHORT_REFLECT_SYSTEM, user_msg, mem_dir)
logger.debug("reflect: short reflection done")
async def run_long_reflection(
@@ -420,6 +430,77 @@ async def run_long_reflection(
logger.debug("reflect: long reflection done (%d files)", len(files))
async def run_diary_update(
session_dir: Path,
llm: Any,
memory_dir: Path | None = None,
) -> None:
"""Update today's diary file with a narrative of recent activity."""
mem_dir = memory_dir or MEMORY_DIR
fname = diary_filename()
diary_path = mem_dir / fname
today_str = datetime.now().strftime("%Y-%m-%d")
# Read existing diary body (strip frontmatter).
existing_body = ""
if diary_path.exists():
try:
raw = diary_path.read_text(encoding="utf-8")
m = re.match(r"^---\s*\n.*?\n---\s*\n?", raw, re.DOTALL)
existing_body = raw[m.end() :].strip() if m else raw.strip()
except OSError:
pass
# Read all conversation messages for context.
messages = await read_conversation_parts(session_dir)
transcript_lines: list[str] = []
for msg in messages[-40:]:
role = msg.get("role", "")
content = str(msg.get("content", "")).strip()
if role == "tool" or not content:
continue
label = "user" if role == "user" else "assistant"
if len(content) > 600:
content = content[:600] + "..."
transcript_lines.append(f"[{label}]: {content}")
if not transcript_lines:
return
transcript = "\n".join(transcript_lines)
user_msg = (
f"## Today's Diary So Far\n\n"
f"{existing_body or '(no entries yet)'}\n\n"
f"## Recent Conversation\n\n"
f"{transcript}\n\n"
f"Date: {today_str}"
)
try:
from framework.agents.queen.config import default_config
resp = await llm.acomplete(
messages=[{"role": "user", "content": user_msg}],
system=_DIARY_SYSTEM,
max_tokens=min(default_config.max_tokens, 1024),
)
new_body = (resp.content or "").strip()
if not new_body:
return
doc = build_diary_document(date_str=today_str, body=new_body)
if len(doc.encode("utf-8")) > MAX_FILE_SIZE_BYTES:
new_body = new_body[:2800]
doc = build_diary_document(date_str=today_str, body=new_body)
mem_dir.mkdir(parents=True, exist_ok=True)
diary_path.write_text(doc, encoding="utf-8")
logger.debug("diary: updated %s (%d chars)", fname, len(doc))
except Exception:
logger.warning("diary: update failed", exc_info=True)
# ---------------------------------------------------------------------------
# Event-bus integration
# ---------------------------------------------------------------------------
@@ -433,7 +514,6 @@ async def subscribe_reflection_triggers(
session_dir: Path,
llm: Any,
memory_dir: Path | None = None,
cursor_file: Path | None = None,
phase_state: Any = None,
) -> list[str]:
"""Subscribe to queen turn events and return subscription IDs.
@@ -463,24 +543,20 @@ async def subscribe_reflection_triggers(
_short_count += 1
logger.debug("reflect: turn complete — short count %d/%d", _short_count, _LONG_REFLECT_INTERVAL)
if _short_count % _LONG_REFLECT_INTERVAL == 0:
await run_short_reflection(
session_dir,
llm,
mem_dir,
cursor_file=cursor_file,
)
await run_short_reflection(session_dir, llm, mem_dir)
await run_long_reflection(llm, mem_dir)
else:
await run_short_reflection(
session_dir,
llm,
mem_dir,
cursor_file=cursor_file,
)
await run_short_reflection(session_dir, llm, mem_dir)
except Exception:
logger.warning("reflect: reflection failed", exc_info=True)
_write_error("short/long reflection")
# Update daily diary after reflection.
try:
await run_diary_update(session_dir, llm, mem_dir)
except Exception:
logger.warning("reflect: diary update failed", exc_info=True)
# Update recall cache after reflection completes, guaranteeing
# recall sees the current turn's extracted memories.
if phase_state is not None:
@@ -547,12 +623,16 @@ async def subscribe_worker_memory_triggers(
colony_memory_dir: Path,
recall_cache: dict[str, str],
) -> list[str]:
"""Subscribe shared colony memory reflection/recall for top-level worker runs."""
from framework.agents.queen.recall_selector import update_recall_cache
"""Subscribe colony memory lifecycle events for worker runs.
Short reflection is now handled synchronously at node handoff in
``WorkerAgent._reflect_colony_memory()``. This function only manages:
- Recall cache initialisation on execution start
- Final long reflection + cleanup on execution end
"""
from framework.runtime.event_bus import EventType
_lock = asyncio.Lock()
_short_counts: dict[str, int] = {}
_terminal_lock = asyncio.Lock()
def _is_worker_event(event: Any) -> bool:
return bool(
@@ -560,63 +640,6 @@ async def subscribe_worker_memory_triggers(
and getattr(event, "stream_id", None) not in ("queen", "judge")
)
async def _update_cache(execution_id: str) -> None:
session_dir = worker_sessions_dir / execution_id
await update_recall_cache(
session_dir,
llm,
memory_dir=colony_memory_dir,
cache_setter=lambda block, execution_id=execution_id: recall_cache.__setitem__(
execution_id, block
),
heading="Colony Memories",
)
async def _on_turn_complete(event: Any) -> None:
if not _is_worker_event(event):
return
if _lock.locked():
logger.debug("reflect: worker colony reflection skipped — lock busy")
return
execution_id = event.execution_id
if execution_id is None:
return
session_dir = worker_sessions_dir / execution_id
cursor_file = worker_colony_cursor_file(session_dir)
async with _lock:
try:
_short_counts[execution_id] = _short_counts.get(execution_id, 0) + 1
await run_short_reflection(
session_dir,
llm,
colony_memory_dir,
cursor_file=cursor_file,
)
if _short_counts[execution_id] % _LONG_REFLECT_INTERVAL == 0:
await run_long_reflection(llm, colony_memory_dir)
await _update_cache(execution_id)
except Exception:
logger.warning("reflect: worker colony reflection failed", exc_info=True)
_write_error("worker colony reflection")
async def _on_compaction(event: Any) -> None:
if not _is_worker_event(event):
return
if _lock.locked():
return
execution_id = event.execution_id
if execution_id is None:
return
async with _lock:
try:
await run_long_reflection(llm, colony_memory_dir)
await _update_cache(execution_id)
except Exception:
logger.warning("reflect: worker compaction reflection failed", exc_info=True)
_write_error("worker compaction reflection")
async def _on_execution_started(event: Any) -> None:
if not _is_worker_event(event):
return
@@ -629,7 +652,7 @@ async def subscribe_worker_memory_triggers(
execution_id = event.execution_id
if execution_id is None:
return
async with _lock:
async with _terminal_lock:
try:
await run_long_reflection(llm, colony_memory_dir)
except Exception:
@@ -637,21 +660,12 @@ async def subscribe_worker_memory_triggers(
_write_error("worker final reflection")
finally:
recall_cache.pop(execution_id, None)
_short_counts.pop(execution_id, None)
return [
event_bus.subscribe(
event_types=[EventType.EXECUTION_STARTED],
handler=_on_execution_started,
),
event_bus.subscribe(
event_types=[EventType.LLM_TURN_COMPLETE],
handler=_on_turn_complete,
),
event_bus.subscribe(
event_types=[EventType.CONTEXT_COMPACTED],
handler=_on_compaction,
),
event_bus.subscribe(
event_types=[EventType.EXECUTION_COMPLETED, EventType.EXECUTION_FAILED],
handler=_on_execution_terminal,
-286
View File
@@ -1,286 +0,0 @@
"""Worker per-run digest (run diary).
Storage layout:
~/.hive/agents/{agent_name}/runs/{run_id}/digest.md
Each completed or failed worker run gets one digest file. The queen reads
these via get_graph_status(focus='diary') before digging into live runtime
logs the diary is a cheap, persistent record that survives across sessions.
"""
from __future__ import annotations
import logging
import traceback
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from framework.runtime.event_bus import AgentEvent, EventBus
logger = logging.getLogger(__name__)
_DIGEST_SYSTEM = """\
You maintain run digests for a worker agent.
A run digest is a concise, factual record of a single task execution.
Write 3-6 sentences covering:
- What the worker was asked to do (the task/goal)
- What approach it took and what tools it used
- What the outcome was (success, partial, or failure and why if relevant)
- Any notable issues, retries, or escalations to the queen
Write in third person past tense. Be direct and specific.
Omit routine tool invocations unless the result matters.
Output only the digest prose no headings, no code fences.
"""
def _worker_runs_dir(agent_name: str) -> Path:
return Path.home() / ".hive" / "agents" / agent_name / "runs"
def digest_path(agent_name: str, run_id: str) -> Path:
return _worker_runs_dir(agent_name) / run_id / "digest.md"
def _collect_run_events(bus: EventBus, run_id: str, limit: int = 2000) -> list[AgentEvent]:
"""Collect all events belonging to *run_id* from the bus history.
Strategy: find the EXECUTION_STARTED event that carries ``run_id``,
extract its ``execution_id``, then query the bus by that execution_id.
This works because TOOL_CALL_*, EDGE_TRAVERSED, NODE_STALLED etc. carry
execution_id but not run_id.
Falls back to a full-scan run_id filter when EXECUTION_STARTED is not
found (e.g. bus was rotated).
"""
from framework.runtime.event_bus import EventType
# Pass 1: find execution_id via EXECUTION_STARTED with matching run_id
started = bus.get_history(event_type=EventType.EXECUTION_STARTED, limit=limit)
exec_id: str | None = None
for e in started:
if getattr(e, "run_id", None) == run_id and e.execution_id:
exec_id = e.execution_id
break
if exec_id:
return bus.get_history(execution_id=exec_id, limit=limit)
# Fallback: scan all events and match by run_id attribute
return [e for e in bus.get_history(limit=limit) if getattr(e, "run_id", None) == run_id]
def _build_run_context(
events: list[AgentEvent],
outcome_event: AgentEvent | None,
) -> str:
"""Assemble a plain-text run context string for the digest LLM call."""
from framework.runtime.event_bus import EventType
# Reverse so events are in chronological order
events_chron = list(reversed(events))
lines: list[str] = []
# Task input from EXECUTION_STARTED
started = [e for e in events_chron if e.type == EventType.EXECUTION_STARTED]
if started:
inp = started[0].data.get("input", {})
if inp:
lines.append(f"Task input: {str(inp)[:400]}")
# Duration (elapsed so far if no outcome yet)
ref_ts = outcome_event.timestamp if outcome_event else datetime.utcnow()
if started:
elapsed = (ref_ts - started[0].timestamp).total_seconds()
m, s = divmod(int(elapsed), 60)
lines.append(f"Duration so far: {m}m {s}s" if m else f"Duration so far: {s}s")
# Outcome
if outcome_event is None:
lines.append("Status: still running (mid-run snapshot)")
elif outcome_event.type == EventType.EXECUTION_COMPLETED:
out = outcome_event.data.get("output", {})
out_str = f"Outcome: completed. Output: {str(out)[:300]}"
lines.append(out_str if out else "Outcome: completed.")
else:
err = outcome_event.data.get("error", "")
lines.append(f"Outcome: failed. Error: {str(err)[:300]}" if err else "Outcome: failed.")
# Node path (edge traversals)
edges = [e for e in events_chron if e.type == EventType.EDGE_TRAVERSED]
if edges:
parts = [
f"{e.data.get('source_node', '?')}->{e.data.get('target_node', '?')}"
for e in edges[-20:]
]
lines.append(f"Node path: {', '.join(parts)}")
# Tools used
tool_events = [e for e in events_chron if e.type == EventType.TOOL_CALL_COMPLETED]
if tool_events:
names = [e.data.get("tool_name", "?") for e in tool_events]
counts = Counter(names)
summary = ", ".join(f"{name}×{n}" if n > 1 else name for name, n in counts.most_common())
lines.append(f"Tools used: {summary}")
# Note any tool errors
errors = [e for e in tool_events if e.data.get("is_error")]
if errors:
err_names = Counter(e.data.get("tool_name", "?") for e in errors)
lines.append(f"Tool errors: {dict(err_names)}")
# Issues
issue_map = {
EventType.NODE_STALLED: "stall",
EventType.NODE_TOOL_DOOM_LOOP: "doom loop",
EventType.CONSTRAINT_VIOLATION: "constraint violation",
EventType.NODE_RETRY: "retry",
}
issue_parts: list[str] = []
for evt_type, label in issue_map.items():
n = sum(1 for e in events_chron if e.type == evt_type)
if n:
issue_parts.append(f"{n} {label}(s)")
if issue_parts:
lines.append(f"Issues: {', '.join(issue_parts)}")
# Escalations to queen
escalations = [e for e in events_chron if e.type == EventType.ESCALATION_REQUESTED]
if escalations:
lines.append(f"Escalations to queen: {len(escalations)}")
# Final LLM output snippet (last LLM_TEXT_DELTA snapshot)
text_events = [e for e in reversed(events_chron) if e.type == EventType.LLM_TEXT_DELTA]
if text_events:
snapshot = text_events[0].data.get("snapshot", "") or ""
if snapshot:
lines.append(f"Final LLM output: {snapshot[-400:].strip()}")
return "\n".join(lines)
async def consolidate_worker_run(
agent_name: str,
run_id: str,
outcome_event: AgentEvent | None,
bus: EventBus,
llm: Any,
) -> None:
"""Write (or overwrite) the digest for a worker run.
Called fire-and-forget either:
- After EXECUTION_COMPLETED / EXECUTION_FAILED (outcome_event set, final write)
- Periodically during a run on a cooldown timer (outcome_event=None, mid-run snapshot)
The digest file is always overwritten so each call produces the freshest view.
The final completion/failure call supersedes any mid-run snapshot.
Args:
agent_name: Worker agent directory name (determines storage path).
run_id: The run ID.
outcome_event: EXECUTION_COMPLETED or EXECUTION_FAILED event, or None for
a mid-run snapshot.
bus: The session EventBus (shared queen + worker).
llm: LLMProvider with an acomplete() method.
"""
try:
events = _collect_run_events(bus, run_id)
run_context = _build_run_context(events, outcome_event)
if not run_context:
logger.debug("worker_memory: no events for run %s, skipping digest", run_id)
return
is_final = outcome_event is not None
logger.info(
"worker_memory: generating %s digest for run %s ...",
"final" if is_final else "mid-run",
run_id,
)
from framework.agents.queen.config import default_config
resp = await llm.acomplete(
messages=[{"role": "user", "content": run_context}],
system=_DIGEST_SYSTEM,
max_tokens=min(default_config.max_tokens, 512),
)
digest_text = (resp.content or "").strip()
if not digest_text:
logger.warning("worker_memory: LLM returned empty digest for run %s", run_id)
return
path = digest_path(agent_name, run_id)
path.parent.mkdir(parents=True, exist_ok=True)
from framework.runtime.event_bus import EventType
ts = (outcome_event.timestamp if outcome_event else datetime.utcnow()).strftime(
"%Y-%m-%d %H:%M"
)
if outcome_event is None:
status = "running"
elif outcome_event.type == EventType.EXECUTION_COMPLETED:
status = "completed"
else:
status = "failed"
path.write_text(
f"# {run_id}\n\n**{ts}** | {status}\n\n{digest_text}\n",
encoding="utf-8",
)
logger.info(
"worker_memory: %s digest written for run %s (%d chars)",
status,
run_id,
len(digest_text),
)
except Exception:
tb = traceback.format_exc()
logger.exception("worker_memory: digest failed for run %s", run_id)
# Persist the error so it's findable without log access
error_path = _worker_runs_dir(agent_name) / run_id / "digest_error.txt"
try:
error_path.parent.mkdir(parents=True, exist_ok=True)
error_path.write_text(
f"run_id: {run_id}\ntime: {datetime.now().isoformat()}\n\n{tb}",
encoding="utf-8",
)
except Exception:
pass
def read_recent_digests(agent_name: str, max_runs: int = 5) -> list[tuple[str, str]]:
"""Return recent run digests as [(run_id, content), ...], newest first.
Args:
agent_name: Worker agent directory name.
max_runs: Maximum number of digests to return.
Returns:
List of (run_id, digest_content) tuples, ordered newest first.
"""
runs_dir = _worker_runs_dir(agent_name)
if not runs_dir.exists():
return []
digest_files = sorted(
runs_dir.glob("*/digest.md"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)[:max_runs]
result: list[tuple[str, str]] = []
for f in digest_files:
try:
content = f.read_text(encoding="utf-8").strip()
if content:
result.append((f.parent.name, content))
except OSError:
continue
return result
+6
View File
@@ -67,6 +67,12 @@ class GraphContext:
# Retry tracking: worker_id → retry_count (for execution quality assessment)
retry_counts: dict[str, int] = field(default_factory=dict)
nodes_with_retries: set[str] = field(default_factory=set)
# Colony memory reflection at node handoff
colony_memory_dir: Any = None # Path | None
worker_sessions_dir: Any = None # Path | None
colony_recall_cache: dict[str, str] = field(default_factory=dict)
colony_reflect_llm: Any = None # LLMProvider for reflection
_colony_reflect_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
def build_scoped_buffer(buffer: DataBuffer, node_spec: NodeSpec) -> DataBuffer:
+7
View File
@@ -1147,6 +1147,9 @@ class NodeConversation:
Used in isolated mode so a node only sees its own
messages in the shared flat store. In continuous mode
pass ``None`` to load all parts.
run_id: If set, only load parts matching this run_id.
Ensures intentional restarts (new run_id) start fresh
while crash recovery (same run_id) resumes correctly.
Returns ``None`` if the store contains no metadata (i.e. the
conversation was never persisted).
@@ -1168,6 +1171,10 @@ class NodeConversation:
parts = await store.read_parts()
if phase_id:
parts = [p for p in parts if p.get("phase_id") == phase_id]
# Filter by run_id so intentional restarts (new run_id) start fresh
# while crash recovery (same run_id) loads prior parts.
if run_id and not is_legacy_run_id(run_id):
parts = [p for p in parts if p.get("run_id") == run_id]
conv._messages = [Message.from_storage_dict(p) for p in parts]
cursor = await store.read_cursor()
@@ -62,6 +62,12 @@ async def restore(
if conversation is None:
return None
# If run_id filtering removed all messages, this is an intentional
# restart (new run), not a crash recovery. Return None so the caller
# falls through to the fresh-conversation path.
if conversation.message_count == 0:
return None
accumulator = await OutputAccumulator.restore(conversation_store, run_id=ctx.effective_run_id)
accumulator.spillover_dir = config.spillover_dir
accumulator.max_value_chars = config.max_output_value_chars
+13 -7
View File
@@ -160,6 +160,10 @@ class GraphExecutor:
skill_dirs: list[str] | None = None,
context_warn_ratio: float | None = None,
batch_init_nudge: str | None = None,
colony_memory_dir: Any = None,
colony_worker_sessions_dir: Any = None,
colony_recall_cache: dict[str, str] | None = None,
colony_reflect_llm: Any = None,
):
"""
Initialize the executor.
@@ -221,6 +225,10 @@ class GraphExecutor:
self.skill_dirs: list[str] = skill_dirs or []
self.context_warn_ratio: float | None = context_warn_ratio
self.batch_init_nudge: str | None = batch_init_nudge
self.colony_memory_dir = colony_memory_dir
self.colony_worker_sessions_dir = colony_worker_sessions_dir
self.colony_recall_cache = colony_recall_cache or {}
self.colony_reflect_llm = colony_reflect_llm
if protocols_prompt:
self.logger.info(
@@ -1318,6 +1326,10 @@ class GraphExecutor:
iteration_metadata_provider=self.iteration_metadata_provider,
loop_config=self._loop_config,
node_visit_counts=dict(node_visit_counts),
colony_memory_dir=self.colony_memory_dir,
worker_sessions_dir=self.colony_worker_sessions_dir,
colony_recall_cache=self.colony_recall_cache,
colony_reflect_llm=self.colony_reflect_llm,
)
# Create one WorkerAgent per node
@@ -1581,13 +1593,7 @@ class GraphExecutor:
# Wait for completion — two strategies depending on event bus availability
if has_event_subscription and sub_completed is not None:
# Event-driven: wait for completion events
if terminal_worker_ids:
await completion_event.wait()
else:
for _ in range(graph.max_steps * 10):
if _check_graph_done():
break
await asyncio.sleep(0.1)
await completion_event.wait()
else:
# No event bus: wait on worker tasks directly and route completions inline.
pending_tasks: dict[str, asyncio.Task] = {
+56
View File
@@ -318,6 +318,8 @@ class WorkerAgent:
self.lifecycle = WorkerLifecycle.COMPLETED
self._last_result = result
self._last_activations = activations
# Colony memory reflection — runs before downstream activation
await self._reflect_colony_memory()
completion = WorkerCompletion(
worker_id=node_spec.id,
success=True,
@@ -338,6 +340,8 @@ class WorkerAgent:
self.lifecycle = WorkerLifecycle.FAILED
self._last_result = result
self._last_activations = activations
# Colony memory reflection — capture learnings even on failure
await self._reflect_colony_memory()
await self._publish_failure(result.error or "Unknown error")
except Exception as exc:
error = str(exc) or type(exc).__name__
@@ -649,6 +653,58 @@ class WorkerAgent:
pause_event=self._pause_requested,
)
async def _reflect_colony_memory(self) -> None:
"""Run colony memory reflection at node handoff.
Awaits the shared colony lock so parallel workers queue (never skip).
"""
gc = self._gc
if gc.colony_memory_dir is None or gc.colony_reflect_llm is None:
return
if gc.worker_sessions_dir is None:
return
from pathlib import Path
session_dir = Path(gc.worker_sessions_dir) / gc.execution_id
if not session_dir.exists():
return
# Await lock — serializes reflection but never skips
async with gc._colony_reflect_lock:
try:
from framework.agents.queen.reflection_agent import run_short_reflection
await run_short_reflection(
session_dir, gc.colony_reflect_llm, gc.colony_memory_dir
)
except Exception:
logger.warning(
"Worker %s: colony reflection failed",
self.node_spec.id,
exc_info=True,
)
# Update recall cache outside lock (per-execution key, no write races)
try:
from framework.agents.queen.recall_selector import update_recall_cache
await update_recall_cache(
session_dir,
gc.colony_reflect_llm,
memory_dir=gc.colony_memory_dir,
cache_setter=lambda block: gc.colony_recall_cache.__setitem__(
gc.execution_id, block
),
heading="Colony Memories",
)
except Exception:
logger.warning(
"Worker %s: recall cache update failed",
self.node_spec.id,
exc_info=True,
)
# ------------------------------------------------------------------
# Event publishing
# ------------------------------------------------------------------
+9
View File
@@ -239,6 +239,11 @@ class AgentRuntime:
self._tool_executor = tool_executor
self._accounts_prompt = accounts_prompt
self._dynamic_memory_provider_factory: Callable[[str], Callable[[], str] | None] | None = None
# Colony memory config for reflection-at-handoff (set by session_manager)
self._colony_memory_dir: Any = None
self._colony_worker_sessions_dir: Any = None
self._colony_recall_cache: dict[str, str] | None = None
self._colony_reflect_llm: Any = None
self._accounts_data = accounts_data
self._tool_provider_map = tool_provider_map
@@ -362,6 +367,10 @@ class AgentRuntime:
context_warn_ratio=self.context_warn_ratio,
batch_init_nudge=self.batch_init_nudge,
dynamic_memory_provider_factory=self._dynamic_memory_provider_factory,
colony_memory_dir=self._colony_memory_dir,
colony_worker_sessions_dir=self._colony_worker_sessions_dir,
colony_recall_cache=self._colony_recall_cache,
colony_reflect_llm=self._colony_reflect_llm,
)
await stream.start()
self._streams[ep_id] = stream
@@ -192,6 +192,10 @@ class ExecutionStream:
context_warn_ratio: float | None = None,
batch_init_nudge: str | None = None,
dynamic_memory_provider_factory: Callable[[str], Callable[[], str] | None] | None = None,
colony_memory_dir: Any = None,
colony_worker_sessions_dir: Any = None,
colony_recall_cache: dict[str, str] | None = None,
colony_reflect_llm: Any = None,
):
"""
Initialize execution stream.
@@ -247,6 +251,10 @@ class ExecutionStream:
self._context_warn_ratio: float | None = context_warn_ratio
self._batch_init_nudge: str | None = batch_init_nudge
self._dynamic_memory_provider_factory = dynamic_memory_provider_factory
self._colony_memory_dir = colony_memory_dir
self._colony_worker_sessions_dir = colony_worker_sessions_dir
self._colony_recall_cache = colony_recall_cache
self._colony_reflect_llm = colony_reflect_llm
_es_logger = logging.getLogger(__name__)
if protocols_prompt:
@@ -727,6 +735,10 @@ class ExecutionStream:
if self._dynamic_memory_provider_factory is not None
else None
),
colony_memory_dir=self._colony_memory_dir,
colony_worker_sessions_dir=self._colony_worker_sessions_dir,
colony_recall_cache=self._colony_recall_cache,
colony_reflect_llm=self._colony_reflect_llm,
)
# Track executor so inject_input() can reach EventLoopNode instances
self._active_executors[execution_id] = executor
+2 -2
View File
@@ -186,7 +186,6 @@ async def create_queen(
colony_memory_dir,
global_memory_dir,
init_memory_dir,
queen_colony_cursor_file,
)
colony_dir = colony_memory_dir(session.id)
@@ -334,6 +333,8 @@ async def create_queen(
dynamic_prompt_provider=phase_state.get_current_prompt,
iteration_metadata_provider=lambda: {"phase": phase_state.phase},
skill_dirs=_queen_skill_dirs,
protocols_prompt=phase_state.protocols_prompt,
skills_catalog_prompt=phase_state.skills_catalog_prompt,
)
session.queen_executor = executor
logger.debug("[_queen_loop] GraphExecutor created and stored in session.queen_executor")
@@ -401,7 +402,6 @@ async def create_queen(
queen_dir,
session.llm,
memory_dir=colony_dir,
cursor_file=queen_colony_cursor_file(queen_dir),
phase_state=phase_state,
)
+7 -1
View File
@@ -213,8 +213,14 @@ async def handle_chat(request: web.Request) -> web.Response:
manager: Any = request.app["manager"]
try:
logger.debug("[handle_chat] Calling manager.revive_queen()...")
await manager.revive_queen(session, initial_prompt=message)
await manager.revive_queen(session)
logger.debug("[handle_chat] revive_queen() completed successfully")
# Inject the user's message into the revived queen's queue so the
# event loop drains it and clears any restored pending_input_state.
_revived_executor = session.queen_executor
_revived_node = _revived_executor.node_registry.get("queen") if _revived_executor else None
if _revived_node is not None and hasattr(_revived_node, "inject_event"):
await _revived_node.inject_event(message, is_client_input=True, image_content=image_content)
return web.json_response(
{
"status": "queen_revived",
+8 -149
View File
@@ -51,8 +51,6 @@ class Session:
worker_memory_subs: list = field(default_factory=list) # list[str]
# Per-execution colony recall cache for worker prompts
worker_colony_recall_blocks: dict[str, str] = field(default_factory=dict)
# Worker run digest subscription (fires on EXECUTION_COMPLETED / EXECUTION_FAILED)
worker_digest_sub: str | None = None
# Trigger definitions loaded from agent's triggers.json (available but inactive)
available_triggers: dict[str, TriggerDefinition] = field(default_factory=dict)
# Active trigger tracking (IDs currently firing + their asyncio tasks)
@@ -370,11 +368,9 @@ class SessionManager:
session.graph_runtime = runtime
session.worker_info = info
# Subscribe to execution completion for per-run digest generation.
# Colony memory is additive; worker loading should still succeed if
# that optional subscription path hits an import/runtime issue while
# restoring an older session.
self._subscribe_worker_digest(session)
try:
await self._subscribe_worker_colony_memory(session)
except Exception:
@@ -621,13 +617,6 @@ class SessionManager:
await self._emit_trigger_events(session, "removed", session.available_triggers)
session.available_triggers.clear()
if session.worker_digest_sub is not None:
try:
session.event_bus.unsubscribe(session.worker_digest_sub)
except Exception:
pass
session.worker_digest_sub = None
for sub_id in session.worker_memory_subs:
try:
session.event_bus.unsubscribe(sub_id)
@@ -673,13 +662,6 @@ class SessionManager:
pass
session.worker_handoff_sub = None
if session.worker_digest_sub is not None:
try:
session.event_bus.unsubscribe(session.worker_digest_sub)
except Exception:
pass
session.worker_digest_sub = None
for sub_id in session.worker_memory_subs:
try:
session.event_bus.unsubscribe(sub_id)
@@ -773,135 +755,6 @@ class SessionManager:
else:
logger.warning("Worker handoff received but queen node not ready")
def _subscribe_worker_digest(self, session: Session) -> None:
"""Subscribe to worker events to write per-run digests.
Three triggers:
- NODE_LOOP_ITERATION: write a mid-run snapshot, throttled to at most
once every _DIGEST_COOLDOWN seconds per execution.
- TOOL_CALL_COMPLETED for delegate_to_sub_agent: same throttled snapshot.
Orchestrator nodes often run all subagent calls in a single LLM turn,
so NODE_LOOP_ITERATION only fires once at the end. Subagent
completions provide intermediate checkpoints.
- EXECUTION_COMPLETED / EXECUTION_FAILED: always write the final digest,
bypassing the cooldown.
"""
import time as _time
from framework.runtime.event_bus import EventType as _ET
_DIGEST_COOLDOWN = 300.0 # seconds between mid-run snapshots
if session.worker_digest_sub is not None:
try:
session.event_bus.unsubscribe(session.worker_digest_sub)
except Exception:
pass
session.worker_digest_sub = None
agent_name = session.worker_path.name if session.worker_path else None
if not agent_name:
return
_agent_name = agent_name
_llm = session.llm
_bus = session.event_bus
# per-execution_id monotonic timestamp of last mid-run digest
_last_digest: dict[str, float] = {}
def _resolve_run_id(exec_id: str) -> str | None:
"""Look up the run_id for a given execution_id via EXECUTION_STARTED history."""
for e in _bus.get_history(event_type=_ET.EXECUTION_STARTED, limit=200):
if e.execution_id == exec_id and getattr(e, "run_id", None):
return e.run_id
return None
async def _inject_digest_to_queen(run_id: str) -> None:
"""Read the written digest and push it into the queen's conversation."""
from framework.agents.worker_memory import digest_path
try:
content = digest_path(_agent_name, run_id).read_text(encoding="utf-8").strip()
except OSError:
return
if not content:
return
executor = session.queen_executor
if executor is None:
return
node = executor.node_registry.get("queen")
if node is None or not hasattr(node, "inject_event"):
return
await node.inject_event(f"[WORKER_DIGEST]\n{content}")
async def _consolidate_and_notify(run_id: str, outcome_event: Any) -> None:
"""Write the digest then push it to the queen."""
from framework.agents.worker_memory import consolidate_worker_run
await consolidate_worker_run(_agent_name, run_id, outcome_event, _bus, _llm)
await _inject_digest_to_queen(run_id)
async def _on_worker_event(event: Any) -> None:
if event.stream_id == "queen":
return
exec_id = event.execution_id
if event.type == _ET.EXECUTION_STARTED:
# New run on this execution_id — start the cooldown timer so
# mid-run snapshots don't fire immediately at session start.
# The first snapshot will happen after _DIGEST_COOLDOWN seconds.
if exec_id:
_last_digest[exec_id] = _time.monotonic()
elif event.type in (
_ET.EXECUTION_COMPLETED,
_ET.EXECUTION_FAILED,
_ET.EXECUTION_PAUSED,
):
# Final digest — always fire, ignore cooldown.
# EXECUTION_PAUSED covers cancellation (queen re-triggering the
# worker cancels the previous execution, emitting paused).
run_id = getattr(event, "run_id", None) or _resolve_run_id(exec_id)
if run_id:
asyncio.create_task(
_consolidate_and_notify(run_id, event),
name=f"worker-digest-final-{run_id}",
)
elif event.type in (_ET.NODE_LOOP_ITERATION, _ET.TOOL_CALL_COMPLETED):
# Mid-run snapshot — respect 300 s cooldown per execution.
# TOOL_CALL_COMPLETED is only interesting for subagent calls;
# regular tool completions are too frequent and too cheap.
if event.type == _ET.TOOL_CALL_COMPLETED:
tool_name = (event.data or {}).get("tool_name", "")
if tool_name != "delegate_to_sub_agent":
return
if not exec_id:
return
now = _time.monotonic()
if now - _last_digest.get(exec_id, 0.0) < _DIGEST_COOLDOWN:
return
run_id = _resolve_run_id(exec_id)
if run_id:
_last_digest[exec_id] = now
asyncio.create_task(
_consolidate_and_notify(run_id, None),
name=f"worker-digest-{run_id}",
)
session.worker_digest_sub = session.event_bus.subscribe(
event_types=[
_ET.EXECUTION_STARTED,
_ET.NODE_LOOP_ITERATION,
_ET.TOOL_CALL_COMPLETED,
_ET.EXECUTION_COMPLETED,
_ET.EXECUTION_FAILED,
_ET.EXECUTION_PAUSED,
],
handler=_on_worker_event,
)
async def _subscribe_worker_colony_memory(self, session: Session) -> None:
"""Subscribe shared colony reflection/recall for top-level worker runs."""
for sub_id in session.worker_memory_subs:
@@ -936,6 +789,12 @@ class SessionManager:
)
)
# Colony memory config for reflection-at-handoff
runtime._colony_memory_dir = colony_dir
runtime._colony_worker_sessions_dir = worker_sessions_dir
runtime._colony_recall_cache = session.worker_colony_recall_blocks
runtime._colony_reflect_llm = session.llm
session.worker_memory_subs = await subscribe_worker_memory_triggers(
session.event_bus,
session.llm,
@@ -1226,7 +1085,7 @@ class SessionManager:
)
)
async def revive_queen(self, session: Session, initial_prompt: str | None = None) -> None:
async def revive_queen(self, session: Session) -> None:
"""Revive a dead queen executor on an existing session.
Restarts the queen with the same session context (worker profile, tools, etc.).
@@ -1246,7 +1105,7 @@ class SessionManager:
# Start queen with existing session context
logger.debug("[revive_queen] Calling _start_queen...")
await self._start_queen(
session, worker_identity=worker_identity, initial_prompt=initial_prompt
session, worker_identity=worker_identity
)
logger.info("Queen revived for session '%s', new queen_executor=%s", session.id, session.queen_executor)
+6 -37
View File
@@ -2466,30 +2466,6 @@ def register_queen_lifecycle_tools(
"""Get the session's event bus for querying history."""
return getattr(session, "event_bus", None)
def _get_graph_name() -> str | None:
"""Return the loaded agent directory name, used for diary lookups."""
p = getattr(session, "worker_path", None)
return p.name if p else None
def _format_diary(max_runs: int) -> str:
"""Read recent run digests from disk — no EventBus required."""
agent_name = _get_graph_name()
if not agent_name:
return "No worker loaded — diary unavailable."
from framework.agents.worker_memory import read_recent_digests
entries = read_recent_digests(agent_name, max_runs)
if not entries:
return (
f"No run digests for '{agent_name}' yet. "
"Digests are written at the end of each completed run."
)
lines = [f"Worker '{agent_name}'{len(entries)} recent run digest(s):", ""]
for _run_id, content in entries:
lines.append(content)
lines.append("")
return "\n".join(lines).rstrip()
# Tiered cooldowns: summary is free, detail has short cooldown, full keeps 30s
_COOLDOWN_FULL = 30.0
_COOLDOWN_DETAIL = 10.0
@@ -3092,14 +3068,14 @@ def register_queen_lifecycle_tools(
import time as _time
# --- Tiered cooldown ---
# diary is free (file reads only), summary is free, detail has 10s, full has 30s
# summary is free, detail has 10s, full keeps 30s
now = _time.monotonic()
if focus == "full":
cooldown = _COOLDOWN_FULL
tier = "full"
elif focus == "diary" or focus is None:
elif focus is None:
cooldown = 0.0
tier = focus or "summary"
tier = "summary"
else:
cooldown = _COOLDOWN_DETAIL
tier = "detail"
@@ -3118,10 +3094,6 @@ def register_queen_lifecycle_tools(
)
_status_last_called[tier] = now
# --- Diary: pure file reads, no runtime required ---
if focus == "diary":
return _format_diary(last_n)
# --- Runtime check ---
runtime = _get_runtime()
if runtime is None:
@@ -3171,7 +3143,7 @@ def register_queen_lifecycle_tools(
else:
return (
f"Unknown focus '{focus}'. "
"Valid options: diary, activity, memory, tools, issues, progress, full."
"Valid options: activity, memory, tools, issues, progress, full."
)
except Exception as exc:
logger.exception("get_graph_status error")
@@ -3182,8 +3154,6 @@ def register_queen_lifecycle_tools(
description=(
"Check on the loaded graph. Returns a brief prose summary by default. "
"Use 'focus' to drill into specifics:\n"
"- diary: persistent run digests from past executions — read this first "
"before digging into live runtime logs\n"
"- activity: current node, transitions, latest LLM output\n"
"- memory: worker's accumulated buffer state\n"
"- tools: running and recent tool calls\n"
@@ -3196,10 +3166,9 @@ def register_queen_lifecycle_tools(
"properties": {
"focus": {
"type": "string",
"enum": ["diary", "activity", "memory", "tools", "issues", "progress", "full"],
"enum": ["activity", "memory", "tools", "issues", "progress", "full"],
"description": (
"Aspect to inspect. Omit for a brief summary. "
"Use 'diary' to read persistent run history before checking live logs."
"Aspect to inspect. Omit for a brief summary."
),
},
"last_n": {
@@ -1,21 +0,0 @@
"""Deprecated — queen memory tools replaced by reflection agent + recall selector.
The write_to_diary and recall_diary tools are no longer registered.
Memory is now handled automatically:
- Save: reflection_agent.py extracts learnings after each turn
- Recall: recall_selector.py picks relevant memories before each turn
This file is kept only for import compatibility during migration.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from framework.runner.tool_registry import ToolRegistry
def register_queen_memory_tools(registry: ToolRegistry) -> None: # noqa: ARG001
"""No-op — memory tools replaced by automatic reflection + recall."""
pass
+68 -167
View File
@@ -182,29 +182,12 @@ def test_memory_freshness_text_old():
# ---------------------------------------------------------------------------
# Cursor
# read_conversation_parts
# ---------------------------------------------------------------------------
def test_cursor_read_write(tmp_path: Path):
cursor_file = tmp_path / ".cursor.json"
assert qm.read_cursor(cursor_file) == 0
qm.write_cursor(42, cursor_file)
assert qm.read_cursor(cursor_file) == 42
def test_cursor_read_corrupted(tmp_path: Path):
cursor_file = tmp_path / ".cursor.json"
cursor_file.write_text("not json", encoding="utf-8")
assert qm.read_cursor(cursor_file) == 0
# ---------------------------------------------------------------------------
# read_messages_since_cursor
# ---------------------------------------------------------------------------
def test_read_messages_since_cursor(tmp_path: Path):
@pytest.mark.asyncio
async def test_read_conversation_parts(tmp_path: Path):
parts_dir = tmp_path / "conversations" / "parts"
parts_dir.mkdir(parents=True)
for i in range(5):
@@ -212,23 +195,16 @@ def test_read_messages_since_cursor(tmp_path: Path):
json.dumps({"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"})
)
msgs, max_seq = qm.read_messages_since_cursor(tmp_path, 2)
assert max_seq == 4
assert len(msgs) == 2 # seq 3 and 4
msgs = await qm.read_conversation_parts(tmp_path)
assert len(msgs) == 5
assert msgs[0]["content"] == "msg 0"
assert msgs[4]["content"] == "msg 4"
def test_read_messages_since_cursor_compaction_fallback(tmp_path: Path):
"""When cursor is ahead of all files (evicted), return everything."""
parts_dir = tmp_path / "conversations" / "parts"
parts_dir.mkdir(parents=True)
for i in range(3):
(parts_dir / f"{i:010d}.json").write_text(
json.dumps({"role": "user", "content": f"msg {i}"})
)
msgs, max_seq = qm.read_messages_since_cursor(tmp_path, 999)
assert len(msgs) == 3 # Fallback: returns all
assert max_seq == 999 # Cursor stays (will be overwritten by caller)
@pytest.mark.asyncio
async def test_read_conversation_parts_empty(tmp_path: Path):
msgs = await qm.read_conversation_parts(tmp_path)
assert msgs == []
# ---------------------------------------------------------------------------
@@ -348,19 +324,12 @@ async def test_short_reflection(tmp_path: Path):
session_dir,
llm,
memory_dir=mem_dir,
cursor_file=tmp_path / "cursor.json",
)
# Verify the memory file was created.
written = mem_dir / "user-likes-tests.md"
assert written.exists()
assert "user-likes-tests" in written.read_text()
# Verify cursor was advanced.
cursor_file = qm.MEMORY_DIR / ".cursor.json"
# We passed a custom memory_dir, but cursor uses the default path.
# The function uses read_cursor()/write_cursor() with default CURSOR_FILE.
# Just verify the LLM was called.
assert llm.acomplete.call_count == 2
@@ -472,60 +441,6 @@ def test_safe_path_accepted(tmp_path: Path):
assert "Deleted" in result
# ---------------------------------------------------------------------------
# Bug 2: Failed reflections do not advance cursor
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cursor_not_advanced_on_llm_failure(tmp_path: Path):
from framework.agents.queen.reflection_agent import run_short_reflection
parts_dir = tmp_path / "session" / "conversations" / "parts"
parts_dir.mkdir(parents=True)
for i in range(3):
(parts_dir / f"{i:010d}.json").write_text(
json.dumps({"role": "user", "content": f"message {i}"})
)
cursor_file = tmp_path / ".cursor.json"
qm.write_cursor(0, cursor_file)
llm = AsyncMock()
llm.acomplete.side_effect = RuntimeError("LLM down")
# Patch read_cursor/write_cursor to use our temp cursor file.
import unittest.mock as mock
with mock.patch("framework.agents.queen.reflection_agent.read_cursor", return_value=0), \
mock.patch("framework.agents.queen.reflection_agent.write_cursor") as mock_write:
await run_short_reflection(tmp_path / "session", llm, memory_dir=tmp_path / "mem")
# write_cursor should NOT have been called since the LLM failed.
mock_write.assert_not_called()
@pytest.mark.asyncio
async def test_cursor_advanced_on_success(tmp_path: Path):
from framework.agents.queen.reflection_agent import run_short_reflection
parts_dir = tmp_path / "session" / "conversations" / "parts"
parts_dir.mkdir(parents=True)
for i in range(3):
(parts_dir / f"{i:010d}.json").write_text(
json.dumps({"role": "user", "content": f"message {i}"})
)
llm = AsyncMock()
llm.acomplete.return_value = MagicMock(content="Nothing to remember.", raw_response={})
import unittest.mock as mock
with mock.patch("framework.agents.queen.reflection_agent.read_cursor", return_value=0), \
mock.patch("framework.agents.queen.reflection_agent.write_cursor") as mock_write:
await run_short_reflection(tmp_path / "session", llm, memory_dir=tmp_path / "mem")
mock_write.assert_called_once_with(2, None)
def test_init_memory_dir_migrates_shared_memories_into_colony(tmp_path: Path):
source = tmp_path / "legacy-shared"
source.mkdir()
@@ -625,7 +540,13 @@ def test_queen_phase_state_appends_colony_and_global_memory_blocks():
@pytest.mark.asyncio
async def test_worker_colony_reflection_updates_shared_memory_and_cache(tmp_path: Path):
async def test_worker_colony_reflection_at_handoff(tmp_path: Path):
"""Colony reflection runs via WorkerAgent._reflect_colony_memory at node handoff."""
import asyncio
from framework.graph.context import GraphContext
from framework.graph.worker_agent import WorkerAgent
worker_sessions_dir = tmp_path / "worker-sessions"
execution_id = "exec-1"
session_dir = worker_sessions_dir / execution_id / "conversations" / "parts"
@@ -641,11 +562,11 @@ async def test_worker_colony_reflection_updates_shared_memory_and_cache(tmp_path
colony_dir = tmp_path / "colony"
colony_dir.mkdir()
recall_cache: dict[str, str] = {}
bus = EventBus()
recall_cache: dict[str, str] = {execution_id: ""}
llm = AsyncMock()
llm.acomplete.side_effect = [
reflect_llm = AsyncMock()
reflect_llm.acomplete.side_effect = [
# Short reflection: write a memory file
MagicMock(
content="",
raw_response={
@@ -668,84 +589,64 @@ async def test_worker_colony_reflection_updates_shared_memory_and_cache(tmp_path
]
},
),
# Short reflection done
MagicMock(content="done", raw_response={}),
# Recall selector picks the new memory
MagicMock(content=json.dumps({"selected_memories": ["user-prefers-terse-summaries.md"]})),
]
subs = await subscribe_worker_memory_triggers(
bus,
llm,
worker_sessions_dir=worker_sessions_dir,
colony_memory_dir=colony_dir,
recall_cache=recall_cache,
)
try:
await bus.publish(
AgentEvent(
type=EventType.EXECUTION_STARTED,
stream_id="default",
execution_id=execution_id,
)
)
await bus.publish(
AgentEvent(
type=EventType.LLM_TURN_COMPLETE,
stream_id="default",
execution_id=execution_id,
)
)
finally:
for sub_id in subs:
bus.unsubscribe(sub_id)
# Build a minimal GraphContext with colony memory fields
gc = MagicMock(spec=GraphContext)
gc.colony_memory_dir = colony_dir
gc.worker_sessions_dir = worker_sessions_dir
gc.colony_recall_cache = recall_cache
gc.colony_reflect_llm = reflect_llm
gc.execution_id = execution_id
gc._colony_reflect_lock = asyncio.Lock()
node_spec = SimpleNamespace(id="test-node")
worker = WorkerAgent.__new__(WorkerAgent)
worker._gc = gc
worker.node_spec = node_spec
await worker._reflect_colony_memory()
assert (colony_dir / "user-prefers-terse-summaries.md").exists()
assert "Colony Memories" in recall_cache[execution_id]
assert "terse summaries" in recall_cache[execution_id]
# ---------------------------------------------------------------------------
# Bug 3: Compaction fallback only when cursor > max_all_seq
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_subscribe_worker_triggers_only_lifecycle_events(tmp_path: Path):
"""After simplification, worker triggers only subscribe to start and terminal events."""
colony_dir = tmp_path / "colony"
colony_dir.mkdir()
recall_cache: dict[str, str] = {}
bus = EventBus()
llm = AsyncMock()
subs = await subscribe_worker_memory_triggers(
bus,
llm,
worker_sessions_dir=tmp_path / "sessions",
colony_memory_dir=colony_dir,
recall_cache=recall_cache,
)
try:
# Should have exactly 2 subscriptions (start + terminal)
assert len(subs) == 2
def test_compaction_fallback_when_cursor_evicted(tmp_path: Path):
"""When cursor_seq > max file seq, fallback triggers (compaction happened)."""
parts_dir = tmp_path / "conversations" / "parts"
parts_dir.mkdir(parents=True)
for i in range(3):
(parts_dir / f"{i:010d}.json").write_text(
json.dumps({"role": "user", "content": f"msg {i}"})
# EXECUTION_STARTED initialises cache
await bus.publish(
AgentEvent(
type=EventType.EXECUTION_STARTED,
stream_id="default",
execution_id="exec-1",
)
)
# Cursor is at 999, but max file seq is 2 → compation evicted files.
msgs, max_seq = qm.read_messages_since_cursor(tmp_path, 999)
assert len(msgs) == 3
assert recall_cache.get("exec-1") == ""
finally:
for sub_id in subs:
bus.unsubscribe(sub_id)
def test_no_compaction_fallback_when_up_to_date(tmp_path: Path):
"""When cursor_seq == max file seq, should return empty (not all files)."""
parts_dir = tmp_path / "conversations" / "parts"
parts_dir.mkdir(parents=True)
for i in range(3):
(parts_dir / f"{i:010d}.json").write_text(
json.dumps({"role": "user", "content": f"msg {i}"})
)
# Cursor is at 2 (max seq) → already up-to-date, should return nothing.
msgs, max_seq = qm.read_messages_since_cursor(tmp_path, 2)
assert len(msgs) == 0
def test_no_compaction_fallback_when_behind(tmp_path: Path):
"""When cursor_seq < max file seq but no new_files, shouldn't happen normally.
But verify: cursor_seq=0 with files at 0,1,2 should return 1,2 (seq > 0)."""
parts_dir = tmp_path / "conversations" / "parts"
parts_dir.mkdir(parents=True)
for i in range(3):
(parts_dir / f"{i:010d}.json").write_text(
json.dumps({"role": "user", "content": f"msg {i}"})
)
msgs, max_seq = qm.read_messages_since_cursor(tmp_path, 0)
assert len(msgs) == 2 # seq 1 and 2
assert max_seq == 2