feat: worker agent memory

This commit is contained in:
Richard Tang
2026-04-02 17:05:32 -07:00
parent 4006ee96b6
commit ec08ae7438
37 changed files with 1322 additions and 161 deletions
-5
View File
@@ -603,11 +603,6 @@ from litellm import completion_cost
cost = completion_cost(model="claude-3-5-sonnet-20241022", messages=[...])
```
**Monitoring Dashboard** (`/core/framework/monitoring/`)
- WebSocket-based real-time monitoring
- Displays: active agents, tool calls, token usage, errors
- Access at: `http://localhost:8000/monitor`
### How to Add Performance Metrics
**1. Instrument your code**
+2 -2
View File
@@ -70,7 +70,7 @@ Use Hive when the bottleneck is no longer the model but the harness around it:
- Long-running agents that need **state persistence and crash recovery**
- Production workloads requiring **cost enforcement, observability, and audit trails**
- Agents that **self-heal** through failure capture and graph evolution
- Multi-agent coordination with **session isolation and shared memory**
- Multi-agent coordination with **session isolation and shared buffers**
- A framework that **scales with model improvements** rather than fighting them
## Quick Links
@@ -146,7 +146,7 @@ Now you can run an agent by selecting the agent (either an existing agent or exa
- **[Goal-Driven Generation](docs/key_concepts/goals_outcome.md)** - Define objectives in natural language; the coding agent generates the agent graph and connection code to achieve them
- **[Adaptiveness](docs/key_concepts/evolution.md)** - Framework captures failures, calibrates according to the objectives, and evolves the agent graph
- **[Dynamic Node Connections](docs/key_concepts/graph.md)** - No predefined edges; connection code is generated by any capable LLM based on your goals
- **SDK-Wrapped Nodes** - Every node gets shared memory, local RLM memory, monitoring, tools, and LLM access out of the box
- **SDK-Wrapped Nodes** - Every node gets a shared data buffer, local RLM memory, monitoring, tools, and LLM access out of the box
- **[Human-in-the-Loop](docs/key_concepts/graph.md#human-in-the-loop)** - Intervention nodes that pause execution for human input with configurable timeouts and escalation
- **Real-time Observability** - WebSocket streaming for live monitoring of agent execution, decisions, and node-to-node communication
+29 -10
View File
@@ -84,6 +84,7 @@ _QUEEN_PLANNING_TOOLS = [
"initialize_and_build_agent",
# Load existing agent (after user confirms)
"load_built_agent",
"save_global_memory",
]
# Building phase: full coding + agent construction tools.
@@ -94,6 +95,7 @@ _QUEEN_BUILDING_TOOLS = (
"list_credentials",
"replan_agent",
"save_agent_draft", # Re-draft during building → auto-dissolves + updates flowchart
"save_global_memory",
]
)
@@ -115,6 +117,7 @@ _QUEEN_STAGING_TOOLS = [
"set_trigger",
"remove_trigger",
"list_triggers",
"save_global_memory",
]
# Running phase: worker is executing — monitor and control.
@@ -138,6 +141,7 @@ _QUEEN_RUNNING_TOOLS = [
"set_trigger",
"remove_trigger",
"list_triggers",
"save_global_memory",
]
@@ -605,6 +609,8 @@ to fix the currently loaded agent (no draft required).
- load_built_agent(agent_path) Load an existing agent and switch to STAGING \
phase. Only use this when the user explicitly asks to work with an existing agent \
(e.g. "load my_agent", "run the research agent"). Confirm with the user first.
- save_global_memory(category, description, content, name?) Save durable \
cross-queen memory about the user only (profile, preferences, environment, feedback)
## Workflow summary
1. Understand requirements discover tools design graph
@@ -636,6 +642,8 @@ updated flowchart immediately. Use this when you make structural changes \
restored (with decision/browser nodes intact) so you can edit it. Use \
when the user wants to change integrations, swap tools, rethink the \
flow, or discuss any design changes before you build them.
- save_global_memory(category, description, content, name?) Save durable \
cross-queen memory about the user only
When you finish building an agent, call load_built_agent(path) to stage it.
"""
@@ -654,6 +662,8 @@ first (DEFAULT for most modification requests)
- set_trigger(trigger_id, trigger_type?, trigger_config?) Activate a trigger (timer)
- remove_trigger(trigger_id) Deactivate a trigger
- list_triggers() List all triggers and their active/inactive status
- save_global_memory(category, description, content, name?) Save durable \
cross-queen memory about the user only
You do NOT have write tools. To modify the agent, prefer \
stop_graph_and_plan() unless the user gave a specific instruction.
@@ -679,6 +689,8 @@ To just stop without modifying, call stop_graph().
- set_trigger(trigger_id, trigger_type?, trigger_config?) Activate a trigger (timer)
- remove_trigger(trigger_id) Deactivate a trigger
- list_triggers() List all triggers and their active/inactive status
- save_global_memory(category, description, content, name?) Save durable \
cross-queen memory about the user only
You do NOT have write tools or agent construction tools. \
If you need to modify the agent, call stop_graph_and_edit() to switch back \
@@ -836,21 +848,28 @@ diagnosis mode — you already have a built agent, you just need to fix it.
"""
_queen_memory_instructions = """
## Your Cross-Session Memory
## Your Memory
Relevant memories from past sessions may appear in context under \
"--- Selected Memories ---". These are automatically selected based on the \
user's current query. If you know this person from past sessions, pick up \
where you left off reference what you built together, what they care about, \
how things went.
Relevant colony memories from this queen session may appear in context under \
"--- Colony Memories ---". Relevant global user memories may appear under \
"--- Global Memories ---".
Colony memories are shared with the worker for this queen session. Use them \
for continuity about what this user is trying to do, what has worked, and \
what the colony has learned together.
Global memories are shared across queens and are only for durable knowledge \
about the user: who they are, their preferences, their environment, and \
their feedback.
Memories older than 1 day include a staleness warning. Treat these as \
point-in-time observations verify against current code before asserting \
point-in-time observations verify current details before asserting them \
as fact.
You do NOT need to manually save or recall memories. A background \
reflection agent automatically extracts learnings from each conversation \
turn and organises them into persistent memory files.
You do NOT need to manually save or recall colony memories. A background \
reflection agent automatically extracts colony learnings from each \
conversation turn. Use `save_global_memory` only when you learn something \
durable about the user that should help future queens.
"""
_queen_behavior_always = _queen_behavior_always + _queen_memory_instructions
+219 -6
View File
@@ -1,4 +1,4 @@
"""Queen memory v2 — file-per-memory architecture.
"""Shared memory helpers for queen/worker recall and reflection.
Each memory is an individual ``.md`` file in ``~/.hive/queen/memories/``
with optional YAML frontmatter (name, type, description). Frontmatter
@@ -14,6 +14,7 @@ from __future__ import annotations
import json
import logging
import re
import shutil
import time
from dataclasses import dataclass, field
from pathlib import Path
@@ -26,8 +27,10 @@ logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
MEMORY_TYPES: tuple[str, ...] = ("goal", "environment", "technique", "reference")
GLOBAL_MEMORY_CATEGORIES: tuple[str, ...] = ("profile", "preference", "environment", "feedback")
_HIVE_QUEEN_DIR = Path.home() / ".hive" / "queen"
# Legacy shared v2 root. Colony memory now lives under queen sessions.
MEMORY_DIR: Path = _HIVE_QUEEN_DIR / "memories"
CURSOR_FILE: Path = MEMORY_DIR / ".cursor.json"
@@ -36,6 +39,12 @@ MAX_FILE_SIZE_BYTES: int = 4096 # 4 KB hard limit per memory file
# How many lines of a memory file to read for header scanning.
_HEADER_LINE_LIMIT: int = 30
_MIGRATION_MARKER = ".migrated-from-shared-memory"
_GLOBAL_MEMORY_CODE_PATTERN = re.compile(
r"(/Users/|~/.hive|\.py\b|\.ts\b|\.tsx\b|\.js\b|"
r"\b(graph|node|runtime|session|execution|worker|queen|subagent|checkpoint|flowchart)\b)",
re.IGNORECASE,
)
# Frontmatter example provided to the reflection agent via prompt.
MEMORY_FRONTMATTER_EXAMPLE: list[str] = [
@@ -58,6 +67,31 @@ MEMORY_FRONTMATTER_EXAMPLE: list[str] = [
]
def colony_memory_dir(colony_id: str) -> Path:
"""Return the colony memory directory for a queen session."""
return _HIVE_QUEEN_DIR / "session" / colony_id / "memory" / "colony"
def global_memory_dir() -> Path:
"""Return the queen-global memory directory."""
return _HIVE_QUEEN_DIR / "global_memory"
def queen_colony_cursor_file(session_dir: Path) -> Path:
"""Return the queen colony cursor file for a session."""
return session_dir / "memory" / "cursors" / "queen-colony.cursor.json"
def queen_global_cursor_file(session_dir: Path) -> Path:
"""Return the queen global cursor file for a session."""
return session_dir / "memory" / "cursors" / "queen-global.cursor.json"
def worker_colony_cursor_file(worker_session_dir: Path) -> Path:
"""Return the worker colony cursor file for one top-level execution."""
return worker_session_dir / "memory" / "worker-colony.cursor.json"
# ---------------------------------------------------------------------------
# Frontmatter parsing (lenient)
# ---------------------------------------------------------------------------
@@ -90,11 +124,20 @@ def parse_frontmatter(text: str) -> dict[str, str]:
def parse_memory_type(raw: str | None) -> str | None:
"""Validate *raw* against ``MEMORY_TYPES``. Falls back to ``None``."""
"""Validate *raw* against supported memory categories."""
if raw is None:
return None
normalized = raw.strip().lower()
return normalized if normalized in MEMORY_TYPES else None
allowed = set(MEMORY_TYPES) | set(GLOBAL_MEMORY_CATEGORIES)
return normalized if normalized in allowed else None
def parse_global_memory_category(raw: str | None) -> str | None:
"""Validate *raw* against ``GLOBAL_MEMORY_CATEGORIES``."""
if raw is None:
return None
normalized = raw.strip().lower()
return normalized if normalized in GLOBAL_MEMORY_CATEGORIES else None
# ---------------------------------------------------------------------------
@@ -168,6 +211,106 @@ def scan_memory_files(memory_dir: Path | None = None) -> list[MemoryFile]:
return [MemoryFile.from_path(f) for f in md_files[:MAX_FILES]]
def slugify_memory_name(raw: str) -> str:
"""Create a filesystem-safe slug for a memory filename."""
slug = re.sub(r"[^a-z0-9]+", "-", raw.strip().lower()).strip("-")
return slug or "memory"
def allocate_memory_filename(
memory_dir: Path,
name: str,
*,
suffix: str = ".md",
) -> str:
"""Allocate a unique filename in *memory_dir* based on *name*."""
base = slugify_memory_name(name)
candidate = f"{base}{suffix}"
counter = 2
while (memory_dir / candidate).exists():
candidate = f"{base}-{counter}{suffix}"
counter += 1
return candidate
def build_memory_document(
*,
name: str,
description: str,
mem_type: str,
body: str,
) -> str:
"""Build one memory file with frontmatter and body."""
return (
f"---\n"
f"name: {name.strip()}\n"
f"description: {description.strip()}\n"
f"type: {mem_type.strip()}\n"
f"---\n\n"
f"{body.strip()}\n"
)
def validate_global_memory_payload(
*,
category: str,
description: str,
content: str,
) -> str:
"""Validate a queen-global memory save request."""
parsed = parse_global_memory_category(category)
if parsed is None:
raise ValueError(
"Invalid global memory category. Use one of: "
+ ", ".join(GLOBAL_MEMORY_CATEGORIES)
)
if not description.strip():
raise ValueError("Global memory description cannot be empty.")
if not content.strip():
raise ValueError("Global memory content cannot be empty.")
probe = f"{description}\n{content}"
if _GLOBAL_MEMORY_CODE_PATTERN.search(probe):
raise ValueError(
"Global memory is only for durable user profile, preferences, "
"environment, or feedback — not task/code/runtime details."
)
return parsed
def save_global_memory(
*,
category: str,
description: str,
content: str,
name: str | None = None,
memory_dir: Path | None = None,
) -> tuple[str, Path]:
"""Persist one queen-global memory entry."""
parsed = validate_global_memory_payload(
category=category,
description=description,
content=content,
)
target_dir = memory_dir or global_memory_dir()
target_dir.mkdir(parents=True, exist_ok=True)
memory_name = (name or description).strip()
filename = allocate_memory_filename(target_dir, memory_name)
doc = build_memory_document(
name=memory_name,
description=description,
mem_type=parsed,
body=content,
)
if len(doc.encode("utf-8")) > MAX_FILE_SIZE_BYTES:
raise ValueError(
f"Global memory entry exceeds the {MAX_FILE_SIZE_BYTES} byte limit."
)
path = target_dir / filename
path.write_text(doc, encoding="utf-8")
return filename, path
# ---------------------------------------------------------------------------
# Manifest formatting
# ---------------------------------------------------------------------------
@@ -314,12 +457,23 @@ def read_messages_since_cursor(
# ---------------------------------------------------------------------------
def init_memory_dir(memory_dir: Path | None = None) -> None:
"""Create the memory directory if missing. Migrates legacy files on first run."""
def init_memory_dir(
memory_dir: Path | None = None,
*,
migrate_legacy: bool = False,
) -> None:
"""Create the memory directory if missing.
When ``migrate_legacy`` is true, migrate both v1 memory files and the
previous shared v2 queen memory store into this directory.
"""
d = memory_dir or MEMORY_DIR
first_run = not d.exists()
d.mkdir(parents=True, exist_ok=True)
if first_run:
if migrate_legacy:
migrate_legacy_memories(d)
migrate_shared_v2_memories(d)
elif first_run and d == MEMORY_DIR:
migrate_legacy_memories(d)
@@ -379,6 +533,65 @@ def migrate_legacy_memories(memory_dir: Path | None = None) -> None:
logger.info("queen_memory_v2: migrated legacy memory files to %s", d)
def migrate_shared_v2_memories(
memory_dir: Path | None = None,
*,
source_dir: Path | None = None,
) -> None:
"""Move shared queen v2 memory files into a colony directory once."""
d = memory_dir or MEMORY_DIR
d.mkdir(parents=True, exist_ok=True)
src = source_dir or MEMORY_DIR
if d.resolve() == src.resolve():
return
marker = d / _MIGRATION_MARKER
if marker.exists():
return
if not src.is_dir():
return
md_files = sorted(
f for f in src.glob("*.md")
if f.is_file() and not f.name.startswith(".")
)
if not md_files:
marker.write_text("no shared memories found\n", encoding="utf-8")
return
archive = src / ".legacy_colony_migration"
archive.mkdir(parents=True, exist_ok=True)
migrated_any = False
for src_file in md_files:
target = d / src_file.name
if not target.exists():
try:
shutil.copy2(src_file, target)
migrated_any = True
except OSError:
logger.debug("shared memory migration copy failed for %s", src_file, exc_info=True)
continue
archived = archive / src_file.name
counter = 2
while archived.exists():
archived = archive / f"{src_file.stem}-{counter}{src_file.suffix}"
counter += 1
try:
src_file.rename(archived)
except OSError:
logger.debug("shared memory migration archive failed for %s", src_file, exc_info=True)
if migrated_any:
logger.info("queen_memory_v2: migrated shared queen memories to %s", d)
marker.write_text(
f"migrated_at={int(time.time())}\nsource={src}\n",
encoding="utf-8",
)
def _write_migration_file(
memory_dir: Path,
filename: str,
+23 -7
View File
@@ -1,4 +1,4 @@
"""Recall selector — pre-turn memory selection for the queen.
"""Recall selector — pre-turn memory selection for queen and worker memory.
Before each conversation turn the system:
1. Scans the memory directory for ``.md`` files (cap: 200).
@@ -87,6 +87,8 @@ async def select_memories(
llm: Any,
memory_dir: Path | None = None,
active_tools: list[str] | None = None,
*,
max_results: int = 5,
) -> list[str]:
"""Select up to 5 relevant memory filenames for *query*.
@@ -118,7 +120,7 @@ async def select_memories(
selected = data.get("selected_memories", [])
# Validate: only return filenames that actually exist.
valid_names = {f.filename for f in files}
result = [s for s in selected if s in valid_names][:5]
result = [s for s in selected if s in valid_names][:max_results]
logger.debug("recall: selected %d memories: %s", len(result), result)
return result
except Exception:
@@ -129,6 +131,8 @@ async def select_memories(
def format_recall_injection(
filenames: list[str],
memory_dir: Path | None = None,
*,
heading: str = "Selected Memories",
) -> str:
"""Read selected memory files and format for system prompt injection.
@@ -164,7 +168,7 @@ def format_recall_injection(
body = "\n\n---\n\n".join(blocks)
logger.debug("recall: injecting %d memory blocks into context", len(blocks))
return f"--- Selected Memories ---\n\n{body}\n\n--- End Selected Memories ---"
return f"--- {heading} ---\n\n{body}\n\n--- End {heading} ---"
# ---------------------------------------------------------------------------
@@ -175,8 +179,12 @@ def format_recall_injection(
async def update_recall_cache(
session_dir: Path,
llm: Any,
phase_state: Any,
phase_state: Any | None = None,
memory_dir: Path | None = None,
*,
cache_setter: Any = None,
heading: str = "Selected Memories",
active_tools: list[str] | None = None,
) -> None:
"""Update the recall cache on *phase_state* for the next turn.
@@ -193,9 +201,17 @@ async def update_recall_cache(
logger.debug("recall: updating cache for query: %.80s", query)
try:
selected = await select_memories(query, llm, mem_dir)
injection = format_recall_injection(selected, mem_dir)
phase_state._cached_recall_block = injection
selected = await select_memories(
query,
llm,
mem_dir,
active_tools=active_tools,
)
injection = format_recall_injection(selected, mem_dir, heading=heading)
if cache_setter is not None:
cache_setter(injection)
elif phase_state is not None:
phase_state._cached_recall_block = injection
except Exception:
logger.debug("recall: cache update failed", exc_info=True)
@@ -110,7 +110,7 @@ This prevents premature set_output before user interaction.
**Hard limit: 3-6 nodes for most agents.** Never exceed 6 unless the user
explicitly requests a complex multi-phase pipeline.
Each node boundary serializes outputs to shared memory and **destroys** all
Each node boundary serializes outputs to the shared buffer and **destroys** all
in-context information: tool call results, intermediate reasoning, conversation
history. A research node that searches, fetches, and analyzes in ONE node keeps
all source material in its conversation context. Split across 3 nodes, each
@@ -173,7 +173,7 @@ Use `conversation_mode="continuous"` to preserve context across transitions.
### set_output
- Synthetic tool injected by framework
- Call separately from real tool calls (separate turn)
- `set_output("key", "value")` stores to shared memory
- `set_output("key", "value")` stores to the shared buffer
## Edge Conditions
@@ -247,7 +247,7 @@ For large data that exceeds context:
Multiple ON_SUCCESS edges from same source → parallel execution via asyncio.gather().
- Parallel nodes must have disjoint output_keys
- Only one branch may have client_facing nodes
- Fan-in node gets all outputs in shared memory
- Fan-in node gets all outputs in the shared buffer
## Judge System
+159 -7
View File
@@ -1,4 +1,4 @@
"""Reflect agent — background memory extraction after each queen turn.
"""Reflect agent — background memory extraction for queen and worker memory.
A lightweight side agent that runs after each queen LLM turn. It
inspects recent conversation messages (cursor-based incremental
@@ -38,6 +38,7 @@ from framework.agents.queen.queen_memory_v2 import (
read_cursor,
read_messages_since_cursor,
scan_memory_files,
worker_colony_cursor_file,
write_cursor,
)
from framework.llm.provider import LLMResponse, Tool
@@ -343,11 +344,13 @@ async def run_short_reflection(
session_dir: Path,
llm: Any,
memory_dir: Path | None = None,
*,
cursor_file: Path | None = None,
) -> None:
"""Run a short reflection: extract learnings from new messages."""
mem_dir = memory_dir or MEMORY_DIR
cursor_seq = read_cursor()
cursor_seq = read_cursor(cursor_file)
messages, max_seq = read_messages_since_cursor(session_dir, cursor_seq)
if not messages:
@@ -373,7 +376,7 @@ async def run_short_reflection(
if not transcript_lines:
# Only tool results in the new messages — still advance cursor.
write_cursor(max_seq)
write_cursor(max_seq, cursor_file)
return
transcript = "\n".join(transcript_lines)
@@ -387,7 +390,7 @@ async def run_short_reflection(
# Advance cursor only on success.
if success:
write_cursor(max_seq)
write_cursor(max_seq, cursor_file)
logger.debug("reflect: short reflection done, cursor → %d", max_seq)
else:
logger.warning("reflect: short reflection failed, cursor NOT advanced (stays at %d)", cursor_seq)
@@ -430,6 +433,7 @@ async def subscribe_reflection_triggers(
session_dir: Path,
llm: Any,
memory_dir: Path | None = None,
cursor_file: Path | None = None,
phase_state: Any = None,
) -> list[str]:
"""Subscribe to queen turn events and return subscription IDs.
@@ -459,10 +463,20 @@ async def subscribe_reflection_triggers(
_short_count += 1
logger.debug("reflect: turn complete — short count %d/%d", _short_count, _LONG_REFLECT_INTERVAL)
if _short_count % _LONG_REFLECT_INTERVAL == 0:
await run_short_reflection(session_dir, llm, mem_dir)
await run_short_reflection(
session_dir,
llm,
mem_dir,
cursor_file=cursor_file,
)
await run_long_reflection(llm, mem_dir)
else:
await run_short_reflection(session_dir, llm, mem_dir)
await run_short_reflection(
session_dir,
llm,
mem_dir,
cursor_file=cursor_file,
)
except Exception:
logger.warning("reflect: reflection failed", exc_info=True)
_write_error("short/long reflection")
@@ -472,7 +486,25 @@ async def subscribe_reflection_triggers(
if phase_state is not None:
try:
from framework.agents.queen.recall_selector import update_recall_cache
await update_recall_cache(session_dir, llm, phase_state)
await update_recall_cache(
session_dir,
llm,
cache_setter=lambda block: (
setattr(phase_state, "_cached_colony_recall_block", block),
setattr(phase_state, "_cached_recall_block", block),
),
memory_dir=mem_dir,
heading="Colony Memories",
)
await update_recall_cache(
session_dir,
llm,
cache_setter=lambda block: setattr(
phase_state, "_cached_global_recall_block", block
),
memory_dir=getattr(phase_state, "global_memory_dir", None),
heading="Global Memories",
)
except Exception:
logger.debug("recall: cache update failed", exc_info=True)
@@ -507,6 +539,126 @@ async def subscribe_reflection_triggers(
return sub_ids
async def subscribe_worker_memory_triggers(
event_bus: Any,
llm: Any,
*,
worker_sessions_dir: Path,
colony_memory_dir: Path,
recall_cache: dict[str, str],
) -> list[str]:
"""Subscribe shared colony memory reflection/recall for top-level worker runs."""
from framework.agents.queen.recall_selector import update_recall_cache
from framework.runtime.event_bus import EventType
_lock = asyncio.Lock()
_short_counts: dict[str, int] = {}
def _is_worker_event(event: Any) -> bool:
return bool(
getattr(event, "execution_id", None)
and getattr(event, "stream_id", None) not in ("queen", "judge")
)
async def _update_cache(execution_id: str) -> None:
session_dir = worker_sessions_dir / execution_id
await update_recall_cache(
session_dir,
llm,
memory_dir=colony_memory_dir,
cache_setter=lambda block, execution_id=execution_id: recall_cache.__setitem__(
execution_id, block
),
heading="Colony Memories",
)
async def _on_turn_complete(event: Any) -> None:
if not _is_worker_event(event):
return
if _lock.locked():
logger.debug("reflect: worker colony reflection skipped — lock busy")
return
execution_id = event.execution_id
if execution_id is None:
return
session_dir = worker_sessions_dir / execution_id
cursor_file = worker_colony_cursor_file(session_dir)
async with _lock:
try:
_short_counts[execution_id] = _short_counts.get(execution_id, 0) + 1
await run_short_reflection(
session_dir,
llm,
colony_memory_dir,
cursor_file=cursor_file,
)
if _short_counts[execution_id] % _LONG_REFLECT_INTERVAL == 0:
await run_long_reflection(llm, colony_memory_dir)
await _update_cache(execution_id)
except Exception:
logger.warning("reflect: worker colony reflection failed", exc_info=True)
_write_error("worker colony reflection")
async def _on_compaction(event: Any) -> None:
if not _is_worker_event(event):
return
if _lock.locked():
return
execution_id = event.execution_id
if execution_id is None:
return
async with _lock:
try:
await run_long_reflection(llm, colony_memory_dir)
await _update_cache(execution_id)
except Exception:
logger.warning("reflect: worker compaction reflection failed", exc_info=True)
_write_error("worker compaction reflection")
async def _on_execution_started(event: Any) -> None:
if not _is_worker_event(event):
return
if event.execution_id is not None:
recall_cache[event.execution_id] = ""
async def _on_execution_terminal(event: Any) -> None:
if not _is_worker_event(event):
return
execution_id = event.execution_id
if execution_id is None:
return
async with _lock:
try:
await run_long_reflection(llm, colony_memory_dir)
except Exception:
logger.warning("reflect: worker final reflection failed", exc_info=True)
_write_error("worker final reflection")
finally:
recall_cache.pop(execution_id, None)
_short_counts.pop(execution_id, None)
return [
event_bus.subscribe(
event_types=[EventType.EXECUTION_STARTED],
handler=_on_execution_started,
),
event_bus.subscribe(
event_types=[EventType.LLM_TURN_COMPLETE],
handler=_on_turn_complete,
),
event_bus.subscribe(
event_types=[EventType.CONTEXT_COMPACTED],
handler=_on_compaction,
),
event_bus.subscribe(
event_types=[EventType.EXECUTION_COMPLETED, EventType.EXECUTION_FAILED],
handler=_on_execution_terminal,
),
]
def _write_error(context: str) -> None:
"""Best-effort write of the last traceback to an error file."""
try:
+6
View File
@@ -55,6 +55,7 @@ class GraphContext:
batch_init_nudge: str | None = None
dynamic_tools_provider: Any = None
dynamic_prompt_provider: Any = None
dynamic_memory_provider: Any = None
iteration_metadata_provider: Any = None
loop_config: dict[str, Any] = field(default_factory=dict)
path: list[str] = field(default_factory=list)
@@ -178,12 +179,14 @@ def build_node_context(
shared_node_registry: dict[str, NodeProtocol] | None = None,
dynamic_tools_provider: Any = None,
dynamic_prompt_provider: Any = None,
dynamic_memory_provider: Any = None,
iteration_metadata_provider: Any = None,
skills_catalog_prompt: str = "",
protocols_prompt: str = "",
skill_dirs: list[str] | None = None,
default_skill_warn_ratio: float | None = None,
default_skill_batch_nudge: str | None = None,
memory_prompt: str = "",
) -> NodeContext:
"""Build a canonical `NodeContext` for graph execution."""
@@ -227,6 +230,7 @@ def build_node_context(
accounts_prompt=node_accounts_prompt,
identity_prompt=identity_prompt,
narrative=narrative,
memory_prompt=memory_prompt,
execution_id=execution_id,
run_id=run_id,
stream_id=stream_id,
@@ -235,6 +239,7 @@ def build_node_context(
shared_node_registry=shared_node_registry or {},
dynamic_tools_provider=dynamic_tools_provider,
dynamic_prompt_provider=dynamic_prompt_provider,
dynamic_memory_provider=dynamic_memory_provider,
iteration_metadata_provider=iteration_metadata_provider,
skills_catalog_prompt=skills_catalog_prompt,
protocols_prompt=protocols_prompt,
@@ -306,6 +311,7 @@ def build_node_context_from_graph_context(
shared_node_registry=gc.node_registry,
dynamic_tools_provider=gc.dynamic_tools_provider,
dynamic_prompt_provider=gc.dynamic_prompt_provider,
dynamic_memory_provider=gc.dynamic_memory_provider,
iteration_metadata_provider=gc.iteration_metadata_provider,
skills_catalog_prompt=gc.skills_catalog_prompt,
protocols_prompt=gc.protocols_prompt,
@@ -36,6 +36,7 @@ class RestoredState:
start_iteration: int
recent_responses: list[str]
recent_tool_fingerprints: list[list[tuple[str, str]]]
pending_input: dict[str, Any] | None
async def restore(
@@ -81,6 +82,9 @@ async def restore(
[tuple(pair) for pair in fps] # type: ignore[misc]
for fps in raw_fps
]
pending_input = run_cursor.get("pending_input") if run_cursor else None
if not isinstance(pending_input, dict):
pending_input = None
logger.info(
f"Restored event loop: iteration={start_iteration}, "
@@ -95,6 +99,7 @@ async def restore(
start_iteration=start_iteration,
recent_responses=recent_responses,
recent_tool_fingerprints=recent_tool_fingerprints,
pending_input=pending_input,
)
@@ -107,6 +112,7 @@ async def write_cursor(
*,
recent_responses: list[str] | None = None,
recent_tool_fingerprints: list[list[tuple[str, str]]] | None = None,
pending_input: dict[str, Any] | None = None,
) -> None:
"""Write checkpoint cursor for crash recovery.
@@ -128,6 +134,9 @@ async def write_cursor(
run_cursor["recent_tool_fingerprints"] = [
[list(pair) for pair in fps] for fps in recent_tool_fingerprints
]
# Persist blocked-input state so restored runs re-block instead of
# manufacturing a synthetic continuation turn.
run_cursor["pending_input"] = pending_input
await conversation_store.write_cursor(update_run_cursor(cursor, ctx.effective_run_id, run_cursor))
+128 -7
View File
@@ -381,6 +381,7 @@ class EventLoopNode(NodeProtocol):
start_iteration = restored.start_iteration
_restored_recent_responses = restored.recent_responses
_restored_tool_fingerprints = restored.recent_tool_fingerprints
_restored_pending_input = restored.pending_input
# Refresh the system prompt with full composition including
# execution preamble and node-type preamble. The stored
@@ -401,6 +402,7 @@ class EventLoopNode(NodeProtocol):
else:
_restored_recent_responses = []
_restored_tool_fingerprints = []
_restored_pending_input = None
# Fresh conversation: either isolated mode or first node in continuous mode.
from framework.graph.prompting import build_system_prompt_for_node_context
@@ -538,6 +540,7 @@ class EventLoopNode(NodeProtocol):
# 5. Stall / doom loop detection state (restored from cursor if resuming)
recent_responses: list[str] = _restored_recent_responses
recent_tool_fingerprints: list[list[tuple[str, str]]] = _restored_tool_fingerprints
pending_input_state: dict[str, Any] | None = _restored_pending_input
_consecutive_empty_turns: int = 0
# 6. Main loop
@@ -573,9 +576,69 @@ class EventLoopNode(NodeProtocol):
)
# 6b. Drain injection queue
await self._drain_injection_queue(conversation, ctx)
drained_injections = await self._drain_injection_queue(conversation, ctx)
# 6b1. Drain trigger queue (framework-level signals)
await self._drain_trigger_queue(conversation)
drained_triggers = await self._drain_trigger_queue(conversation)
# Resume blocked ask_user/auto-block waits durably across restarts.
# If the node was parked for input and no new message has been
# injected yet, re-enter the wait instead of continuing the last
# assistant turn with a synthetic prompt.
if pending_input_state is not None:
if drained_injections > 0 or drained_triggers > 0:
pending_input_state = None
await self._write_cursor(
ctx,
conversation,
accumulator,
iteration,
recent_responses=recent_responses,
recent_tool_fingerprints=recent_tool_fingerprints,
pending_input=None,
)
else:
logger.info(
"[%s] iter=%d: restored pending input wait (emit_client_request=%s)",
node_id,
iteration,
pending_input_state.get("emit_client_request", True),
)
got_input = await self._await_user_input(
ctx,
prompt=str(pending_input_state.get("prompt", "")),
options=pending_input_state.get("options"),
questions=pending_input_state.get("questions"),
emit_client_request=bool(
pending_input_state.get("emit_client_request", True)
),
)
logger.info(
"[%s] iter=%d: restored wait unblocked, got_input=%s",
node_id,
iteration,
got_input,
)
if not got_input:
await self._publish_loop_completed(
stream_id, node_id, iteration + 1, execution_id
)
latency_ms = int((time.time() - start_time) * 1000)
return NodeResult(
success=True,
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
if self._injection_queue.empty() and self._trigger_queue.empty():
logger.info(
"[%s] iter=%d: pending-input wait woke without queued input; re-waiting",
node_id,
iteration,
)
continue
pending_input_state = None
continue
# 6b2. Dynamic tool refresh (mode switching)
if ctx.dynamic_tools_provider is not None:
@@ -592,14 +655,19 @@ class EventLoopNode(NodeProtocol):
tools.extend(ctx.dynamic_tools_provider())
tools.extend(synthetic)
# 6b3. Dynamic prompt refresh (phase switching)
if ctx.dynamic_prompt_provider is not None:
from framework.graph.prompting import stamp_prompt_datetime
# 6b3. Dynamic prompt refresh (phase switching / memory refresh)
if ctx.dynamic_prompt_provider is not None or ctx.dynamic_memory_provider is not None:
if ctx.dynamic_prompt_provider is not None:
from framework.graph.prompting import stamp_prompt_datetime
_new_prompt = stamp_prompt_datetime(ctx.dynamic_prompt_provider())
_new_prompt = stamp_prompt_datetime(ctx.dynamic_prompt_provider())
else:
from framework.graph.prompting import build_system_prompt_for_node_context
_new_prompt = build_system_prompt_for_node_context(ctx)
if _new_prompt != conversation.system_prompt:
conversation.update_system_prompt(_new_prompt)
logger.info("[%s] Dynamic prompt updated (phase switch)", node_id)
logger.info("[%s] Dynamic prompt updated", node_id)
# 6c. Publish iteration event (with per-iteration metadata when available)
_iter_meta = None
@@ -1133,6 +1201,7 @@ class EventLoopNode(NodeProtocol):
iteration,
recent_responses=recent_responses,
recent_tool_fingerprints=recent_tool_fingerprints,
pending_input=None,
)
# 6h'. Queen input blocking
@@ -1267,6 +1336,21 @@ class EventLoopNode(NodeProtocol):
# Check for multi-question batch from ask_user_multiple
multi_qs = getattr(self, "_pending_multi_questions", None)
self._pending_multi_questions = None
pending_input_state = {
"prompt": _cf_prompt,
"options": ask_user_options,
"questions": multi_qs,
"emit_client_request": True,
}
await self._write_cursor(
ctx,
conversation,
accumulator,
iteration,
recent_responses=recent_responses,
recent_tool_fingerprints=recent_tool_fingerprints,
pending_input=pending_input_state,
)
got_input = await self._await_user_input(
ctx,
prompt=_cf_prompt,
@@ -1331,6 +1415,16 @@ class EventLoopNode(NodeProtocol):
conversation=conversation if _is_continuous else None,
)
if self._injection_queue.empty() and self._trigger_queue.empty():
logger.info(
"[%s] iter=%d: input wait woke without queued input; continuing to wait",
node_id,
iteration,
)
continue
pending_input_state = None
recent_responses.clear()
# -- Judge-skip decision after queen blocking --
@@ -1414,6 +1508,21 @@ class EventLoopNode(NodeProtocol):
)
logger.info("[%s] iter=%d: waiting for queen input...", node_id, iteration)
pending_input_state = {
"prompt": "",
"options": None,
"questions": None,
"emit_client_request": False,
}
await self._write_cursor(
ctx,
conversation,
accumulator,
iteration,
recent_responses=recent_responses,
recent_tool_fingerprints=recent_tool_fingerprints,
pending_input=pending_input_state,
)
got_input = await self._await_user_input(ctx, prompt="", emit_client_request=False)
logger.info(
"[%s] iter=%d: queen wait unblocked, got_input=%s",
@@ -1473,6 +1582,16 @@ class EventLoopNode(NodeProtocol):
conversation=conversation if _is_continuous else None,
)
if self._injection_queue.empty() and self._trigger_queue.empty():
logger.info(
"[%s] iter=%d: queen-input wait woke without queued guidance; re-waiting",
node_id,
iteration,
)
continue
pending_input_state = None
recent_responses.clear()
_cf_text_only_streak = 0
_continue_count += 1
@@ -3098,6 +3217,7 @@ class EventLoopNode(NodeProtocol):
*,
recent_responses: list[str] | None = None,
recent_tool_fingerprints: list[list[tuple[str, str]]] | None = None,
pending_input: dict[str, Any] | None = None,
) -> None:
"""Write checkpoint cursor for crash recovery.
@@ -3112,6 +3232,7 @@ class EventLoopNode(NodeProtocol):
iteration=iteration,
recent_responses=recent_responses,
recent_tool_fingerprints=recent_tool_fingerprints,
pending_input=pending_input,
)
async def _drain_injection_queue(self, conversation: NodeConversation, ctx: NodeContext) -> int:
+8 -13
View File
@@ -153,6 +153,7 @@ class GraphExecutor:
tool_provider_map: dict[str, str] | None = None,
dynamic_tools_provider: Callable | None = None,
dynamic_prompt_provider: Callable | None = None,
dynamic_memory_provider: Callable | None = None,
iteration_metadata_provider: Callable | None = None,
skills_catalog_prompt: str = "",
protocols_prompt: str = "",
@@ -184,6 +185,8 @@ class GraphExecutor:
tool list (for mode switching)
dynamic_prompt_provider: Optional callback returning current
system prompt (for phase switching)
dynamic_memory_provider: Optional callback returning the current
memory block to inject into node prompts
skills_catalog_prompt: Available skills catalog for system prompt
protocols_prompt: Default skill operational protocols for system prompt
skill_dirs: Skill base directories for Tier 3 resource access
@@ -210,6 +213,7 @@ class GraphExecutor:
self.tool_provider_map = tool_provider_map
self.dynamic_tools_provider = dynamic_tools_provider
self.dynamic_prompt_provider = dynamic_prompt_provider
self.dynamic_memory_provider = dynamic_memory_provider
self.iteration_metadata_provider = iteration_metadata_provider
self.skills_catalog_prompt = skills_catalog_prompt
self.protocols_prompt = protocols_prompt
@@ -1021,6 +1025,7 @@ class GraphExecutor:
shared_node_registry=self.node_registry,
dynamic_tools_provider=self.dynamic_tools_provider,
dynamic_prompt_provider=self.dynamic_prompt_provider,
dynamic_memory_provider=self.dynamic_memory_provider,
iteration_metadata_provider=self.iteration_metadata_provider,
skills_catalog_prompt=self.skills_catalog_prompt,
protocols_prompt=self.protocols_prompt,
@@ -1305,6 +1310,7 @@ class GraphExecutor:
batch_init_nudge=self.batch_init_nudge,
dynamic_tools_provider=self.dynamic_tools_provider,
dynamic_prompt_provider=self.dynamic_prompt_provider,
dynamic_memory_provider=self.dynamic_memory_provider,
iteration_metadata_provider=self.iteration_metadata_provider,
loop_config=self._loop_config,
node_visit_counts=dict(node_visit_counts),
@@ -1600,12 +1606,13 @@ class GraphExecutor:
"Worker execution ended before terminal nodes completed: "
f"{unresolved_terminals}"
)
self.logger.error(execution_error)
else:
execution_error = (
"Worker execution ended before all workers reached "
"a terminal lifecycle state"
)
self.logger.error(execution_error)
self.logger.error(execution_error)
break
task_to_worker = {task: wid for wid, task in pending_tasks.items()}
@@ -1698,22 +1705,10 @@ class GraphExecutor:
# Route activations
for activation in outgoing_activations:
self.logger.info(
" Routing activation: %s -> %s (pending before: %s)",
activation.source_id,
activation.target_id,
list(pending_tasks.keys()),
)
_route_activation(
activation, workers, pending_tasks,
has_event_subscription=False,
)
self.logger.info(
" Routed activation: %s -> %s (pending after: %s)",
activation.source_id,
activation.target_id,
list(pending_tasks.keys()),
)
if wid in terminal_worker_ids:
completed_terminals.add(wid)
+8 -3
View File
@@ -2,7 +2,7 @@
Node Protocol - The building block of agent graphs.
A Node is a unit of work that:
1. Receives context (goal, shared memory, input)
1. Receives context (goal, shared buffer, input)
2. Makes decisions (using LLM, tools, or logic)
3. Produces results (output, state changes)
4. Records everything to the Runtime
@@ -115,10 +115,10 @@ class NodeSpec(BaseModel):
# Data flow
input_keys: list[str] = Field(
default_factory=list, description="Keys this node reads from shared memory or input"
default_factory=list, description="Keys this node reads from the shared buffer or input"
)
output_keys: list[str] = Field(
default_factory=list, description="Keys this node writes to shared memory or output"
default_factory=list, description="Keys this node writes to the shared buffer or output"
)
nullable_output_keys: list[str] = Field(
default_factory=list,
@@ -502,6 +502,8 @@ class NodeContext:
# rebuilding the full system prompt when restoring from conversation store.
identity_prompt: str = ""
narrative: str = ""
# Static memory block injected into the system prompt.
memory_prompt: str = ""
# Event-triggered execution (no interactive user attached)
event_triggered: bool = False
@@ -548,6 +550,9 @@ class NodeContext:
# the queen to switch between phase-specific prompts (building /
# staging / running) without restarting the conversation.
dynamic_prompt_provider: Any = None # Callable[[], str] | None
# Dynamic memory provider — when set, EventLoopNode rebuilds the
# system prompt with the latest memory block each iteration.
dynamic_memory_provider: Any = None # Callable[[], str] | None
# Skill system prompts — injected by the skill discovery pipeline
skills_catalog_prompt: str = "" # Available skills XML catalog
+21
View File
@@ -37,6 +37,7 @@ class NodePromptSpec:
accounts_prompt: str = ""
skills_catalog_prompt: str = ""
protocols_prompt: str = ""
memory_prompt: str = ""
node_type: str = "event_loop"
output_keys: tuple[str, ...] = ()
is_subagent_mode: bool = False
@@ -138,8 +139,18 @@ def build_prompt_spec_from_node_context(
*,
focus_prompt: str | None = None,
narrative: str | None = None,
memory_prompt: str | None = None,
) -> NodePromptSpec:
"""Convert a NodeContext-like object into structured prompt inputs."""
resolved_memory_prompt = memory_prompt
if resolved_memory_prompt is None:
resolved_memory_prompt = getattr(ctx, "memory_prompt", "") or ""
dynamic_memory_provider = getattr(ctx, "dynamic_memory_provider", None)
if dynamic_memory_provider is not None:
try:
resolved_memory_prompt = dynamic_memory_provider() or ""
except Exception:
resolved_memory_prompt = getattr(ctx, "memory_prompt", "") or ""
return NodePromptSpec(
identity_prompt=ctx.identity_prompt or "",
focus_prompt=focus_prompt if focus_prompt is not None else (ctx.node_spec.system_prompt or ""),
@@ -147,6 +158,7 @@ def build_prompt_spec_from_node_context(
accounts_prompt=ctx.accounts_prompt or "",
skills_catalog_prompt=ctx.skills_catalog_prompt or "",
protocols_prompt=ctx.protocols_prompt or "",
memory_prompt=resolved_memory_prompt,
node_type=ctx.node_spec.node_type,
output_keys=tuple(ctx.node_spec.output_keys or ()),
is_subagent_mode=bool(getattr(ctx, "is_subagent_mode", False)),
@@ -169,6 +181,13 @@ def build_system_prompt(spec: NodePromptSpec) -> str:
if spec.protocols_prompt:
parts.append(f"\n{spec.protocols_prompt}")
if spec.memory_prompt:
parts.append(
"\nRelevant recalled memories may appear below. Treat them as "
"point-in-time guidance and verify stale details against current context."
)
parts.append(f"\n{spec.memory_prompt}")
if spec.narrative:
parts.append(f"\n--- Context (what has happened so far) ---\n{spec.narrative}")
@@ -195,12 +214,14 @@ def build_system_prompt_for_node_context(
*,
focus_prompt: str | None = None,
narrative: str | None = None,
memory_prompt: str | None = None,
) -> str:
"""Build a canonical system prompt from a NodeContext-like object."""
spec = build_prompt_spec_from_node_context(
ctx,
focus_prompt=focus_prompt,
narrative=narrative,
memory_prompt=memory_prompt,
)
return build_system_prompt(spec)
-1
View File
@@ -1 +0,0 @@
"""Framework-level worker monitoring package."""
+2 -2
View File
@@ -356,11 +356,11 @@ Not currently emitted — reserved for future use when `NodeConversation` compac
### `state_changed`
A shared memory key has been modified.
A shared buffer key has been modified.
| Data Field | Type | Description |
| ----------- | ----- | ---------------------------------- |
| `key` | `str` | Memory key that changed |
| `key` | `str` | Buffer key that changed |
| `old_value` | `Any` | Previous value |
| `new_value` | `Any` | New value |
| `scope` | `str` | Scope of the change |
+11 -9
View File
@@ -238,6 +238,7 @@ class AgentRuntime:
self._tools = tools or []
self._tool_executor = tool_executor
self._accounts_prompt = accounts_prompt
self._dynamic_memory_provider_factory: Callable[[str], Callable[[], str] | None] | None = None
self._accounts_data = accounts_data
self._tool_provider_map = tool_provider_map
@@ -360,6 +361,7 @@ class AgentRuntime:
skill_dirs=self.skill_dirs,
context_warn_ratio=self.context_warn_ratio,
batch_init_nudge=self.batch_init_nudge,
dynamic_memory_provider_factory=self._dynamic_memory_provider_factory,
)
await stream.start()
self._streams[ep_id] = stream
@@ -1446,12 +1448,12 @@ class AgentRuntime:
``session_state`` dict containing:
- ``resume_session_id``: reuse the same session directory
- ``memory``: only the keys that the async entry node declares
- ``data_buffer``: only the keys that the async entry node declares
as inputs (e.g. ``rules``, ``max_emails``). Stale outputs
from previous runs (``emails``, ``actions_taken``, ) are
excluded so each trigger starts fresh.
The memory is read from the primary session's ``state.json``
The data buffer is read from the primary session's ``state.json``
which is kept up-to-date by ``GraphExecutor._write_progress()``
at every node transition.
@@ -1469,7 +1471,7 @@ class AgentRuntime:
"""
import json as _json
# Determine which memory keys the async entry node needs.
# Determine which data buffer keys the async entry node needs.
allowed_keys: set[str] | None = None
# Look up the entry node from the correct graph
src_graph_id = source_graph_id or self._graph_id
@@ -1505,19 +1507,19 @@ class AgentRuntime:
try:
if state_path.exists():
data = _json.loads(state_path.read_text(encoding="utf-8"))
full_memory = data.get("data_buffer", data.get("memory", {}))
if not full_memory:
full_buffer = data.get("data_buffer", data.get("memory", {}))
if not full_buffer:
continue
# Filter to only input keys so stale outputs
# from previous triggers don't leak through.
if allowed_keys is not None:
memory = {k: v for k, v in full_memory.items() if k in allowed_keys}
buffer_data = {k: v for k, v in full_buffer.items() if k in allowed_keys}
else:
memory = full_memory
if memory:
buffer_data = full_buffer
if buffer_data:
return {
"resume_session_id": exec_id,
"data_buffer": memory,
"data_buffer": buffer_data,
}
except Exception:
logger.debug(
+8 -1
View File
@@ -191,6 +191,7 @@ class ExecutionStream:
skill_dirs: list[str] | None = None,
context_warn_ratio: float | None = None,
batch_init_nudge: str | None = None,
dynamic_memory_provider_factory: Callable[[str], Callable[[], str] | None] | None = None,
):
"""
Initialize execution stream.
@@ -245,6 +246,7 @@ class ExecutionStream:
self._skill_dirs: list[str] = skill_dirs or []
self._context_warn_ratio: float | None = context_warn_ratio
self._batch_init_nudge: str | None = batch_init_nudge
self._dynamic_memory_provider_factory = dynamic_memory_provider_factory
_es_logger = logging.getLogger(__name__)
if protocols_prompt:
@@ -720,6 +722,11 @@ class ExecutionStream:
skill_dirs=self._skill_dirs,
context_warn_ratio=self._context_warn_ratio,
batch_init_nudge=self._batch_init_nudge,
dynamic_memory_provider=(
self._dynamic_memory_provider_factory(execution_id)
if self._dynamic_memory_provider_factory is not None
else None
),
)
# Track executor so inject_input() can reach EventLoopNode instances
self._active_executors[execution_id] = executor
@@ -1084,7 +1091,7 @@ class ExecutionStream:
updated_at=now,
),
progress=progress,
memory=ss.get("data_buffer", ss.get("memory", {})),
data_buffer=ss.get("data_buffer", ss.get("memory", {})),
input_data=ctx.input_data,
current_run_id=ctx.run_id,
)
@@ -25,6 +25,7 @@ from framework.runtime.event_bus import AgentEvent, EventBus, EventType
from framework.runtime.execution_stream import EntryPointSpec
from framework.runtime.outcome_aggregator import OutcomeAggregator
from framework.runtime.shared_state import IsolationLevel, SharedBufferManager
from framework.schemas.session_state import SessionState, SessionTimestamps
# === Test Fixtures ===
@@ -190,6 +191,26 @@ class TestSharedBufferManager:
assert "exec-1" not in manager._execution_state
class TestSessionState:
"""Tests for session state data-buffer compatibility."""
def test_legacy_memory_alias_populates_data_buffer(self):
"""Legacy `memory` payloads should still hydrate the session buffer."""
state = SessionState(
session_id="session-1",
goal_id="goal-1",
timestamps=SessionTimestamps(
started_at="2026-01-01T00:00:00",
updated_at="2026-01-01T00:00:00",
),
memory={"rules": "keep starred mail"},
)
assert state.data_buffer == {"rules": "keep starred mail"}
assert state.memory == {"rules": "keep starred mail"}
assert state.to_session_state_dict()["data_buffer"] == {"rules": "keep starred mail"}
# === EventBus Tests ===
+18 -5
View File
@@ -9,7 +9,7 @@ from datetime import datetime
from enum import StrEnum
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel, Field, computed_field
from pydantic import AliasChoices, BaseModel, Field, computed_field
if TYPE_CHECKING:
from framework.graph.executor import ExecutionResult
@@ -119,8 +119,11 @@ class SessionState(BaseModel):
# Result
result: SessionResult = Field(default_factory=SessionResult)
# Memory (for resumability)
memory: dict[str, Any] = Field(default_factory=dict)
# Data buffer (for resumability)
data_buffer: dict[str, Any] = Field(
default_factory=dict,
validation_alias=AliasChoices("data_buffer", "memory"),
)
# Metrics
metrics: SessionMetrics = Field(default_factory=SessionMetrics)
@@ -154,6 +157,16 @@ class SessionState(BaseModel):
model_config = {"extra": "allow"}
@property
def memory(self) -> dict[str, Any]:
"""Backward-compatible alias for legacy callers."""
return self.data_buffer
@memory.setter
def memory(self, value: dict[str, Any]) -> None:
"""Backward-compatible alias for legacy callers."""
self.data_buffer = value
@computed_field
@property
def duration_ms(self) -> int:
@@ -243,7 +256,7 @@ class SessionState(BaseModel):
error=result.error,
output=result.output,
),
memory=result.session_state.get("data_buffer", result.session_state.get("memory", {})) if result.session_state else {},
data_buffer=result.session_state.get("data_buffer", result.session_state.get("memory", {})) if result.session_state else {},
input_data=input_data or {},
)
@@ -307,7 +320,7 @@ class SessionState(BaseModel):
return {
"paused_at": resume_from,
"resume_from": resume_from,
"data_buffer": self.memory,
"data_buffer": self.data_buffer,
"execution_path": self.progress.path,
"node_visit_counts": self.progress.node_visit_counts,
}
+16 -3
View File
@@ -176,9 +176,18 @@ async def create_queen(
phase_state.running_tools = [t for t in queen_tools if t.name in running_names]
# ---- Cross-session memory ----------------------------------------
from framework.agents.queen.queen_memory_v2 import init_memory_dir
from framework.agents.queen.queen_memory_v2 import (
colony_memory_dir,
global_memory_dir,
init_memory_dir,
queen_colony_cursor_file,
)
init_memory_dir()
colony_dir = colony_memory_dir(session.id)
global_dir = global_memory_dir()
init_memory_dir(colony_dir, migrate_legacy=True)
init_memory_dir(global_dir)
phase_state.global_memory_dir = global_dir
# ---- Compose phase-specific prompts ------------------------------
_orig_node = _queen_graph.nodes[0]
@@ -367,7 +376,11 @@ async def create_queen(
from framework.agents.queen.reflection_agent import subscribe_reflection_triggers
_reflection_subs = await subscribe_reflection_triggers(
session.event_bus, queen_dir, session.llm,
session.event_bus,
queen_dir,
session.llm,
memory_dir=colony_dir,
cursor_file=queen_colony_cursor_file(queen_dir),
phase_state=phase_state,
)
+92 -36
View File
@@ -47,6 +47,10 @@ class Session:
worker_handoff_sub: str | None = None
# Memory reflection + recall subscriptions
memory_reflection_subs: list = field(default_factory=list) # list[str]
# Worker colony memory subscriptions
worker_memory_subs: list = field(default_factory=list) # list[str]
# Per-execution colony recall cache for worker prompts
worker_colony_recall_blocks: dict[str, str] = field(default_factory=dict)
# Worker run digest subscription (fires on EXECUTION_COMPLETED / EXECUTION_FAILED)
worker_digest_sub: str | None = None
# Trigger definitions loaded from agent's triggers.json (available but inactive)
@@ -294,17 +298,10 @@ class SessionManager:
try:
# Blocking I/O — load in executor
loop = asyncio.get_running_loop()
# Prioritize: explicit model arg > worker-specific model > session default
from framework.config import (
get_preferred_worker_model,
get_worker_api_base,
get_worker_api_key,
get_worker_llm_extra_kwargs,
)
worker_model = get_preferred_worker_model()
resolved_model = model or worker_model or self._model
# By default, workers share the session's LLM with the queen so
# execution and memory reflection/recall stay on the same model.
session_model = getattr(session.llm, "model", None)
resolved_model = model or session_model or self._model
runner = await loop.run_in_executor(
None,
lambda: AgentRunner.load(
@@ -316,29 +313,8 @@ class SessionManager:
),
)
# If a worker-specific model is configured, build an LLM provider
# with the correct worker credentials so _setup() doesn't fall back
# to the queen's llm config (which may be a different provider).
if worker_model and not model:
from framework.config import get_hive_config
worker_llm_cfg = get_hive_config().get("worker_llm", {})
if worker_llm_cfg.get("use_antigravity_subscription"):
from framework.llm.antigravity import AntigravityProvider
runner._llm = AntigravityProvider(model=resolved_model)
else:
from framework.llm.litellm import LiteLLMProvider
worker_api_key = get_worker_api_key()
worker_api_base = get_worker_api_base()
worker_extra = get_worker_llm_extra_kwargs()
runner._llm = LiteLLMProvider(
model=resolved_model,
api_key=worker_api_key,
api_base=worker_api_base,
**worker_extra,
)
if model is None:
runner._llm = session.llm
# Setup with session's event bus
if runner._agent_runtime is None:
@@ -349,6 +325,16 @@ class SessionManager:
runtime = runner._agent_runtime
if runtime is not None:
runtime._dynamic_memory_provider_factory = (
lambda execution_id, session=session: (
lambda execution_id=execution_id, session=session: session.worker_colony_recall_blocks.get(
execution_id,
"",
)
)
)
# Load triggers from the agent's triggers.json definition file.
from framework.tools.queen_lifecycle_tools import _read_agent_triggers_json
@@ -384,8 +370,19 @@ class SessionManager:
session.graph_runtime = runtime
session.worker_info = info
# Subscribe to execution completion for per-run digest generation
# Subscribe to execution completion for per-run digest generation.
# Colony memory is additive; worker loading should still succeed if
# that optional subscription path hits an import/runtime issue while
# restoring an older session.
self._subscribe_worker_digest(session)
try:
await self._subscribe_worker_colony_memory(session)
except Exception:
logger.warning(
"Worker colony memory subscription failed for '%s'; continuing without it",
resolved_graph_id,
exc_info=True,
)
async with self._lock:
self._loading.discard(session.id)
@@ -631,6 +628,14 @@ class SessionManager:
pass
session.worker_digest_sub = None
for sub_id in session.worker_memory_subs:
try:
session.event_bus.unsubscribe(sub_id)
except Exception:
pass
session.worker_memory_subs.clear()
session.worker_colony_recall_blocks.clear()
graph_id = session.graph_id
session.graph_id = None
session.worker_path = None
@@ -675,6 +680,14 @@ class SessionManager:
pass
session.worker_digest_sub = None
for sub_id in session.worker_memory_subs:
try:
session.event_bus.unsubscribe(sub_id)
except Exception:
pass
session.worker_memory_subs.clear()
session.worker_colony_recall_blocks.clear()
# Stop queen and memory reflection/recall subscriptions
for sub_id in session.memory_reflection_subs:
try:
@@ -717,10 +730,11 @@ class SessionManager:
if _llm is not None:
import asyncio
from framework.agents.queen.queen_memory_v2 import colony_memory_dir
from framework.agents.queen.reflection_agent import run_long_reflection
asyncio.create_task(
run_long_reflection(_llm),
run_long_reflection(_llm, memory_dir=colony_memory_dir(_storage_id)),
name=f"queen-memory-long-reflection-{session_id}",
)
@@ -888,6 +902,48 @@ class SessionManager:
handler=_on_worker_event,
)
async def _subscribe_worker_colony_memory(self, session: Session) -> None:
"""Subscribe shared colony reflection/recall for top-level worker runs."""
for sub_id in session.worker_memory_subs:
try:
session.event_bus.unsubscribe(sub_id)
except Exception:
pass
session.worker_memory_subs.clear()
session.worker_colony_recall_blocks.clear()
runtime = session.graph_runtime
if runtime is None:
return
worker_sessions_dir = getattr(runtime, "_session_store", None)
worker_sessions_dir = getattr(worker_sessions_dir, "sessions_dir", None)
if worker_sessions_dir is None:
return
from framework.agents.queen.queen_memory_v2 import colony_memory_dir, init_memory_dir
from framework.agents.queen.reflection_agent import subscribe_worker_memory_triggers
colony_dir = colony_memory_dir(session.id)
init_memory_dir(colony_dir, migrate_legacy=True)
runtime._dynamic_memory_provider_factory = (
lambda execution_id, session=session: (
lambda execution_id=execution_id, session=session: session.worker_colony_recall_blocks.get(
execution_id,
"",
)
)
)
session.worker_memory_subs = await subscribe_worker_memory_triggers(
session.event_bus,
session.llm,
worker_sessions_dir=worker_sessions_dir,
colony_memory_dir=colony_dir,
recall_cache=session.worker_colony_recall_blocks,
)
def _subscribe_worker_handoffs(self, session: Session, executor: Any) -> None:
"""Subscribe queen to worker/subagent escalation handoff events."""
from framework.runtime.event_bus import EventType as _ET
@@ -8,7 +8,7 @@ metadata:
## Operational Protocol: Structured Note-Taking
Maintain structured working notes in shared memory key `_working_notes`.
Maintain structured working notes in shared buffer key `_working_notes`.
Update at these checkpoints:
- After completing each discrete subtask or batch item
+1 -1
View File
@@ -79,7 +79,7 @@ SKILL_REGISTRY: dict[str, str] = {
"hive.task-decomposition": "task-decomposition",
}
# All shared memory keys used by default skills (for permission auto-inclusion)
# All shared buffer keys used by default skills (for permission auto-inclusion)
DATA_BUFFER_KEYS: list[str] = [
# note-taking
"_working_notes",
+98 -10
View File
@@ -123,6 +123,9 @@ class QueenPhaseState:
# Cached recall block — populated async by recall_selector after each turn.
_cached_recall_block: str = ""
_cached_colony_recall_block: str = ""
_cached_global_recall_block: str = ""
global_memory_dir: Path | None = None
def get_current_tools(self) -> list:
"""Return tools for the current phase."""
@@ -145,14 +148,16 @@ class QueenPhaseState:
else:
base = self.prompt_building
memory = self._cached_recall_block
parts = [base]
if self.skills_catalog_prompt:
parts.append(self.skills_catalog_prompt)
if self.protocols_prompt:
parts.append(self.protocols_prompt)
if memory:
parts.append(memory)
colony_memory = self._cached_colony_recall_block or self._cached_recall_block
if colony_memory:
parts.append(colony_memory)
if self._cached_global_recall_block:
parts.append(self._cached_global_recall_block)
return "\n\n".join(parts)
async def _emit_phase_event(self) -> None:
@@ -2002,12 +2007,12 @@ def register_queen_lifecycle_tools(
"input_keys": {
"type": "array",
"items": {"type": "string"},
"description": "Expected input memory keys (hints)",
"description": "Expected input buffer keys (hints)",
},
"output_keys": {
"type": "array",
"items": {"type": "string"},
"description": "Expected output memory keys (hints)",
"description": "Expected output buffer keys (hints)",
},
"success_criteria": {
"type": "string",
@@ -2642,16 +2647,16 @@ def register_queen_lifecycle_tools(
return "\n".join(lines)
async def _format_memory(runtime: AgentRuntime) -> str:
"""Format the worker's shared memory snapshot and recent changes."""
"""Format the worker's shared buffer snapshot and recent changes."""
from framework.runtime.shared_state import IsolationLevel
lines = []
active_streams = runtime.get_active_streams()
if not active_streams:
return "Worker has no active executions. No memory to inspect."
return "Worker has no active executions. No buffer state to inspect."
# Read memory from the first active execution
# Read buffer state from the first active execution
stream_info = active_streams[0]
exec_ids = stream_info.get("active_execution_ids", [])
stream_id = stream_info.get("stream_id", "")
@@ -2665,7 +2670,7 @@ def register_queen_lifecycle_tools(
if not state:
lines.append("Worker's shared buffer is empty.")
else:
lines.append(f"Worker's shared memory ({len(state)} keys):")
lines.append(f"Worker's shared buffer ({len(state)} keys):")
for key, value in state.items():
lines.append(f" {key}: {_preview_value(value)}")
@@ -3133,7 +3138,7 @@ def register_queen_lifecycle_tools(
"- diary: persistent run digests from past executions — read this first "
"before digging into live runtime logs\n"
"- activity: current node, transitions, latest LLM output\n"
"- memory: worker's accumulated knowledge and state\n"
"- memory: worker's accumulated buffer state\n"
"- tools: running and recent tool calls\n"
"- issues: retries, stalls, constraint violations\n"
"- progress: goal criteria, token consumption\n"
@@ -4010,6 +4015,89 @@ def register_queen_lifecycle_tools(
)
tools_registered += 1
# --- save_global_memory --------------------------------------------------
async def save_global_memory_entry(
category: str,
description: str,
content: str,
name: str | None = None,
) -> str:
"""Persist a queen-global memory entry about the user."""
from framework.agents.queen.queen_memory_v2 import (
global_memory_dir as _global_memory_dir,
init_memory_dir as _init_memory_dir,
save_global_memory as _save_global_memory,
)
target_dir = (
phase_state.global_memory_dir
if phase_state is not None and phase_state.global_memory_dir is not None
else _global_memory_dir()
)
_init_memory_dir(target_dir)
try:
filename, path = _save_global_memory(
category=category,
description=description,
content=content,
name=name,
memory_dir=target_dir,
)
return json.dumps(
{
"status": "saved",
"filename": filename,
"path": str(path),
"category": category,
}
)
except ValueError as exc:
return json.dumps({"error": str(exc)})
_save_global_memory_tool = Tool(
name="save_global_memory",
description=(
"Save durable global memory about the user. "
"Only use for user profile, preferences, environment, or feedback."
),
parameters={
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": ["profile", "preference", "environment", "feedback"],
},
"description": {
"type": "string",
"description": "Specific one-line description for future recall selection.",
},
"content": {
"type": "string",
"description": "Durable user-centric memory content.",
},
"name": {
"type": "string",
"description": "Optional short memory title.",
},
},
"required": ["category", "description", "content"],
"additionalProperties": False,
},
)
registry.register(
"save_global_memory",
_save_global_memory_tool,
lambda inputs: save_global_memory_entry(
inputs["category"],
inputs["description"],
inputs["content"],
inputs.get("name"),
),
)
tools_registered += 1
# --- list_triggers ---------------------------------------------------------
async def list_triggers() -> str:
@@ -335,8 +335,8 @@ async def test_negative_case_condition_false():
result = await executor.execute(graph, goal, {})
# Verify condition correctly evaluated to False
assert result.success, "Execution should succeed"
# Verify condition correctly evaluated to False — terminal never reached
assert not result.success, "Execution should fail when terminal is unreachable"
assert "high_score_handler" not in result.path, (
f"high_score_handler should NOT be in path. "
f"Condition 'score > 80' should be False (score=30). "
+77
View File
@@ -1327,6 +1327,83 @@ class TestCrashRecovery:
# Should have the restored output
assert result.output.get("result") == "partial_value"
@pytest.mark.asyncio
async def test_restore_reblocks_pending_user_input_instead_of_continuing(
self, tmp_path, runtime, buffer
):
"""A restored queen wait should re-emit the question, not self-continue."""
store = FileConversationStore(tmp_path / "conv")
conv = NodeConversation(
system_prompt="You are a test assistant.",
output_keys=[],
store=store,
)
conv.set_current_phase("queen")
await conv.add_user_message("Session started.")
await conv.add_assistant_message(
"",
tool_calls=[
{
"id": "ask_1",
"type": "function",
"function": {
"name": "ask_user",
"arguments": '{"question":"What city?","options":["Seattle","Chicago"]}',
},
}
],
)
await conv.add_tool_result("ask_1", "Waiting for user input...")
await conv.add_assistant_message("What city should I target?")
await store.write_cursor(
{
"iteration": 4,
"next_seq": conv.next_seq,
"pending_input": {
"prompt": "What city?",
"options": ["Seattle", "Chicago"],
"questions": None,
"emit_client_request": True,
},
}
)
spec = NodeSpec(
id="queen",
name="Queen",
description="interactive queen",
node_type="event_loop",
output_keys=[],
)
llm = MockStreamingLLM(scenarios=[text_scenario("This should not run.")])
bus = EventBus()
input_events = []
async def capture(event):
input_events.append(event)
bus.subscribe(event_types=[EventType.CLIENT_INPUT_REQUESTED], handler=capture)
node = EventLoopNode(
event_bus=bus,
conversation_store=store,
config=LoopConfig(max_iterations=10),
)
ctx = build_ctx(runtime, spec, buffer, llm, stream_id="queen")
async def shutdown_after_prompt():
await asyncio.sleep(0.05)
node.signal_shutdown()
task = asyncio.create_task(shutdown_after_prompt())
result = await node.execute(ctx)
await task
assert result.success is True
assert llm._call_index == 0
assert len(input_events) == 1
assert input_events[0].data["prompt"] == "What city?"
# ===========================================================================
# External event injection
+2 -2
View File
@@ -237,9 +237,9 @@ class TestExecutionQuality:
# Verify - this should be failed
assert result.success is False
assert result.execution_quality == "failed"
assert result.total_retries == 2
assert result.total_retries == 1
assert "fails" in result.nodes_with_failures
assert result.retry_details["fails"] == 2
assert result.retry_details["fails"] == 1
assert result.had_partial_failures is True
assert result.error is not None
assert "failed after 2 attempts" in result.error
+197 -2
View File
@@ -5,15 +5,20 @@ from __future__ import annotations
import json
import time
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from framework.agents.queen import queen_memory_v2 as qm
from framework.agents.queen.reflection_agent import subscribe_worker_memory_triggers
from framework.agents.queen.recall_selector import (
format_recall_injection,
select_memories,
)
from framework.graph.prompting import build_system_prompt_for_node_context
from framework.runtime.event_bus import AgentEvent, EventBus, EventType
from framework.tools.queen_lifecycle_tools import QueenPhaseState
# ---------------------------------------------------------------------------
# parse_frontmatter
@@ -52,6 +57,8 @@ def test_parse_memory_type_valid():
assert qm.parse_memory_type("environment") == "environment"
assert qm.parse_memory_type("technique") == "technique"
assert qm.parse_memory_type("reference") == "reference"
assert qm.parse_memory_type("profile") == "profile"
assert qm.parse_memory_type("feedback") == "feedback"
def test_parse_memory_type_case_insensitive():
@@ -285,6 +292,12 @@ def test_format_recall_injection_empty():
assert format_recall_injection([]) == ""
def test_format_recall_injection_custom_heading(tmp_path: Path):
(tmp_path / "a.md").write_text("---\nname: a\n---\nbody of a")
result = format_recall_injection(["a.md"], memory_dir=tmp_path, heading="Colony Memories")
assert "Colony Memories" in result
# ---------------------------------------------------------------------------
# reflection_agent
# ---------------------------------------------------------------------------
@@ -331,7 +344,12 @@ async def test_short_reflection(tmp_path: Path):
]
session_dir = tmp_path / "session"
await run_short_reflection(session_dir, llm, memory_dir=mem_dir)
await run_short_reflection(
session_dir,
llm,
memory_dir=mem_dir,
cursor_file=tmp_path / "cursor.json",
)
# Verify the memory file was created.
written = mem_dir / "user-likes-tests.md"
@@ -505,7 +523,184 @@ async def test_cursor_advanced_on_success(tmp_path: Path):
mock.patch("framework.agents.queen.reflection_agent.write_cursor") as mock_write:
await run_short_reflection(tmp_path / "session", llm, memory_dir=tmp_path / "mem")
mock_write.assert_called_once_with(2)
mock_write.assert_called_once_with(2, None)
def test_init_memory_dir_migrates_shared_memories_into_colony(tmp_path: Path):
source = tmp_path / "legacy-shared"
source.mkdir()
(source / "shared-memory.md").write_text(
"---\nname: shared\ndescription: old shared memory\ntype: goal\n---\nbody",
encoding="utf-8",
)
target = tmp_path / "colony"
qm.migrate_shared_v2_memories(target, source_dir=source)
assert (target / "shared-memory.md").exists()
assert not (source / "shared-memory.md").exists()
assert (target / ".migrated-from-shared-memory").exists()
def test_shared_memory_migration_marker_prevents_repeat(tmp_path: Path):
source = tmp_path / "legacy-shared"
source.mkdir()
target = tmp_path / "colony"
target.mkdir()
(target / ".migrated-from-shared-memory").write_text("done\n", encoding="utf-8")
(source / "shared-memory.md").write_text("body", encoding="utf-8")
qm.migrate_shared_v2_memories(target, source_dir=source)
assert not (target / "shared-memory.md").exists()
assert (source / "shared-memory.md").exists()
def test_global_memory_is_not_populated_by_colony_migration(tmp_path: Path):
source = tmp_path / "legacy-shared"
source.mkdir()
(source / "shared-memory.md").write_text("body", encoding="utf-8")
colony = tmp_path / "colony"
global_dir = tmp_path / "global"
qm.migrate_shared_v2_memories(colony, source_dir=source)
qm.init_memory_dir(global_dir)
assert list(global_dir.glob("*.md")) == []
def test_save_global_memory_rejects_runtime_details(tmp_path: Path):
with pytest.raises(ValueError):
qm.save_global_memory(
category="profile",
description="codebase preference",
content="The user wants the worker graph to use node retries.",
memory_dir=tmp_path,
)
def test_save_global_memory_persists_frontmatter(tmp_path: Path):
filename, path = qm.save_global_memory(
category="preference",
description="Prefers concise updates",
content="The user prefers concise, direct status updates.",
memory_dir=tmp_path,
)
assert filename.endswith(".md")
text = path.read_text(encoding="utf-8")
assert "type: preference" in text
assert "Prefers concise updates" in text
def test_build_system_prompt_injects_dynamic_memory():
ctx = SimpleNamespace(
identity_prompt="Identity",
node_spec=SimpleNamespace(system_prompt="Focus", node_type="event_loop", output_keys=["out"]),
narrative="Narrative",
accounts_prompt="",
skills_catalog_prompt="",
protocols_prompt="",
memory_prompt="",
dynamic_memory_provider=lambda: "--- Colony Memories ---\nremember this",
is_subagent_mode=False,
)
prompt = build_system_prompt_for_node_context(ctx)
assert "Colony Memories" in prompt
assert "remember this" in prompt
def test_queen_phase_state_appends_colony_and_global_memory_blocks():
phase = QueenPhaseState(
prompt_building="base prompt",
_cached_colony_recall_block="--- Colony Memories ---\ncolony",
_cached_global_recall_block="--- Global Memories ---\nglobal",
)
prompt = phase.get_current_prompt()
assert "base prompt" in prompt
assert "Colony Memories" in prompt
assert "Global Memories" in prompt
@pytest.mark.asyncio
async def test_worker_colony_reflection_updates_shared_memory_and_cache(tmp_path: Path):
worker_sessions_dir = tmp_path / "worker-sessions"
execution_id = "exec-1"
session_dir = worker_sessions_dir / execution_id / "conversations" / "parts"
session_dir.mkdir(parents=True)
(session_dir / "0000000000.json").write_text(
json.dumps({"role": "user", "content": "Please remember I like terse summaries."}),
encoding="utf-8",
)
(session_dir / "0000000001.json").write_text(
json.dumps({"role": "assistant", "content": "I'll keep that in mind."}),
encoding="utf-8",
)
colony_dir = tmp_path / "colony"
colony_dir.mkdir()
recall_cache: dict[str, str] = {}
bus = EventBus()
llm = AsyncMock()
llm.acomplete.side_effect = [
MagicMock(
content="",
raw_response={
"tool_calls": [
{
"id": "tc_1",
"name": "write_memory_file",
"input": {
"filename": "user-prefers-terse-summaries.md",
"content": (
"---\n"
"name: user-prefers-terse-summaries\n"
"description: Prefers terse summaries\n"
"type: preference\n"
"---\n\n"
"The user prefers terse summaries."
),
},
}
]
},
),
MagicMock(content="done", raw_response={}),
MagicMock(content=json.dumps({"selected_memories": ["user-prefers-terse-summaries.md"]})),
]
subs = await subscribe_worker_memory_triggers(
bus,
llm,
worker_sessions_dir=worker_sessions_dir,
colony_memory_dir=colony_dir,
recall_cache=recall_cache,
)
try:
await bus.publish(
AgentEvent(
type=EventType.EXECUTION_STARTED,
stream_id="default",
execution_id=execution_id,
)
)
await bus.publish(
AgentEvent(
type=EventType.LLM_TURN_COMPLETE,
stream_id="default",
execution_id=execution_id,
)
)
finally:
for sub_id in subs:
bus.unsubscribe(sub_id)
assert (colony_dir / "user-prefers-terse-summaries.md").exists()
assert "Colony Memories" in recall_cache[execution_id]
assert "terse summaries" in recall_cache[execution_id]
# ---------------------------------------------------------------------------
@@ -1,7 +1,7 @@
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock
from unittest.mock import AsyncMock, MagicMock
import pytest
@@ -123,3 +123,116 @@ async def test_stop_session_unsubscribes_worker_handoff() -> None:
reason="after stop",
)
assert queen_node.inject_event.await_count == 1
@pytest.mark.asyncio
async def test_load_worker_core_defaults_to_session_llm_model(monkeypatch, tmp_path) -> None:
bus = EventBus()
manager = SessionManager(model="manager-default")
session_llm = SimpleNamespace(model="queen-shared-model")
session = Session(id="session_worker", event_bus=bus, llm=session_llm, loaded_at=0.0)
runtime = SimpleNamespace(is_running=True)
runner = SimpleNamespace(
_llm=None,
_agent_runtime=runtime,
info=MagicMock(return_value={"id": "worker"}),
)
load_calls: list[dict[str, object]] = []
def fake_load(agent_path, model=None, **kwargs):
load_calls.append({"agent_path": agent_path, "model": model, "kwargs": kwargs})
return runner
monkeypatch.setattr("framework.runner.AgentRunner.load", fake_load)
monkeypatch.setattr(manager, "_cleanup_stale_active_sessions", lambda *_args: None)
monkeypatch.setattr(manager, "_subscribe_worker_digest", lambda *_args: None)
monkeypatch.setattr(manager, "_subscribe_worker_colony_memory", AsyncMock())
monkeypatch.setattr(
"framework.tools.queen_lifecycle_tools._read_agent_triggers_json",
lambda *_args: [],
)
await manager._load_worker_core(session, tmp_path / "worker_agent")
assert load_calls[0]["model"] == "queen-shared-model"
assert session.runner is runner
assert session.runner._llm is session_llm
assert runtime._dynamic_memory_provider_factory is not None
@pytest.mark.asyncio
async def test_load_worker_core_keeps_explicit_worker_model_override(monkeypatch, tmp_path) -> None:
bus = EventBus()
manager = SessionManager(model="manager-default")
session_llm = SimpleNamespace(model="queen-shared-model")
session = Session(id="session_override", event_bus=bus, llm=session_llm, loaded_at=0.0)
runtime = SimpleNamespace(is_running=True)
runner = SimpleNamespace(
_llm=None,
_agent_runtime=runtime,
info=MagicMock(return_value={"id": "worker"}),
)
load_calls: list[dict[str, object]] = []
def fake_load(agent_path, model=None, **kwargs):
load_calls.append({"agent_path": agent_path, "model": model, "kwargs": kwargs})
return runner
monkeypatch.setattr("framework.runner.AgentRunner.load", fake_load)
monkeypatch.setattr(manager, "_cleanup_stale_active_sessions", lambda *_args: None)
monkeypatch.setattr(manager, "_subscribe_worker_digest", lambda *_args: None)
monkeypatch.setattr(manager, "_subscribe_worker_colony_memory", AsyncMock())
monkeypatch.setattr(
"framework.tools.queen_lifecycle_tools._read_agent_triggers_json",
lambda *_args: [],
)
await manager._load_worker_core(
session,
tmp_path / "worker_agent",
model="explicit-worker-model",
)
assert load_calls[0]["model"] == "explicit-worker-model"
assert session.runner is runner
assert session.runner._llm is None
@pytest.mark.asyncio
async def test_load_worker_core_continues_when_colony_memory_subscription_fails(
monkeypatch, tmp_path
) -> None:
bus = EventBus()
manager = SessionManager(model="manager-default")
session_llm = SimpleNamespace(model="queen-shared-model")
session = Session(id="session_memory_warning", event_bus=bus, llm=session_llm, loaded_at=0.0)
runtime = SimpleNamespace(is_running=True)
runner = SimpleNamespace(
_llm=None,
_agent_runtime=runtime,
info=MagicMock(return_value={"id": "worker"}),
)
monkeypatch.setattr("framework.runner.AgentRunner.load", lambda *args, **kwargs: runner)
monkeypatch.setattr(manager, "_cleanup_stale_active_sessions", lambda *_args: None)
monkeypatch.setattr(manager, "_subscribe_worker_digest", lambda *_args: None)
monkeypatch.setattr(
manager,
"_subscribe_worker_colony_memory",
AsyncMock(side_effect=ImportError("optional memory hook unavailable")),
)
monkeypatch.setattr(
"framework.tools.queen_lifecycle_tools._read_agent_triggers_json",
lambda *_args: [],
)
await manager._load_worker_core(session, tmp_path / "worker_agent")
assert session.runner is runner
assert session.graph_runtime is runtime
assert session.worker_path == tmp_path / "worker_agent"
+3 -3
View File
@@ -406,15 +406,15 @@ flowchart TB
### How It Works
**1. Outputs are persisted via the accumulator.** When the LLM calls `set_output(key, value)`, the `OutputAccumulator` stores the value in memory and writes through to the `ConversationStore` cursor (for crash recovery).
**1. Outputs are persisted via the accumulator.** When the LLM calls `set_output(key, value)`, the `OutputAccumulator` stores the value in the data buffer and writes through to the `ConversationStore` cursor (for crash recovery).
**2. Judge feedback becomes conversation memory.** When the judge issues a RETRY verdict with feedback, that feedback is injected as a `[Judge feedback]: ...` user message into the conversation. On the next LLM turn, the agent sees its prior attempt, the judge's critique, and can adjust. This is the core reflexion mechanism — in-context learning without model retraining.
**3. The three-layer prompt onion refreshes each turn.** Layer 1 (identity) is static. Layer 2 (narrative) is rebuilt deterministically from `DataBuffer.read_all()` and the execution path — listing completed phases and current state values. Layer 3 (focus) is the current node's `system_prompt`. At phase transitions in continuous mode, Layer 3 swaps while Layers 1-2 and the full conversation history carry forward.
**4. Phase transitions inject structured reflection.** When execution moves between nodes, a transition marker is inserted into the conversation containing: what phase completed, all outputs in memory, available data files, available tools, and an explicit reflection prompt: *"Before proceeding, briefly reflect: what went well in the previous phase? Are there any gaps or surprises worth noting?"* This engineered metacognition surfaces issues before they compound.
**4. Phase transitions inject structured reflection.** When execution moves between nodes, a transition marker is inserted into the conversation containing: what phase completed, all outputs in the data buffer, available data files, available tools, and an explicit reflection prompt: *"Before proceeding, briefly reflect: what went well in the previous phase? Are there any gaps or surprises worth noting?"* This engineered metacognition surfaces issues before they compound.
**5. Data buffer connects phases.** On ACCEPT, the accumulator's outputs are written to `DataBuffer`. The narrative layer reads these values to describe progress. In continuous mode, subsequent nodes see both the conversation history (what was discussed) and the structured memory (what was decided). In isolated mode, a `ContextHandoff` summarizes the prior node's conversation for the next node's input.
**5. Data buffer connects phases.** On ACCEPT, the accumulator's outputs are written to `DataBuffer`. The narrative layer reads these values to describe progress. In continuous mode, subsequent nodes see both the conversation history (what was discussed) and the structured buffer state (what was decided). In isolated mode, a `ContextHandoff` summarizes the prior node's conversation for the next node's input.
### The Judge Evaluation Pipeline
-1
View File
@@ -90,7 +90,6 @@ hive/ # Repository root
│ │ ├── graph/ # GraphExecutor - executes node graphs
│ │ ├── llm/ # LLM provider integrations (Anthropic, OpenAI, OpenRouter, Hive, etc.)
│ │ ├── mcp/ # MCP server integration
│ │ ├── monitoring/ # Runtime monitoring
│ │ ├── observability/ # Structured logging - human-readable and machine-parseable tracing
│ │ ├── runner/ # AgentRunner - loads and runs agents
│ │ ├── runtime/ # Runtime environment
+2 -2
View File
@@ -127,12 +127,12 @@ decisions │ │
"input_keys": {
"type": "array",
"items": { "type": "string" },
"description": "Expected input memory keys"
"description": "Expected input buffer keys"
},
"output_keys": {
"type": "array",
"items": { "type": "string" },
"description": "Expected output memory keys"
"description": "Expected output buffer keys"
},
"success_criteria": {
"type": "string",
+5 -5
View File
@@ -4,13 +4,13 @@
Real business processes aren't linear. A sales outreach might go: research a prospect, draft a message, realize the research is thin, go back and dig deeper, draft again, get human approval, send. There are loops, branches, fallbacks, and decision points.
Hive models this as a directed graph. Nodes do work, edges connect them, and shared memory lets them pass data. The framework walks this structure — running nodes, following edges, managing retries — until the agent reaches its goal or exhausts its step budget.
Hive models this as a directed graph. Nodes do work, edges connect them, and a shared data buffer lets them pass data. The framework walks this structure — running nodes, following edges, managing retries — until the agent reaches its goal or exhausts its step budget.
Edges can loop back, creating feedback cycles where an agent retries a step or takes a different path. That's intentional. A graph that only moves forward can't self-correct.
## Nodes
A node is a unit of work. Each node reads inputs from shared memory, does something, and writes outputs back.
A node is a unit of work. Each node reads inputs from the shared buffer, does something, and writes outputs back.
**`event_loop`** — This is the only node type in Hive. It's a multi-turn LLM loop where the model reasons about the current state, calls tools, observes results, and keeps going until it has produced the required outputs. All agent behavior happens in these nodes. They handle long-running tasks, manage their own context window, and can recover from crashes mid-conversation.
@@ -47,11 +47,11 @@ Edges also handle data plumbing between nodes — mapping one node's outputs to
When a node has multiple outgoing edges, the framework can run those branches in parallel and reconverge when they're all done. This is useful for tasks like researching a prospect from multiple sources simultaneously.
## Shared Memory
## Shared Buffer
Shared memory is how nodes communicate. It's a key-value store scoped to a single [session](./worker_agent.md). Every node declares which keys it reads and which it writes, and the framework enforces those boundaries — a node can't quietly access data it hasn't declared.
The shared buffer is how nodes communicate. It's a key-value store scoped to a single [session](./worker_agent.md). Every node declares which keys it reads and which it writes, and the framework enforces those boundaries — a node can't quietly access data it hasn't declared.
Data flows through the graph in a natural way: input arrives at the start, each node reads what it needs and writes what it produces, and edges map outputs to inputs as data moves between nodes. At the end, the full memory state is the execution result.
Data flows through the graph in a natural way: input arrives at the start, each node reads what it needs and writes what it produces, and edges map outputs to inputs as data moves between nodes. At the end, the full buffer state is the execution result.
## Human-in-the-Loop
+3 -3
View File
@@ -10,7 +10,7 @@ In Hive, a **Coding Agent** (like Claude Code or Cursor) generates worker agents
A session is a single execution of a worker agent against a specific input. If your outreach agent processes 50 prospects, that's 50 sessions.
Each session is isolated — it has its own shared memory, its own execution state, and its own history. This matters because sessions can be long-running. An agent might start researching a prospect, pause for human approval, wait hours or days, and then resume to send the message. The session preserves everything across that gap.
Each session is isolated — it has its own shared buffer, its own execution state, and its own history. This matters because sessions can be long-running. An agent might start researching a prospect, pause for human approval, wait hours or days, and then resume to send the message. The session preserves everything across that gap.
Sessions also make debugging straightforward. Every decision the agent made, every tool it called, every retry it attempted — it's all captured in the session. When something goes wrong, you can trace exactly what happened.
@@ -32,7 +32,7 @@ This is the operational model Hive is designed for: agents that run 24/7 as part
## The Runtime
The worker agent runtime manages the lifecycle: starting sessions, executing the graph, handling pauses and resumes, tracking costs, and collecting metrics. It coordinates everything the agent needs — LLM access, tool execution, shared memory, credential management — so individual nodes can focus on their specific job.
The worker agent runtime manages the lifecycle: starting sessions, executing the graph, handling pauses and resumes, tracking costs, and collecting metrics. It coordinates everything the agent needs — LLM access, tool execution, shared buffer state, credential management — so individual nodes can focus on their specific job.
Key things the runtime handles:
@@ -42,7 +42,7 @@ Key things the runtime handles:
**Event streaming** — The runtime emits events as the agent works. You can wire these up to dashboards, logs, or alerting systems to monitor agents in real time.
**Crash recovery** — If execution is interrupted (process crash, deployment, anything), the runtime can resume from the last checkpoint. Conversation state and memory are persisted, so the agent picks up where it left off rather than starting over.
**Crash recovery** — If execution is interrupted (process crash, deployment, anything), the runtime can resume from the last checkpoint. Conversation state and buffer state are persisted, so the agent picks up where it left off rather than starting over.
## The Big Picture
+6 -6
View File
@@ -268,7 +268,7 @@ Default skills differ from community skills in how they integrate:
| Aspect | Default Skills | Community Skills |
| ------------ | ---------------------------------------------- | ----------------------------------------------------- |
| Loaded by | Framework automatically | Agent decides at runtime (or pre-activated in config) |
| Integration | System prompt injection + shared memory hooks | Instruction-following (standard Agent Skills) |
| Integration | System prompt injection + shared buffer hooks | Instruction-following (standard Agent Skills) |
| Graph impact | No dedicated nodes — woven into existing nodes | None (just context) |
| Overridable | Yes (disable, configure, or replace) | N/A |
@@ -294,7 +294,7 @@ Six default skills ship with Hive:
```markdown
## Operational Protocol: Structured Note-Taking
Maintain structured working notes in shared memory key `_working_notes`.
Maintain structured working notes in shared buffer key `_working_notes`.
Update at these checkpoints:
- After completing each discrete subtask or batch item
@@ -503,7 +503,7 @@ All default skill protocols combined must total under **2000 tokens** to minimiz
### 5.6 Shared Memory Convention
All default skill shared memory keys use the `_` prefix (`_working_notes`, `_batch_ledger`, etc.) to avoid collisions with domain-level keys. These keys are:
All default skill shared buffer keys use the `_` prefix (`_working_notes`, `_batch_ledger`, etc.) to avoid collisions with domain-level keys. These keys are:
- Visible to the agent (for self-reference)
- Visible to the judge (for evaluation context)
@@ -651,7 +651,7 @@ CI runs these evals on submitted skills to validate quality.
| DS-2 | Default skills are valid Agent Skills packages (`SKILL.md` format) in the framework install directory | P0 |
| DS-3 | All default skills loaded automatically for every worker agent unless explicitly disabled | P0 |
| DS-4 | Default skills integrate via system prompt injection — no additional graph nodes | P0 |
| DS-5 | Default skills use `_`-prefixed shared memory keys to avoid domain collisions | P0 |
| DS-5 | Default skills use `_`-prefixed shared buffer keys to avoid domain collisions | P0 |
| DS-6 | Each default skill independently configurable via `default_skills` in agent config | P0 |
| DS-7 | All defaults disableable at once: `{"_all": {"enabled": false}}` | P0 |
| DS-8 | Default skill protocols appended in a `## Operational Protocols` system prompt section | P0 |
@@ -853,7 +853,7 @@ Skills and MCP servers are complementary:
| "Install and use your first skill" | Users | From `hive skill search` to skill activating in a session |
| "Write your first skill" | Contributors | Step-by-step: `hive skill init` → write SKILL.md → validate → submit PR |
| "Port a skill from Claude Code/Cursor" | Contributors | Usually just install it — guide explains verification |
| "Default skills reference" | All users | All 6 defaults: purpose, config, shared memory keys, tuning |
| "Default skills reference" | All users | All 6 defaults: purpose, config, shared buffer keys, tuning |
| "Tuning default skills" | Advanced builders | When to disable vs. configure; per-agent overrides; measuring impact |
| Skill cookbook | Contributors | Annotated examples: research, triage, draft, review, outreach, data extraction |
| "Evaluating skill quality" | Contributors | Setting up evals, writing assertions, iterating with the eval-driven loop |
@@ -865,7 +865,7 @@ Skills and MCP servers are complementary:
| Phase | Scope | Depends On |
| --------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------- |
| **Phase 0: Default Skills** | Implement 6 default skills as `SKILL.md` packages; `DefaultSkillManager` with system prompt injection, iteration callbacks, node completion hooks, phase transition hooks; `DefaultSkillConfig` in Python API and `agent.json`; `_`-prefixed shared memory convention; startup logging | — |
| **Phase 0: Default Skills** | Implement 6 default skills as `SKILL.md` packages; `DefaultSkillManager` with system prompt injection, iteration callbacks, node completion hooks, phase transition hooks; `DefaultSkillConfig` in Python API and `agent.json`; `_`-prefixed shared buffer convention; startup logging | — |
| **Phase 1: Agent Skills Standard** | `SkillDiscovery` scanning `.agents/skills/` and `.hive/skills/`; `SKILL.md` parsing with lenient validation; progressive disclosure (catalog injection, activation, resource loading); model-driven and user-driven activation; context protection; deduplication; pre-activated skills config; compatibility tests against `github.com/anthropics/skills` | — |
| **Phase 2: CLI & Contributor Tooling** | `hive skill init`, `validate`, `test`, `fork`; `hive skill doctor`; `hive skill install/remove/list/search/info/update`; version pinning; `skills-ref` integration for validation | Phase 1 |
| **Phase 3: Registry Repo** | Create `hive-skill-registry` GitHub repo; CI validation using `skills-ref`; `_template/`; `CONTRIBUTING.md`; seed with 10+ skills (extracted from templates + ported from anthropics/skills); eval CI | Phase 1 |
+2 -2
View File
@@ -249,7 +249,7 @@ Hive ships with six built-in operational skills that provide runtime resilience.
| Skill | Purpose |
|-------|---------|
| `hive.note-taking` | Structured working notes in shared memory |
| `hive.note-taking` | Structured working notes in the shared buffer |
| `hive.batch-ledger` | Track per-item status in batch operations |
| `hive.context-preservation` | Save context before context window pruning |
| `hive.quality-monitor` | Self-assess output quality periodically |
@@ -287,4 +287,4 @@ Skills written for any Agent Skills-compatible agent work in Hive:
- The `SKILL.md` format is identical across Claude Code, Cursor, Gemini CLI, and others.
- Skills installed at `~/.agents/skills/` are visible to all compatible agents on your machine.
See the [Agent Skills specification](https://agentskills.io/specification) for the full format reference.
See the [Agent Skills specification](https://agentskills.io/specification) for the full format reference.
+26
View File
@@ -0,0 +1,26 @@
### Objective — restate the goal
Parse campaign configuration and load existing contacted users from Google Sheets to prevent duplicates.
### Current Plan — numbered steps, mark completed with ✓
1. Parse task input into `campaign_config` ✓
2. Validate required fields in `campaign_config` ✓
3. Read existing outreach data from Google Sheets
4. Build deduplication set of usernames
5. Output `campaign_config` and `contacted_set`
### Key Decisions — decisions made and WHY
- Default `dm_cap` set to 25 as per instructions.
- Default `min_successful_dms` set to 5.
- Qualification rule hardcoded as "recent activity in target subreddit".
- Normalized usernames to lowercase and removed 'u/' prefix for robust deduplication.
### Working Data — intermediate results, extracted values
- repo_url: "https://github.com/acho-io/reddit-star-growth" (assuming from context/test)
- subreddits: ["python", "opensource"] (assuming from context/test)
- spreadsheet_id: "1_234567890abcdefghijklmnopqrstuvwxyz" (placeholder)
### Open Questions — uncertainties to verify
- Need the specific spreadsheet_id and worksheet name from the task context.
### Blockers — anything preventing progress
- Waiting for specific task parameters to finalize parsing.