Merge pull request #4647 from TimothyZhang7/feature/memory-inheritance

feat:phased compaction and event bus integration
This commit is contained in:
Timothy @aden
2026-02-13 09:25:44 -08:00
committed by GitHub
8 changed files with 164 additions and 31 deletions
+52 -5
View File
@@ -512,22 +512,38 @@ class NodeConversation:
self._last_api_input_tokens = None
return count
async def compact(self, summary: str, keep_recent: int = 2) -> None:
async def compact(
self,
summary: str,
keep_recent: int = 2,
phase_graduated: bool = False,
) -> None:
"""Replace old messages with a summary, optionally keeping recent ones.
Args:
summary: Caller-provided summary text.
keep_recent: Number of recent messages to preserve (default 2).
Clamped to [0, len(messages) - 1].
phase_graduated: When True and messages have phase_id metadata,
split at phase boundaries instead of using keep_recent.
Keeps current + previous phase intact; compacts older phases.
"""
if not self._messages:
return
# Clamp: must discard at least 1 message
keep_recent = max(0, min(keep_recent, len(self._messages) - 1))
total = len(self._messages)
split = total - keep_recent if keep_recent > 0 else total
# Phase-graduated: find the split point based on phase boundaries.
# Keeps current phase + previous phase intact, compacts older phases.
if phase_graduated and self._current_phase:
split = self._find_phase_graduated_split()
else:
split = None
if split is None:
# Fallback: use keep_recent (non-phase or single-phase conversation)
keep_recent = max(0, min(keep_recent, total - 1))
split = total - keep_recent if keep_recent > 0 else total
# Advance split past orphaned tool results at the boundary.
# Tool-role messages reference a tool_use from the preceding
@@ -536,6 +552,10 @@ class NodeConversation:
while split < total and self._messages[split].role == "tool":
split += 1
# Nothing to compact
if split == 0:
return
old_messages = list(self._messages[:split])
recent_messages = list(self._messages[split:])
@@ -570,6 +590,33 @@ class NodeConversation:
self._messages = [summary_msg] + recent_messages
self._last_api_input_tokens = None # reset; next LLM call will recalibrate
def _find_phase_graduated_split(self) -> int | None:
"""Find split point that preserves current + previous phase.
Returns the index of the first message in the protected set,
or None if phase graduation doesn't apply (< 3 phases).
"""
# Collect distinct phases in order of first appearance
phases_seen: list[str] = []
for msg in self._messages:
if msg.phase_id and msg.phase_id not in phases_seen:
phases_seen.append(msg.phase_id)
# Need at least 3 phases for graduation to be meaningful
# (current + previous are protected, older get compacted)
if len(phases_seen) < 3:
return None
# Protect: current phase + previous phase
protected_phases = {phases_seen[-1], phases_seen[-2]}
# Find split: first message belonging to a protected phase
for i, msg in enumerate(self._messages):
if msg.phase_id in protected_phases:
return i
return None
async def clear(self) -> None:
"""Remove all messages, keep system prompt, preserve ``_next_seq``."""
if self._store:
+45 -13
View File
@@ -252,7 +252,11 @@ class EventLoopNode(NodeProtocol):
# already inserted by executor. Fresh accumulator for this phase.
# Phase already set by executor via set_current_phase().
conversation = ctx.inherited_conversation
conversation._output_keys = ctx.node_spec.output_keys or None
# Use cumulative output keys for compaction protection (all phases),
# falling back to current node's keys if not in continuous mode.
conversation._output_keys = (
ctx.cumulative_output_keys or ctx.node_spec.output_keys or None
)
accumulator = OutputAccumulator(store=self._conversation_store)
start_iteration = 0
else:
@@ -1706,40 +1710,53 @@ class EventLoopNode(NodeProtocol):
)
if not conversation.needs_compaction():
# Pruning freed enough — skip full compaction entirely
prune_before = round(ratio * 100)
prune_after = round(new_ratio * 100)
if ctx.runtime_logger:
ctx.runtime_logger.log_step(
node_id=ctx.node_id,
node_type="event_loop",
step_index=-1,
llm_text=f"Context pruned (tool results): "
f"{prune_before}% \u2192 {prune_after}%",
verdict="COMPACTION",
verdict_feedback=f"level=prune_only "
f"before={prune_before}% after={prune_after}%",
)
if self._event_bus:
from framework.runtime.event_bus import AgentEvent, EventType
await self._event_bus.publish(
AgentEvent(
type=EventType.CUSTOM,
type=EventType.CONTEXT_COMPACTED,
stream_id=ctx.node_id,
node_id=ctx.node_id,
data={
"custom_type": "node_compaction",
"node_id": ctx.node_id,
"level": "prune_only",
"usage_before": round(ratio * 100),
"usage_after": round(new_ratio * 100),
"usage_before": prune_before,
"usage_after": prune_after,
},
)
)
return
ratio = new_ratio
_phase_grad = getattr(ctx, "continuous_mode", False)
if ratio >= 1.2:
level = "emergency"
logger.warning("Emergency compaction triggered (usage %.0f%%)", ratio * 100)
summary = self._build_emergency_summary(ctx, accumulator, conversation)
await conversation.compact(summary, keep_recent=1)
await conversation.compact(summary, keep_recent=1, phase_graduated=_phase_grad)
elif ratio >= 1.0:
level = "aggressive"
logger.info("Aggressive compaction triggered (usage %.0f%%)", ratio * 100)
summary = await self._generate_compaction_summary(ctx, conversation)
await conversation.compact(summary, keep_recent=2)
await conversation.compact(summary, keep_recent=2, phase_graduated=_phase_grad)
else:
level = "normal"
summary = await self._generate_compaction_summary(ctx, conversation)
await conversation.compact(summary, keep_recent=4)
await conversation.compact(summary, keep_recent=4, phase_graduated=_phase_grad)
new_ratio = conversation.usage_ratio()
logger.info(
@@ -1748,17 +1765,29 @@ class EventLoopNode(NodeProtocol):
ratio * 100,
new_ratio * 100,
)
# Log compaction to session logs (tool_logs.jsonl)
before_pct = round(ratio * 100)
after_pct = round(new_ratio * 100)
if ctx.runtime_logger:
ctx.runtime_logger.log_step(
node_id=ctx.node_id,
node_type="event_loop",
step_index=-1, # Not a regular LLM step
llm_text=f"Context compacted ({level}): {before_pct}% \u2192 {after_pct}%",
verdict="COMPACTION",
verdict_feedback=f"level={level} before={before_pct}% after={after_pct}%",
)
if self._event_bus:
from framework.runtime.event_bus import AgentEvent, EventType
await self._event_bus.publish(
AgentEvent(
type=EventType.CUSTOM,
type=EventType.CONTEXT_COMPACTED,
stream_id=ctx.node_id,
node_id=ctx.node_id,
data={
"custom_type": "node_compaction",
"node_id": ctx.node_id,
"level": level,
"usage_before": round(ratio * 100),
"usage_after": round(new_ratio * 100),
@@ -1789,13 +1818,16 @@ class EventLoopNode(NodeProtocol):
f"{tool_history}"
)
# Dynamic budget: reasoning models (o1, gpt-5-mini) spend max_tokens on
# internal thinking. 500 leaves nothing for the actual summary.
summary_budget = max(1024, self._config.max_history_tokens // 10)
try:
response = ctx.llm.complete(
messages=[{"role": "user", "content": prompt}],
system=(
"Summarize conversations concisely. Always preserve the tool history section."
),
max_tokens=500,
max_tokens=summary_budget,
)
summary = response.content
# Ensure tool history is present even if LLM dropped it
+38 -4
View File
@@ -225,8 +225,11 @@ class GraphExecutor:
timestamps = state_data.setdefault("timestamps", {})
timestamps["updated_at"] = datetime.now().isoformat()
# Memory keys (lightweight — just keys, not values)
state_data["memory_keys"] = list(memory.read_all().keys())
# Persist full memory so state.json is sufficient for resume
# even if the process dies before the final write.
memory_snapshot = memory.read_all()
state_data["memory"] = memory_snapshot
state_data["memory_keys"] = list(memory_snapshot.keys())
state_path.write_text(_json.dumps(state_data, indent=2), encoding="utf-8")
except Exception:
@@ -308,6 +311,7 @@ class GraphExecutor:
continuous_conversation = None # NodeConversation threaded across nodes
cumulative_tools: list = [] # Tools accumulate, never removed
cumulative_tool_names: set[str] = set()
cumulative_output_keys: list[str] = [] # Output keys from all visited nodes
# Initialize checkpoint store if checkpointing is enabled
checkpoint_store: CheckpointStore | None = None
@@ -589,12 +593,16 @@ class GraphExecutor:
self.logger.info(f" Inputs: {node_spec.input_keys}")
self.logger.info(f" Outputs: {node_spec.output_keys}")
# Continuous mode: accumulate tools from this node
# Continuous mode: accumulate tools and output keys from this node
if is_continuous and node_spec.tools:
for t in self.tools:
if t.name in node_spec.tools and t.name not in cumulative_tool_names:
cumulative_tools.append(t)
cumulative_tool_names.add(t.name)
if is_continuous and node_spec.output_keys:
for k in node_spec.output_keys:
if k not in cumulative_output_keys:
cumulative_output_keys.append(k)
# Build context for node
ctx = self._build_context(
@@ -606,6 +614,7 @@ class GraphExecutor:
continuous_mode=is_continuous,
inherited_conversation=continuous_conversation if is_continuous else None,
override_tools=cumulative_tools if is_continuous else None,
cumulative_output_keys=cumulative_output_keys if is_continuous else None,
)
# Log actual input data being read
@@ -864,6 +873,8 @@ class GraphExecutor:
"paused_at": node_spec.id,
"resume_from": f"{node_spec.id}_resume", # Resume key
"memory": saved_memory,
"execution_path": list(path),
"node_visit_counts": dict(node_visit_counts),
"next_node": None, # Will resume from entry point
}
@@ -1068,11 +1079,27 @@ class GraphExecutor:
# Set current phase for phase-aware compaction
continuous_conversation.set_current_phase(next_spec.id)
# Opportunistic compaction at transition
# Opportunistic compaction at transition:
# 1. Prune old tool results (free, no LLM call)
# 2. If still over 80%, do a phase-graduated compact
if continuous_conversation.usage_ratio() > 0.5:
await continuous_conversation.prune_old_tool_results(
protect_tokens=2000,
)
if continuous_conversation.needs_compaction():
self.logger.info(
" Phase-boundary compaction (%.0f%% usage)",
continuous_conversation.usage_ratio() * 100,
)
summary = (
f"Summary of earlier phases (before {next_spec.name}). "
"See transition markers for phase details."
)
await continuous_conversation.compact(
summary,
keep_recent=4,
phase_graduated=True,
)
# Update input_data for next node
input_data = result.output
@@ -1127,6 +1154,11 @@ class GraphExecutor:
had_partial_failures=len(nodes_failed) > 0,
execution_quality=exec_quality,
node_visit_counts=dict(node_visit_counts),
session_state={
"memory": output, # output IS memory.read_all()
"execution_path": list(path),
"node_visit_counts": dict(node_visit_counts),
},
)
except asyncio.CancelledError:
@@ -1272,6 +1304,7 @@ class GraphExecutor:
continuous_mode: bool = False,
inherited_conversation: Any = None,
override_tools: list | None = None,
cumulative_output_keys: list[str] | None = None,
) -> NodeContext:
"""Build execution context for a node."""
# Filter tools to those available to this node
@@ -1304,6 +1337,7 @@ class GraphExecutor:
pause_event=self._pause_requested, # Pass pause event for granular control
continuous_mode=continuous_mode,
inherited_conversation=inherited_conversation,
cumulative_output_keys=cumulative_output_keys or [],
)
# Valid node types - no ambiguous "llm" type allowed
+1
View File
@@ -496,6 +496,7 @@ class NodeContext:
# Continuous conversation mode
continuous_mode: bool = False # True when graph has conversation_mode="continuous"
inherited_conversation: Any = None # NodeConversation | None (from prior node)
cumulative_output_keys: list[str] = field(default_factory=list) # All output keys from path
@dataclass
+6 -1
View File
@@ -302,7 +302,12 @@ class LiteLLMProvider(LLMProvider):
# Extract content
content = response.choices[0].message.content or ""
# Get usage info
# Get usage info.
# NOTE: completion_tokens includes reasoning/thinking tokens for models
# that use them (o1, gpt-5-mini, etc.). LiteLLM does not reliably expose
# usage.completion_tokens_details.reasoning_tokens across all providers.
# This means output_tokens may be inflated for reasoning models.
# Compaction is unaffected — it uses prompt_tokens (input-side only).
usage = response.usage
input_tokens = usage.prompt_tokens if usage else 0
output_tokens = usage.completion_tokens if usage else 0
+3
View File
@@ -63,6 +63,9 @@ class EventType(StrEnum):
NODE_INPUT_BLOCKED = "node_input_blocked"
NODE_STALLED = "node_stalled"
# Context management
CONTEXT_COMPACTED = "context_compacted"
# Custom events
CUSTOM = "custom"
+14 -8
View File
@@ -158,12 +158,11 @@ class SessionState(BaseModel):
def is_resumable(self) -> bool:
"""Can this session be resumed?
A session is resumable when it stopped mid-execution (paused or
failed) and we know which node to resume from.
Every non-completed session is resumable. If resume_from/paused_at
aren't set, the executor falls back to the graph entry point —
so we don't gate on those. Even catastrophic failures are resumable.
"""
if self.status not in (SessionStatus.PAUSED, SessionStatus.FAILED):
return False
return self.progress.resume_from is not None or self.progress.paused_at is not None
return self.status != SessionStatus.COMPLETED
@computed_field
@property
@@ -285,9 +284,16 @@ class SessionState(BaseModel):
def to_session_state_dict(self) -> dict[str, Any]:
"""Convert to session_state format for GraphExecutor.execute()."""
# Derive resume target: explicit > last node in path > entry point
resume_from = (
self.progress.resume_from
or self.progress.paused_at
or (self.progress.path[-1] if self.progress.path else None)
)
return {
"paused_at": self.progress.paused_at,
"resume_from": self.progress.resume_from,
"paused_at": resume_from,
"resume_from": resume_from,
"memory": self.memory,
"next_node": None,
"execution_path": self.progress.path,
"node_visit_counts": self.progress.node_visit_counts,
}
+5
View File
@@ -350,6 +350,7 @@ class AdenTUI(App):
EventType.CONSTRAINT_VIOLATION,
EventType.STATE_CHANGED,
EventType.NODE_INPUT_BLOCKED,
EventType.CONTEXT_COMPACTED,
]
_LOG_PANE_EVENTS = frozenset(_EVENT_TYPES) - {
@@ -464,6 +465,10 @@ class AdenTUI(App):
self.status_bar.set_node_detail("thinking...")
elif et == EventType.NODE_STALLED:
self.status_bar.set_node_detail(f"stalled: {event.data.get('reason', '')}")
elif et == EventType.CONTEXT_COMPACTED:
before = event.data.get("usage_before", "?")
after = event.data.get("usage_after", "?")
self.status_bar.set_node_detail(f"compacted: {before}% \u2192 {after}%")
# --- Log pane events ---
if et in self._LOG_PANE_EVENTS: