refactor(graph): modularize event loop helpers

Extract EventLoopNode helper logic into focused event_loop modules while keeping the node responsible for orchestration.

Preserve the existing behavior and compatibility for compaction, event publishing, cursor persistence, synthetic tools, judge evaluation, stall detection, tool result handling, and subagent escalation wiring.
This commit is contained in:
Sundaram Kumar Jha
2026-03-19 09:07:19 +05:30
parent 47bfadaad9
commit 74c0a85e3f
10 changed files with 2865 additions and 1874 deletions
@@ -0,0 +1,6 @@
"""EventLoopNode subpackage — modular components of the event loop orchestrator.
All public symbols are re-exported by the parent ``event_loop_node.py`` for
backward compatibility. Internal consumers may import directly from these
submodules for clarity.
"""
@@ -0,0 +1,480 @@
"""Conversation compaction pipeline.
Implements the multi-level compaction strategy:
1. Prune old tool results
2. Structure-preserving compaction (spillover)
3. LLM summary compaction (with recursive splitting)
4. Emergency deterministic summary (no LLM)
"""
from __future__ import annotations
import logging
import re
from typing import Any
from framework.graph.conversation import NodeConversation
from framework.graph.node import NodeContext
logger = logging.getLogger(__name__)
# Limits for LLM compaction
LLM_COMPACT_CHAR_LIMIT: int = 240_000
LLM_COMPACT_MAX_DEPTH: int = 10
async def compact(
ctx: NodeContext,
conversation: NodeConversation,
accumulator: Any | None, # OutputAccumulator
*,
config: Any, # LoopConfig
event_bus: Any | None, # EventBus
char_limit: int = LLM_COMPACT_CHAR_LIMIT,
max_depth: int = LLM_COMPACT_MAX_DEPTH,
) -> None:
"""Run the full compaction pipeline if conversation needs compaction.
Pipeline stages (in order, short-circuits when budget is restored):
1. Prune old tool results
2. Structure-preserving compaction (free, no LLM)
3. LLM summary compaction (recursive split if too large)
4. Emergency deterministic summary (fallback)
"""
if not conversation.needs_compaction():
return
ratio_before = conversation.usage_ratio()
phase_grad = getattr(ctx, "phase_graduated", False)
# --- Step 1: Prune old tool results (free, fast) ---
conversation.prune_old_tool_results(keep_recent=4)
if not conversation.needs_compaction():
await log_compaction(ctx, conversation, ratio_before, event_bus)
return
# --- Step 2: Standard structure-preserving compaction (free, no LLM) ---
spill_dir = config.spillover_dir
if spill_dir:
await conversation.compact_preserving_structure(
spillover_dir=spill_dir,
keep_recent=4,
phase_graduated=phase_grad,
)
if not conversation.needs_compaction():
await log_compaction(ctx, conversation, ratio_before, event_bus)
return
# --- Step 3: LLM summary compaction ---
if ctx.llm is not None:
logger.info(
"LLM summary compaction triggered (%.0f%% usage)",
conversation.usage_ratio() * 100,
)
try:
summary = await llm_compact(
ctx,
list(conversation.messages),
accumulator,
char_limit=char_limit,
max_depth=max_depth,
max_context_tokens=config.max_context_tokens,
)
await conversation.compact(
summary,
keep_recent=2,
phase_graduated=phase_grad,
)
except Exception as e:
logger.warning("LLM compaction failed: %s", e)
if not conversation.needs_compaction():
await log_compaction(ctx, conversation, ratio_before, event_bus)
return
# --- Step 4: Emergency deterministic summary (LLM failed/unavailable) ---
logger.warning(
"Emergency compaction (%.0f%% usage)",
conversation.usage_ratio() * 100,
)
summary = build_emergency_summary(ctx, accumulator, conversation, config)
await conversation.compact(
summary,
keep_recent=1,
phase_graduated=phase_grad,
)
await log_compaction(ctx, conversation, ratio_before, event_bus)
# --- LLM compaction with binary-search splitting ----------------------
async def llm_compact(
ctx: NodeContext,
messages: list,
accumulator: Any | None = None,
_depth: int = 0,
*,
char_limit: int = LLM_COMPACT_CHAR_LIMIT,
max_depth: int = LLM_COMPACT_MAX_DEPTH,
max_context_tokens: int = 128_000,
) -> str:
"""Summarise *messages* with LLM, splitting recursively if too large.
If the formatted text exceeds ``LLM_COMPACT_CHAR_LIMIT`` or the LLM
rejects the call with a context-length error, the messages are split
in half and each half is summarised independently. Tool history is
appended once at the top-level call (``_depth == 0``).
"""
from framework.graph.conversation import extract_tool_call_history
from framework.graph.event_loop.tool_result_handler import is_context_too_large_error
if _depth > max_depth:
raise RuntimeError(f"LLM compaction recursion limit ({max_depth})")
formatted = format_messages_for_summary(messages)
# Proactive split: avoid wasting an API call on oversized input
if len(formatted) > char_limit and len(messages) > 1:
summary = await _llm_compact_split(
ctx,
messages,
accumulator,
_depth,
char_limit=char_limit,
max_depth=max_depth,
max_context_tokens=max_context_tokens,
)
else:
prompt = build_llm_compaction_prompt(
ctx,
accumulator,
formatted,
max_context_tokens=max_context_tokens,
)
summary_budget = max(1024, max_context_tokens // 2)
try:
response = await ctx.llm.acomplete(
messages=[{"role": "user", "content": prompt}],
system=(
"You are a conversation compactor for an AI agent. "
"Write a detailed summary that allows the agent to "
"continue its work. Preserve user-stated rules, "
"constraints, and account/identity preferences verbatim."
),
max_tokens=summary_budget,
)
summary = response.content
except Exception as e:
if is_context_too_large_error(e) and len(messages) > 1:
logger.info(
"LLM context too large (depth=%d, msgs=%d) — splitting",
_depth,
len(messages),
)
summary = await _llm_compact_split(
ctx,
messages,
accumulator,
_depth,
char_limit=char_limit,
max_depth=max_depth,
max_context_tokens=max_context_tokens,
)
else:
raise
# Append tool history at top level only
if _depth == 0:
tool_history = extract_tool_call_history(messages)
if tool_history and "TOOLS ALREADY CALLED" not in summary:
summary += "\n\n" + tool_history
return summary
async def _llm_compact_split(
ctx: NodeContext,
messages: list,
accumulator: Any | None,
_depth: int,
*,
char_limit: int = LLM_COMPACT_CHAR_LIMIT,
max_depth: int = LLM_COMPACT_MAX_DEPTH,
max_context_tokens: int = 128_000,
) -> str:
"""Split messages in half and summarise each half independently."""
mid = max(1, len(messages) // 2)
s1 = await llm_compact(
ctx,
messages[:mid],
None,
_depth + 1,
char_limit=char_limit,
max_depth=max_depth,
max_context_tokens=max_context_tokens,
)
s2 = await llm_compact(
ctx,
messages[mid:],
accumulator,
_depth + 1,
char_limit=char_limit,
max_depth=max_depth,
max_context_tokens=max_context_tokens,
)
return s1 + "\n\n" + s2
# --- Compaction helpers ------------------------------------------------
def format_messages_for_summary(messages: list) -> str:
"""Format messages as text for LLM summarisation."""
lines: list[str] = []
for m in messages:
if m.role == "tool":
content = m.content[:500]
if len(m.content) > 500:
content += "..."
lines.append(f"[tool result]: {content}")
elif m.role == "assistant" and m.tool_calls:
names = [tc.get("function", {}).get("name", "?") for tc in m.tool_calls]
text = m.content[:200] if m.content else ""
lines.append(f"[assistant (calls: {', '.join(names)})]: {text}")
else:
lines.append(f"[{m.role}]: {m.content}")
return "\n\n".join(lines)
def build_llm_compaction_prompt(
ctx: NodeContext,
accumulator: Any | None, # OutputAccumulator
formatted_messages: str,
*,
max_context_tokens: int = 128_000,
) -> str:
"""Build prompt for LLM compaction targeting 50% of token budget."""
spec = ctx.node_spec
ctx_lines = [f"NODE: {spec.name} (id={spec.id})"]
if spec.description:
ctx_lines.append(f"PURPOSE: {spec.description}")
if spec.success_criteria:
ctx_lines.append(f"SUCCESS CRITERIA: {spec.success_criteria}")
if accumulator:
acc = accumulator.to_dict()
done = {k: v for k, v in acc.items() if v is not None}
todo = [k for k, v in acc.items() if v is None]
if done:
ctx_lines.append(
"OUTPUTS ALREADY SET:\n"
+ "\n".join(f" {k}: {str(v)[:150]}" for k, v in done.items())
)
if todo:
ctx_lines.append(f"OUTPUTS STILL NEEDED: {', '.join(todo)}")
elif spec.output_keys:
ctx_lines.append(f"OUTPUTS STILL NEEDED: {', '.join(spec.output_keys)}")
target_tokens = max_context_tokens // 2
target_chars = target_tokens * 4
node_ctx = "\n".join(ctx_lines)
return (
"You are compacting an AI agent's conversation history. "
"The agent is still working and needs to continue.\n\n"
f"AGENT CONTEXT:\n{node_ctx}\n\n"
f"CONVERSATION MESSAGES:\n{formatted_messages}\n\n"
"INSTRUCTIONS:\n"
f"Write a summary of approximately {target_chars} characters "
f"(~{target_tokens} tokens).\n"
"1. Preserve ALL user-stated rules, constraints, and preferences "
"verbatim.\n"
"2. Preserve key decisions made and results obtained.\n"
"3. Preserve in-progress work state so the agent can continue.\n"
"4. Be detailed enough that the agent can resume without "
"re-doing work.\n"
)
async def log_compaction(
ctx: NodeContext,
conversation: NodeConversation,
ratio_before: float,
event_bus: Any | None,
) -> None:
"""Log compaction result to runtime logger and event bus."""
ratio_after = conversation.usage_ratio()
before_pct = round(ratio_before * 100)
after_pct = round(ratio_after * 100)
# Determine label from what happened
if after_pct >= before_pct - 1:
level = "prune_only"
elif ratio_after <= 0.6:
level = "llm"
else:
level = "structural"
logger.info(
"Compaction complete (%s): %d%% -> %d%%",
level,
before_pct,
after_pct,
)
if ctx.runtime_logger:
ctx.runtime_logger.log_step(
node_id=ctx.node_id,
node_type="event_loop",
step_index=-1,
llm_text=f"Context compacted ({level}): {before_pct}% \u2192 {after_pct}%",
verdict="COMPACTION",
verdict_feedback=f"level={level} before={before_pct}% after={after_pct}%",
)
if event_bus:
from framework.runtime.event_bus import AgentEvent, EventType
await event_bus.publish(
AgentEvent(
type=EventType.CONTEXT_COMPACTED,
stream_id=ctx.stream_id or ctx.node_id,
node_id=ctx.node_id,
data={
"level": level,
"usage_before": before_pct,
"usage_after": after_pct,
},
)
)
def build_emergency_summary(
ctx: NodeContext,
accumulator: Any | None = None, # OutputAccumulator
conversation: NodeConversation | None = None,
config: Any | None = None, # LoopConfig
) -> str:
"""Build a structured emergency compaction summary.
Unlike normal/aggressive compaction which uses an LLM summary,
emergency compaction cannot afford an LLM call (context is already
way over budget). Instead, build a deterministic summary from the
node's known state so the LLM can continue working after
compaction without losing track of its task and inputs.
"""
parts = [
"EMERGENCY COMPACTION — previous conversation was too large "
"and has been replaced with this summary.\n"
]
# 1. Node identity
spec = ctx.node_spec
parts.append(f"NODE: {spec.name} (id={spec.id})")
if spec.description:
parts.append(f"PURPOSE: {spec.description}")
# 2. Inputs the node received
input_lines = []
for key in spec.input_keys:
value = ctx.input_data.get(key) or ctx.memory.read(key)
if value is not None:
# Truncate long values but keep them recognisable
v_str = str(value)
if len(v_str) > 200:
v_str = v_str[:200] + ""
input_lines.append(f" {key}: {v_str}")
if input_lines:
parts.append("INPUTS:\n" + "\n".join(input_lines))
# 3. Output accumulator state (what's been set so far)
if accumulator:
acc_state = accumulator.to_dict()
set_keys = {k: v for k, v in acc_state.items() if v is not None}
missing = [k for k, v in acc_state.items() if v is None]
if set_keys:
lines = [f" {k}: {str(v)[:150]}" for k, v in set_keys.items()]
parts.append("OUTPUTS ALREADY SET:\n" + "\n".join(lines))
if missing:
parts.append(f"OUTPUTS STILL NEEDED: {', '.join(missing)}")
elif spec.output_keys:
parts.append(f"OUTPUTS STILL NEEDED: {', '.join(spec.output_keys)}")
# 4. Available tools reminder
if spec.tools:
parts.append(f"AVAILABLE TOOLS: {', '.join(spec.tools)}")
# 5. Spillover files — list actual files so the LLM can load
# them immediately instead of having to call list_data_files first.
# Inline adapt.md (agent memory) directly — it contains user rules
# and identity preferences that must survive emergency compaction.
spillover_dir = config.spillover_dir if config else None
if spillover_dir:
try:
from pathlib import Path
data_dir = Path(spillover_dir)
if data_dir.is_dir():
# Inline adapt.md content directly
adapt_path = data_dir / "adapt.md"
if adapt_path.is_file():
adapt_text = adapt_path.read_text(encoding="utf-8").strip()
if adapt_text:
parts.append(f"AGENT MEMORY (adapt.md):\n{adapt_text}")
all_files = sorted(
f.name for f in data_dir.iterdir() if f.is_file() and f.name != "adapt.md"
)
# Separate conversation history files from regular data files
conv_files = [f for f in all_files if re.match(r"conversation_\d+\.md$", f)]
data_files = [f for f in all_files if f not in conv_files]
if conv_files:
conv_list = "\n".join(
f" - {f} (full path: {data_dir / f})" for f in conv_files
)
parts.append(
"CONVERSATION HISTORY (freeform messages saved during compaction — "
"use load_data('<filename>') to review earlier dialogue):\n" + conv_list
)
if data_files:
file_list = "\n".join(
f" - {f} (full path: {data_dir / f})" for f in data_files[:30]
)
parts.append(
"DATA FILES (use load_data('<filename>') to read):\n" + file_list
)
if not all_files:
parts.append(
"NOTE: Large tool results may have been saved to files. "
"Use list_directory to check the data directory."
)
except Exception:
parts.append(
"NOTE: Large tool results were saved to files. "
"Use read_file(path='<path>') to read them."
)
# 6. Tool call history (prevent re-calling tools)
if conversation is not None:
tool_history = _extract_tool_call_history(conversation)
if tool_history:
parts.append(tool_history)
parts.append(
"\nContinue working towards setting the remaining outputs. "
"Use your tools and the inputs above."
)
return "\n\n".join(parts)
def _extract_tool_call_history(conversation: NodeConversation) -> str:
"""Extract tool call history from conversation messages.
This is the instance-level variant that operates on a NodeConversation
directly (vs. the module-level extract_tool_call_history in conversation.py
which works on raw message lists).
"""
from framework.graph.conversation import extract_tool_call_history
return extract_tool_call_history(list(conversation.messages))
@@ -0,0 +1,219 @@
"""Cursor persistence, queue draining, and pause detection.
Handles the checkpoint/resume cycle: restoring state from a previous
conversation store, writing cursor data, and managing injection/trigger
queues between iterations.
"""
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass
from typing import Any
from framework.graph.conversation import NodeConversation
from framework.graph.node import NodeContext
logger = logging.getLogger(__name__)
@dataclass
class RestoredState:
"""State recovered from a previous checkpoint."""
conversation: NodeConversation
accumulator: Any # OutputAccumulator
start_iteration: int
recent_responses: list[str]
recent_tool_fingerprints: list[list[tuple[str, str]]]
async def restore(
conversation_store: Any | None, # ConversationStore
ctx: NodeContext,
config: Any, # LoopConfig
) -> RestoredState | None:
"""Attempt to restore from a previous checkpoint.
Returns a ``RestoredState`` with conversation, accumulator, iteration
counter, and stall/doom-loop detection state everything needed to
resume exactly where execution stopped.
"""
if conversation_store is None:
return None
# Import here to avoid circular imports at module level
from framework.graph.event_loop_node import OutputAccumulator
# In isolated mode, filter parts by phase_id so the node only sees
# its own messages in the shared flat conversation store. In
# continuous mode (or when _restore is called for timer-resume)
# load all parts — the full conversation threads across nodes.
_is_continuous = getattr(ctx, "continuous_mode", False)
phase_filter = None if _is_continuous else ctx.node_id
conversation = await NodeConversation.restore(
conversation_store,
phase_id=phase_filter,
)
if conversation is None:
return None
accumulator = await OutputAccumulator.restore(conversation_store)
accumulator.spillover_dir = config.spillover_dir
accumulator.max_value_chars = config.max_output_value_chars
cursor = await conversation_store.read_cursor()
start_iteration = cursor.get("iteration", 0) + 1 if cursor else 0
# Restore stall/doom-loop detection state
recent_responses: list[str] = cursor.get("recent_responses", []) if cursor else []
raw_fps = cursor.get("recent_tool_fingerprints", []) if cursor else []
recent_tool_fingerprints: list[list[tuple[str, str]]] = [
[tuple(pair) for pair in fps] # type: ignore[misc]
for fps in raw_fps
]
logger.info(
f"Restored event loop: iteration={start_iteration}, "
f"messages={conversation.message_count}, "
f"outputs={list(accumulator.values.keys())}, "
f"stall_window={len(recent_responses)}, "
f"doom_window={len(recent_tool_fingerprints)}"
)
return RestoredState(
conversation=conversation,
accumulator=accumulator,
start_iteration=start_iteration,
recent_responses=recent_responses,
recent_tool_fingerprints=recent_tool_fingerprints,
)
async def write_cursor(
conversation_store: Any | None, # ConversationStore
ctx: NodeContext,
conversation: NodeConversation,
accumulator: Any, # OutputAccumulator
iteration: int,
*,
recent_responses: list[str] | None = None,
recent_tool_fingerprints: list[list[tuple[str, str]]] | None = None,
) -> None:
"""Write checkpoint cursor for crash recovery.
Persists iteration counter, accumulator outputs, and stall/doom-loop
detection state so that resume picks up exactly where execution stopped.
"""
if conversation_store:
cursor = await conversation_store.read_cursor() or {}
cursor.update(
{
"iteration": iteration,
"node_id": ctx.node_id,
"next_seq": conversation.next_seq,
"outputs": accumulator.to_dict(),
}
)
# Persist stall/doom-loop detection state for reliable resume
if recent_responses is not None:
cursor["recent_responses"] = recent_responses
if recent_tool_fingerprints is not None:
# Convert list[list[tuple]] → list[list[list]] for JSON
cursor["recent_tool_fingerprints"] = [
[list(pair) for pair in fps] for fps in recent_tool_fingerprints
]
await conversation_store.write_cursor(cursor)
async def drain_injection_queue(
queue: asyncio.Queue,
conversation: NodeConversation,
) -> int:
"""Drain all pending injected events as user messages. Returns count."""
count = 0
while not queue.empty():
try:
content, is_client_input = queue.get_nowait()
logger.info(
"[drain] injected message (client_input=%s): %s",
is_client_input,
content[:200] if content else "(empty)",
)
# Real user input is stored as-is; external events get a prefix
if is_client_input:
await conversation.add_user_message(content, is_client_input=True)
else:
await conversation.add_user_message(f"[External event]: {content}")
count += 1
except asyncio.QueueEmpty:
break
return count
async def drain_trigger_queue(
queue: asyncio.Queue,
conversation: NodeConversation,
) -> int:
"""Drain all pending trigger events as a single batched user message.
Multiple triggers are merged so the LLM sees them atomically and can
reason about all pending triggers before acting.
"""
# Import here to avoid circular imports at module level
from framework.graph.event_loop_node import TriggerEvent
triggers: list[TriggerEvent] = []
while not queue.empty():
try:
triggers.append(queue.get_nowait())
except asyncio.QueueEmpty:
break
if not triggers:
return 0
parts: list[str] = []
for t in triggers:
task = t.payload.get("task", "")
task_line = f"\nTask: {task}" if task else ""
payload_str = json.dumps(t.payload, default=str)
parts.append(f"[TRIGGER: {t.trigger_type}/{t.source_id}]{task_line}\n{payload_str}")
combined = "\n\n".join(parts)
logger.info("[drain] %d trigger(s): %s", len(triggers), combined[:200])
await conversation.add_user_message(combined)
return len(triggers)
async def check_pause(
ctx: NodeContext,
conversation: NodeConversation,
iteration: int,
) -> bool:
"""
Check if pause has been requested. Returns True if paused.
Note: This check happens BEFORE starting iteration N, after completing N-1.
If paused, the node exits having completed {iteration} iterations (0 to iteration-1).
"""
# Check executor-level pause event (for /pause command, Ctrl+Z)
if ctx.pause_event and ctx.pause_event.is_set():
completed = iteration # 0-indexed: iteration=3 means 3 iterations completed (0,1,2)
logger.info(f"⏸ Pausing after {completed} iteration(s) completed (executor-level)")
return True
# Check context-level pause flags (legacy/alternative methods)
pause_requested = ctx.input_data.get("pause_requested", False)
if not pause_requested:
try:
pause_requested = ctx.memory.read("pause_requested") or False
except (PermissionError, KeyError):
pause_requested = False
if pause_requested:
completed = iteration
logger.info(f"⏸ Pausing after {completed} iteration(s) completed (context-level)")
return True
return False
@@ -0,0 +1,332 @@
"""EventBus publishing helpers for the event loop.
Thin wrappers around EventBus.emit_*() calls that check for bus existence
before publishing. Extracted to reduce noise in the main orchestrator.
"""
from __future__ import annotations
import logging
import time
from typing import Any
from framework.graph.node import NodeContext
from framework.runtime.event_bus import EventBus
logger = logging.getLogger(__name__)
async def publish_loop_started(
event_bus: EventBus | None,
stream_id: str,
node_id: str,
max_iterations: int,
execution_id: str = "",
) -> None:
if event_bus:
await event_bus.emit_node_loop_started(
stream_id=stream_id,
node_id=node_id,
max_iterations=max_iterations,
execution_id=execution_id,
)
async def generate_action_plan(
event_bus: EventBus | None,
ctx: NodeContext,
stream_id: str,
node_id: str,
execution_id: str,
) -> None:
"""Generate a brief action plan via LLM and emit it as an SSE event.
Runs as a fire-and-forget task so it never blocks the main loop.
"""
try:
system_prompt = ctx.node_spec.system_prompt or ""
# Trim to keep the prompt small
prompt_summary = system_prompt[:500]
if len(system_prompt) > 500:
prompt_summary += "..."
tool_names = [t.name for t in ctx.available_tools]
output_keys = ctx.node_spec.output_keys or []
prompt = (
f'You are about to work on a task as node "{node_id}".\n\n'
f"System prompt:\n{prompt_summary}\n\n"
f"Tools available: {tool_names}\n"
f"Required outputs: {output_keys}\n\n"
f"Write a brief action plan (2-5 bullet points) describing "
f"what you will do to complete this task. Be specific and concise.\n"
f"Return ONLY the plan text, no preamble."
)
response = await ctx.llm.acomplete(
messages=[{"role": "user", "content": prompt}],
max_tokens=1024,
)
plan = response.content.strip()
if plan and event_bus:
await event_bus.emit_node_action_plan(
stream_id=stream_id,
node_id=node_id,
plan=plan,
execution_id=execution_id,
)
except Exception as e:
logger.warning("Action plan generation failed for node '%s': %s", node_id, e)
async def publish_iteration(
event_bus: EventBus | None,
stream_id: str,
node_id: str,
iteration: int,
execution_id: str = "",
extra_data: dict | None = None,
) -> None:
if event_bus:
await event_bus.emit_node_loop_iteration(
stream_id=stream_id,
node_id=node_id,
iteration=iteration,
execution_id=execution_id,
extra_data=extra_data,
)
async def publish_llm_turn_complete(
event_bus: EventBus | None,
stream_id: str,
node_id: str,
stop_reason: str,
model: str,
input_tokens: int,
output_tokens: int,
cached_tokens: int = 0,
execution_id: str = "",
iteration: int | None = None,
) -> None:
if event_bus:
await event_bus.emit_llm_turn_complete(
stream_id=stream_id,
node_id=node_id,
stop_reason=stop_reason,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cached_tokens=cached_tokens,
execution_id=execution_id,
iteration=iteration,
)
def log_skip_judge(
ctx: NodeContext,
node_id: str,
iteration: int,
feedback: str,
tool_calls: list[dict],
llm_text: str,
turn_tokens: dict[str, int],
iter_start: float,
) -> None:
"""Log a CONTINUE step that skips judge evaluation (e.g., waiting for input)."""
if ctx.runtime_logger:
ctx.runtime_logger.log_step(
node_id=node_id,
node_type="event_loop",
step_index=iteration,
verdict="CONTINUE",
verdict_feedback=feedback,
tool_calls=tool_calls,
llm_text=llm_text,
input_tokens=turn_tokens.get("input", 0),
output_tokens=turn_tokens.get("output", 0),
latency_ms=int((time.time() - iter_start) * 1000),
)
async def publish_loop_completed(
event_bus: EventBus | None,
stream_id: str,
node_id: str,
iterations: int,
execution_id: str = "",
) -> None:
if event_bus:
await event_bus.emit_node_loop_completed(
stream_id=stream_id,
node_id=node_id,
iterations=iterations,
execution_id=execution_id,
)
async def publish_stalled(
event_bus: EventBus | None,
stream_id: str,
node_id: str,
execution_id: str = "",
) -> None:
if event_bus:
await event_bus.emit_node_stalled(
stream_id=stream_id,
node_id=node_id,
reason="Consecutive similar responses detected",
execution_id=execution_id,
)
async def publish_text_delta(
event_bus: EventBus | None,
stream_id: str,
node_id: str,
content: str,
snapshot: str,
ctx: NodeContext,
execution_id: str = "",
iteration: int | None = None,
inner_turn: int = 0,
) -> None:
if event_bus:
if ctx.node_spec.client_facing:
await event_bus.emit_client_output_delta(
stream_id=stream_id,
node_id=node_id,
content=content,
snapshot=snapshot,
execution_id=execution_id,
iteration=iteration,
inner_turn=inner_turn,
)
else:
await event_bus.emit_llm_text_delta(
stream_id=stream_id,
node_id=node_id,
content=content,
snapshot=snapshot,
execution_id=execution_id,
inner_turn=inner_turn,
)
async def publish_tool_started(
event_bus: EventBus | None,
stream_id: str,
node_id: str,
tool_use_id: str,
tool_name: str,
tool_input: dict,
execution_id: str = "",
) -> None:
if event_bus:
await event_bus.emit_tool_call_started(
stream_id=stream_id,
node_id=node_id,
tool_use_id=tool_use_id,
tool_name=tool_name,
tool_input=tool_input,
execution_id=execution_id,
)
async def publish_tool_completed(
event_bus: EventBus | None,
stream_id: str,
node_id: str,
tool_use_id: str,
tool_name: str,
result: str,
is_error: bool,
execution_id: str = "",
) -> None:
if event_bus:
await event_bus.emit_tool_call_completed(
stream_id=stream_id,
node_id=node_id,
tool_use_id=tool_use_id,
tool_name=tool_name,
result=result,
is_error=is_error,
execution_id=execution_id,
)
async def publish_judge_verdict(
event_bus: EventBus | None,
stream_id: str,
node_id: str,
action: str,
feedback: str = "",
judge_type: str = "implicit",
iteration: int = 0,
execution_id: str = "",
) -> None:
if event_bus:
await event_bus.emit_judge_verdict(
stream_id=stream_id,
node_id=node_id,
action=action,
feedback=feedback,
judge_type=judge_type,
iteration=iteration,
execution_id=execution_id,
)
async def publish_output_key_set(
event_bus: EventBus | None,
stream_id: str,
node_id: str,
key: str,
execution_id: str = "",
) -> None:
if event_bus:
await event_bus.emit_output_key_set(
stream_id=stream_id, node_id=node_id, key=key, execution_id=execution_id
)
async def run_hooks(
hooks_config: dict[str, list],
event: str,
conversation: Any, # NodeConversation
trigger: str | None = None,
) -> None:
"""Run all registered hooks for *event*, applying their results.
Each hook receives a HookContext and may return a HookResult that:
- replaces the system prompt (result.system_prompt)
- injects an extra user message (result.inject)
Hooks run in registration order; each sees the prompt as left by the
previous hook.
"""
# Import here to avoid circular deps at module level
from framework.graph.event_loop_node import HookContext
hook_list = hooks_config.get(event, [])
if not hook_list:
return
for hook in hook_list:
ctx = HookContext(
event=event,
trigger=trigger,
system_prompt=conversation.system_prompt,
)
try:
result = await hook(ctx)
except Exception:
logger.warning(
"Hook '%s' raised an exception", event, exc_info=True
)
continue
if result is None:
continue
if result.system_prompt:
conversation.update_system_prompt(result.system_prompt)
if result.inject:
await conversation.add_user_message(result.inject)
@@ -0,0 +1,144 @@
"""Judge evaluation pipeline for the event loop.
Contains the turn evaluation logic (implicit judge + custom judge dispatch)
and the SubagentJudge implementation.
"""
from __future__ import annotations
import logging
from typing import Any
from framework.graph.conversation import NodeConversation
from framework.graph.node import NodeContext
logger = logging.getLogger(__name__)
async def judge_turn(
*,
mark_complete_flag: bool,
judge: Any | None, # JudgeProtocol
ctx: NodeContext,
conversation: NodeConversation,
accumulator: Any, # OutputAccumulator
assistant_text: str,
tool_results: list[dict],
iteration: int,
get_missing_output_keys_fn: Any, # Callable
max_context_tokens: int,
) -> Any: # JudgeVerdict
"""Evaluate the current state using judge or implicit logic.
Evaluation levels (in order):
0. Short-circuits: mark_complete, skip_judge, tool-continue.
1. Custom judge (JudgeProtocol) full authority when set.
2. Implicit judge output-key check + optional conversation-aware
quality gate (when ``success_criteria`` is defined).
Returns a JudgeVerdict. ``feedback=None`` means no real evaluation
happened (skip_judge, tool-continue); the caller must not inject a
feedback message. Any non-None feedback (including ``""``) means a
real evaluation occurred and will be logged into the conversation.
"""
from framework.graph.event_loop_node import JudgeVerdict
# --- Level 0: short-circuits (no evaluation) -----------------------
if mark_complete_flag:
return JudgeVerdict(action="ACCEPT")
if ctx.node_spec.skip_judge:
return JudgeVerdict(action="RETRY") # feedback=None → not logged
# --- Level 1: custom judge -----------------------------------------
if judge is not None:
context = {
"assistant_text": assistant_text,
"tool_calls": tool_results,
"output_accumulator": accumulator.to_dict(),
"accumulator": accumulator,
"iteration": iteration,
"conversation_summary": conversation.export_summary(),
"output_keys": ctx.node_spec.output_keys,
"missing_keys": get_missing_output_keys_fn(
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
),
}
verdict = await judge.evaluate(context)
# Ensure evaluated RETRY always carries feedback for logging.
if verdict.action == "RETRY" and not verdict.feedback:
return JudgeVerdict(action="RETRY", feedback="Custom judge returned RETRY.")
return verdict
# --- Level 2: implicit judge ---------------------------------------
# Real tool calls were made — let the agent keep working.
if tool_results:
return JudgeVerdict(action="RETRY") # feedback=None → not logged
missing = get_missing_output_keys_fn(
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
)
if missing:
return JudgeVerdict(
action="RETRY",
feedback=(
f"Task incomplete. Required outputs not yet produced: {missing}. "
f"Follow your system prompt instructions to complete the work."
),
)
# All output keys present — run safety checks before accepting.
output_keys = ctx.node_spec.output_keys or []
nullable_keys = set(ctx.node_spec.nullable_output_keys or [])
# All-nullable with nothing set → node produced nothing useful.
all_nullable = output_keys and nullable_keys >= set(output_keys)
none_set = not any(accumulator.get(k) is not None for k in output_keys)
if all_nullable and none_set:
return JudgeVerdict(
action="RETRY",
feedback=(
f"No output keys have been set yet. "
f"Use set_output to set at least one of: {output_keys}"
),
)
# Client-facing with no output keys → continuous interaction node.
# Inject tool-use pressure instead of auto-accepting.
if not output_keys and ctx.node_spec.client_facing:
return JudgeVerdict(
action="RETRY",
feedback=(
"STOP describing what you will do. "
"You have FULL access to all tools — file creation, "
"shell commands, MCP tools — and you CAN call them "
"directly in your response. Respond ONLY with tool "
"calls, no prose. Execute the task now."
),
)
# Level 2b: conversation-aware quality check (if success_criteria set)
if ctx.node_spec.success_criteria and ctx.llm:
from framework.graph.conversation_judge import evaluate_phase_completion
verdict = await evaluate_phase_completion(
llm=ctx.llm,
conversation=conversation,
phase_name=ctx.node_spec.name,
phase_description=ctx.node_spec.description,
success_criteria=ctx.node_spec.success_criteria,
accumulator_state=accumulator.to_dict(),
max_context_tokens=max_context_tokens,
)
if verdict.action != "ACCEPT":
return JudgeVerdict(
action=verdict.action,
feedback=verdict.feedback or "Phase criteria not met.",
)
return JudgeVerdict(action="ACCEPT", feedback="")
@@ -0,0 +1,106 @@
"""Stall and doom-loop detection for the event loop.
Pure functions with no class dependencies safe to call from any context.
"""
from __future__ import annotations
import json
def ngram_similarity(s1: str, s2: str, n: int = 2) -> float:
"""Jaccard similarity of n-gram sets.
Returns 0.0-1.0, where 1.0 is exact match.
Fast: O(len(s) + len(s2)) using set operations.
"""
def _ngrams(s: str) -> set[str]:
return {s[i : i + n] for i in range(len(s) - n + 1) if s.strip()}
if not s1 or not s2:
return 0.0
ngrams1, ngrams2 = _ngrams(s1.lower()), _ngrams(s2.lower())
if not ngrams1 or not ngrams2:
return 0.0
intersection = len(ngrams1 & ngrams2)
union = len(ngrams1 | ngrams2)
return intersection / union if union else 0.0
def is_stalled(
recent_responses: list[str],
threshold: int,
similarity_threshold: float,
) -> bool:
"""Detect stall using n-gram similarity.
Detects when ALL N consecutive responses are mutually similar
(>= threshold). A single dissimilar response resets the signal.
This catches phrases like "I'm still stuck" vs "I'm stuck"
without false-positives on "attempt 1" vs "attempt 2".
"""
if len(recent_responses) < threshold:
return False
if not recent_responses[0]:
return False
# Every consecutive pair must be similar
for i in range(1, len(recent_responses)):
if ngram_similarity(recent_responses[i], recent_responses[i - 1]) < similarity_threshold:
return False
return True
def fingerprint_tool_calls(
tool_results: list[dict],
) -> list[tuple[str, str]]:
"""Create deterministic fingerprints for a turn's tool calls.
Each fingerprint is (tool_name, canonical_args_json). Order-sensitive
so [search("a"), fetch("b")] != [fetch("b"), search("a")].
"""
fingerprints = []
for tr in tool_results:
name = tr.get("tool_name", "")
args = tr.get("tool_input", {})
try:
canonical = json.dumps(args, sort_keys=True, default=str)
except (TypeError, ValueError):
canonical = str(args)
fingerprints.append((name, canonical))
return fingerprints
def is_tool_doom_loop(
recent_tool_fingerprints: list[list[tuple[str, str]]],
threshold: int,
enabled: bool = True,
) -> tuple[bool, str]:
"""Detect doom loop via exact fingerprint match.
Detects when N consecutive turns invoke the same tools with
identical (canonicalized) arguments. Different arguments mean
different work, so only exact matches count.
Returns (is_doom_loop, description).
"""
if not enabled:
return False, ""
if len(recent_tool_fingerprints) < threshold:
return False, ""
first = recent_tool_fingerprints[0]
if not first:
return False, ""
# All turns in the window must match the first exactly
if all(fp == first for fp in recent_tool_fingerprints[1:]):
tool_names = [name for name, _ in first]
desc = (
f"Doom loop detected: {len(recent_tool_fingerprints)} "
f"identical consecutive tool calls ({', '.join(tool_names)})"
)
return True, desc
return False, ""
@@ -0,0 +1,397 @@
"""Subagent execution for the event loop.
Handles the full subagent lifecycle: validation, context setup, tool filtering,
conversation store derivation, execution, and cleanup. Also includes the
_EscalationReceiver helper used for subagent queen escalation routing.
"""
from __future__ import annotations
import asyncio
import json
import logging
import time
from pathlib import Path
from typing import Any
from framework.llm.provider import ToolResult, ToolUse
logger = logging.getLogger(__name__)
class EscalationReceiver:
"""Temporary receiver registered in node_registry for subagent escalation routing.
When a subagent calls ``report_to_parent(wait_for_response=True)``, the callback
creates one of these, registers it under a unique escalation ID in the executor's
``node_registry``, and awaits ``wait()``. The TUI / runner calls
``inject_input(escalation_id, content)`` which the ``ExecutionStream`` routes here
via ``inject_event()`` matching the same ``hasattr(node, "inject_event")`` check
used for regular ``EventLoopNode`` instances.
"""
def __init__(self) -> None:
self._event = asyncio.Event()
self._response: str | None = None
self._awaiting_input = True # So inject_worker_message() can prefer us
async def inject_event(self, content: str, *, is_client_input: bool = False) -> None:
"""Called by ExecutionStream.inject_input() when the user responds."""
self._response = content
self._event.set()
async def wait(self) -> str | None:
"""Block until inject_event() delivers the user's response."""
await self._event.wait()
return self._response
async def execute_subagent(
ctx: Any, # NodeContext
agent_id: str,
task: str,
*,
accumulator: Any | None = None, # OutputAccumulator
event_bus: Any | None = None, # EventBus
config: Any = None, # LoopConfig
tool_executor: Any | None = None, # Callable
conversation_store: Any | None = None, # ConversationStore
subagent_instance_counter: dict[str, int] | None = None,
) -> ToolResult:
"""Execute a subagent and return the result as a ToolResult.
The subagent:
- Gets a fresh conversation with just the task
- Has read-only access to the parent's readable memory
- Cannot delegate to its own subagents (prevents recursion)
- Returns its output in structured JSON format
Args:
ctx: Parent node's context (for memory, tools, LLM access).
agent_id: The node ID of the subagent to invoke.
task: The task description to give the subagent.
accumulator: Parent's OutputAccumulator.
event_bus: EventBus for lifecycle events.
config: LoopConfig for iteration/tool limits.
tool_executor: Tool executor callable.
conversation_store: Parent conversation store (for deriving subagent store).
subagent_instance_counter: Mutable counter dict for unique subagent paths.
Returns:
ToolResult with structured JSON output.
"""
from framework.graph.node import NodeContext, SharedMemory
# Log subagent invocation start
logger.info(
"\n" + "=" * 60 + "\n"
"🤖 SUBAGENT INVOCATION\n"
"=" * 60 + "\n"
"Parent Node: %s\n"
"Subagent ID: %s\n"
"Task: %s\n" + "=" * 60,
ctx.node_id,
agent_id,
task[:500] + "..." if len(task) > 500 else task,
)
# 1. Validate agent exists in registry
if agent_id not in ctx.node_registry:
return ToolResult(
tool_use_id="",
content=json.dumps(
{
"message": f"Sub-agent '{agent_id}' not found in registry",
"data": None,
"metadata": {"agent_id": agent_id, "success": False, "error": "not_found"},
}
),
is_error=True,
)
subagent_spec = ctx.node_registry[agent_id]
# 2. Create read-only memory snapshot
parent_data = ctx.memory.read_all()
# Merge in-flight outputs from the parent's accumulator.
if accumulator:
for key, value in accumulator.to_dict().items():
if key not in parent_data:
parent_data[key] = value
subagent_memory = SharedMemory()
for key, value in parent_data.items():
subagent_memory.write(key, value, validate=False)
read_keys = set(parent_data.keys()) | set(subagent_spec.input_keys or [])
scoped_memory = subagent_memory.with_permissions(
read_keys=list(read_keys),
write_keys=[], # Read-only!
)
# 2b. Set up report callback (one-way channel to parent / event bus)
subagent_reports: list[dict] = []
async def _report_callback(
message: str,
data: dict | None = None,
*,
wait_for_response: bool = False,
) -> str | None:
subagent_reports.append({"message": message, "data": data, "timestamp": time.time()})
if event_bus:
await event_bus.emit_subagent_report(
stream_id=ctx.node_id,
node_id=f"{ctx.node_id}:subagent:{agent_id}",
subagent_id=agent_id,
message=message,
data=data,
execution_id=ctx.execution_id,
)
if not wait_for_response:
return None
if not event_bus:
logger.warning(
"Subagent '%s' requested user response but no event_bus available",
agent_id,
)
return None
# Create isolated receiver and register for input routing
import uuid
from framework.graph.event_loop_node import _EscalationReceiver
escalation_id = f"{ctx.node_id}:escalation:{uuid.uuid4().hex[:8]}"
receiver = _EscalationReceiver()
registry = ctx.shared_node_registry
registry[escalation_id] = receiver
try:
await event_bus.emit_escalation_requested(
stream_id=ctx.stream_id or ctx.node_id,
node_id=escalation_id,
reason=f"Subagent report (wait_for_response) from {agent_id}",
context=message,
execution_id=ctx.execution_id,
)
# Block until queen responds
return await receiver.wait()
finally:
registry.pop(escalation_id, None)
# 3. Filter tools for subagent
subagent_tool_names = set(subagent_spec.tools or [])
tool_source = ctx.all_tools if ctx.all_tools else ctx.available_tools
# GCU auto-population
if subagent_spec.node_type == "gcu" and not subagent_tool_names:
subagent_tools = [t for t in tool_source if t.name != "delegate_to_sub_agent"]
else:
subagent_tools = [
t
for t in tool_source
if t.name in subagent_tool_names and t.name != "delegate_to_sub_agent"
]
missing = subagent_tool_names - {t.name for t in subagent_tools}
if missing:
logger.warning(
"Subagent '%s' requested tools not found in catalog: %s",
agent_id,
sorted(missing),
)
logger.info(
"📦 Subagent '%s' configuration:\n"
" - System prompt: %s\n"
" - Tools available (%d): %s\n"
" - Memory keys inherited: %s",
agent_id,
(subagent_spec.system_prompt[:200] + "...")
if subagent_spec.system_prompt and len(subagent_spec.system_prompt) > 200
else subagent_spec.system_prompt,
len(subagent_tools),
[t.name for t in subagent_tools],
list(parent_data.keys()),
)
# 4. Build subagent context
max_iter = min(config.max_iterations, 10)
subagent_ctx = NodeContext(
runtime=ctx.runtime,
node_id=f"{ctx.node_id}:subagent:{agent_id}",
node_spec=subagent_spec,
memory=scoped_memory,
input_data={"task": task, **parent_data},
llm=ctx.llm,
available_tools=subagent_tools,
goal_context=(
f"Your specific task: {task}\n\n"
f"COMPLETION REQUIREMENTS:\n"
f"When your task is done, you MUST call set_output() "
f"for each required key: {subagent_spec.output_keys}\n"
f"Alternatively, call report_to_parent(mark_complete=true) "
f"with your findings in message/data.\n"
f"You have a maximum of {max_iter} turns to complete this task."
),
goal=ctx.goal,
max_tokens=ctx.max_tokens,
runtime_logger=ctx.runtime_logger,
is_subagent_mode=True, # Prevents nested delegation
report_callback=_report_callback,
node_registry={}, # Empty - no nested subagents
shared_node_registry=ctx.shared_node_registry, # For escalation routing
)
# 5. Create and execute subagent EventLoopNode
if subagent_instance_counter is not None:
subagent_instance_counter.setdefault(agent_id, 0)
subagent_instance_counter[agent_id] += 1
subagent_instance = str(subagent_instance_counter[agent_id])
else:
subagent_instance = "1"
subagent_conv_store = None
if conversation_store is not None:
from framework.storage.conversation_store import FileConversationStore
parent_base = getattr(conversation_store, "_base", None)
if parent_base is not None:
conversations_dir = parent_base.parent
subagent_dir_name = f"{agent_id}-{subagent_instance}"
subagent_store_path = conversations_dir / subagent_dir_name
subagent_conv_store = FileConversationStore(base_path=subagent_store_path)
# Derive a subagent-scoped spillover dir
subagent_spillover = None
if config.spillover_dir:
subagent_spillover = str(
Path(config.spillover_dir) / agent_id / subagent_instance
)
# Import here to avoid circular imports at module level
from framework.graph.event_loop_node import EventLoopNode, LoopConfig, SubagentJudge
subagent_node = EventLoopNode(
event_bus=event_bus,
judge=SubagentJudge(task=task, max_iterations=max_iter),
config=LoopConfig(
max_iterations=max_iter,
max_tool_calls_per_turn=config.max_tool_calls_per_turn,
tool_call_overflow_margin=config.tool_call_overflow_margin,
max_context_tokens=config.max_context_tokens,
stall_detection_threshold=config.stall_detection_threshold,
max_tool_result_chars=config.max_tool_result_chars,
spillover_dir=subagent_spillover,
),
tool_executor=tool_executor,
conversation_store=subagent_conv_store,
)
# Inject a unique GCU browser profile for this subagent
_profile_token = None
try:
from gcu.browser.session import set_active_profile as _set_gcu_profile
_profile_token = _set_gcu_profile(f"{agent_id}-{subagent_instance}")
except ImportError:
pass # GCU tools not installed; no-op
try:
logger.info("🚀 Starting subagent '%s' execution...", agent_id)
start_time = time.time()
result = await subagent_node.execute(subagent_ctx)
latency_ms = int((time.time() - start_time) * 1000)
separator = "-" * 60
logger.info(
"\n%s\n"
"✅ SUBAGENT '%s' COMPLETED\n"
"%s\n"
"Success: %s\n"
"Latency: %dms\n"
"Tokens used: %s\n"
"Output keys: %s\n"
"%s",
separator,
agent_id,
separator,
result.success,
latency_ms,
result.tokens_used,
list(result.output.keys()) if result.output else [],
separator,
)
result_json = {
"message": (
f"Sub-agent '{agent_id}' completed successfully"
if result.success
else f"Sub-agent '{agent_id}' failed: {result.error}"
),
"data": result.output,
"reports": subagent_reports if subagent_reports else None,
"metadata": {
"agent_id": agent_id,
"success": result.success,
"tokens_used": result.tokens_used,
"latency_ms": latency_ms,
"report_count": len(subagent_reports),
},
}
return ToolResult(
tool_use_id="",
content=json.dumps(result_json, indent=2, default=str),
is_error=not result.success,
)
except Exception as e:
logger.exception(
"\n" + "!" * 60 + "\n❌ SUBAGENT '%s' FAILED\nError: %s\n" + "!" * 60,
agent_id,
str(e),
)
result_json = {
"message": f"Sub-agent '{agent_id}' raised exception: {e}",
"data": None,
"metadata": {
"agent_id": agent_id,
"success": False,
"error": str(e),
},
}
return ToolResult(
tool_use_id="",
content=json.dumps(result_json, indent=2),
is_error=True,
)
finally:
# Restore the GCU profile context
if _profile_token is not None:
from gcu.browser.session import _active_profile as _gcu_profile_var
_gcu_profile_var.reset(_profile_token)
# Stop the browser session for this subagent's profile
if tool_executor is not None:
_subagent_profile = f"{agent_id}-{subagent_instance}"
try:
_stop_use = ToolUse(
id="gcu-cleanup",
name="browser_stop",
input={"profile": _subagent_profile},
)
_stop_result = tool_executor(_stop_use)
if asyncio.iscoroutine(_stop_result) or asyncio.isfuture(_stop_result):
await _stop_result
except Exception as _gcu_exc:
logger.warning(
"GCU browser_stop failed for profile %r: %s",
_subagent_profile,
_gcu_exc,
)
@@ -0,0 +1,371 @@
"""Synthetic tool builders for the event loop.
Factory functions that create ``Tool`` definitions for framework-level
synthetic tools (set_output, ask_user, escalate, delegate, report_to_parent).
Also includes the ``handle_set_output`` validation logic.
All functions are pure they receive explicit parameters and return
``Tool`` or ``ToolResult`` objects with no side effects.
"""
from __future__ import annotations
from typing import Any
from framework.llm.provider import Tool, ToolResult
def build_ask_user_tool() -> Tool:
"""Build the synthetic ask_user tool for explicit user-input requests.
Client-facing nodes call ask_user() when they need to pause and wait
for user input. Text-only turns WITHOUT ask_user flow through without
blocking, allowing progress updates and summaries to stream freely.
"""
return Tool(
name="ask_user",
description=(
"You MUST call this tool whenever you need the user's response. "
"Always call it after greeting the user, asking a question, or "
"requesting approval. Do NOT call it for status updates or "
"summaries that don't require a response. "
"Always include 2-3 predefined options. The UI automatically "
"appends an 'Other' free-text input after your options, so NEVER "
"include catch-all options like 'Custom idea', 'Something else', "
"'Other', or 'None of the above' — the UI handles that. "
"When the question primarily needs a typed answer but you must "
"include options, make one option signal that typing is expected "
"(e.g. 'I\\'ll type my response'). This helps users discover the "
"free-text input. "
"The ONLY exception: omit options when the question demands a "
"free-form answer the user must type out (e.g. 'Describe your "
"agent idea', 'Paste the error message'). "
'{"question": "What would you like to do?", "options": '
'["Build a new agent", "Modify existing agent", "Run tests"]} '
"Free-form example: "
'{"question": "Describe the agent you want to build."}'
),
parameters={
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question or prompt shown to the user.",
},
"options": {
"type": "array",
"items": {"type": "string"},
"description": (
"2-3 specific predefined choices. Include in most cases. "
'Example: ["Option A", "Option B", "Option C"]. '
"The UI always appends an 'Other' free-text input, so "
"do NOT include catch-alls like 'Custom idea' or 'Other'. "
"Omit ONLY when the user must type a free-form answer."
),
"minItems": 2,
"maxItems": 3,
},
},
"required": ["question"],
},
)
def build_ask_user_multiple_tool() -> Tool:
"""Build the synthetic ask_user_multiple tool for batched questions.
Queen-only tool that presents multiple questions at once so the user
can answer them all in a single interaction rather than one at a time.
"""
return Tool(
name="ask_user_multiple",
description=(
"Ask the user multiple questions at once. Use this instead of "
"ask_user when you have 2 or more questions to ask in the same "
"turn — it lets the user answer everything in one go rather than "
"going back and forth. Each question can have its own predefined "
"options (2-3 choices) or be free-form. The UI renders all "
"questions together with a single Submit button. "
"ALWAYS prefer this over ask_user when you have multiple things "
"to clarify. "
"IMPORTANT: Do NOT repeat the questions in your text response — "
"the widget renders them. Keep your text to a brief intro only. "
'{"questions": ['
' {"id": "scope", "prompt": "What scope?", "options": ["Full", "Partial"]},'
' {"id": "format", "prompt": "Output format?", "options": ["PDF", "CSV", "JSON"]},'
' {"id": "details", "prompt": "Any special requirements?"}'
"]}"
),
parameters={
"type": "object",
"properties": {
"questions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {
"type": "string",
"description": (
"Short identifier for this question (used in the response)."
),
},
"prompt": {
"type": "string",
"description": "The question text shown to the user.",
},
"options": {
"type": "array",
"items": {"type": "string"},
"description": (
"2-3 predefined choices. The UI appends an "
"'Other' free-text input automatically. "
"Omit only when the user must type a free-form answer."
),
"minItems": 2,
"maxItems": 3,
},
},
"required": ["id", "prompt"],
},
"minItems": 2,
"maxItems": 8,
"description": "List of questions to present to the user.",
},
},
"required": ["questions"],
},
)
def build_set_output_tool(output_keys: list[str] | None) -> Tool | None:
"""Build the synthetic set_output tool for explicit output declaration."""
if not output_keys:
return None
return Tool(
name="set_output",
description=(
"Set an output value for this node. Call once per output key. "
"Use this for brief notes, counts, status, and file references — "
"NOT for large data payloads. When a tool result was saved to a "
"data file, pass the filename as the value "
"(e.g. 'google_sheets_get_values_1.txt') so the next phase can "
"load the full data. Values exceeding ~2000 characters are "
"auto-saved to data files. "
f"Valid keys: {output_keys}"
),
parameters={
"type": "object",
"properties": {
"key": {
"type": "string",
"description": f"Output key. Must be one of: {output_keys}",
"enum": output_keys,
},
"value": {
"type": "string",
"description": (
"The output value — a brief note, count, status, "
"or data filename reference."
),
},
},
"required": ["key", "value"],
},
)
def build_escalate_tool() -> Tool:
"""Build the synthetic escalate tool for worker -> queen handoff."""
return Tool(
name="escalate",
description=(
"Escalate to the queen when requesting user input, "
"blocked by errors, missing "
"credentials, or ambiguous constraints that require supervisor "
"guidance. Include a concise reason and optional context. "
"The node will pause until the queen injects guidance."
),
parameters={
"type": "object",
"properties": {
"reason": {
"type": "string",
"description": (
"Short reason for escalation (e.g. 'Tool repeatedly failing')."
),
},
"context": {
"type": "string",
"description": "Optional diagnostic details for the queen.",
},
},
"required": ["reason"],
},
)
def build_delegate_tool(
sub_agents: list[str], node_registry: dict[str, Any]
) -> Tool | None:
"""Build the synthetic delegate_to_sub_agent tool for subagent invocation.
Args:
sub_agents: List of node IDs that can be invoked as subagents.
node_registry: Map of node_id -> NodeSpec for looking up subagent descriptions.
Returns:
Tool definition if sub_agents is non-empty, None otherwise.
"""
if not sub_agents:
return None
agent_descriptions = []
for agent_id in sub_agents:
spec = node_registry.get(agent_id)
if spec:
desc = getattr(spec, "description", "(no description)")
agent_descriptions.append(f"- {agent_id}: {desc}")
else:
agent_descriptions.append(f"- {agent_id}: (not found in registry)")
return Tool(
name="delegate_to_sub_agent",
description=(
"Delegate a task to a specialized sub-agent. The sub-agent runs "
"autonomously with read-only access to current memory and returns "
"its result. Use this to parallelize work or leverage specialized capabilities.\n\n"
"Available sub-agents:\n" + "\n".join(agent_descriptions)
),
parameters={
"type": "object",
"properties": {
"agent_id": {
"type": "string",
"description": f"The sub-agent to invoke. Must be one of: {sub_agents}",
"enum": sub_agents,
},
"task": {
"type": "string",
"description": (
"The task description for the sub-agent to execute. "
"Be specific about what you want the sub-agent to do and "
"what information to return."
),
},
},
"required": ["agent_id", "task"],
},
)
def build_report_to_parent_tool() -> Tool:
"""Build the synthetic report_to_parent tool for sub-agent progress reports.
Sub-agents call this to send one-way progress updates, partial findings,
or status reports to the parent node (and external observers via event bus)
without blocking execution.
When ``wait_for_response`` is True, the sub-agent blocks until the parent
relays the user's response — used for escalation (e.g. login pages, CAPTCHAs).
When ``mark_complete`` is True, the sub-agent terminates immediately after
sending the report no need to call set_output for each output key.
"""
return Tool(
name="report_to_parent",
description=(
"Send a report to the parent agent. By default this is fire-and-forget: "
"the parent receives the report but does not respond. "
"Set wait_for_response=true to BLOCK until the user replies — use this "
"when you need human intervention (e.g. login pages, CAPTCHAs, "
"authentication walls). The user's response is returned as the tool result. "
"Set mark_complete=true to finish your task and terminate immediately "
"after sending the report — use this when your findings are in the "
"message/data fields and you don't need to call set_output."
),
parameters={
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "A human-readable status or progress message.",
},
"data": {
"type": "object",
"description": "Optional structured data to include with the report.",
},
"wait_for_response": {
"type": "boolean",
"description": (
"If true, block execution until the user responds. "
"Use for escalation scenarios requiring human intervention."
),
"default": False,
},
"mark_complete": {
"type": "boolean",
"description": (
"If true, terminate the sub-agent immediately after sending "
"this report. The report message and data are delivered to the "
"parent as the final result. No set_output calls are needed."
),
"default": False,
},
},
"required": ["message"],
},
)
def handle_set_output(
tool_input: dict[str, Any],
output_keys: list[str] | None,
) -> ToolResult:
"""Handle set_output tool call. Returns ToolResult (sync)."""
import logging
import re
logger = logging.getLogger(__name__)
key = tool_input.get("key", "")
value = tool_input.get("value", "")
valid_keys = output_keys or []
# Recover from truncated JSON (max_tokens hit mid-argument).
# The _raw key is set by litellm when json.loads fails.
if not key and "_raw" in tool_input:
raw = tool_input["_raw"]
key_match = re.search(r'"key"\s*:\s*"(\w+)"', raw)
if key_match:
key = key_match.group(1)
val_match = re.search(r'"value"\s*:\s*"', raw)
if val_match:
start = val_match.end()
value = raw[start:].rstrip()
for suffix in ('"}\n', '"}', '"'):
if value.endswith(suffix):
value = value[: -len(suffix)]
break
if key:
logger.warning(
"Recovered set_output args from truncated JSON: key=%s, value_len=%d",
key,
len(value),
)
# Re-inject so the caller sees proper key/value
tool_input["key"] = key
tool_input["value"] = value
if key not in valid_keys:
return ToolResult(
tool_use_id="",
content=f"Invalid output key '{key}'. Valid keys: {valid_keys}",
is_error=True,
)
return ToolResult(
tool_use_id="",
content=f"Output '{key}' set successfully.",
is_error=False,
)
@@ -0,0 +1,515 @@
"""Tool result handling: truncation, spillover, JSON preview, and execution.
Manages tool result size limits, file spillover for large results, and
smart JSON previews. Also includes transient error classification and
the context-window-exceeded error detector.
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
from pathlib import Path
from typing import Any
from framework.llm.provider import ToolResult, ToolUse
from framework.llm.stream_events import ToolCallEvent
logger = logging.getLogger(__name__)
# Pattern for detecting context-window-exceeded errors across LLM providers.
_CONTEXT_TOO_LARGE_RE = re.compile(
r"context.{0,20}(length|window|limit|size)|"
r"too.{0,10}(long|large|many.{0,10}tokens)|"
r"(exceed|exceeds|exceeded).{0,30}(limit|window|context|tokens)|"
r"maximum.{0,20}token|prompt.{0,20}too.{0,10}long",
re.IGNORECASE,
)
def is_context_too_large_error(exc: BaseException) -> bool:
"""Detect whether an exception indicates the LLM input was too large."""
cls = type(exc).__name__
if "ContextWindow" in cls:
return True
return bool(_CONTEXT_TOO_LARGE_RE.search(str(exc)))
def is_transient_error(exc: BaseException) -> bool:
"""Classify whether an exception is transient (retryable) vs permanent.
Transient: network errors, rate limits, server errors, timeouts.
Permanent: auth errors, bad requests, context window exceeded.
"""
try:
from litellm.exceptions import (
APIConnectionError,
BadGatewayError,
InternalServerError,
RateLimitError,
ServiceUnavailableError,
)
transient_types: tuple[type[BaseException], ...] = (
RateLimitError,
APIConnectionError,
InternalServerError,
BadGatewayError,
ServiceUnavailableError,
TimeoutError,
ConnectionError,
OSError,
)
except ImportError:
transient_types = (TimeoutError, ConnectionError, OSError)
if isinstance(exc, transient_types):
return True
# RuntimeError from StreamErrorEvent with "Stream error:" prefix
if isinstance(exc, RuntimeError):
error_str = str(exc).lower()
transient_keywords = [
"rate limit",
"429",
"timeout",
"connection",
"internal server",
"502",
"503",
"504",
"service unavailable",
"bad gateway",
"overloaded",
"failed to parse tool call",
]
return any(kw in error_str for kw in transient_keywords)
return False
def extract_json_metadata(parsed: Any, *, _depth: int = 0, _max_depth: int = 3) -> str:
"""Return a concise structural summary of parsed JSON.
Reports key names, value types, and crucially array lengths so
the LLM knows how much data exists beyond the preview.
Returns an empty string for simple scalars.
"""
if _depth >= _max_depth:
if isinstance(parsed, dict):
return f"dict with {len(parsed)} keys"
if isinstance(parsed, list):
return f"list of {len(parsed)} items"
return type(parsed).__name__
if isinstance(parsed, dict):
if not parsed:
return "empty dict"
lines: list[str] = []
indent = " " * (_depth + 1)
for key, value in list(parsed.items())[:20]:
if isinstance(value, list):
line = f'{indent}"{key}": list of {len(value)} items'
if value:
first = value[0]
if isinstance(first, dict):
sample_keys = list(first.keys())[:10]
line += f" (each item: dict with keys {sample_keys})"
elif isinstance(first, list):
line += f" (each item: list of {len(first)} elements)"
lines.append(line)
elif isinstance(value, dict):
child = extract_json_metadata(
value, _depth=_depth + 1, _max_depth=_max_depth
)
lines.append(f'{indent}"{key}": {child}')
else:
lines.append(f'{indent}"{key}": {type(value).__name__}')
if len(parsed) > 20:
lines.append(f"{indent}... and {len(parsed) - 20} more keys")
return "\n".join(lines)
if isinstance(parsed, list):
if not parsed:
return "empty list"
desc = f"list of {len(parsed)} items"
first = parsed[0]
if isinstance(first, dict):
sample_keys = list(first.keys())[:10]
desc += f" (each item: dict with keys {sample_keys})"
elif isinstance(first, list):
desc += f" (each item: list of {len(first)} elements)"
return desc
return ""
def build_json_preview(parsed: Any, *, max_chars: int = 5000) -> str | None:
"""Build a smart preview of parsed JSON, truncating large arrays.
Shows first 3 + last 1 items of large arrays with explicit count
markers so the LLM cannot mistake the preview for the full dataset.
Returns ``None`` if no truncation was needed (no large arrays).
"""
_LARGE_ARRAY_THRESHOLD = 10
def _truncate_arrays(obj: Any) -> tuple[Any, bool]:
"""Return (truncated_copy, was_truncated)."""
if isinstance(obj, list) and len(obj) > _LARGE_ARRAY_THRESHOLD:
n = len(obj)
head = obj[:3]
tail = obj[-1:]
marker = f"... ({n - 4} more items omitted, {n} total) ..."
return head + [marker] + tail, True
if isinstance(obj, dict):
changed = False
out: dict[str, Any] = {}
for k, v in obj.items():
new_v, did = _truncate_arrays(v)
out[k] = new_v
changed = changed or did
return (out, True) if changed else (obj, False)
return obj, False
preview_obj, was_truncated = _truncate_arrays(parsed)
if not was_truncated:
return None # No large arrays — caller should use raw slicing
try:
result = json.dumps(preview_obj, indent=2, ensure_ascii=False)
except (TypeError, ValueError):
return None
if len(result) > max_chars:
# Even 3+1 items too big — try just 1 item
def _minimal_arrays(obj: Any) -> Any:
if isinstance(obj, list) and len(obj) > _LARGE_ARRAY_THRESHOLD:
n = len(obj)
return obj[:1] + [f"... ({n - 1} more items omitted, {n} total) ..."]
if isinstance(obj, dict):
return {k: _minimal_arrays(v) for k, v in obj.items()}
return obj
preview_obj = _minimal_arrays(parsed)
try:
result = json.dumps(preview_obj, indent=2, ensure_ascii=False)
except (TypeError, ValueError):
return None
if len(result) > max_chars:
result = result[:max_chars] + ""
return result
def truncate_tool_result(
result: ToolResult,
tool_name: str,
*,
max_tool_result_chars: int,
spillover_dir: str | None,
next_spill_filename_fn: Any, # Callable[[str], str]
) -> ToolResult:
"""Persist tool result to file and optionally truncate for context.
When *spillover_dir* is configured, EVERY non-error tool result is
saved to a file (short filename like ``web_search_1.txt``). A
``[Saved to '...']`` annotation is appended so the reference
survives pruning and compaction.
- Small results ( limit): full content kept + file annotation
- Large results (> limit): preview + file reference
- Errors: pass through unchanged
- load_data results: truncate with pagination hint (no re-spill)
"""
limit = max_tool_result_chars
# Errors always pass through unchanged
if result.is_error:
return result
# load_data reads FROM spilled files — never re-spill (circular).
# Just truncate with a pagination hint if the result is too large.
if tool_name == "load_data":
if limit <= 0 or len(result.content) <= limit:
return result # Small load_data result — pass through as-is
# Large load_data result — truncate with smart preview
PREVIEW_CAP = min(5000, max(limit - 500, limit // 2))
metadata_str = ""
smart_preview: str | None = None
try:
parsed_ld = json.loads(result.content)
metadata_str = extract_json_metadata(parsed_ld)
smart_preview = build_json_preview(parsed_ld, max_chars=PREVIEW_CAP)
except (json.JSONDecodeError, TypeError, ValueError):
pass
if smart_preview is not None:
preview_block = smart_preview
else:
preview_block = result.content[:PREVIEW_CAP] + ""
header = (
f"[{tool_name} result: {len(result.content):,} chars — "
f"too large for context. Use offset_bytes/limit_bytes "
f"parameters to read smaller chunks.]"
)
if metadata_str:
header += f"\n\nData structure:\n{metadata_str}"
header += (
"\n\nWARNING: This is an INCOMPLETE preview. "
"Do NOT draw conclusions or counts from it."
)
truncated = f"{header}\n\nPreview (small sample only):\n{preview_block}"
logger.info(
"%s result truncated: %d%d chars (use offset/limit to paginate)",
tool_name,
len(result.content),
len(truncated),
)
return ToolResult(
tool_use_id=result.tool_use_id,
content=truncated,
is_error=False,
)
spill_dir = spillover_dir
if spill_dir:
spill_path = Path(spill_dir)
spill_path.mkdir(parents=True, exist_ok=True)
filename = next_spill_filename_fn(tool_name)
# Pretty-print JSON content so load_data's line-based
# pagination works correctly.
write_content = result.content
parsed_json: Any = None # track for metadata extraction
try:
parsed_json = json.loads(result.content)
write_content = json.dumps(parsed_json, indent=2, ensure_ascii=False)
except (json.JSONDecodeError, TypeError, ValueError):
pass # Not JSON — write as-is
(spill_path / filename).write_text(write_content, encoding="utf-8")
if limit > 0 and len(result.content) > limit:
# Large result: build a small, metadata-rich preview so the
# LLM cannot mistake it for the complete dataset.
PREVIEW_CAP = 5000
# Extract structural metadata (array lengths, key names)
metadata_str = ""
smart_preview: str | None = None
if parsed_json is not None:
metadata_str = extract_json_metadata(parsed_json)
smart_preview = build_json_preview(parsed_json, max_chars=PREVIEW_CAP)
if smart_preview is not None:
preview_block = smart_preview
else:
preview_block = result.content[:PREVIEW_CAP] + ""
# Assemble header with structural info + warning
header = (
f"[Result from {tool_name}: {len(result.content):,} chars — "
f"too large for context, saved to '{filename}'.]\n"
)
if metadata_str:
header += f"\nData structure:\n{metadata_str}"
header += (
f"\n\nWARNING: The preview below is INCOMPLETE. "
f"Do NOT draw conclusions or counts from it. "
f"Use load_data(filename='{filename}') to read the "
f"full data before analysis."
)
content = f"{header}\n\nPreview (small sample only):\n{preview_block}"
logger.info(
"Tool result spilled to file: %s (%d chars → %s)",
tool_name,
len(result.content),
filename,
)
else:
# Small result: keep full content + annotation
content = f"{result.content}\n\n[Saved to '{filename}']"
logger.info(
"Tool result saved to file: %s (%d chars → %s)",
tool_name,
len(result.content),
filename,
)
return ToolResult(
tool_use_id=result.tool_use_id,
content=content,
is_error=False,
)
# No spillover_dir — truncate in-place if needed
if limit > 0 and len(result.content) > limit:
PREVIEW_CAP = min(5000, max(limit - 500, limit // 2))
metadata_str = ""
smart_preview: str | None = None
try:
parsed_inline = json.loads(result.content)
metadata_str = extract_json_metadata(parsed_inline)
smart_preview = build_json_preview(parsed_inline, max_chars=PREVIEW_CAP)
except (json.JSONDecodeError, TypeError, ValueError):
pass
if smart_preview is not None:
preview_block = smart_preview
else:
preview_block = result.content[:PREVIEW_CAP] + ""
header = (
f"[Result from {tool_name}: {len(result.content):,} chars — "
f"truncated to fit context budget.]"
)
if metadata_str:
header += f"\n\nData structure:\n{metadata_str}"
header += (
"\n\nWARNING: This is an INCOMPLETE preview. "
"Do NOT draw conclusions or counts from the preview alone."
)
truncated = f"{header}\n\n{preview_block}"
logger.info(
"Tool result truncated in-place: %s (%d%d chars)",
tool_name,
len(result.content),
len(truncated),
)
return ToolResult(
tool_use_id=result.tool_use_id,
content=truncated,
is_error=False,
)
return result
async def execute_tool(
tool_executor: Any, # Callable[[ToolUse], ToolResult | Awaitable[ToolResult]] | None
tc: ToolCallEvent,
timeout: float,
) -> ToolResult:
"""Execute a tool call, handling both sync and async executors.
Applies ``tool_call_timeout_seconds`` to prevent hung MCP servers
from blocking the event loop indefinitely. The initial executor
call is offloaded to a thread pool so that sync executors don't
freeze the event loop.
"""
if tool_executor is None:
return ToolResult(
tool_use_id=tc.tool_use_id,
content=f"No tool executor configured for '{tc.tool_name}'",
is_error=True,
)
tool_use = ToolUse(id=tc.tool_use_id, name=tc.tool_name, input=tc.tool_input)
async def _run() -> ToolResult:
# Offload the executor call to a thread. Sync MCP executors
# block on future.result() — running in a thread keeps the
# event loop free so asyncio.wait_for can fire the timeout.
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, tool_executor, tool_use)
# Async executors return a coroutine — await it on the loop
if asyncio.iscoroutine(result) or asyncio.isfuture(result):
result = await result
return result
try:
if timeout > 0:
result = await asyncio.wait_for(_run(), timeout=timeout)
else:
result = await _run()
except TimeoutError:
logger.warning("Tool '%s' timed out after %.0fs", tc.tool_name, timeout)
return ToolResult(
tool_use_id=tc.tool_use_id,
content=(
f"Tool '{tc.tool_name}' timed out after {timeout:.0f}s. "
"The operation took too long and was cancelled. "
"Try a simpler request or a different approach."
),
is_error=True,
)
return result
def record_learning(key: str, value: Any, spillover_dir: str | None) -> None:
"""Append a set_output value to adapt.md as a learning entry.
Called at set_output time the moment knowledge is produced so that
adapt.md accumulates the agent's outputs across the session. Since
adapt.md is injected into the system prompt, these persist through
any compaction.
"""
if not spillover_dir:
return
try:
adapt_path = Path(spillover_dir) / "adapt.md"
adapt_path.parent.mkdir(parents=True, exist_ok=True)
content = adapt_path.read_text(encoding="utf-8") if adapt_path.exists() else ""
if "## Outputs" not in content:
content += "\n\n## Outputs\n"
# Truncate long values for memory (full value is in shared memory)
v_str = str(value)
if len(v_str) > 500:
v_str = v_str[:500] + ""
entry = f"- {key}: {v_str}\n"
# Replace existing entry for same key (update, not duplicate)
lines = content.splitlines(keepends=True)
replaced = False
for i, line in enumerate(lines):
if line.startswith(f"- {key}:"):
lines[i] = entry
replaced = True
break
if replaced:
content = "".join(lines)
else:
content += entry
adapt_path.write_text(content, encoding="utf-8")
except Exception as e:
logger.warning("Failed to record learning for key=%s: %s", key, e)
def next_spill_filename(tool_name: str, counter: int) -> str:
"""Return a short, monotonic filename for a tool result spill."""
# Shorten common tool name prefixes to save tokens
short = tool_name.removeprefix("tool_").removeprefix("mcp_")
return f"{short}_{counter}.txt"
def restore_spill_counter(spillover_dir: str | None) -> int:
"""Scan spillover_dir for existing spill files and return the max counter.
Returns the highest spill number found (or 0 if none).
"""
if not spillover_dir:
return 0
spill_path = Path(spillover_dir)
if not spill_path.is_dir():
return 0
max_n = 0
for f in spill_path.iterdir():
if not f.is_file():
continue
m = re.search(r"_(\d+)\.txt$", f.name)
if m:
max_n = max(max_n, int(m.group(1)))
return max_n
File diff suppressed because it is too large Load Diff