Compare commits

...

5 Commits

Author SHA1 Message Date
Timothy cacae9f290 fix: compaction logics 2026-03-03 13:33:01 -08:00
Timothy b59094d35f fix: queen should not return on empty stream 2026-03-03 09:44:15 -08:00
Timothy 97cbcf7658 fix: adapt path guarantee 2026-03-03 08:11:49 -08:00
Timothy 4b4abb47b0 Merge branch 'feature/queen-worker-comm' into fix/queen-recovery 2026-03-03 08:02:59 -08:00
Timothy bc9a43d5a9 fix: execution recovery 2026-03-03 07:43:05 -08:00
6 changed files with 927 additions and 282 deletions
+180 -37
View File
@@ -152,6 +152,74 @@ def _compact_tool_calls(tool_calls: list[dict[str, Any]]) -> list[dict[str, Any]
return compact
def extract_tool_call_history(messages: list[Message], max_entries: int = 30) -> str:
"""Build a compact tool call history from a list of messages.
Used in compaction summaries to prevent the LLM from re-calling
tools it already called. Extracts tool call details, files saved,
outputs set, and errors encountered.
"""
tool_calls_detail: dict[str, list[str]] = {}
files_saved: list[str] = []
outputs_set: list[str] = []
errors: list[str] = []
def _summarize_input(name: str, args: dict) -> str:
if name == "web_search":
return args.get("query", "")
if name == "web_scrape":
return args.get("url", "")
if name in ("load_data", "save_data"):
return args.get("filename", "")
return ""
for msg in messages:
if msg.role == "assistant" and msg.tool_calls:
for tc in msg.tool_calls:
func = tc.get("function", {})
name = func.get("name", "unknown")
try:
args = json.loads(func.get("arguments", "{}"))
except (json.JSONDecodeError, TypeError):
args = {}
summary = _summarize_input(name, args)
tool_calls_detail.setdefault(name, []).append(summary)
if name == "save_data" and args.get("filename"):
files_saved.append(args["filename"])
if name == "set_output" and args.get("key"):
outputs_set.append(args["key"])
if msg.role == "tool" and msg.is_error:
preview = msg.content[:120].replace("\n", " ")
errors.append(preview)
parts: list[str] = []
if tool_calls_detail:
lines: list[str] = []
for name, inputs in list(tool_calls_detail.items())[:max_entries]:
count = len(inputs)
non_empty = [s for s in inputs if s]
if non_empty:
detail_lines = [f" - {s[:120]}" for s in non_empty[:8]]
lines.append(f" {name} ({count}x):\n" + "\n".join(detail_lines))
else:
lines.append(f" {name} ({count}x)")
parts.append("TOOLS ALREADY CALLED:\n" + "\n".join(lines))
if files_saved:
unique = list(dict.fromkeys(files_saved))
parts.append("FILES SAVED: " + ", ".join(unique))
if outputs_set:
unique = list(dict.fromkeys(outputs_set))
parts.append("OUTPUTS SET: " + ", ".join(unique))
if errors:
parts.append(
"ERRORS (do NOT retry these):\n" + "\n".join(f" - {e}" for e in errors[:10])
)
return "\n\n".join(parts)
# ---------------------------------------------------------------------------
# ConversationStore protocol (Phase 2)
# ---------------------------------------------------------------------------
@@ -653,6 +721,7 @@ class NodeConversation:
spillover_dir: str,
keep_recent: int = 4,
phase_graduated: bool = False,
aggressive: bool = False,
) -> None:
"""Structure-preserving compaction: save freeform text to file, keep tool messages.
@@ -662,6 +731,11 @@ class NodeConversation:
after pruning. Only freeform text exchanges (user messages,
text-only assistant messages) are saved to a file and removed.
When *aggressive* is True, non-essential tool call pairs are also
collapsed into a compact summary instead of being kept individually.
Only ``set_output`` calls and error results are preserved; all other
old tool pairs are replaced by a tool-call history summary.
The result: the agent retains exact knowledge of what tools it called,
where each result is stored, and can load the conversation text if
needed. No LLM summary call. No heuristics. Nothing lost.
@@ -693,35 +767,92 @@ class NodeConversation:
# Classify old messages: structural (keep) vs freeform (save to file)
kept_structural: list[Message] = []
freeform_lines: list[str] = []
collapsed_msgs: list[Message] = []
for msg in old_messages:
if msg.role == "tool":
# Tool results — already pruned to ~30 tokens (file reference).
# Keep in conversation.
kept_structural.append(msg)
elif msg.role == "assistant" and msg.tool_calls:
# Assistant message with tool_calls — keep the tool_calls
# with truncated arguments, clear the freeform text content.
compact_tcs = _compact_tool_calls(msg.tool_calls)
kept_structural.append(
Message(
seq=msg.seq,
role=msg.role,
content="",
tool_calls=compact_tcs,
is_error=msg.is_error,
phase_id=msg.phase_id,
is_transition_marker=msg.is_transition_marker,
)
if aggressive:
# Aggressive: only keep set_output tool pairs and error results.
# Everything else is collapsed into a tool-call history summary.
# We need to track tool_call IDs to pair assistant messages with
# their tool results.
protected_tc_ids: set[str] = set()
collapsible_tc_ids: set[str] = set()
# First pass: classify assistant messages
for msg in old_messages:
if msg.role != "assistant" or not msg.tool_calls:
continue
has_protected = any(
tc.get("function", {}).get("name") == "set_output"
for tc in msg.tool_calls
)
else:
# Freeform text (user messages, text-only assistant messages)
# — save to file and remove from conversation.
role_label = msg.role
text = msg.content
if len(text) > 2000:
text = text[:2000] + ""
freeform_lines.append(f"[{role_label}] (seq={msg.seq}): {text}")
tc_ids = {tc.get("id", "") for tc in msg.tool_calls}
if has_protected:
protected_tc_ids |= tc_ids
else:
collapsible_tc_ids |= tc_ids
# Second pass: classify all messages
for msg in old_messages:
if msg.role == "tool":
tc_id = msg.tool_use_id or ""
if tc_id in protected_tc_ids:
kept_structural.append(msg)
elif msg.is_error:
# Error results are always protected
kept_structural.append(msg)
# Protect the parent assistant message too
protected_tc_ids.add(tc_id)
else:
collapsed_msgs.append(msg)
elif msg.role == "assistant" and msg.tool_calls:
tc_ids = {tc.get("id", "") for tc in msg.tool_calls}
if tc_ids & protected_tc_ids:
# Has at least one protected tool call — keep entire msg
compact_tcs = _compact_tool_calls(msg.tool_calls)
kept_structural.append(
Message(
seq=msg.seq,
role=msg.role,
content="",
tool_calls=compact_tcs,
is_error=msg.is_error,
phase_id=msg.phase_id,
is_transition_marker=msg.is_transition_marker,
)
)
else:
collapsed_msgs.append(msg)
else:
# Freeform text — save to file
role_label = msg.role
text = msg.content
if len(text) > 2000:
text = text[:2000] + ""
freeform_lines.append(f"[{role_label}] (seq={msg.seq}): {text}")
else:
# Standard mode: keep all tool call pairs as structural
for msg in old_messages:
if msg.role == "tool":
kept_structural.append(msg)
elif msg.role == "assistant" and msg.tool_calls:
compact_tcs = _compact_tool_calls(msg.tool_calls)
kept_structural.append(
Message(
seq=msg.seq,
role=msg.role,
content="",
tool_calls=compact_tcs,
is_error=msg.is_error,
phase_id=msg.phase_id,
is_transition_marker=msg.is_transition_marker,
)
)
else:
role_label = msg.role
text = msg.content
if len(text) > 2000:
text = text[:2000] + ""
freeform_lines.append(f"[{role_label}] (seq={msg.seq}): {text}")
# Write freeform text to a numbered conversation file
spill_path = Path(spillover_dir)
@@ -741,13 +872,25 @@ class NodeConversation:
conv_filename = ""
# Build reference message
ref_parts: list[str] = []
if conv_filename:
ref_content = (
ref_parts.append(
f"[Previous conversation saved to '{conv_filename}'. "
f"Use load_data('{conv_filename}') to review if needed.]"
)
else:
ref_content = "[Previous freeform messages compacted.]"
elif not collapsed_msgs:
ref_parts.append("[Previous freeform messages compacted.]")
# Aggressive: add collapsed tool-call history to the reference
if collapsed_msgs:
tool_history = extract_tool_call_history(collapsed_msgs)
if tool_history:
ref_parts.append(tool_history)
elif not ref_parts:
ref_parts.append("[Previous tool calls compacted.]")
ref_content = "\n\n".join(ref_parts)
# Use a seq just before the first kept message
recent_messages = list(self._messages[split:])
if kept_structural:
@@ -760,15 +903,15 @@ class NodeConversation:
ref_msg = Message(seq=ref_seq, role="user", content=ref_content)
# Persist: delete old messages from store, write reference + kept structural
# Persist: delete old messages from store, write reference + kept structural.
# In aggressive mode, collapsed messages may be interspersed with kept
# messages, so we delete everything before the recent boundary and
# rewrite only what we want to keep.
if self._store:
first_kept_seq = (
kept_structural[0].seq
if kept_structural
else (recent_messages[0].seq if recent_messages else self._next_seq)
recent_boundary = (
recent_messages[0].seq if recent_messages else self._next_seq
)
# Delete everything before the first structural message we're keeping
await self._store.delete_parts_before(first_kept_seq)
await self._store.delete_parts_before(recent_boundary)
# Write the reference message
await self._store.write_part(ref_msg.seq, ref_msg.to_storage_dict())
# Write kept structural messages (they may have been modified)
+330 -219
View File
@@ -36,6 +36,23 @@ from framework.runtime.llm_debug_logger import log_llm_turn
logger = logging.getLogger(__name__)
# Pattern for detecting context-window-exceeded errors across LLM providers.
_CONTEXT_TOO_LARGE_RE = re.compile(
r"context.{0,20}(length|window|limit|size)|"
r"too.{0,10}(long|large|many.{0,10}tokens)|"
r"(exceed|exceeds|exceeded).{0,30}(limit|window|context|tokens)|"
r"maximum.{0,20}token|prompt.{0,20}too.{0,10}long",
re.IGNORECASE,
)
def _is_context_too_large_error(exc: BaseException) -> bool:
"""Detect whether an exception indicates the LLM input was too large."""
cls = type(exc).__name__
if "ContextWindow" in cls:
return True
return bool(_CONTEXT_TOO_LARGE_RE.search(str(exc)))
# ---------------------------------------------------------------------------
# Escalation receiver (temporary routing target for subagent → user input)
@@ -553,7 +570,7 @@ class EventLoopNode(NodeProtocol):
# 6d. Pre-turn compaction check (tiered)
if conversation.needs_compaction():
await self._compact_tiered(ctx, conversation, accumulator)
await self._compact(ctx, conversation, accumulator)
# 6e. Run single LLM turn (with transient error retry)
logger.info(
@@ -753,7 +770,7 @@ class EventLoopNode(NodeProtocol):
# 6e''. Post-turn compaction check (catches tool-result bloat)
if conversation.needs_compaction():
await self._compact_tiered(ctx, conversation, accumulator)
await self._compact(ctx, conversation, accumulator)
# Reset auto-block grace streak when real work happens
if real_tool_results or outputs_set:
@@ -774,7 +791,13 @@ class EventLoopNode(NodeProtocol):
missing = self._get_missing_output_keys(
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
)
if not missing:
# Only accept on empty response if the node actually has
# output_keys that are all satisfied. Nodes with NO
# output_keys (e.g. the forever-alive queen) should never
# be terminated by a ghost empty stream — "missing" is
# trivially empty when there are no required outputs.
has_real_outputs = bool(ctx.node_spec.output_keys)
if not missing and has_real_outputs:
logger.info(
"[%s] iter=%d: empty response but all outputs set — accepting",
node_id,
@@ -791,7 +814,7 @@ class EventLoopNode(NodeProtocol):
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
else:
elif missing:
# Ghost empty stream: LLM returned nothing and outputs
# are still missing. The conversation hasn't changed, so
# repeating the same call will produce the same empty
@@ -840,6 +863,37 @@ class EventLoopNode(NodeProtocol):
"progress.]"
)
continue
else:
# No output_keys and empty response — forever-alive node
# got a ghost empty stream. Nudge like the missing-outputs
# path but without failing (no outputs to demand).
_consecutive_empty_turns += 1
logger.warning(
"[%s] iter=%d: empty response on node with no output_keys "
"(consecutive=%d)",
node_id,
iteration,
_consecutive_empty_turns,
)
if _consecutive_empty_turns >= self._config.stall_detection_threshold:
# Persistent ghost — but since this is a forever-alive
# node, block for user input instead of crashing.
logger.warning(
"[%s] iter=%d: %d consecutive empty responses, "
"blocking for user input",
node_id,
iteration,
_consecutive_empty_turns,
)
await self._await_user_input(ctx, prompt="")
_consecutive_empty_turns = 0
else:
await conversation.add_user_message(
"[System: Your response was empty. Review the "
"conversation and respond to the user or take "
"action with your tools.]"
)
continue
else:
_consecutive_empty_turns = 0
@@ -1519,7 +1573,7 @@ class EventLoopNode(NodeProtocol):
"Pre-send guard: context at %.0f%% of budget, compacting",
conversation.usage_ratio() * 100,
)
await self._compact_tiered(ctx, conversation, accumulator)
await self._compact(ctx, conversation, accumulator)
messages = conversation.to_llm_messages()
@@ -2406,78 +2460,11 @@ class EventLoopNode(NodeProtocol):
) -> str:
"""Build a compact tool call history from the conversation.
Used in compaction summaries to prevent the LLM from re-calling
tools it already called. Extracts:
- Tool call details: name, count, and *inputs* for key tools
(search queries, scrape URLs, loaded filenames)
- Files saved via save_data
- Outputs set via set_output
- Errors encountered
Delegates to :func:`extract_tool_call_history` in conversation.py.
"""
# Per-tool: list of input summaries (one per call)
tool_calls_detail: dict[str, list[str]] = {}
files_saved: list[str] = []
outputs_set: list[str] = []
errors: list[str] = []
from framework.graph.conversation import extract_tool_call_history
# Tool-specific input extractors: return a short summary string
def _summarize_input(name: str, args: dict) -> str:
if name == "web_search":
return args.get("query", "")
if name == "web_scrape":
return args.get("url", "")
if name == "load_data":
return args.get("filename", "")
if name == "save_data":
return args.get("filename", "")
return ""
for msg in conversation.messages:
if msg.role == "assistant" and msg.tool_calls:
for tc in msg.tool_calls:
func = tc.get("function", {})
name = func.get("name", "unknown")
try:
args = json.loads(func.get("arguments", "{}"))
except (json.JSONDecodeError, TypeError):
args = {}
summary = _summarize_input(name, args)
tool_calls_detail.setdefault(name, []).append(summary)
if name == "save_data" and args.get("filename"):
files_saved.append(args["filename"])
if name == "set_output" and args.get("key"):
outputs_set.append(args["key"])
if msg.role == "tool" and msg.is_error:
preview = msg.content[:120].replace("\n", " ")
errors.append(preview)
parts: list[str] = []
if tool_calls_detail:
lines: list[str] = []
for name, inputs in list(tool_calls_detail.items())[:max_entries]:
count = len(inputs)
# Include input details for tools where inputs matter
non_empty = [s for s in inputs if s]
if non_empty:
detail_lines = [f" - {s[:120]}" for s in non_empty[:8]]
lines.append(f" {name} ({count}x):\n" + "\n".join(detail_lines))
else:
lines.append(f" {name} ({count}x)")
parts.append("TOOLS ALREADY CALLED:\n" + "\n".join(lines))
if files_saved:
unique = list(dict.fromkeys(files_saved))
parts.append("FILES SAVED: " + ", ".join(unique))
if outputs_set:
unique = list(dict.fromkeys(outputs_set))
parts.append("OUTPUTS SET: " + ", ".join(unique))
if errors:
parts.append(
"ERRORS (do NOT retry these):\n" + "\n".join(f" - {e}" for e in errors[:10])
)
return "\n\n".join(parts)
return extract_tool_call_history(conversation.messages, max_entries=max_entries)
def _build_initial_message(self, ctx: NodeContext) -> str:
"""Build the initial user message from input data and memory.
@@ -2648,6 +2635,7 @@ class EventLoopNode(NodeProtocol):
return
try:
adapt_path = Path(self._config.spillover_dir) / "adapt.md"
adapt_path.parent.mkdir(parents=True, exist_ok=True)
content = adapt_path.read_text(encoding="utf-8") if adapt_path.exists() else ""
if "## Outputs" not in content:
@@ -2825,131 +2813,304 @@ class EventLoopNode(NodeProtocol):
return result
async def _compact_tiered(
# --- Compaction -----------------------------------------------------------
# Threshold above which LLM compaction is invoked (structural handles 80-95%).
_LLM_COMPACT_THRESHOLD = 0.95
# Max chars of formatted messages before proactively splitting for LLM.
_LLM_COMPACT_CHAR_LIMIT = 240_000
# Max recursion depth for binary-search splitting.
_LLM_COMPACT_MAX_DEPTH = 10
async def _compact(
self,
ctx: NodeContext,
conversation: NodeConversation,
accumulator: OutputAccumulator | None = None,
) -> None:
"""Run compaction with aggressiveness scaled to usage level.
"""Compact conversation history to stay within token budget.
| Usage | Strategy |
|----------------|---------------------------------------------|
| 80-100% | Normal: LLM summary, keep 4 recent messages |
| 100-120% | Aggressive: LLM summary, keep 2 recent |
| >= 120% | Emergency: static summary, keep 1 recent |
1. Prune old tool results (always, free).
2. Structure-preserving compaction at >=80% (standard, then aggressive).
3. LLM compaction at >95% with recursive binary-search splitting.
4. Emergency deterministic summary only if LLM failed or unavailable.
"""
ratio = conversation.usage_ratio()
ratio_before = conversation.usage_ratio()
phase_grad = getattr(ctx, "continuous_mode", False)
# --- Tier 0: Prune old tool results (zero-cost, no LLM call) ---
# --- Step 1: Prune old tool results (free, no LLM) ---
protect = max(2000, self._config.max_history_tokens // 12)
pruned = await conversation.prune_old_tool_results(
protect_tokens=protect,
min_prune_tokens=max(1000, protect // 3),
)
if pruned > 0:
new_ratio = conversation.usage_ratio()
logger.info(
"Pruned %d old tool results: %.0f%% -> %.0f%%",
pruned,
ratio * 100,
new_ratio * 100,
ratio_before * 100,
conversation.usage_ratio() * 100,
)
if not conversation.needs_compaction():
# Pruning freed enough — skip full compaction entirely
prune_before = round(ratio * 100)
prune_after = round(new_ratio * 100)
if ctx.runtime_logger:
ctx.runtime_logger.log_step(
node_id=ctx.node_id,
node_type="event_loop",
step_index=-1,
llm_text=f"Context pruned (tool results): "
f"{prune_before}% \u2192 {prune_after}%",
verdict="COMPACTION",
verdict_feedback=f"level=prune_only "
f"before={prune_before}% after={prune_after}%",
)
if self._event_bus:
from framework.runtime.event_bus import AgentEvent, EventType
await self._event_bus.publish(
AgentEvent(
type=EventType.CONTEXT_COMPACTED,
stream_id=ctx.stream_id or ctx.node_id,
node_id=ctx.node_id,
data={
"level": "prune_only",
"usage_before": prune_before,
"usage_after": prune_after,
},
)
)
return
ratio = new_ratio
_phase_grad = getattr(ctx, "continuous_mode", False)
if ratio >= 1.2:
level = "emergency"
keep = 1
logger.warning("Emergency compaction triggered (usage %.0f%%)", ratio * 100)
elif ratio >= 1.0:
level = "aggressive"
keep = 2
logger.info("Aggressive compaction triggered (usage %.0f%%)", ratio * 100)
else:
level = "normal"
keep = 4
if not conversation.needs_compaction():
await self._log_compaction(ctx, conversation, ratio_before)
return
# --- Step 2: Structure-preserving compaction (>=80%) ---
spill_dir = self._config.spillover_dir
if spill_dir:
# Structure-preserving: save freeform text to file, keep tool messages
pre_structural = conversation.usage_ratio()
await conversation.compact_preserving_structure(
spillover_dir=spill_dir,
keep_recent=keep,
phase_graduated=_phase_grad,
keep_recent=4,
phase_graduated=phase_grad,
)
# Circuit breaker: if structure-preserving compaction barely helped
# (still over budget), fall back to destructive compact() which
# replaces everything with a summary.
mid_ratio = conversation.usage_ratio()
if mid_ratio >= 0.9 * ratio:
logger.warning(
"Structure-preserving compaction ineffective "
"(%.0f%% -> %.0f%%), falling back to summary compaction",
ratio * 100,
mid_ratio * 100,
if conversation.usage_ratio() >= 0.9 * pre_structural:
logger.info(
"Standard structural compaction ineffective "
"(%.0f%% -> %.0f%%), trying aggressive",
pre_structural * 100,
conversation.usage_ratio() * 100,
)
summary = self._build_emergency_summary(ctx, accumulator, conversation)
await conversation.compact(summary, keep_recent=keep, phase_graduated=_phase_grad)
else:
# Fallback: LLM-based summary (no spillover dir available)
if level == "emergency":
summary = self._build_emergency_summary(ctx, accumulator, conversation)
else:
summary = await self._generate_compaction_summary(ctx, conversation)
await conversation.compact(summary, keep_recent=keep, phase_graduated=_phase_grad)
await conversation.compact_preserving_structure(
spillover_dir=spill_dir,
keep_recent=4,
phase_graduated=phase_grad,
aggressive=True,
)
if not conversation.needs_compaction():
await self._log_compaction(ctx, conversation, ratio_before)
return
new_ratio = conversation.usage_ratio()
logger.info(
"Compaction complete (%s): %.0f%% -> %.0f%%",
level,
ratio * 100,
new_ratio * 100,
# --- Step 3: LLM compaction at >95% (recursive binary-search) ---
if (
conversation.usage_ratio() > self._LLM_COMPACT_THRESHOLD
and ctx.llm is not None
):
logger.info(
"LLM compaction triggered (%.0f%% usage)",
conversation.usage_ratio() * 100,
)
try:
summary = await self._llm_compact(
ctx, list(conversation.messages), accumulator,
)
await conversation.compact(
summary,
keep_recent=2,
phase_graduated=phase_grad,
)
except Exception as e:
logger.warning("LLM compaction failed: %s", e)
if not conversation.needs_compaction():
await self._log_compaction(ctx, conversation, ratio_before)
return
# --- Step 4: Emergency deterministic summary (LLM failed/unavailable) ---
logger.warning(
"Emergency compaction (%.0f%% usage)",
conversation.usage_ratio() * 100,
)
summary = self._build_emergency_summary(ctx, accumulator, conversation)
await conversation.compact(
summary, keep_recent=1, phase_graduated=phase_grad,
)
await self._log_compaction(ctx, conversation, ratio_before)
# --- LLM compaction with binary-search splitting ----------------------
async def _llm_compact(
self,
ctx: NodeContext,
messages: list,
accumulator: OutputAccumulator | None = None,
_depth: int = 0,
) -> str:
"""Summarise *messages* with LLM, splitting recursively if too large.
If the formatted text exceeds ``_LLM_COMPACT_CHAR_LIMIT`` or the LLM
rejects the call with a context-length error, the messages are split
in half and each half is summarised independently. Tool history is
appended once at the top-level call (``_depth == 0``).
"""
from framework.graph.conversation import extract_tool_call_history
if _depth > self._LLM_COMPACT_MAX_DEPTH:
raise RuntimeError(
f"LLM compaction recursion limit ({self._LLM_COMPACT_MAX_DEPTH})"
)
formatted = self._format_messages_for_summary(messages)
# Proactive split: avoid wasting an API call on oversized input
if len(formatted) > self._LLM_COMPACT_CHAR_LIMIT and len(messages) > 1:
summary = await self._llm_compact_split(
ctx, messages, accumulator, _depth,
)
else:
prompt = self._build_llm_compaction_prompt(
ctx, accumulator, formatted,
)
summary_budget = max(1024, self._config.max_history_tokens // 2)
try:
response = await ctx.llm.acomplete(
messages=[{"role": "user", "content": prompt}],
system=(
"You are a conversation compactor for an AI agent. "
"Write a detailed summary that allows the agent to "
"continue its work. Preserve user-stated rules, "
"constraints, and account/identity preferences verbatim."
),
max_tokens=summary_budget,
)
summary = response.content
except Exception as e:
if _is_context_too_large_error(e) and len(messages) > 1:
logger.info(
"LLM context too large (depth=%d, msgs=%d) — splitting",
_depth,
len(messages),
)
summary = await self._llm_compact_split(
ctx, messages, accumulator, _depth,
)
else:
raise
# Append tool history at top level only
if _depth == 0:
tool_history = extract_tool_call_history(messages)
if tool_history and "TOOLS ALREADY CALLED" not in summary:
summary += "\n\n" + tool_history
return summary
async def _llm_compact_split(
self,
ctx: NodeContext,
messages: list,
accumulator: OutputAccumulator | None,
_depth: int,
) -> str:
"""Split messages in half and summarise each half independently."""
mid = max(1, len(messages) // 2)
s1 = await self._llm_compact(ctx, messages[:mid], None, _depth + 1)
s2 = await self._llm_compact(
ctx, messages[mid:], accumulator, _depth + 1,
)
return s1 + "\n\n" + s2
# --- Compaction helpers ------------------------------------------------
@staticmethod
def _format_messages_for_summary(messages: list) -> str:
"""Format messages as text for LLM summarisation."""
lines: list[str] = []
for m in messages:
if m.role == "tool":
content = m.content[:500]
if len(m.content) > 500:
content += "..."
lines.append(f"[tool result]: {content}")
elif m.role == "assistant" and m.tool_calls:
names = [
tc.get("function", {}).get("name", "?")
for tc in m.tool_calls
]
text = m.content[:200] if m.content else ""
lines.append(f"[assistant (calls: {', '.join(names)})]: {text}")
else:
lines.append(f"[{m.role}]: {m.content}")
return "\n\n".join(lines)
def _build_llm_compaction_prompt(
self,
ctx: NodeContext,
accumulator: OutputAccumulator | None,
formatted_messages: str,
) -> str:
"""Build prompt for LLM compaction targeting 50% of token budget."""
spec = ctx.node_spec
ctx_lines = [f"NODE: {spec.name} (id={spec.id})"]
if spec.description:
ctx_lines.append(f"PURPOSE: {spec.description}")
if spec.success_criteria:
ctx_lines.append(f"SUCCESS CRITERIA: {spec.success_criteria}")
if accumulator:
acc = accumulator.to_dict()
done = {k: v for k, v in acc.items() if v is not None}
todo = [k for k, v in acc.items() if v is None]
if done:
ctx_lines.append(
"OUTPUTS ALREADY SET:\n"
+ "\n".join(
f" {k}: {str(v)[:150]}" for k, v in done.items()
)
)
if todo:
ctx_lines.append(f"OUTPUTS STILL NEEDED: {', '.join(todo)}")
elif spec.output_keys:
ctx_lines.append(
f"OUTPUTS STILL NEEDED: {', '.join(spec.output_keys)}"
)
target_tokens = self._config.max_history_tokens // 2
target_chars = target_tokens * 4
node_ctx = "\n".join(ctx_lines)
return (
"You are compacting an AI agent's conversation history. "
"The agent is still working and needs to continue.\n\n"
f"AGENT CONTEXT:\n{node_ctx}\n\n"
f"CONVERSATION MESSAGES:\n{formatted_messages}\n\n"
"INSTRUCTIONS:\n"
f"Write a summary of approximately {target_chars} characters "
f"(~{target_tokens} tokens).\n"
"1. Preserve ALL user-stated rules, constraints, and preferences "
"verbatim.\n"
"2. Preserve key decisions made and results obtained.\n"
"3. Preserve in-progress work state so the agent can continue.\n"
"4. Be detailed enough that the agent can resume without "
"re-doing work.\n"
)
async def _log_compaction(
self,
ctx: NodeContext,
conversation: NodeConversation,
ratio_before: float,
) -> None:
"""Log compaction result to runtime logger and event bus."""
ratio_after = conversation.usage_ratio()
before_pct = round(ratio_before * 100)
after_pct = round(ratio_after * 100)
# Determine label from what happened
if after_pct >= before_pct - 1:
level = "prune_only"
elif ratio_after <= 0.6:
level = "llm"
else:
level = "structural"
logger.info(
"Compaction complete (%s): %d%% -> %d%%",
level,
before_pct,
after_pct,
)
# Log compaction to session logs (tool_logs.jsonl)
before_pct = round(ratio * 100)
after_pct = round(new_ratio * 100)
if ctx.runtime_logger:
ctx.runtime_logger.log_step(
node_id=ctx.node_id,
node_type="event_loop",
step_index=-1, # Not a regular LLM step
llm_text=f"Context compacted ({level}): {before_pct}% \u2192 {after_pct}%",
step_index=-1,
llm_text=f"Context compacted ({level}): "
f"{before_pct}% \u2192 {after_pct}%",
verdict="COMPACTION",
verdict_feedback=f"level={level} before={before_pct}% after={after_pct}%",
verdict_feedback=f"level={level} "
f"before={before_pct}% after={after_pct}%",
)
if self._event_bus:
@@ -2962,62 +3123,12 @@ class EventLoopNode(NodeProtocol):
node_id=ctx.node_id,
data={
"level": level,
"usage_before": round(ratio * 100),
"usage_after": round(new_ratio * 100),
"usage_before": before_pct,
"usage_after": after_pct,
},
)
)
async def _generate_compaction_summary(
self,
ctx: NodeContext,
conversation: NodeConversation,
) -> str:
"""Use LLM to generate a conversation summary for compaction."""
tool_history = self._extract_tool_call_history(conversation)
messages_text = "\n".join(
f"[{m.role}]: {m.content[:200]}" for m in conversation.messages[-10:]
)
prompt = (
"Summarize this conversation so far in 2-3 sentences, "
"preserving key decisions and results.\n\n"
"IMPORTANT: Always preserve any user-stated rules, constraints, "
"or preferences — especially which account/identity to use, "
"formatting preferences, and behavioral instructions. "
"These MUST appear verbatim or near-verbatim in your summary.\n\n"
f"{messages_text}"
)
if tool_history:
prompt += (
"\n\nINCLUDE this tool history verbatim in your summary "
"(the agent needs it to avoid re-calling tools):\n\n"
f"{tool_history}"
)
# Dynamic budget: reasoning models (o1, gpt-5-mini) spend max_tokens on
# internal thinking. 500 leaves nothing for the actual summary.
summary_budget = max(1024, self._config.max_history_tokens // 10)
try:
response = await ctx.llm.acomplete(
messages=[{"role": "user", "content": prompt}],
system=(
"Summarize conversations concisely. Always preserve the tool "
"history section. Always preserve user-stated rules, constraints, "
"and account/identity preferences verbatim."
),
max_tokens=summary_budget,
)
summary = response.content
# Ensure tool history is present even if LLM dropped it
if tool_history and "TOOLS ALREADY CALLED" not in summary:
summary += "\n\n" + tool_history
return summary
except Exception as e:
logger.warning(f"Compaction summary generation failed: {e}")
if tool_history:
return f"Previous conversation context (summary unavailable).\n\n{tool_history}"
return "Previous conversation context (summary unavailable)."
def _build_emergency_summary(
self,
+154 -22
View File
@@ -286,6 +286,114 @@ class GraphExecutor:
return errors
# Max chars of formatted messages before proactively splitting for LLM.
_PHASE_LLM_CHAR_LIMIT = 240_000
_PHASE_LLM_MAX_DEPTH = 10
async def _phase_llm_compact(
self,
conversation: Any,
next_spec: NodeSpec,
messages: list,
_depth: int = 0,
) -> str:
"""Summarise messages for phase-boundary compaction.
Uses the same recursive binary-search splitting as EventLoopNode.
"""
from framework.graph.conversation import extract_tool_call_history
from framework.graph.event_loop_node import _is_context_too_large_error
if _depth > self._PHASE_LLM_MAX_DEPTH:
raise RuntimeError("Phase LLM compaction recursion limit")
# Format messages
lines: list[str] = []
for m in messages:
if m.role == "tool":
c = m.content[:500] + ("..." if len(m.content) > 500 else "")
lines.append(f"[tool result]: {c}")
elif m.role == "assistant" and m.tool_calls:
names = [
tc.get("function", {}).get("name", "?")
for tc in m.tool_calls
]
lines.append(f"[assistant (calls: {', '.join(names)})]: "
f"{m.content[:200] if m.content else ''}")
else:
lines.append(f"[{m.role}]: {m.content}")
formatted = "\n\n".join(lines)
# Proactive split
if len(formatted) > self._PHASE_LLM_CHAR_LIMIT and len(messages) > 1:
summary = await self._phase_llm_compact_split(
conversation, next_spec, messages, _depth,
)
else:
max_tokens = getattr(conversation, "_max_history_tokens", 32000)
target_tokens = max_tokens // 2
target_chars = target_tokens * 4
prompt = (
"You are compacting an AI agent's conversation history "
"at a phase boundary.\n\n"
f"NEXT PHASE: {next_spec.name}\n"
)
if next_spec.description:
prompt += f"NEXT PHASE PURPOSE: {next_spec.description}\n"
prompt += (
f"\nCONVERSATION MESSAGES:\n{formatted}\n\n"
"INSTRUCTIONS:\n"
f"Write a summary of approximately {target_chars} characters "
f"(~{target_tokens} tokens).\n"
"Preserve user-stated rules, constraints, and preferences "
"verbatim. Preserve key decisions and results from earlier "
"phases. Preserve context needed for the next phase.\n"
)
summary_budget = max(1024, max_tokens // 2)
try:
response = await self._llm.acomplete(
messages=[{"role": "user", "content": prompt}],
system=(
"You are a conversation compactor. Write a detailed "
"summary preserving context for the next phase."
),
max_tokens=summary_budget,
)
summary = response.content
except Exception as e:
if _is_context_too_large_error(e) and len(messages) > 1:
summary = await self._phase_llm_compact_split(
conversation, next_spec, messages, _depth,
)
else:
raise
# Append tool history at top level only
if _depth == 0:
tool_history = extract_tool_call_history(messages)
if tool_history and "TOOLS ALREADY CALLED" not in summary:
summary += "\n\n" + tool_history
return summary
async def _phase_llm_compact_split(
self,
conversation: Any,
next_spec: NodeSpec,
messages: list,
_depth: int,
) -> str:
"""Split messages in half and summarise each half."""
mid = max(1, len(messages) // 2)
s1 = await self._phase_llm_compact(
conversation, next_spec, messages[:mid], _depth + 1,
)
s2 = await self._phase_llm_compact(
conversation, next_spec, messages[mid:], _depth + 1,
)
return s1 + "\n\n" + s2
async def execute(
self,
graph: GraphSpec,
@@ -1291,9 +1399,7 @@ class GraphExecutor:
# Set current phase for phase-aware compaction
continuous_conversation.set_current_phase(next_spec.id)
# Opportunistic compaction at transition:
# 1. Prune old tool results (free, no LLM call)
# 2. If still over 80%, do a phase-graduated compact
# Phase-boundary compaction (same flow as EventLoopNode._compact)
if continuous_conversation.usage_ratio() > 0.5:
await continuous_conversation.prune_old_tool_results(
protect_tokens=2000,
@@ -1305,40 +1411,66 @@ class GraphExecutor:
_phase_ratio * 100,
)
_data_dir = (
str(self._storage_path / "data") if self._storage_path else None
str(self._storage_path / "data")
if self._storage_path
else None
)
# Step 1: Structural compaction (>=80%)
if _data_dir:
_pre = continuous_conversation.usage_ratio()
await continuous_conversation.compact_preserving_structure(
spillover_dir=_data_dir,
keep_recent=4,
phase_graduated=True,
)
# Circuit breaker: if still over budget, fall back
_post_ratio = continuous_conversation.usage_ratio()
if _post_ratio >= 0.9 * _phase_ratio:
self.logger.warning(
" Structure-preserving compaction ineffective "
"(%.0f%% -> %.0f%%), falling back to summary",
_phase_ratio * 100,
_post_ratio * 100,
)
summary = (
f"Summary of earlier phases (before {next_spec.name}). "
"See transition markers for phase details."
)
await continuous_conversation.compact(
summary,
if continuous_conversation.usage_ratio() >= 0.9 * _pre:
await continuous_conversation.compact_preserving_structure(
spillover_dir=_data_dir,
keep_recent=4,
phase_graduated=True,
aggressive=True,
)
else:
# Step 2: LLM compaction (>95%)
if (
continuous_conversation.usage_ratio() > 0.95
and self._llm is not None
):
self.logger.info(
" LLM phase-boundary compaction "
"(%.0f%% usage)",
continuous_conversation.usage_ratio() * 100,
)
try:
_llm_summary = await self._phase_llm_compact(
continuous_conversation,
next_spec,
list(continuous_conversation.messages),
)
await continuous_conversation.compact(
_llm_summary,
keep_recent=2,
phase_graduated=True,
)
except Exception as e:
self.logger.warning(
" Phase LLM compaction failed: %s", e,
)
# Step 3: Emergency (only if still over budget)
if continuous_conversation.needs_compaction():
self.logger.warning(
" Emergency phase compaction (%.0f%%)",
continuous_conversation.usage_ratio() * 100,
)
summary = (
f"Summary of earlier phases (before {next_spec.name}). "
f"Summary of earlier phases "
f"(before {next_spec.name}). "
"See transition markers for phase details."
)
await continuous_conversation.compact(
summary,
keep_recent=4,
keep_recent=1,
phase_graduated=True,
)
+9 -2
View File
@@ -511,9 +511,11 @@ class ExecutionStream:
logger.debug(f"Queued execution {execution_id} for stream {self.stream_id}")
return execution_id
# Errors that indicate a fundamental configuration or environment problem.
# Resurrecting after these is pointless — the same error will recur.
# Errors that indicate resurrection won't help — the same error will recur.
# Includes both configuration/environment errors and deterministic node
# failures where the conversation/state hasn't changed.
_FATAL_ERROR_PATTERNS: tuple[str, ...] = (
# Configuration / environment
"credential",
"authentication",
"unauthorized",
@@ -525,6 +527,11 @@ class ExecutionStream:
"permission denied",
"invalid api",
"configuration error",
# Deterministic node failures — resurrecting at the same node with
# the same conversation produces the same result.
"node stalled",
"ghost empty stream",
"max iterations",
)
@classmethod
+5 -1
View File
@@ -605,7 +605,11 @@ export default function Workspace() {
const result = await sessionsApi.get(existingSessionId);
if (result.loading) continue;
return result as LiveSession;
} catch {
} catch (pollErr) {
// 404 = agent failed to load and was cleaned up — stop immediately
if (pollErr instanceof ApiError && pollErr.status === 404) {
throw new Error("Agent failed to load");
}
if (i === maxAttempts - 1) throw loadErr;
}
}
+249 -1
View File
@@ -2,11 +2,12 @@
from __future__ import annotations
import json
from typing import Any
import pytest
from framework.graph.conversation import Message, NodeConversation
from framework.graph.conversation import Message, NodeConversation, extract_tool_call_history
from framework.storage.conversation_store import FileConversationStore
# ---------------------------------------------------------------------------
@@ -930,3 +931,250 @@ class TestConversationIntegration:
assert restored.next_seq == 4
assert restored.messages[0].content == "new msg"
assert restored.messages[0].seq == 2
# ---------------------------------------------------------------------------
# Helpers for aggressive compaction tests
# ---------------------------------------------------------------------------
def _make_tool_call(call_id: str, name: str, args: dict) -> dict:
return {
"id": call_id,
"type": "function",
"function": {"name": name, "arguments": json.dumps(args)},
}
async def _build_tool_heavy_conversation(
store: MockConversationStore | None = None,
) -> NodeConversation:
"""Build a conversation with many tool call pairs.
Layout: user msg, then 5x (assistant with append_data tool_call + tool result),
then 1x (assistant with set_output tool_call + tool result), then user msg + assistant msg.
"""
conv = NodeConversation(store=store)
await conv.add_user_message("Process the data") # seq 0
for i in range(5):
args = {"filename": "output.html", "content": "x" * 500}
tc = [_make_tool_call(f"call_{i}", "append_data", args)]
conv._messages.append(Message(
seq=conv._next_seq, role="assistant",
content=f"Appending part {i}", tool_calls=tc,
))
if store:
await store.write_part(conv._next_seq, conv._messages[-1].to_storage_dict())
conv._next_seq += 1
conv._messages.append(Message(
seq=conv._next_seq, role="tool",
content='{"success": true}', tool_use_id=f"call_{i}",
))
if store:
await store.write_part(conv._next_seq, conv._messages[-1].to_storage_dict())
conv._next_seq += 1
# set_output call — must be protected
so_tc = [_make_tool_call("call_so", "set_output", {"key": "result", "value": "done"})]
conv._messages.append(
Message(seq=conv._next_seq, role="assistant", content="Setting output", tool_calls=so_tc)
)
if store:
await store.write_part(conv._next_seq, conv._messages[-1].to_storage_dict())
conv._next_seq += 1
conv._messages.append(Message(
seq=conv._next_seq, role="tool",
content="Output 'result' set successfully.",
tool_use_id="call_so",
))
if store:
await store.write_part(conv._next_seq, conv._messages[-1].to_storage_dict())
conv._next_seq += 1
# Recent messages
await conv.add_user_message("Continue")
await conv.add_assistant_message("Working on it")
return conv
# ---------------------------------------------------------------------------
# Tests: aggressive structural compaction
# ---------------------------------------------------------------------------
class TestAggressiveStructuralCompaction:
@pytest.mark.asyncio
async def test_aggressive_collapses_tool_pairs(self, tmp_path):
"""Aggressive mode should collapse non-essential tool pairs into a summary."""
conv = await _build_tool_heavy_conversation()
spill = str(tmp_path)
await conv.compact_preserving_structure(
spillover_dir=spill, keep_recent=2, aggressive=True,
)
# The 5 append_data pairs (10 msgs) + 1 user msg should be collapsed.
# Remaining: ref_msg + set_output pair (2 msgs) + 2 recent = 5
assert conv.message_count == 5
assert conv.messages[0].role == "user" # ref message
assert "TOOLS ALREADY CALLED" in conv.messages[0].content
assert "append_data (5x)" in conv.messages[0].content
# set_output pair should be preserved
assert conv.messages[1].role == "assistant"
assert conv.messages[1].tool_calls is not None
assert conv.messages[1].tool_calls[0]["function"]["name"] == "set_output"
assert conv.messages[2].role == "tool"
# Recent messages intact
assert conv.messages[3].content == "Continue"
assert conv.messages[4].content == "Working on it"
@pytest.mark.asyncio
async def test_aggressive_preserves_set_output(self, tmp_path):
"""set_output tool calls are always protected in aggressive mode."""
conv = await _build_tool_heavy_conversation()
spill = str(tmp_path)
await conv.compact_preserving_structure(
spillover_dir=spill, keep_recent=2, aggressive=True,
)
# Find all tool calls in remaining messages
tool_names = []
for msg in conv.messages:
if msg.tool_calls:
for tc in msg.tool_calls:
tool_names.append(tc["function"]["name"])
assert "set_output" in tool_names
# append_data should NOT be in remaining messages (collapsed)
assert "append_data" not in tool_names
@pytest.mark.asyncio
async def test_aggressive_preserves_errors(self, tmp_path):
"""Error tool results are always protected in aggressive mode."""
conv = NodeConversation()
await conv.add_user_message("Start")
# Regular tool call
tc1 = [_make_tool_call("call_ok", "web_search", {"query": "test"})]
conv._messages.append(
Message(seq=conv._next_seq, role="assistant", content="", tool_calls=tc1)
)
conv._next_seq += 1
conv._messages.append(
Message(seq=conv._next_seq, role="tool", content="results", tool_use_id="call_ok")
)
conv._next_seq += 1
# Error tool call
tc2 = [_make_tool_call("call_err", "web_scrape", {"url": "http://broken.com"})]
conv._messages.append(
Message(seq=conv._next_seq, role="assistant", content="", tool_calls=tc2)
)
conv._next_seq += 1
conv._messages.append(
Message(
seq=conv._next_seq, role="tool", content="Connection timeout",
tool_use_id="call_err", is_error=True,
)
)
conv._next_seq += 1
await conv.add_user_message("Next")
await conv.add_assistant_message("OK")
spill = str(tmp_path)
await conv.compact_preserving_structure(
spillover_dir=spill, keep_recent=2, aggressive=True,
)
# Error pair should be preserved
error_msgs = [m for m in conv.messages if m.role == "tool" and m.is_error]
assert len(error_msgs) == 1
assert error_msgs[0].content == "Connection timeout"
@pytest.mark.asyncio
async def test_standard_mode_keeps_all_tool_pairs(self, tmp_path):
"""Non-aggressive mode should keep all tool pairs (existing behavior)."""
conv = await _build_tool_heavy_conversation()
spill = str(tmp_path)
await conv.compact_preserving_structure(
spillover_dir=spill, keep_recent=2, aggressive=False,
)
# All 6 tool pairs (12 msgs) should be kept as structural.
# Removed: 1 user msg (freeform). Remaining: ref + 12 structural + 2 recent = 15
assert conv.message_count == 15
@pytest.mark.asyncio
async def test_two_pass_sequence(self, tmp_path):
"""Standard pass then aggressive pass produces valid result."""
conv = await _build_tool_heavy_conversation()
spill = str(tmp_path)
# Pass 1: standard
await conv.compact_preserving_structure(
spillover_dir=spill, keep_recent=2,
)
after_standard = conv.message_count
assert after_standard == 15 # all structural kept
# Pass 2: aggressive
await conv.compact_preserving_structure(
spillover_dir=spill, keep_recent=2, aggressive=True,
)
after_aggressive = conv.message_count
assert after_aggressive < after_standard
# ref + set_output pair + 2 recent = 5
assert after_aggressive == 5
@pytest.mark.asyncio
async def test_aggressive_persists_correctly(self, tmp_path):
"""Aggressive compaction correctly updates the store."""
store = MockConversationStore()
conv = await _build_tool_heavy_conversation(store=store)
spill = str(tmp_path)
await conv.compact_preserving_structure(
spillover_dir=spill, keep_recent=2, aggressive=True,
)
# Verify store state matches in-memory state
parts = await store.read_parts()
assert len(parts) == conv.message_count
class TestExtractToolCallHistory:
def test_basic_extraction(self):
msgs = [
Message(seq=0, role="assistant", content="", tool_calls=[
_make_tool_call("c1", "web_search", {"query": "python async"}),
]),
Message(seq=1, role="tool", content="results", tool_use_id="c1"),
Message(seq=2, role="assistant", content="", tool_calls=[
_make_tool_call("c2", "save_data", {"filename": "output.txt", "content": "data"}),
]),
Message(seq=3, role="tool", content="saved", tool_use_id="c2"),
]
result = extract_tool_call_history(msgs)
assert "web_search (1x)" in result
assert "save_data (1x)" in result
assert "FILES SAVED: output.txt" in result
def test_errors_included(self):
msgs = [
Message(
seq=0, role="tool", content="Connection refused",
is_error=True, tool_use_id="c1",
),
]
result = extract_tool_call_history(msgs)
assert "ERRORS" in result
assert "Connection refused" in result
def test_empty_messages(self):
assert extract_tool_call_history([]) == ""