refactor(event-loop): slim event loop node orchestration

This commit is contained in:
Sundaram Kumar Jha
2026-03-24 01:00:08 +05:30
parent b43044cf4d
commit d648f3d315
+84 -540
View File
@@ -18,12 +18,78 @@ import os
import re
import time
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Literal, Protocol, runtime_checkable
from typing import Any
from framework.graph.conversation import ConversationStore, NodeConversation
from framework.graph.event_loop import types as event_loop_types
from framework.graph.event_loop.compaction import (
_llm_compact_split,
build_emergency_summary,
build_llm_compaction_prompt,
compact,
format_messages_for_summary,
llm_compact,
)
from framework.graph.event_loop.cursor_persistence import (
RestoredState,
check_pause,
drain_injection_queue,
drain_trigger_queue,
restore,
write_cursor,
)
from framework.graph.event_loop.event_publishing import (
generate_action_plan,
log_skip_judge,
publish_context_usage,
publish_iteration,
publish_judge_verdict,
publish_llm_turn_complete,
publish_loop_completed,
publish_loop_started,
publish_output_key_set,
publish_stalled,
publish_text_delta,
publish_tool_completed,
publish_tool_started,
run_hooks,
)
from framework.graph.event_loop.judge_pipeline import (
SubagentJudge as SharedSubagentJudge,
judge_turn,
)
from framework.graph.event_loop.stall_detector import (
fingerprint_tool_calls,
is_stalled,
is_tool_doom_loop,
ngram_similarity,
)
from framework.graph.event_loop.subagent_executor import execute_subagent
from framework.graph.event_loop.synthetic_tools import (
build_ask_user_multiple_tool,
build_ask_user_tool,
build_delegate_tool,
build_escalate_tool,
build_report_to_parent_tool,
build_set_output_tool,
handle_set_output,
)
from framework.graph.event_loop.tool_result_handler import (
build_json_preview,
execute_tool,
extract_json_metadata,
is_transient_error,
record_learning,
restore_spill_counter,
truncate_tool_result,
)
from framework.graph.event_loop.types import (
JudgeProtocol,
JudgeVerdict,
TriggerEvent,
)
from framework.graph.node import NodeContext, NodeProtocol, NodeResult
from framework.llm.capabilities import supports_image_tool_results
from framework.llm.provider import Tool, ToolResult, ToolUse
@@ -73,7 +139,7 @@ async def _describe_images_as_text(image_content: list[dict[str, Any]]) -> str |
if description:
count = len(image_content)
label = "image" if count == 1 else f"{count} images"
return f"[{label} attached - description: {description}]"
return f"[{label} attached \u2014 description: {description}]"
except Exception as exc:
logger.debug("Vision fallback model '%s' failed: %s", model, exc)
continue
@@ -81,20 +147,6 @@ async def _describe_images_as_text(image_content: list[dict[str, Any]]) -> str |
return None
@dataclass
class TriggerEvent:
"""A framework-level trigger signal (timer tick or webhook hit).
Triggers are queued separately from user messages / external events
and drained atomically so the LLM sees all pending triggers at once.
"""
trigger_type: str # "timer" | "webhook"
source_id: str # entry point ID or webhook route ID
payload: dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
# Pattern for detecting context-window-exceeded errors across LLM providers.
_CONTEXT_TOO_LARGE_RE = re.compile(
r"context.{0,20}(length|window|limit|size)|"
@@ -162,268 +214,12 @@ class TurnCancelled(Exception):
pass
@dataclass
class JudgeVerdict:
"""Result of judge evaluation for the event loop."""
action: Literal["ACCEPT", "RETRY", "ESCALATE"]
# None = no evaluation happened (skip_judge, tool-continue); not logged.
# "" = evaluated but no feedback; logged with default text.
# "..." = evaluated with feedback; logged as-is.
feedback: str | None = None
@runtime_checkable
class JudgeProtocol(Protocol):
"""Protocol for event-loop judges.
Implementations evaluate the current state of the event loop and
decide whether to accept the output, retry with feedback, or escalate.
"""
async def evaluate(self, context: dict[str, Any]) -> JudgeVerdict: ...
class SubagentJudge:
"""Judge for subagent execution.
Accepts immediately when all required output keys are filled,
regardless of whether real tool calls were also made in the same turn.
On RETRY, reminds the subagent of its specific task with progressive
urgency based on remaining iterations.
"""
def __init__(self, task: str, max_iterations: int = 10):
self._task = task
self._max_iterations = max_iterations
async def evaluate(self, context: dict[str, Any]) -> JudgeVerdict:
missing = context.get("missing_keys", [])
if not missing:
return JudgeVerdict(action="ACCEPT", feedback="")
iteration = context.get("iteration", 0)
remaining = self._max_iterations - iteration - 1
if remaining <= 3:
urgency = (
f"URGENT: Only {remaining} iterations left. "
f"Stop all other work and call set_output NOW for: {missing}"
)
elif remaining <= self._max_iterations // 2:
urgency = (
f"WARNING: {remaining} iterations remaining. "
f"You must call set_output for: {missing}"
)
else:
urgency = f"Missing output keys: {missing}. Use set_output to provide them."
return JudgeVerdict(action="RETRY", feedback=f"Your task: {self._task}\n{urgency}")
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
@dataclass
class LoopConfig:
"""Configuration for the event loop."""
max_iterations: int = 50
max_tool_calls_per_turn: int = 30
judge_every_n_turns: int = 1
stall_detection_threshold: int = 3
stall_similarity_threshold: float = 0.85
max_context_tokens: int = 32_000
store_prefix: str = ""
# Overflow margin for max_tool_calls_per_turn. Tool calls are only
# discarded when the count exceeds max_tool_calls_per_turn * (1 + margin).
# Default 0.5 means 50% wiggle room (e.g. limit=10 → hard cutoff at 15).
tool_call_overflow_margin: float = 0.5
# --- Tool result context management ---
# When a tool result exceeds this character count, it is truncated in the
# conversation context. If *spillover_dir* is set the full result is
# written to a file and the truncated message includes the filename so
# the agent can retrieve it with load_data(). If *spillover_dir* is
# ``None`` the result is simply truncated with an explanatory note.
max_tool_result_chars: int = 30_000
spillover_dir: str | None = None # Path string; created on first use
# --- set_output value spilling ---
# When a set_output value exceeds this character count it is auto-saved
# to a file in *spillover_dir* and the stored value is replaced with a
# lightweight file reference. This keeps shared memory / adapt.md /
# transition markers small and forces the next node to load the full
# data from the file. Set to 0 to disable.
max_output_value_chars: int = 2_000
# --- Stream retry (transient error recovery within EventLoopNode) ---
# When _run_single_turn() raises a transient error (network, rate limit,
# server error), retry up to this many times with exponential backoff
# before re-raising. Set to 0 to disable.
max_stream_retries: int = 3
stream_retry_backoff_base: float = 2.0
stream_retry_max_delay: float = 60.0 # cap per-retry sleep
# --- Tool doom loop detection ---
# Detect when the LLM calls the same tool(s) with identical args for
# N consecutive turns. For client-facing nodes, blocks for user input.
# For non-client-facing nodes, injects a warning into the conversation.
tool_doom_loop_threshold: int = 3
# --- Client-facing auto-block grace period ---
# When a client-facing node produces text-only turns (no tools, no
# set_output), the judge is skipped for this many consecutive auto-block
# turns. After the grace period, the judge runs to apply RETRY pressure
# on models stuck in a clarification loop. Explicit ask_user() calls
# always skip the judge regardless of this setting.
cf_grace_turns: int = 1
tool_doom_loop_enabled: bool = True
# --- Per-tool-call timeout ---
# Maximum seconds a single tool call may take before being killed.
# Prevents hung MCP servers (especially browser/GCU tools) from
# blocking the entire event loop indefinitely. 0 = no timeout.
tool_call_timeout_seconds: float = 60.0
# --- Subagent delegation timeout ---
# Maximum seconds a delegate_to_sub_agent call may run before being
# killed. Subagents run a full event-loop so they naturally take
# longer than a single tool call — default is 10 minutes. 0 = no timeout.
subagent_timeout_seconds: float = 600.0
# --- Lifecycle hooks ---
# Hooks are async callables keyed by event name. Supported events:
# "session_start" — fires once after the first user message is added,
# before the first LLM turn. trigger = initial message.
# "external_message" — fires when inject_notification() delivers a message.
# trigger = injected message text.
# Each hook receives a HookContext and may return a HookResult to patch
# the system prompt and/or inject a follow-up user message.
hooks: dict[str, list] = None # dict[str, list[HookFn]] (None → no hooks)
def __post_init__(self) -> None:
if self.hooks is None:
object.__setattr__(self, "hooks", {})
# ---------------------------------------------------------------------------
# Hook types
# ---------------------------------------------------------------------------
@dataclass
class HookContext:
"""Context passed to every lifecycle hook."""
event: str # event name, e.g. "session_start"
trigger: str | None # message that triggered the hook, if any
system_prompt: str # current system prompt at hook invocation time
@dataclass
class HookResult:
"""What a hook may return to modify node state."""
system_prompt: str | None = None # replace current system prompt
inject: str | None = None # inject an additional user message
# ---------------------------------------------------------------------------
# Output accumulator with write-through persistence
# ---------------------------------------------------------------------------
@dataclass
class OutputAccumulator:
"""Accumulates output key-value pairs with optional write-through persistence.
Values are stored in memory and optionally written through to a
ConversationStore's cursor data for crash recovery.
When *spillover_dir* and *max_value_chars* are set, large values are
automatically saved to files and replaced with lightweight file
references. This guarantees auto-spill fires on **every** ``set()``
call regardless of code path (resume, checkpoint restore, etc.).
"""
values: dict[str, Any] = field(default_factory=dict)
store: ConversationStore | None = None
spillover_dir: str | None = None
max_value_chars: int = 0 # 0 = disabled
async def set(self, key: str, value: Any) -> None:
"""Set a key-value pair, auto-spilling large values to files.
When the serialised value exceeds *max_value_chars*, the data is
saved to ``<spillover_dir>/output_<key>.<ext>`` and *value* is
replaced with a compact file-reference string.
"""
value = self._auto_spill(key, value)
self.values[key] = value
if self.store:
cursor = await self.store.read_cursor() or {}
outputs = cursor.get("outputs", {})
outputs[key] = value
cursor["outputs"] = outputs
await self.store.write_cursor(cursor)
def _auto_spill(self, key: str, value: Any) -> Any:
"""Save large values to a file and return a reference string."""
if self.max_value_chars <= 0 or not self.spillover_dir:
return value
val_str = json.dumps(value, ensure_ascii=False) if not isinstance(value, str) else value
if len(val_str) <= self.max_value_chars:
return value
spill_path = Path(self.spillover_dir)
spill_path.mkdir(parents=True, exist_ok=True)
ext = ".json" if isinstance(value, (dict, list)) else ".txt"
filename = f"output_{key}{ext}"
write_content = (
json.dumps(value, indent=2, ensure_ascii=False)
if isinstance(value, (dict, list))
else str(value)
)
(spill_path / filename).write_text(write_content, encoding="utf-8")
file_size = (spill_path / filename).stat().st_size
logger.info(
"set_output value auto-spilled: key=%s, %d chars → %s (%d bytes)",
key,
len(val_str),
filename,
file_size,
)
return (
f"[Saved to '{filename}' ({file_size:,} bytes). "
f"Use load_data(filename='{filename}') "
f"to access full data.]"
)
def get(self, key: str) -> Any | None:
"""Get a value by key, or None if not present."""
return self.values.get(key)
def to_dict(self) -> dict[str, Any]:
"""Return a copy of all accumulated values."""
return dict(self.values)
def has_all_keys(self, required: list[str]) -> bool:
"""Check if all required keys have been set (non-None)."""
return all(key in self.values and self.values[key] is not None for key in required)
@classmethod
async def restore(cls, store: ConversationStore) -> OutputAccumulator:
"""Restore an OutputAccumulator from a store's cursor data."""
cursor = await store.read_cursor()
values = {}
if cursor and "outputs" in cursor:
values = cursor["outputs"]
return cls(values=values, store=store)
# Re-export shared event-loop types from the legacy parent module.
SubagentJudge = SharedSubagentJudge
LoopConfig = event_loop_types.LoopConfig
HookContext = event_loop_types.HookContext
HookResult = event_loop_types.HookResult
OutputAccumulator = event_loop_types.OutputAccumulator
# ---------------------------------------------------------------------------
@@ -2983,40 +2779,28 @@ class EventLoopNode(NodeProtocol):
def _build_ask_user_tool(self) -> Tool:
"""Build the synthetic ask_user tool. Delegates to synthetic_tools module."""
from framework.graph.event_loop.synthetic_tools import build_ask_user_tool
return build_ask_user_tool()
def _build_ask_user_multiple_tool(self) -> Tool:
"""Build the synthetic ask_user_multiple tool. Delegates to synthetic_tools module."""
from framework.graph.event_loop.synthetic_tools import build_ask_user_multiple_tool
return build_ask_user_multiple_tool()
def _build_set_output_tool(self, output_keys: list[str] | None) -> Tool | None:
"""Build the synthetic set_output tool. Delegates to synthetic_tools module."""
from framework.graph.event_loop.synthetic_tools import build_set_output_tool
return build_set_output_tool(output_keys)
def _build_escalate_tool(self) -> Tool:
"""Build the synthetic escalate tool. Delegates to synthetic_tools module."""
from framework.graph.event_loop.synthetic_tools import build_escalate_tool
return build_escalate_tool()
def _build_delegate_tool(
self, sub_agents: list[str], node_registry: dict[str, Any]
) -> Tool | None:
"""Build the synthetic delegate_to_sub_agent tool. Delegates to synthetic_tools module."""
from framework.graph.event_loop.synthetic_tools import build_delegate_tool
return build_delegate_tool(sub_agents, node_registry)
def _build_report_to_parent_tool(self) -> Tool:
"""Build the synthetic report_to_parent tool. Delegates to synthetic_tools module."""
from framework.graph.event_loop.synthetic_tools import build_report_to_parent_tool
return build_report_to_parent_tool()
def _handle_set_output(
@@ -3025,8 +2809,6 @@ class EventLoopNode(NodeProtocol):
output_keys: list[str] | None,
) -> ToolResult:
"""Handle set_output tool call. Delegates to synthetic_tools module."""
from framework.graph.event_loop.synthetic_tools import handle_set_output
return handle_set_output(tool_input, output_keys)
# -------------------------------------------------------------------
@@ -3043,8 +2825,6 @@ class EventLoopNode(NodeProtocol):
iteration: int,
) -> JudgeVerdict:
"""Evaluate the current state. Delegates to judge_pipeline module."""
from framework.graph.event_loop.judge_pipeline import judge_turn
return await judge_turn(
mark_complete_flag=self._mark_complete_flag,
judge=self._judge,
@@ -3114,14 +2894,10 @@ class EventLoopNode(NodeProtocol):
@staticmethod
def _ngram_similarity(s1: str, s2: str, n: int = 2) -> float:
"""Jaccard similarity of n-gram sets. Delegates to stall_detector module."""
from framework.graph.event_loop.stall_detector import ngram_similarity
return ngram_similarity(s1, s2, n)
def _is_stalled(self, recent_responses: list[str]) -> bool:
"""Detect stall using n-gram similarity. Delegates to stall_detector module."""
from framework.graph.event_loop.stall_detector import is_stalled
return is_stalled(
recent_responses,
self._config.stall_detection_threshold,
@@ -3131,8 +2907,6 @@ class EventLoopNode(NodeProtocol):
@staticmethod
def _is_transient_error(exc: BaseException) -> bool:
"""Classify whether an exception is transient. Delegates to tool_result_handler module."""
from framework.graph.event_loop.tool_result_handler import is_transient_error
return is_transient_error(exc)
@staticmethod
@@ -3140,8 +2914,6 @@ class EventLoopNode(NodeProtocol):
tool_results: list[dict],
) -> list[tuple[str, str]]:
"""Create deterministic fingerprints. Delegates to stall_detector module."""
from framework.graph.event_loop.stall_detector import fingerprint_tool_calls
return fingerprint_tool_calls(tool_results)
def _is_tool_doom_loop(
@@ -3149,8 +2921,6 @@ class EventLoopNode(NodeProtocol):
recent_tool_fingerprints: list[list[tuple[str, str]]],
) -> tuple[bool, str]:
"""Detect doom loop. Delegates to stall_detector module."""
from framework.graph.event_loop.stall_detector import is_tool_doom_loop
return is_tool_doom_loop(
recent_tool_fingerprints=recent_tool_fingerprints,
threshold=self._config.tool_doom_loop_threshold,
@@ -3166,8 +2936,6 @@ class EventLoopNode(NodeProtocol):
sync executors (MCP STDIO tools that block on ``future.result()``)
don't freeze the event loop.
"""
from framework.graph.event_loop.tool_result_handler import execute_tool
return await execute_tool(
tool_executor=self._tool_executor,
tc=tc,
@@ -3183,8 +2951,6 @@ class EventLoopNode(NodeProtocol):
adapt.md is injected into the system prompt, these persist through
any compaction.
"""
from framework.graph.event_loop.tool_result_handler import record_learning
return record_learning(
key=key,
value=value,
@@ -3200,8 +2966,6 @@ class EventLoopNode(NodeProtocol):
def _restore_spill_counter(self) -> None:
"""Scan spillover_dir for existing spill files and restore the counter."""
from framework.graph.event_loop.tool_result_handler import restore_spill_counter
self._spill_counter = restore_spill_counter(
spillover_dir=self._config.spillover_dir,
)
@@ -3219,8 +2983,6 @@ class EventLoopNode(NodeProtocol):
Returns an empty string for simple scalars.
"""
from framework.graph.event_loop.tool_result_handler import extract_json_metadata
return extract_json_metadata(
parsed=parsed,
)
@@ -3234,8 +2996,6 @@ class EventLoopNode(NodeProtocol):
Returns ``None`` if no truncation was needed (no large arrays).
"""
from framework.graph.event_loop.tool_result_handler import build_json_preview
return build_json_preview(
parsed=parsed,
max_chars=max_chars,
@@ -3258,8 +3018,6 @@ class EventLoopNode(NodeProtocol):
- Errors: pass through unchanged
- load_data results: truncate with pagination hint (no re-spill)
"""
from framework.graph.event_loop.tool_result_handler import truncate_tool_result
return truncate_tool_result(
result=result,
tool_name=tool_name,
@@ -3291,8 +3049,6 @@ class EventLoopNode(NodeProtocol):
does not fully resolve the budget.
4. Emergency deterministic summary only if LLM failed or unavailable.
"""
from framework.graph.event_loop.compaction import compact
return await compact(
ctx=ctx,
conversation=conversation,
@@ -3301,9 +3057,6 @@ class EventLoopNode(NodeProtocol):
event_bus=self._event_bus,
char_limit=self._LLM_COMPACT_CHAR_LIMIT,
max_depth=self._LLM_COMPACT_MAX_DEPTH,
build_message_inventory_fn=self._build_message_inventory,
publish_context_usage_fn=self._publish_context_usage,
write_debug_log_fn=self._write_compaction_debug_log,
)
# --- LLM compaction with binary-search splitting ----------------------
@@ -3322,8 +3075,6 @@ class EventLoopNode(NodeProtocol):
in half and each half is summarised independently. Tool history is
appended once at the top-level call (``_depth == 0``).
"""
from framework.graph.event_loop.compaction import llm_compact
return await llm_compact(
ctx=ctx,
messages=messages,
@@ -3342,8 +3093,6 @@ class EventLoopNode(NodeProtocol):
_depth: int,
) -> str:
"""Split messages in half and summarise each half independently."""
from framework.graph.event_loop.compaction import _llm_compact_split
return await _llm_compact_split(
ctx=ctx,
messages=messages,
@@ -3359,8 +3108,6 @@ class EventLoopNode(NodeProtocol):
@staticmethod
def _format_messages_for_summary(messages: list) -> str:
"""Format messages as text for LLM summarisation."""
from framework.graph.event_loop.compaction import format_messages_for_summary
return format_messages_for_summary(messages)
def _build_llm_compaction_prompt(
@@ -3370,8 +3117,6 @@ class EventLoopNode(NodeProtocol):
formatted_messages: str,
) -> str:
"""Build prompt for LLM compaction targeting 50% of token budget."""
from framework.graph.event_loop.compaction import build_llm_compaction_prompt
return build_llm_compaction_prompt(
ctx,
accumulator,
@@ -3379,143 +3124,6 @@ class EventLoopNode(NodeProtocol):
max_context_tokens=self._config.max_context_tokens,
)
@staticmethod
def _build_message_inventory(
conversation: NodeConversation,
) -> list[dict[str, Any]]:
"""Build a per-message size inventory for debug logging."""
inventory: list[dict[str, Any]] = []
for m in conversation.messages:
content_chars = len(m.content)
tc_chars = 0
tool_name = None
if m.tool_calls:
for tc in m.tool_calls:
args = tc.get("function", {}).get("arguments", "")
tc_chars += len(args) if isinstance(args, str) else len(json.dumps(args))
names = [tc.get("function", {}).get("name", "?") for tc in m.tool_calls]
tool_name = ", ".join(names)
elif m.role == "tool" and m.tool_use_id:
for prev in conversation.messages:
if prev.tool_calls:
for tc in prev.tool_calls:
if tc.get("id") == m.tool_use_id:
tool_name = tc.get("function", {}).get("name", "?")
break
if tool_name:
break
entry: dict[str, Any] = {
"seq": m.seq,
"role": m.role,
"content_chars": content_chars,
}
if tc_chars:
entry["tool_call_args_chars"] = tc_chars
if tool_name:
entry["tool"] = tool_name
if m.is_error:
entry["is_error"] = True
if m.phase_id:
entry["phase"] = m.phase_id
if content_chars > 2000:
entry["preview"] = m.content[:200] + ""
inventory.append(entry)
return inventory
async def _log_compaction(
self,
ctx: NodeContext,
conversation: NodeConversation,
ratio_before: float,
pre_inventory: list[dict[str, Any]] | None = None,
) -> None:
"""Log compaction result to runtime logger and event bus."""
from framework.graph.event_loop.compaction import log_compaction
return await log_compaction(
ctx,
conversation,
ratio_before,
self._event_bus,
pre_inventory=pre_inventory,
publish_context_usage_fn=self._publish_context_usage,
write_debug_log_fn=self._write_compaction_debug_log,
)
@staticmethod
def _write_compaction_debug_log(
ctx: NodeContext,
before_pct: int,
after_pct: int,
level: str,
inventory: list[dict[str, Any]] | None,
) -> None:
"""Write detailed compaction analysis to ~/.hive/compaction_log/."""
log_dir = Path.home() / ".hive" / "compaction_log"
log_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now(UTC).strftime("%Y%m%dT%H%M%S_%f")
node_label = ctx.node_id.replace("/", "_")
log_path = log_dir / f"{ts}_{node_label}.md"
lines: list[str] = [
f"# Compaction Debug — {ctx.node_id}",
f"**Time:** {datetime.now(UTC).isoformat()}",
f"**Node:** {ctx.node_spec.name} (`{ctx.node_id}`)",
]
if ctx.stream_id:
lines.append(f"**Stream:** {ctx.stream_id}")
lines.append(f"**Level:** {level}")
lines.append(f"**Usage:** {before_pct}% → {after_pct}%")
lines.append("")
if inventory:
total_chars = sum(
e.get("content_chars", 0) + e.get("tool_call_args_chars", 0) for e in inventory
)
lines.append(
f"## Pre-Compaction Message Inventory "
f"({len(inventory)} messages, {total_chars:,} total chars)"
)
lines.append("")
ranked = sorted(
inventory,
key=lambda e: e.get("content_chars", 0) + e.get("tool_call_args_chars", 0),
reverse=True,
)
lines.append("| # | seq | role | tool | chars | % of total | flags |")
lines.append("|---|-----|------|------|------:|------------|-------|")
for i, entry in enumerate(ranked, 1):
chars = entry.get("content_chars", 0) + entry.get("tool_call_args_chars", 0)
pct = (chars / total_chars * 100) if total_chars else 0
tool = entry.get("tool", "")
flags = []
if entry.get("is_error"):
flags.append("error")
if entry.get("phase"):
flags.append(f"phase={entry['phase']}")
lines.append(
f"| {i} | {entry['seq']} | {entry['role']} | {tool} "
f"| {chars:,} | {pct:.1f}% | {', '.join(flags)} |"
)
large = [e for e in ranked if e.get("preview")]
if large:
lines.append("")
lines.append("### Large message previews")
for entry in large:
lines.append(
f"\n**seq={entry['seq']}** ({entry['role']}, {entry.get('tool', '')}):"
)
lines.append(f"```\n{entry['preview']}\n```")
lines.append("")
try:
log_path.write_text("\n".join(lines), encoding="utf-8")
logger.debug("Compaction debug log written to %s", log_path)
except OSError:
logger.debug("Failed to write compaction debug log to %s", log_path)
def _build_emergency_summary(
self,
ctx: NodeContext,
@@ -3530,36 +3138,22 @@ class EventLoopNode(NodeProtocol):
node's known state so the LLM can continue working after
compaction without losing track of its task and inputs.
"""
from framework.graph.event_loop.compaction import build_emergency_summary
return build_emergency_summary(ctx, accumulator, conversation, self._config)
# -------------------------------------------------------------------
# Persistence: restore, cursor, injection, pause
# -------------------------------------------------------------------
@dataclass
class _RestoredState:
"""State recovered from a previous checkpoint."""
conversation: NodeConversation
accumulator: OutputAccumulator
start_iteration: int
recent_responses: list[str]
recent_tool_fingerprints: list[list[tuple[str, str]]]
async def _restore(
self,
ctx: NodeContext,
) -> _RestoredState | None:
) -> RestoredState | None:
"""Attempt to restore from a previous checkpoint.
Returns a ``_RestoredState`` with conversation, accumulator, iteration
Returns a ``RestoredState`` with conversation, accumulator, iteration
counter, and stall/doom-loop detection state everything needed to
resume exactly where execution stopped.
"""
from framework.graph.event_loop.cursor_persistence import restore
return await restore(
conversation_store=self._conversation_store,
ctx=ctx,
@@ -3581,8 +3175,6 @@ class EventLoopNode(NodeProtocol):
Persists iteration counter, accumulator outputs, and stall/doom-loop
detection state so that resume picks up exactly where execution stopped.
"""
from framework.graph.event_loop.cursor_persistence import write_cursor
return await write_cursor(
conversation_store=self._conversation_store,
ctx=ctx,
@@ -3595,8 +3187,6 @@ class EventLoopNode(NodeProtocol):
async def _drain_injection_queue(self, conversation: NodeConversation, ctx: NodeContext) -> int:
"""Drain all pending injected events as user messages. Returns count."""
from framework.graph.event_loop.cursor_persistence import drain_injection_queue
return await drain_injection_queue(
queue=self._injection_queue,
conversation=conversation,
@@ -3610,8 +3200,6 @@ class EventLoopNode(NodeProtocol):
Multiple triggers are merged so the LLM sees them atomically and can
reason about all pending triggers before acting.
"""
from framework.graph.event_loop.cursor_persistence import drain_trigger_queue
return await drain_trigger_queue(
queue=self._trigger_queue,
conversation=conversation,
@@ -3629,8 +3217,6 @@ class EventLoopNode(NodeProtocol):
Note: This check happens BEFORE starting iteration N, after completing N-1.
If paused, the node exits having completed {iteration} iterations (0 to iteration-1).
"""
from framework.graph.event_loop.cursor_persistence import check_pause
return await check_pause(
ctx=ctx,
conversation=conversation,
@@ -3644,8 +3230,6 @@ class EventLoopNode(NodeProtocol):
async def _publish_loop_started(
self, stream_id: str, node_id: str, execution_id: str = ""
) -> None:
from framework.graph.event_loop.event_publishing import publish_loop_started
return await publish_loop_started(
event_bus=self._event_bus,
stream_id=stream_id,
@@ -3665,8 +3249,6 @@ class EventLoopNode(NodeProtocol):
Runs as a fire-and-forget task so it never blocks the main loop.
"""
from framework.graph.event_loop.event_publishing import generate_action_plan
return await generate_action_plan(
event_bus=self._event_bus,
ctx=ctx,
@@ -3689,8 +3271,6 @@ class EventLoopNode(NodeProtocol):
Hooks run in registration order; each sees the prompt as left by the
previous hook.
"""
from framework.graph.event_loop.event_publishing import run_hooks
return await run_hooks(
hooks_config=self._config.hooks,
event=event,
@@ -3705,27 +3285,11 @@ class EventLoopNode(NodeProtocol):
trigger: str,
) -> None:
"""Emit a CONTEXT_USAGE_UPDATED event with current context window state."""
if not self._event_bus:
return
from framework.runtime.event_bus import AgentEvent, EventType
estimated = conversation.estimate_tokens()
max_tokens = conversation._max_context_tokens
ratio = estimated / max_tokens if max_tokens > 0 else 0.0
await self._event_bus.publish(
AgentEvent(
type=EventType.CONTEXT_USAGE_UPDATED,
stream_id=ctx.stream_id or ctx.node_id,
node_id=ctx.node_id,
data={
"usage_ratio": round(ratio, 4),
"usage_pct": round(ratio * 100),
"message_count": conversation.message_count,
"estimated_tokens": estimated,
"max_context_tokens": max_tokens,
"trigger": trigger,
},
)
return await publish_context_usage(
event_bus=self._event_bus,
ctx=ctx,
conversation=conversation,
trigger=trigger,
)
async def _publish_iteration(
@@ -3736,8 +3300,6 @@ class EventLoopNode(NodeProtocol):
execution_id: str = "",
extra_data: dict | None = None,
) -> None:
from framework.graph.event_loop.event_publishing import publish_iteration
return await publish_iteration(
event_bus=self._event_bus,
stream_id=stream_id,
@@ -3759,8 +3321,6 @@ class EventLoopNode(NodeProtocol):
execution_id: str = "",
iteration: int | None = None,
) -> None:
from framework.graph.event_loop.event_publishing import publish_llm_turn_complete
return await publish_llm_turn_complete(
event_bus=self._event_bus,
stream_id=stream_id,
@@ -3786,8 +3346,6 @@ class EventLoopNode(NodeProtocol):
iter_start: float,
) -> None:
"""Log a CONTINUE step that skips judge evaluation (e.g., waiting for input)."""
from framework.graph.event_loop.event_publishing import log_skip_judge
return log_skip_judge(
ctx=ctx,
node_id=node_id,
@@ -3802,8 +3360,6 @@ class EventLoopNode(NodeProtocol):
async def _publish_loop_completed(
self, stream_id: str, node_id: str, iterations: int, execution_id: str = ""
) -> None:
from framework.graph.event_loop.event_publishing import publish_loop_completed
return await publish_loop_completed(
event_bus=self._event_bus,
stream_id=stream_id,
@@ -3813,8 +3369,6 @@ class EventLoopNode(NodeProtocol):
)
async def _publish_stalled(self, stream_id: str, node_id: str, execution_id: str = "") -> None:
from framework.graph.event_loop.event_publishing import publish_stalled
return await publish_stalled(
event_bus=self._event_bus,
stream_id=stream_id,
@@ -3833,8 +3387,6 @@ class EventLoopNode(NodeProtocol):
iteration: int | None = None,
inner_turn: int = 0,
) -> None:
from framework.graph.event_loop.event_publishing import publish_text_delta
return await publish_text_delta(
event_bus=self._event_bus,
stream_id=stream_id,
@@ -3856,8 +3408,6 @@ class EventLoopNode(NodeProtocol):
tool_input: dict,
execution_id: str = "",
) -> None:
from framework.graph.event_loop.event_publishing import publish_tool_started
return await publish_tool_started(
event_bus=self._event_bus,
stream_id=stream_id,
@@ -3878,8 +3428,6 @@ class EventLoopNode(NodeProtocol):
is_error: bool,
execution_id: str = "",
) -> None:
from framework.graph.event_loop.event_publishing import publish_tool_completed
return await publish_tool_completed(
event_bus=self._event_bus,
stream_id=stream_id,
@@ -3901,8 +3449,6 @@ class EventLoopNode(NodeProtocol):
iteration: int = 0,
execution_id: str = "",
) -> None:
from framework.graph.event_loop.event_publishing import publish_judge_verdict
return await publish_judge_verdict(
event_bus=self._event_bus,
stream_id=stream_id,
@@ -3921,8 +3467,6 @@ class EventLoopNode(NodeProtocol):
key: str,
execution_id: str = "",
) -> None:
from framework.graph.event_loop.event_publishing import publish_output_key_set
return await publish_output_key_set(
event_bus=self._event_bus,
stream_id=stream_id,
@@ -3965,8 +3509,6 @@ class EventLoopNode(NodeProtocol):
- data: Subagent's output (free-form JSON)
- metadata: Execution metadata (success, tokens, latency)
"""
from framework.graph.event_loop.subagent_executor import execute_subagent
return await execute_subagent(
ctx=ctx,
agent_id=agent_id,
@@ -3977,4 +3519,6 @@ class EventLoopNode(NodeProtocol):
tool_executor=self._tool_executor,
conversation_store=self._conversation_store,
subagent_instance_counter=self._subagent_instance_counter,
event_loop_node_cls=type(self),
escalation_receiver_cls=_EscalationReceiver,
)