fix: compaction logics
This commit is contained in:
@@ -152,6 +152,74 @@ def _compact_tool_calls(tool_calls: list[dict[str, Any]]) -> list[dict[str, Any]
|
||||
return compact
|
||||
|
||||
|
||||
def extract_tool_call_history(messages: list[Message], max_entries: int = 30) -> str:
|
||||
"""Build a compact tool call history from a list of messages.
|
||||
|
||||
Used in compaction summaries to prevent the LLM from re-calling
|
||||
tools it already called. Extracts tool call details, files saved,
|
||||
outputs set, and errors encountered.
|
||||
"""
|
||||
tool_calls_detail: dict[str, list[str]] = {}
|
||||
files_saved: list[str] = []
|
||||
outputs_set: list[str] = []
|
||||
errors: list[str] = []
|
||||
|
||||
def _summarize_input(name: str, args: dict) -> str:
|
||||
if name == "web_search":
|
||||
return args.get("query", "")
|
||||
if name == "web_scrape":
|
||||
return args.get("url", "")
|
||||
if name in ("load_data", "save_data"):
|
||||
return args.get("filename", "")
|
||||
return ""
|
||||
|
||||
for msg in messages:
|
||||
if msg.role == "assistant" and msg.tool_calls:
|
||||
for tc in msg.tool_calls:
|
||||
func = tc.get("function", {})
|
||||
name = func.get("name", "unknown")
|
||||
try:
|
||||
args = json.loads(func.get("arguments", "{}"))
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
args = {}
|
||||
|
||||
summary = _summarize_input(name, args)
|
||||
tool_calls_detail.setdefault(name, []).append(summary)
|
||||
|
||||
if name == "save_data" and args.get("filename"):
|
||||
files_saved.append(args["filename"])
|
||||
if name == "set_output" and args.get("key"):
|
||||
outputs_set.append(args["key"])
|
||||
|
||||
if msg.role == "tool" and msg.is_error:
|
||||
preview = msg.content[:120].replace("\n", " ")
|
||||
errors.append(preview)
|
||||
|
||||
parts: list[str] = []
|
||||
if tool_calls_detail:
|
||||
lines: list[str] = []
|
||||
for name, inputs in list(tool_calls_detail.items())[:max_entries]:
|
||||
count = len(inputs)
|
||||
non_empty = [s for s in inputs if s]
|
||||
if non_empty:
|
||||
detail_lines = [f" - {s[:120]}" for s in non_empty[:8]]
|
||||
lines.append(f" {name} ({count}x):\n" + "\n".join(detail_lines))
|
||||
else:
|
||||
lines.append(f" {name} ({count}x)")
|
||||
parts.append("TOOLS ALREADY CALLED:\n" + "\n".join(lines))
|
||||
if files_saved:
|
||||
unique = list(dict.fromkeys(files_saved))
|
||||
parts.append("FILES SAVED: " + ", ".join(unique))
|
||||
if outputs_set:
|
||||
unique = list(dict.fromkeys(outputs_set))
|
||||
parts.append("OUTPUTS SET: " + ", ".join(unique))
|
||||
if errors:
|
||||
parts.append(
|
||||
"ERRORS (do NOT retry these):\n" + "\n".join(f" - {e}" for e in errors[:10])
|
||||
)
|
||||
return "\n\n".join(parts)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ConversationStore protocol (Phase 2)
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -653,6 +721,7 @@ class NodeConversation:
|
||||
spillover_dir: str,
|
||||
keep_recent: int = 4,
|
||||
phase_graduated: bool = False,
|
||||
aggressive: bool = False,
|
||||
) -> None:
|
||||
"""Structure-preserving compaction: save freeform text to file, keep tool messages.
|
||||
|
||||
@@ -662,6 +731,11 @@ class NodeConversation:
|
||||
after pruning. Only freeform text exchanges (user messages,
|
||||
text-only assistant messages) are saved to a file and removed.
|
||||
|
||||
When *aggressive* is True, non-essential tool call pairs are also
|
||||
collapsed into a compact summary instead of being kept individually.
|
||||
Only ``set_output`` calls and error results are preserved; all other
|
||||
old tool pairs are replaced by a tool-call history summary.
|
||||
|
||||
The result: the agent retains exact knowledge of what tools it called,
|
||||
where each result is stored, and can load the conversation text if
|
||||
needed. No LLM summary call. No heuristics. Nothing lost.
|
||||
@@ -693,35 +767,92 @@ class NodeConversation:
|
||||
# Classify old messages: structural (keep) vs freeform (save to file)
|
||||
kept_structural: list[Message] = []
|
||||
freeform_lines: list[str] = []
|
||||
collapsed_msgs: list[Message] = []
|
||||
|
||||
for msg in old_messages:
|
||||
if msg.role == "tool":
|
||||
# Tool results — already pruned to ~30 tokens (file reference).
|
||||
# Keep in conversation.
|
||||
kept_structural.append(msg)
|
||||
elif msg.role == "assistant" and msg.tool_calls:
|
||||
# Assistant message with tool_calls — keep the tool_calls
|
||||
# with truncated arguments, clear the freeform text content.
|
||||
compact_tcs = _compact_tool_calls(msg.tool_calls)
|
||||
kept_structural.append(
|
||||
Message(
|
||||
seq=msg.seq,
|
||||
role=msg.role,
|
||||
content="",
|
||||
tool_calls=compact_tcs,
|
||||
is_error=msg.is_error,
|
||||
phase_id=msg.phase_id,
|
||||
is_transition_marker=msg.is_transition_marker,
|
||||
)
|
||||
if aggressive:
|
||||
# Aggressive: only keep set_output tool pairs and error results.
|
||||
# Everything else is collapsed into a tool-call history summary.
|
||||
# We need to track tool_call IDs to pair assistant messages with
|
||||
# their tool results.
|
||||
protected_tc_ids: set[str] = set()
|
||||
collapsible_tc_ids: set[str] = set()
|
||||
|
||||
# First pass: classify assistant messages
|
||||
for msg in old_messages:
|
||||
if msg.role != "assistant" or not msg.tool_calls:
|
||||
continue
|
||||
has_protected = any(
|
||||
tc.get("function", {}).get("name") == "set_output"
|
||||
for tc in msg.tool_calls
|
||||
)
|
||||
else:
|
||||
# Freeform text (user messages, text-only assistant messages)
|
||||
# — save to file and remove from conversation.
|
||||
role_label = msg.role
|
||||
text = msg.content
|
||||
if len(text) > 2000:
|
||||
text = text[:2000] + "…"
|
||||
freeform_lines.append(f"[{role_label}] (seq={msg.seq}): {text}")
|
||||
tc_ids = {tc.get("id", "") for tc in msg.tool_calls}
|
||||
if has_protected:
|
||||
protected_tc_ids |= tc_ids
|
||||
else:
|
||||
collapsible_tc_ids |= tc_ids
|
||||
|
||||
# Second pass: classify all messages
|
||||
for msg in old_messages:
|
||||
if msg.role == "tool":
|
||||
tc_id = msg.tool_use_id or ""
|
||||
if tc_id in protected_tc_ids:
|
||||
kept_structural.append(msg)
|
||||
elif msg.is_error:
|
||||
# Error results are always protected
|
||||
kept_structural.append(msg)
|
||||
# Protect the parent assistant message too
|
||||
protected_tc_ids.add(tc_id)
|
||||
else:
|
||||
collapsed_msgs.append(msg)
|
||||
elif msg.role == "assistant" and msg.tool_calls:
|
||||
tc_ids = {tc.get("id", "") for tc in msg.tool_calls}
|
||||
if tc_ids & protected_tc_ids:
|
||||
# Has at least one protected tool call — keep entire msg
|
||||
compact_tcs = _compact_tool_calls(msg.tool_calls)
|
||||
kept_structural.append(
|
||||
Message(
|
||||
seq=msg.seq,
|
||||
role=msg.role,
|
||||
content="",
|
||||
tool_calls=compact_tcs,
|
||||
is_error=msg.is_error,
|
||||
phase_id=msg.phase_id,
|
||||
is_transition_marker=msg.is_transition_marker,
|
||||
)
|
||||
)
|
||||
else:
|
||||
collapsed_msgs.append(msg)
|
||||
else:
|
||||
# Freeform text — save to file
|
||||
role_label = msg.role
|
||||
text = msg.content
|
||||
if len(text) > 2000:
|
||||
text = text[:2000] + "…"
|
||||
freeform_lines.append(f"[{role_label}] (seq={msg.seq}): {text}")
|
||||
else:
|
||||
# Standard mode: keep all tool call pairs as structural
|
||||
for msg in old_messages:
|
||||
if msg.role == "tool":
|
||||
kept_structural.append(msg)
|
||||
elif msg.role == "assistant" and msg.tool_calls:
|
||||
compact_tcs = _compact_tool_calls(msg.tool_calls)
|
||||
kept_structural.append(
|
||||
Message(
|
||||
seq=msg.seq,
|
||||
role=msg.role,
|
||||
content="",
|
||||
tool_calls=compact_tcs,
|
||||
is_error=msg.is_error,
|
||||
phase_id=msg.phase_id,
|
||||
is_transition_marker=msg.is_transition_marker,
|
||||
)
|
||||
)
|
||||
else:
|
||||
role_label = msg.role
|
||||
text = msg.content
|
||||
if len(text) > 2000:
|
||||
text = text[:2000] + "…"
|
||||
freeform_lines.append(f"[{role_label}] (seq={msg.seq}): {text}")
|
||||
|
||||
# Write freeform text to a numbered conversation file
|
||||
spill_path = Path(spillover_dir)
|
||||
@@ -741,13 +872,25 @@ class NodeConversation:
|
||||
conv_filename = ""
|
||||
|
||||
# Build reference message
|
||||
ref_parts: list[str] = []
|
||||
if conv_filename:
|
||||
ref_content = (
|
||||
ref_parts.append(
|
||||
f"[Previous conversation saved to '{conv_filename}'. "
|
||||
f"Use load_data('{conv_filename}') to review if needed.]"
|
||||
)
|
||||
else:
|
||||
ref_content = "[Previous freeform messages compacted.]"
|
||||
elif not collapsed_msgs:
|
||||
ref_parts.append("[Previous freeform messages compacted.]")
|
||||
|
||||
# Aggressive: add collapsed tool-call history to the reference
|
||||
if collapsed_msgs:
|
||||
tool_history = extract_tool_call_history(collapsed_msgs)
|
||||
if tool_history:
|
||||
ref_parts.append(tool_history)
|
||||
elif not ref_parts:
|
||||
ref_parts.append("[Previous tool calls compacted.]")
|
||||
|
||||
ref_content = "\n\n".join(ref_parts)
|
||||
|
||||
# Use a seq just before the first kept message
|
||||
recent_messages = list(self._messages[split:])
|
||||
if kept_structural:
|
||||
@@ -760,15 +903,15 @@ class NodeConversation:
|
||||
|
||||
ref_msg = Message(seq=ref_seq, role="user", content=ref_content)
|
||||
|
||||
# Persist: delete old messages from store, write reference + kept structural
|
||||
# Persist: delete old messages from store, write reference + kept structural.
|
||||
# In aggressive mode, collapsed messages may be interspersed with kept
|
||||
# messages, so we delete everything before the recent boundary and
|
||||
# rewrite only what we want to keep.
|
||||
if self._store:
|
||||
first_kept_seq = (
|
||||
kept_structural[0].seq
|
||||
if kept_structural
|
||||
else (recent_messages[0].seq if recent_messages else self._next_seq)
|
||||
recent_boundary = (
|
||||
recent_messages[0].seq if recent_messages else self._next_seq
|
||||
)
|
||||
# Delete everything before the first structural message we're keeping
|
||||
await self._store.delete_parts_before(first_kept_seq)
|
||||
await self._store.delete_parts_before(recent_boundary)
|
||||
# Write the reference message
|
||||
await self._store.write_part(ref_msg.seq, ref_msg.to_storage_dict())
|
||||
# Write kept structural messages (they may have been modified)
|
||||
|
||||
@@ -36,6 +36,23 @@ from framework.runtime.llm_debug_logger import log_llm_turn
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Pattern for detecting context-window-exceeded errors across LLM providers.
|
||||
_CONTEXT_TOO_LARGE_RE = re.compile(
|
||||
r"context.{0,20}(length|window|limit|size)|"
|
||||
r"too.{0,10}(long|large|many.{0,10}tokens)|"
|
||||
r"(exceed|exceeds|exceeded).{0,30}(limit|window|context|tokens)|"
|
||||
r"maximum.{0,20}token|prompt.{0,20}too.{0,10}long",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def _is_context_too_large_error(exc: BaseException) -> bool:
|
||||
"""Detect whether an exception indicates the LLM input was too large."""
|
||||
cls = type(exc).__name__
|
||||
if "ContextWindow" in cls:
|
||||
return True
|
||||
return bool(_CONTEXT_TOO_LARGE_RE.search(str(exc)))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Escalation receiver (temporary routing target for subagent → user input)
|
||||
@@ -553,7 +570,7 @@ class EventLoopNode(NodeProtocol):
|
||||
|
||||
# 6d. Pre-turn compaction check (tiered)
|
||||
if conversation.needs_compaction():
|
||||
await self._compact_tiered(ctx, conversation, accumulator)
|
||||
await self._compact(ctx, conversation, accumulator)
|
||||
|
||||
# 6e. Run single LLM turn (with transient error retry)
|
||||
logger.info(
|
||||
@@ -753,7 +770,7 @@ class EventLoopNode(NodeProtocol):
|
||||
|
||||
# 6e''. Post-turn compaction check (catches tool-result bloat)
|
||||
if conversation.needs_compaction():
|
||||
await self._compact_tiered(ctx, conversation, accumulator)
|
||||
await self._compact(ctx, conversation, accumulator)
|
||||
|
||||
# Reset auto-block grace streak when real work happens
|
||||
if real_tool_results or outputs_set:
|
||||
@@ -1556,7 +1573,7 @@ class EventLoopNode(NodeProtocol):
|
||||
"Pre-send guard: context at %.0f%% of budget, compacting",
|
||||
conversation.usage_ratio() * 100,
|
||||
)
|
||||
await self._compact_tiered(ctx, conversation, accumulator)
|
||||
await self._compact(ctx, conversation, accumulator)
|
||||
|
||||
messages = conversation.to_llm_messages()
|
||||
|
||||
@@ -2443,78 +2460,11 @@ class EventLoopNode(NodeProtocol):
|
||||
) -> str:
|
||||
"""Build a compact tool call history from the conversation.
|
||||
|
||||
Used in compaction summaries to prevent the LLM from re-calling
|
||||
tools it already called. Extracts:
|
||||
- Tool call details: name, count, and *inputs* for key tools
|
||||
(search queries, scrape URLs, loaded filenames)
|
||||
- Files saved via save_data
|
||||
- Outputs set via set_output
|
||||
- Errors encountered
|
||||
Delegates to :func:`extract_tool_call_history` in conversation.py.
|
||||
"""
|
||||
# Per-tool: list of input summaries (one per call)
|
||||
tool_calls_detail: dict[str, list[str]] = {}
|
||||
files_saved: list[str] = []
|
||||
outputs_set: list[str] = []
|
||||
errors: list[str] = []
|
||||
from framework.graph.conversation import extract_tool_call_history
|
||||
|
||||
# Tool-specific input extractors: return a short summary string
|
||||
def _summarize_input(name: str, args: dict) -> str:
|
||||
if name == "web_search":
|
||||
return args.get("query", "")
|
||||
if name == "web_scrape":
|
||||
return args.get("url", "")
|
||||
if name == "load_data":
|
||||
return args.get("filename", "")
|
||||
if name == "save_data":
|
||||
return args.get("filename", "")
|
||||
return ""
|
||||
|
||||
for msg in conversation.messages:
|
||||
if msg.role == "assistant" and msg.tool_calls:
|
||||
for tc in msg.tool_calls:
|
||||
func = tc.get("function", {})
|
||||
name = func.get("name", "unknown")
|
||||
try:
|
||||
args = json.loads(func.get("arguments", "{}"))
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
args = {}
|
||||
|
||||
summary = _summarize_input(name, args)
|
||||
tool_calls_detail.setdefault(name, []).append(summary)
|
||||
|
||||
if name == "save_data" and args.get("filename"):
|
||||
files_saved.append(args["filename"])
|
||||
if name == "set_output" and args.get("key"):
|
||||
outputs_set.append(args["key"])
|
||||
|
||||
if msg.role == "tool" and msg.is_error:
|
||||
preview = msg.content[:120].replace("\n", " ")
|
||||
errors.append(preview)
|
||||
|
||||
parts: list[str] = []
|
||||
if tool_calls_detail:
|
||||
lines: list[str] = []
|
||||
for name, inputs in list(tool_calls_detail.items())[:max_entries]:
|
||||
count = len(inputs)
|
||||
# Include input details for tools where inputs matter
|
||||
non_empty = [s for s in inputs if s]
|
||||
if non_empty:
|
||||
detail_lines = [f" - {s[:120]}" for s in non_empty[:8]]
|
||||
lines.append(f" {name} ({count}x):\n" + "\n".join(detail_lines))
|
||||
else:
|
||||
lines.append(f" {name} ({count}x)")
|
||||
parts.append("TOOLS ALREADY CALLED:\n" + "\n".join(lines))
|
||||
if files_saved:
|
||||
unique = list(dict.fromkeys(files_saved))
|
||||
parts.append("FILES SAVED: " + ", ".join(unique))
|
||||
if outputs_set:
|
||||
unique = list(dict.fromkeys(outputs_set))
|
||||
parts.append("OUTPUTS SET: " + ", ".join(unique))
|
||||
if errors:
|
||||
parts.append(
|
||||
"ERRORS (do NOT retry these):\n" + "\n".join(f" - {e}" for e in errors[:10])
|
||||
)
|
||||
return "\n\n".join(parts)
|
||||
return extract_tool_call_history(conversation.messages, max_entries=max_entries)
|
||||
|
||||
def _build_initial_message(self, ctx: NodeContext) -> str:
|
||||
"""Build the initial user message from input data and memory.
|
||||
@@ -2863,131 +2813,304 @@ class EventLoopNode(NodeProtocol):
|
||||
|
||||
return result
|
||||
|
||||
async def _compact_tiered(
|
||||
# --- Compaction -----------------------------------------------------------
|
||||
|
||||
# Threshold above which LLM compaction is invoked (structural handles 80-95%).
|
||||
_LLM_COMPACT_THRESHOLD = 0.95
|
||||
# Max chars of formatted messages before proactively splitting for LLM.
|
||||
_LLM_COMPACT_CHAR_LIMIT = 240_000
|
||||
# Max recursion depth for binary-search splitting.
|
||||
_LLM_COMPACT_MAX_DEPTH = 10
|
||||
|
||||
async def _compact(
|
||||
self,
|
||||
ctx: NodeContext,
|
||||
conversation: NodeConversation,
|
||||
accumulator: OutputAccumulator | None = None,
|
||||
) -> None:
|
||||
"""Run compaction with aggressiveness scaled to usage level.
|
||||
"""Compact conversation history to stay within token budget.
|
||||
|
||||
| Usage | Strategy |
|
||||
|----------------|---------------------------------------------|
|
||||
| 80-100% | Normal: LLM summary, keep 4 recent messages |
|
||||
| 100-120% | Aggressive: LLM summary, keep 2 recent |
|
||||
| >= 120% | Emergency: static summary, keep 1 recent |
|
||||
1. Prune old tool results (always, free).
|
||||
2. Structure-preserving compaction at >=80% (standard, then aggressive).
|
||||
3. LLM compaction at >95% with recursive binary-search splitting.
|
||||
4. Emergency deterministic summary only if LLM failed or unavailable.
|
||||
"""
|
||||
ratio = conversation.usage_ratio()
|
||||
ratio_before = conversation.usage_ratio()
|
||||
phase_grad = getattr(ctx, "continuous_mode", False)
|
||||
|
||||
# --- Tier 0: Prune old tool results (zero-cost, no LLM call) ---
|
||||
# --- Step 1: Prune old tool results (free, no LLM) ---
|
||||
protect = max(2000, self._config.max_history_tokens // 12)
|
||||
pruned = await conversation.prune_old_tool_results(
|
||||
protect_tokens=protect,
|
||||
min_prune_tokens=max(1000, protect // 3),
|
||||
)
|
||||
if pruned > 0:
|
||||
new_ratio = conversation.usage_ratio()
|
||||
logger.info(
|
||||
"Pruned %d old tool results: %.0f%% -> %.0f%%",
|
||||
pruned,
|
||||
ratio * 100,
|
||||
new_ratio * 100,
|
||||
ratio_before * 100,
|
||||
conversation.usage_ratio() * 100,
|
||||
)
|
||||
if not conversation.needs_compaction():
|
||||
# Pruning freed enough — skip full compaction entirely
|
||||
prune_before = round(ratio * 100)
|
||||
prune_after = round(new_ratio * 100)
|
||||
if ctx.runtime_logger:
|
||||
ctx.runtime_logger.log_step(
|
||||
node_id=ctx.node_id,
|
||||
node_type="event_loop",
|
||||
step_index=-1,
|
||||
llm_text=f"Context pruned (tool results): "
|
||||
f"{prune_before}% \u2192 {prune_after}%",
|
||||
verdict="COMPACTION",
|
||||
verdict_feedback=f"level=prune_only "
|
||||
f"before={prune_before}% after={prune_after}%",
|
||||
)
|
||||
if self._event_bus:
|
||||
from framework.runtime.event_bus import AgentEvent, EventType
|
||||
|
||||
await self._event_bus.publish(
|
||||
AgentEvent(
|
||||
type=EventType.CONTEXT_COMPACTED,
|
||||
stream_id=ctx.stream_id or ctx.node_id,
|
||||
node_id=ctx.node_id,
|
||||
data={
|
||||
"level": "prune_only",
|
||||
"usage_before": prune_before,
|
||||
"usage_after": prune_after,
|
||||
},
|
||||
)
|
||||
)
|
||||
return
|
||||
ratio = new_ratio
|
||||
|
||||
_phase_grad = getattr(ctx, "continuous_mode", False)
|
||||
|
||||
if ratio >= 1.2:
|
||||
level = "emergency"
|
||||
keep = 1
|
||||
logger.warning("Emergency compaction triggered (usage %.0f%%)", ratio * 100)
|
||||
elif ratio >= 1.0:
|
||||
level = "aggressive"
|
||||
keep = 2
|
||||
logger.info("Aggressive compaction triggered (usage %.0f%%)", ratio * 100)
|
||||
else:
|
||||
level = "normal"
|
||||
keep = 4
|
||||
if not conversation.needs_compaction():
|
||||
await self._log_compaction(ctx, conversation, ratio_before)
|
||||
return
|
||||
|
||||
# --- Step 2: Structure-preserving compaction (>=80%) ---
|
||||
spill_dir = self._config.spillover_dir
|
||||
if spill_dir:
|
||||
# Structure-preserving: save freeform text to file, keep tool messages
|
||||
pre_structural = conversation.usage_ratio()
|
||||
await conversation.compact_preserving_structure(
|
||||
spillover_dir=spill_dir,
|
||||
keep_recent=keep,
|
||||
phase_graduated=_phase_grad,
|
||||
keep_recent=4,
|
||||
phase_graduated=phase_grad,
|
||||
)
|
||||
# Circuit breaker: if structure-preserving compaction barely helped
|
||||
# (still over budget), fall back to destructive compact() which
|
||||
# replaces everything with a summary.
|
||||
mid_ratio = conversation.usage_ratio()
|
||||
if mid_ratio >= 0.9 * ratio:
|
||||
logger.warning(
|
||||
"Structure-preserving compaction ineffective "
|
||||
"(%.0f%% -> %.0f%%), falling back to summary compaction",
|
||||
ratio * 100,
|
||||
mid_ratio * 100,
|
||||
if conversation.usage_ratio() >= 0.9 * pre_structural:
|
||||
logger.info(
|
||||
"Standard structural compaction ineffective "
|
||||
"(%.0f%% -> %.0f%%), trying aggressive",
|
||||
pre_structural * 100,
|
||||
conversation.usage_ratio() * 100,
|
||||
)
|
||||
summary = self._build_emergency_summary(ctx, accumulator, conversation)
|
||||
await conversation.compact(summary, keep_recent=keep, phase_graduated=_phase_grad)
|
||||
else:
|
||||
# Fallback: LLM-based summary (no spillover dir available)
|
||||
if level == "emergency":
|
||||
summary = self._build_emergency_summary(ctx, accumulator, conversation)
|
||||
else:
|
||||
summary = await self._generate_compaction_summary(ctx, conversation)
|
||||
await conversation.compact(summary, keep_recent=keep, phase_graduated=_phase_grad)
|
||||
await conversation.compact_preserving_structure(
|
||||
spillover_dir=spill_dir,
|
||||
keep_recent=4,
|
||||
phase_graduated=phase_grad,
|
||||
aggressive=True,
|
||||
)
|
||||
if not conversation.needs_compaction():
|
||||
await self._log_compaction(ctx, conversation, ratio_before)
|
||||
return
|
||||
|
||||
new_ratio = conversation.usage_ratio()
|
||||
logger.info(
|
||||
"Compaction complete (%s): %.0f%% -> %.0f%%",
|
||||
level,
|
||||
ratio * 100,
|
||||
new_ratio * 100,
|
||||
# --- Step 3: LLM compaction at >95% (recursive binary-search) ---
|
||||
if (
|
||||
conversation.usage_ratio() > self._LLM_COMPACT_THRESHOLD
|
||||
and ctx.llm is not None
|
||||
):
|
||||
logger.info(
|
||||
"LLM compaction triggered (%.0f%% usage)",
|
||||
conversation.usage_ratio() * 100,
|
||||
)
|
||||
try:
|
||||
summary = await self._llm_compact(
|
||||
ctx, list(conversation.messages), accumulator,
|
||||
)
|
||||
await conversation.compact(
|
||||
summary,
|
||||
keep_recent=2,
|
||||
phase_graduated=phase_grad,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("LLM compaction failed: %s", e)
|
||||
|
||||
if not conversation.needs_compaction():
|
||||
await self._log_compaction(ctx, conversation, ratio_before)
|
||||
return
|
||||
|
||||
# --- Step 4: Emergency deterministic summary (LLM failed/unavailable) ---
|
||||
logger.warning(
|
||||
"Emergency compaction (%.0f%% usage)",
|
||||
conversation.usage_ratio() * 100,
|
||||
)
|
||||
summary = self._build_emergency_summary(ctx, accumulator, conversation)
|
||||
await conversation.compact(
|
||||
summary, keep_recent=1, phase_graduated=phase_grad,
|
||||
)
|
||||
await self._log_compaction(ctx, conversation, ratio_before)
|
||||
|
||||
# --- LLM compaction with binary-search splitting ----------------------
|
||||
|
||||
async def _llm_compact(
|
||||
self,
|
||||
ctx: NodeContext,
|
||||
messages: list,
|
||||
accumulator: OutputAccumulator | None = None,
|
||||
_depth: int = 0,
|
||||
) -> str:
|
||||
"""Summarise *messages* with LLM, splitting recursively if too large.
|
||||
|
||||
If the formatted text exceeds ``_LLM_COMPACT_CHAR_LIMIT`` or the LLM
|
||||
rejects the call with a context-length error, the messages are split
|
||||
in half and each half is summarised independently. Tool history is
|
||||
appended once at the top-level call (``_depth == 0``).
|
||||
"""
|
||||
from framework.graph.conversation import extract_tool_call_history
|
||||
|
||||
if _depth > self._LLM_COMPACT_MAX_DEPTH:
|
||||
raise RuntimeError(
|
||||
f"LLM compaction recursion limit ({self._LLM_COMPACT_MAX_DEPTH})"
|
||||
)
|
||||
|
||||
formatted = self._format_messages_for_summary(messages)
|
||||
|
||||
# Proactive split: avoid wasting an API call on oversized input
|
||||
if len(formatted) > self._LLM_COMPACT_CHAR_LIMIT and len(messages) > 1:
|
||||
summary = await self._llm_compact_split(
|
||||
ctx, messages, accumulator, _depth,
|
||||
)
|
||||
else:
|
||||
prompt = self._build_llm_compaction_prompt(
|
||||
ctx, accumulator, formatted,
|
||||
)
|
||||
summary_budget = max(1024, self._config.max_history_tokens // 2)
|
||||
try:
|
||||
response = await ctx.llm.acomplete(
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
system=(
|
||||
"You are a conversation compactor for an AI agent. "
|
||||
"Write a detailed summary that allows the agent to "
|
||||
"continue its work. Preserve user-stated rules, "
|
||||
"constraints, and account/identity preferences verbatim."
|
||||
),
|
||||
max_tokens=summary_budget,
|
||||
)
|
||||
summary = response.content
|
||||
except Exception as e:
|
||||
if _is_context_too_large_error(e) and len(messages) > 1:
|
||||
logger.info(
|
||||
"LLM context too large (depth=%d, msgs=%d) — splitting",
|
||||
_depth,
|
||||
len(messages),
|
||||
)
|
||||
summary = await self._llm_compact_split(
|
||||
ctx, messages, accumulator, _depth,
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
# Append tool history at top level only
|
||||
if _depth == 0:
|
||||
tool_history = extract_tool_call_history(messages)
|
||||
if tool_history and "TOOLS ALREADY CALLED" not in summary:
|
||||
summary += "\n\n" + tool_history
|
||||
|
||||
return summary
|
||||
|
||||
async def _llm_compact_split(
|
||||
self,
|
||||
ctx: NodeContext,
|
||||
messages: list,
|
||||
accumulator: OutputAccumulator | None,
|
||||
_depth: int,
|
||||
) -> str:
|
||||
"""Split messages in half and summarise each half independently."""
|
||||
mid = max(1, len(messages) // 2)
|
||||
s1 = await self._llm_compact(ctx, messages[:mid], None, _depth + 1)
|
||||
s2 = await self._llm_compact(
|
||||
ctx, messages[mid:], accumulator, _depth + 1,
|
||||
)
|
||||
return s1 + "\n\n" + s2
|
||||
|
||||
# --- Compaction helpers ------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _format_messages_for_summary(messages: list) -> str:
|
||||
"""Format messages as text for LLM summarisation."""
|
||||
lines: list[str] = []
|
||||
for m in messages:
|
||||
if m.role == "tool":
|
||||
content = m.content[:500]
|
||||
if len(m.content) > 500:
|
||||
content += "..."
|
||||
lines.append(f"[tool result]: {content}")
|
||||
elif m.role == "assistant" and m.tool_calls:
|
||||
names = [
|
||||
tc.get("function", {}).get("name", "?")
|
||||
for tc in m.tool_calls
|
||||
]
|
||||
text = m.content[:200] if m.content else ""
|
||||
lines.append(f"[assistant (calls: {', '.join(names)})]: {text}")
|
||||
else:
|
||||
lines.append(f"[{m.role}]: {m.content}")
|
||||
return "\n\n".join(lines)
|
||||
|
||||
def _build_llm_compaction_prompt(
|
||||
self,
|
||||
ctx: NodeContext,
|
||||
accumulator: OutputAccumulator | None,
|
||||
formatted_messages: str,
|
||||
) -> str:
|
||||
"""Build prompt for LLM compaction targeting 50% of token budget."""
|
||||
spec = ctx.node_spec
|
||||
ctx_lines = [f"NODE: {spec.name} (id={spec.id})"]
|
||||
if spec.description:
|
||||
ctx_lines.append(f"PURPOSE: {spec.description}")
|
||||
if spec.success_criteria:
|
||||
ctx_lines.append(f"SUCCESS CRITERIA: {spec.success_criteria}")
|
||||
|
||||
if accumulator:
|
||||
acc = accumulator.to_dict()
|
||||
done = {k: v for k, v in acc.items() if v is not None}
|
||||
todo = [k for k, v in acc.items() if v is None]
|
||||
if done:
|
||||
ctx_lines.append(
|
||||
"OUTPUTS ALREADY SET:\n"
|
||||
+ "\n".join(
|
||||
f" {k}: {str(v)[:150]}" for k, v in done.items()
|
||||
)
|
||||
)
|
||||
if todo:
|
||||
ctx_lines.append(f"OUTPUTS STILL NEEDED: {', '.join(todo)}")
|
||||
elif spec.output_keys:
|
||||
ctx_lines.append(
|
||||
f"OUTPUTS STILL NEEDED: {', '.join(spec.output_keys)}"
|
||||
)
|
||||
|
||||
target_tokens = self._config.max_history_tokens // 2
|
||||
target_chars = target_tokens * 4
|
||||
node_ctx = "\n".join(ctx_lines)
|
||||
|
||||
return (
|
||||
"You are compacting an AI agent's conversation history. "
|
||||
"The agent is still working and needs to continue.\n\n"
|
||||
f"AGENT CONTEXT:\n{node_ctx}\n\n"
|
||||
f"CONVERSATION MESSAGES:\n{formatted_messages}\n\n"
|
||||
"INSTRUCTIONS:\n"
|
||||
f"Write a summary of approximately {target_chars} characters "
|
||||
f"(~{target_tokens} tokens).\n"
|
||||
"1. Preserve ALL user-stated rules, constraints, and preferences "
|
||||
"verbatim.\n"
|
||||
"2. Preserve key decisions made and results obtained.\n"
|
||||
"3. Preserve in-progress work state so the agent can continue.\n"
|
||||
"4. Be detailed enough that the agent can resume without "
|
||||
"re-doing work.\n"
|
||||
)
|
||||
|
||||
async def _log_compaction(
|
||||
self,
|
||||
ctx: NodeContext,
|
||||
conversation: NodeConversation,
|
||||
ratio_before: float,
|
||||
) -> None:
|
||||
"""Log compaction result to runtime logger and event bus."""
|
||||
ratio_after = conversation.usage_ratio()
|
||||
before_pct = round(ratio_before * 100)
|
||||
after_pct = round(ratio_after * 100)
|
||||
|
||||
# Determine label from what happened
|
||||
if after_pct >= before_pct - 1:
|
||||
level = "prune_only"
|
||||
elif ratio_after <= 0.6:
|
||||
level = "llm"
|
||||
else:
|
||||
level = "structural"
|
||||
|
||||
logger.info(
|
||||
"Compaction complete (%s): %d%% -> %d%%",
|
||||
level,
|
||||
before_pct,
|
||||
after_pct,
|
||||
)
|
||||
|
||||
# Log compaction to session logs (tool_logs.jsonl)
|
||||
before_pct = round(ratio * 100)
|
||||
after_pct = round(new_ratio * 100)
|
||||
if ctx.runtime_logger:
|
||||
ctx.runtime_logger.log_step(
|
||||
node_id=ctx.node_id,
|
||||
node_type="event_loop",
|
||||
step_index=-1, # Not a regular LLM step
|
||||
llm_text=f"Context compacted ({level}): {before_pct}% \u2192 {after_pct}%",
|
||||
step_index=-1,
|
||||
llm_text=f"Context compacted ({level}): "
|
||||
f"{before_pct}% \u2192 {after_pct}%",
|
||||
verdict="COMPACTION",
|
||||
verdict_feedback=f"level={level} before={before_pct}% after={after_pct}%",
|
||||
verdict_feedback=f"level={level} "
|
||||
f"before={before_pct}% after={after_pct}%",
|
||||
)
|
||||
|
||||
if self._event_bus:
|
||||
@@ -3000,62 +3123,12 @@ class EventLoopNode(NodeProtocol):
|
||||
node_id=ctx.node_id,
|
||||
data={
|
||||
"level": level,
|
||||
"usage_before": round(ratio * 100),
|
||||
"usage_after": round(new_ratio * 100),
|
||||
"usage_before": before_pct,
|
||||
"usage_after": after_pct,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
async def _generate_compaction_summary(
|
||||
self,
|
||||
ctx: NodeContext,
|
||||
conversation: NodeConversation,
|
||||
) -> str:
|
||||
"""Use LLM to generate a conversation summary for compaction."""
|
||||
tool_history = self._extract_tool_call_history(conversation)
|
||||
|
||||
messages_text = "\n".join(
|
||||
f"[{m.role}]: {m.content[:200]}" for m in conversation.messages[-10:]
|
||||
)
|
||||
prompt = (
|
||||
"Summarize this conversation so far in 2-3 sentences, "
|
||||
"preserving key decisions and results.\n\n"
|
||||
"IMPORTANT: Always preserve any user-stated rules, constraints, "
|
||||
"or preferences — especially which account/identity to use, "
|
||||
"formatting preferences, and behavioral instructions. "
|
||||
"These MUST appear verbatim or near-verbatim in your summary.\n\n"
|
||||
f"{messages_text}"
|
||||
)
|
||||
if tool_history:
|
||||
prompt += (
|
||||
"\n\nINCLUDE this tool history verbatim in your summary "
|
||||
"(the agent needs it to avoid re-calling tools):\n\n"
|
||||
f"{tool_history}"
|
||||
)
|
||||
|
||||
# Dynamic budget: reasoning models (o1, gpt-5-mini) spend max_tokens on
|
||||
# internal thinking. 500 leaves nothing for the actual summary.
|
||||
summary_budget = max(1024, self._config.max_history_tokens // 10)
|
||||
try:
|
||||
response = await ctx.llm.acomplete(
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
system=(
|
||||
"Summarize conversations concisely. Always preserve the tool "
|
||||
"history section. Always preserve user-stated rules, constraints, "
|
||||
"and account/identity preferences verbatim."
|
||||
),
|
||||
max_tokens=summary_budget,
|
||||
)
|
||||
summary = response.content
|
||||
# Ensure tool history is present even if LLM dropped it
|
||||
if tool_history and "TOOLS ALREADY CALLED" not in summary:
|
||||
summary += "\n\n" + tool_history
|
||||
return summary
|
||||
except Exception as e:
|
||||
logger.warning(f"Compaction summary generation failed: {e}")
|
||||
if tool_history:
|
||||
return f"Previous conversation context (summary unavailable).\n\n{tool_history}"
|
||||
return "Previous conversation context (summary unavailable)."
|
||||
|
||||
def _build_emergency_summary(
|
||||
self,
|
||||
|
||||
@@ -286,6 +286,114 @@ class GraphExecutor:
|
||||
|
||||
return errors
|
||||
|
||||
# Max chars of formatted messages before proactively splitting for LLM.
|
||||
_PHASE_LLM_CHAR_LIMIT = 240_000
|
||||
_PHASE_LLM_MAX_DEPTH = 10
|
||||
|
||||
async def _phase_llm_compact(
|
||||
self,
|
||||
conversation: Any,
|
||||
next_spec: NodeSpec,
|
||||
messages: list,
|
||||
_depth: int = 0,
|
||||
) -> str:
|
||||
"""Summarise messages for phase-boundary compaction.
|
||||
|
||||
Uses the same recursive binary-search splitting as EventLoopNode.
|
||||
"""
|
||||
from framework.graph.conversation import extract_tool_call_history
|
||||
from framework.graph.event_loop_node import _is_context_too_large_error
|
||||
|
||||
if _depth > self._PHASE_LLM_MAX_DEPTH:
|
||||
raise RuntimeError("Phase LLM compaction recursion limit")
|
||||
|
||||
# Format messages
|
||||
lines: list[str] = []
|
||||
for m in messages:
|
||||
if m.role == "tool":
|
||||
c = m.content[:500] + ("..." if len(m.content) > 500 else "")
|
||||
lines.append(f"[tool result]: {c}")
|
||||
elif m.role == "assistant" and m.tool_calls:
|
||||
names = [
|
||||
tc.get("function", {}).get("name", "?")
|
||||
for tc in m.tool_calls
|
||||
]
|
||||
lines.append(f"[assistant (calls: {', '.join(names)})]: "
|
||||
f"{m.content[:200] if m.content else ''}")
|
||||
else:
|
||||
lines.append(f"[{m.role}]: {m.content}")
|
||||
formatted = "\n\n".join(lines)
|
||||
|
||||
# Proactive split
|
||||
if len(formatted) > self._PHASE_LLM_CHAR_LIMIT and len(messages) > 1:
|
||||
summary = await self._phase_llm_compact_split(
|
||||
conversation, next_spec, messages, _depth,
|
||||
)
|
||||
else:
|
||||
max_tokens = getattr(conversation, "_max_history_tokens", 32000)
|
||||
target_tokens = max_tokens // 2
|
||||
target_chars = target_tokens * 4
|
||||
|
||||
prompt = (
|
||||
"You are compacting an AI agent's conversation history "
|
||||
"at a phase boundary.\n\n"
|
||||
f"NEXT PHASE: {next_spec.name}\n"
|
||||
)
|
||||
if next_spec.description:
|
||||
prompt += f"NEXT PHASE PURPOSE: {next_spec.description}\n"
|
||||
prompt += (
|
||||
f"\nCONVERSATION MESSAGES:\n{formatted}\n\n"
|
||||
"INSTRUCTIONS:\n"
|
||||
f"Write a summary of approximately {target_chars} characters "
|
||||
f"(~{target_tokens} tokens).\n"
|
||||
"Preserve user-stated rules, constraints, and preferences "
|
||||
"verbatim. Preserve key decisions and results from earlier "
|
||||
"phases. Preserve context needed for the next phase.\n"
|
||||
)
|
||||
summary_budget = max(1024, max_tokens // 2)
|
||||
try:
|
||||
response = await self._llm.acomplete(
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
system=(
|
||||
"You are a conversation compactor. Write a detailed "
|
||||
"summary preserving context for the next phase."
|
||||
),
|
||||
max_tokens=summary_budget,
|
||||
)
|
||||
summary = response.content
|
||||
except Exception as e:
|
||||
if _is_context_too_large_error(e) and len(messages) > 1:
|
||||
summary = await self._phase_llm_compact_split(
|
||||
conversation, next_spec, messages, _depth,
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
# Append tool history at top level only
|
||||
if _depth == 0:
|
||||
tool_history = extract_tool_call_history(messages)
|
||||
if tool_history and "TOOLS ALREADY CALLED" not in summary:
|
||||
summary += "\n\n" + tool_history
|
||||
|
||||
return summary
|
||||
|
||||
async def _phase_llm_compact_split(
|
||||
self,
|
||||
conversation: Any,
|
||||
next_spec: NodeSpec,
|
||||
messages: list,
|
||||
_depth: int,
|
||||
) -> str:
|
||||
"""Split messages in half and summarise each half."""
|
||||
mid = max(1, len(messages) // 2)
|
||||
s1 = await self._phase_llm_compact(
|
||||
conversation, next_spec, messages[:mid], _depth + 1,
|
||||
)
|
||||
s2 = await self._phase_llm_compact(
|
||||
conversation, next_spec, messages[mid:], _depth + 1,
|
||||
)
|
||||
return s1 + "\n\n" + s2
|
||||
|
||||
async def execute(
|
||||
self,
|
||||
graph: GraphSpec,
|
||||
@@ -1291,9 +1399,7 @@ class GraphExecutor:
|
||||
# Set current phase for phase-aware compaction
|
||||
continuous_conversation.set_current_phase(next_spec.id)
|
||||
|
||||
# Opportunistic compaction at transition:
|
||||
# 1. Prune old tool results (free, no LLM call)
|
||||
# 2. If still over 80%, do a phase-graduated compact
|
||||
# Phase-boundary compaction (same flow as EventLoopNode._compact)
|
||||
if continuous_conversation.usage_ratio() > 0.5:
|
||||
await continuous_conversation.prune_old_tool_results(
|
||||
protect_tokens=2000,
|
||||
@@ -1305,40 +1411,66 @@ class GraphExecutor:
|
||||
_phase_ratio * 100,
|
||||
)
|
||||
_data_dir = (
|
||||
str(self._storage_path / "data") if self._storage_path else None
|
||||
str(self._storage_path / "data")
|
||||
if self._storage_path
|
||||
else None
|
||||
)
|
||||
# Step 1: Structural compaction (>=80%)
|
||||
if _data_dir:
|
||||
_pre = continuous_conversation.usage_ratio()
|
||||
await continuous_conversation.compact_preserving_structure(
|
||||
spillover_dir=_data_dir,
|
||||
keep_recent=4,
|
||||
phase_graduated=True,
|
||||
)
|
||||
# Circuit breaker: if still over budget, fall back
|
||||
_post_ratio = continuous_conversation.usage_ratio()
|
||||
if _post_ratio >= 0.9 * _phase_ratio:
|
||||
self.logger.warning(
|
||||
" Structure-preserving compaction ineffective "
|
||||
"(%.0f%% -> %.0f%%), falling back to summary",
|
||||
_phase_ratio * 100,
|
||||
_post_ratio * 100,
|
||||
)
|
||||
summary = (
|
||||
f"Summary of earlier phases (before {next_spec.name}). "
|
||||
"See transition markers for phase details."
|
||||
)
|
||||
await continuous_conversation.compact(
|
||||
summary,
|
||||
if continuous_conversation.usage_ratio() >= 0.9 * _pre:
|
||||
await continuous_conversation.compact_preserving_structure(
|
||||
spillover_dir=_data_dir,
|
||||
keep_recent=4,
|
||||
phase_graduated=True,
|
||||
aggressive=True,
|
||||
)
|
||||
else:
|
||||
|
||||
# Step 2: LLM compaction (>95%)
|
||||
if (
|
||||
continuous_conversation.usage_ratio() > 0.95
|
||||
and self._llm is not None
|
||||
):
|
||||
self.logger.info(
|
||||
" LLM phase-boundary compaction "
|
||||
"(%.0f%% usage)",
|
||||
continuous_conversation.usage_ratio() * 100,
|
||||
)
|
||||
try:
|
||||
_llm_summary = await self._phase_llm_compact(
|
||||
continuous_conversation,
|
||||
next_spec,
|
||||
list(continuous_conversation.messages),
|
||||
)
|
||||
await continuous_conversation.compact(
|
||||
_llm_summary,
|
||||
keep_recent=2,
|
||||
phase_graduated=True,
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.warning(
|
||||
" Phase LLM compaction failed: %s", e,
|
||||
)
|
||||
|
||||
# Step 3: Emergency (only if still over budget)
|
||||
if continuous_conversation.needs_compaction():
|
||||
self.logger.warning(
|
||||
" Emergency phase compaction (%.0f%%)",
|
||||
continuous_conversation.usage_ratio() * 100,
|
||||
)
|
||||
summary = (
|
||||
f"Summary of earlier phases (before {next_spec.name}). "
|
||||
f"Summary of earlier phases "
|
||||
f"(before {next_spec.name}). "
|
||||
"See transition markers for phase details."
|
||||
)
|
||||
await continuous_conversation.compact(
|
||||
summary,
|
||||
keep_recent=4,
|
||||
keep_recent=1,
|
||||
phase_graduated=True,
|
||||
)
|
||||
|
||||
|
||||
@@ -2,11 +2,12 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from framework.graph.conversation import Message, NodeConversation
|
||||
from framework.graph.conversation import Message, NodeConversation, extract_tool_call_history
|
||||
from framework.storage.conversation_store import FileConversationStore
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -930,3 +931,250 @@ class TestConversationIntegration:
|
||||
assert restored.next_seq == 4
|
||||
assert restored.messages[0].content == "new msg"
|
||||
assert restored.messages[0].seq == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers for aggressive compaction tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_tool_call(call_id: str, name: str, args: dict) -> dict:
|
||||
return {
|
||||
"id": call_id,
|
||||
"type": "function",
|
||||
"function": {"name": name, "arguments": json.dumps(args)},
|
||||
}
|
||||
|
||||
|
||||
async def _build_tool_heavy_conversation(
|
||||
store: MockConversationStore | None = None,
|
||||
) -> NodeConversation:
|
||||
"""Build a conversation with many tool call pairs.
|
||||
|
||||
Layout: user msg, then 5x (assistant with append_data tool_call + tool result),
|
||||
then 1x (assistant with set_output tool_call + tool result), then user msg + assistant msg.
|
||||
"""
|
||||
conv = NodeConversation(store=store)
|
||||
await conv.add_user_message("Process the data") # seq 0
|
||||
|
||||
for i in range(5):
|
||||
args = {"filename": "output.html", "content": "x" * 500}
|
||||
tc = [_make_tool_call(f"call_{i}", "append_data", args)]
|
||||
conv._messages.append(Message(
|
||||
seq=conv._next_seq, role="assistant",
|
||||
content=f"Appending part {i}", tool_calls=tc,
|
||||
))
|
||||
if store:
|
||||
await store.write_part(conv._next_seq, conv._messages[-1].to_storage_dict())
|
||||
conv._next_seq += 1
|
||||
conv._messages.append(Message(
|
||||
seq=conv._next_seq, role="tool",
|
||||
content='{"success": true}', tool_use_id=f"call_{i}",
|
||||
))
|
||||
if store:
|
||||
await store.write_part(conv._next_seq, conv._messages[-1].to_storage_dict())
|
||||
conv._next_seq += 1
|
||||
|
||||
# set_output call — must be protected
|
||||
so_tc = [_make_tool_call("call_so", "set_output", {"key": "result", "value": "done"})]
|
||||
conv._messages.append(
|
||||
Message(seq=conv._next_seq, role="assistant", content="Setting output", tool_calls=so_tc)
|
||||
)
|
||||
if store:
|
||||
await store.write_part(conv._next_seq, conv._messages[-1].to_storage_dict())
|
||||
conv._next_seq += 1
|
||||
conv._messages.append(Message(
|
||||
seq=conv._next_seq, role="tool",
|
||||
content="Output 'result' set successfully.",
|
||||
tool_use_id="call_so",
|
||||
))
|
||||
if store:
|
||||
await store.write_part(conv._next_seq, conv._messages[-1].to_storage_dict())
|
||||
conv._next_seq += 1
|
||||
|
||||
# Recent messages
|
||||
await conv.add_user_message("Continue")
|
||||
await conv.add_assistant_message("Working on it")
|
||||
return conv
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: aggressive structural compaction
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAggressiveStructuralCompaction:
|
||||
@pytest.mark.asyncio
|
||||
async def test_aggressive_collapses_tool_pairs(self, tmp_path):
|
||||
"""Aggressive mode should collapse non-essential tool pairs into a summary."""
|
||||
conv = await _build_tool_heavy_conversation()
|
||||
spill = str(tmp_path)
|
||||
|
||||
await conv.compact_preserving_structure(
|
||||
spillover_dir=spill, keep_recent=2, aggressive=True,
|
||||
)
|
||||
|
||||
# The 5 append_data pairs (10 msgs) + 1 user msg should be collapsed.
|
||||
# Remaining: ref_msg + set_output pair (2 msgs) + 2 recent = 5
|
||||
assert conv.message_count == 5
|
||||
assert conv.messages[0].role == "user" # ref message
|
||||
assert "TOOLS ALREADY CALLED" in conv.messages[0].content
|
||||
assert "append_data (5x)" in conv.messages[0].content
|
||||
|
||||
# set_output pair should be preserved
|
||||
assert conv.messages[1].role == "assistant"
|
||||
assert conv.messages[1].tool_calls is not None
|
||||
assert conv.messages[1].tool_calls[0]["function"]["name"] == "set_output"
|
||||
assert conv.messages[2].role == "tool"
|
||||
|
||||
# Recent messages intact
|
||||
assert conv.messages[3].content == "Continue"
|
||||
assert conv.messages[4].content == "Working on it"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_aggressive_preserves_set_output(self, tmp_path):
|
||||
"""set_output tool calls are always protected in aggressive mode."""
|
||||
conv = await _build_tool_heavy_conversation()
|
||||
spill = str(tmp_path)
|
||||
|
||||
await conv.compact_preserving_structure(
|
||||
spillover_dir=spill, keep_recent=2, aggressive=True,
|
||||
)
|
||||
|
||||
# Find all tool calls in remaining messages
|
||||
tool_names = []
|
||||
for msg in conv.messages:
|
||||
if msg.tool_calls:
|
||||
for tc in msg.tool_calls:
|
||||
tool_names.append(tc["function"]["name"])
|
||||
|
||||
assert "set_output" in tool_names
|
||||
# append_data should NOT be in remaining messages (collapsed)
|
||||
assert "append_data" not in tool_names
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_aggressive_preserves_errors(self, tmp_path):
|
||||
"""Error tool results are always protected in aggressive mode."""
|
||||
conv = NodeConversation()
|
||||
await conv.add_user_message("Start")
|
||||
|
||||
# Regular tool call
|
||||
tc1 = [_make_tool_call("call_ok", "web_search", {"query": "test"})]
|
||||
conv._messages.append(
|
||||
Message(seq=conv._next_seq, role="assistant", content="", tool_calls=tc1)
|
||||
)
|
||||
conv._next_seq += 1
|
||||
conv._messages.append(
|
||||
Message(seq=conv._next_seq, role="tool", content="results", tool_use_id="call_ok")
|
||||
)
|
||||
conv._next_seq += 1
|
||||
|
||||
# Error tool call
|
||||
tc2 = [_make_tool_call("call_err", "web_scrape", {"url": "http://broken.com"})]
|
||||
conv._messages.append(
|
||||
Message(seq=conv._next_seq, role="assistant", content="", tool_calls=tc2)
|
||||
)
|
||||
conv._next_seq += 1
|
||||
conv._messages.append(
|
||||
Message(
|
||||
seq=conv._next_seq, role="tool", content="Connection timeout",
|
||||
tool_use_id="call_err", is_error=True,
|
||||
)
|
||||
)
|
||||
conv._next_seq += 1
|
||||
|
||||
await conv.add_user_message("Next")
|
||||
await conv.add_assistant_message("OK")
|
||||
|
||||
spill = str(tmp_path)
|
||||
await conv.compact_preserving_structure(
|
||||
spillover_dir=spill, keep_recent=2, aggressive=True,
|
||||
)
|
||||
|
||||
# Error pair should be preserved
|
||||
error_msgs = [m for m in conv.messages if m.role == "tool" and m.is_error]
|
||||
assert len(error_msgs) == 1
|
||||
assert error_msgs[0].content == "Connection timeout"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_standard_mode_keeps_all_tool_pairs(self, tmp_path):
|
||||
"""Non-aggressive mode should keep all tool pairs (existing behavior)."""
|
||||
conv = await _build_tool_heavy_conversation()
|
||||
spill = str(tmp_path)
|
||||
|
||||
await conv.compact_preserving_structure(
|
||||
spillover_dir=spill, keep_recent=2, aggressive=False,
|
||||
)
|
||||
|
||||
# All 6 tool pairs (12 msgs) should be kept as structural.
|
||||
# Removed: 1 user msg (freeform). Remaining: ref + 12 structural + 2 recent = 15
|
||||
assert conv.message_count == 15
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_two_pass_sequence(self, tmp_path):
|
||||
"""Standard pass then aggressive pass produces valid result."""
|
||||
conv = await _build_tool_heavy_conversation()
|
||||
spill = str(tmp_path)
|
||||
|
||||
# Pass 1: standard
|
||||
await conv.compact_preserving_structure(
|
||||
spillover_dir=spill, keep_recent=2,
|
||||
)
|
||||
after_standard = conv.message_count
|
||||
assert after_standard == 15 # all structural kept
|
||||
|
||||
# Pass 2: aggressive
|
||||
await conv.compact_preserving_structure(
|
||||
spillover_dir=spill, keep_recent=2, aggressive=True,
|
||||
)
|
||||
after_aggressive = conv.message_count
|
||||
assert after_aggressive < after_standard
|
||||
# ref + set_output pair + 2 recent = 5
|
||||
assert after_aggressive == 5
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_aggressive_persists_correctly(self, tmp_path):
|
||||
"""Aggressive compaction correctly updates the store."""
|
||||
store = MockConversationStore()
|
||||
conv = await _build_tool_heavy_conversation(store=store)
|
||||
spill = str(tmp_path)
|
||||
|
||||
await conv.compact_preserving_structure(
|
||||
spillover_dir=spill, keep_recent=2, aggressive=True,
|
||||
)
|
||||
|
||||
# Verify store state matches in-memory state
|
||||
parts = await store.read_parts()
|
||||
assert len(parts) == conv.message_count
|
||||
|
||||
|
||||
class TestExtractToolCallHistory:
|
||||
def test_basic_extraction(self):
|
||||
msgs = [
|
||||
Message(seq=0, role="assistant", content="", tool_calls=[
|
||||
_make_tool_call("c1", "web_search", {"query": "python async"}),
|
||||
]),
|
||||
Message(seq=1, role="tool", content="results", tool_use_id="c1"),
|
||||
Message(seq=2, role="assistant", content="", tool_calls=[
|
||||
_make_tool_call("c2", "save_data", {"filename": "output.txt", "content": "data"}),
|
||||
]),
|
||||
Message(seq=3, role="tool", content="saved", tool_use_id="c2"),
|
||||
]
|
||||
result = extract_tool_call_history(msgs)
|
||||
assert "web_search (1x)" in result
|
||||
assert "save_data (1x)" in result
|
||||
assert "FILES SAVED: output.txt" in result
|
||||
|
||||
def test_errors_included(self):
|
||||
msgs = [
|
||||
Message(
|
||||
seq=0, role="tool", content="Connection refused",
|
||||
is_error=True, tool_use_id="c1",
|
||||
),
|
||||
]
|
||||
result = extract_tool_call_history(msgs)
|
||||
assert "ERRORS" in result
|
||||
assert "Connection refused" in result
|
||||
|
||||
def test_empty_messages(self):
|
||||
assert extract_tool_call_history([]) == ""
|
||||
|
||||
Reference in New Issue
Block a user