fix: add more events to event bus

This commit is contained in:
Timothy
2026-02-12 20:42:20 -08:00
parent 83140a1398
commit d1d5f49c5a
7 changed files with 405 additions and 19 deletions
+106 -15
View File
@@ -703,6 +703,17 @@ class EventLoopNode(NodeProtocol):
fb_preview,
)
# Publish judge verdict event
judge_type = "custom" if self._judge is not None else "implicit"
await self._publish_judge_verdict(
stream_id,
node_id,
action=verdict.action,
feedback=fb_preview,
judge_type=judge_type,
iteration=iteration,
)
if verdict.action == "ACCEPT":
# Check for missing output keys
missing = self._get_missing_output_keys(
@@ -1058,13 +1069,20 @@ class EventLoopNode(NodeProtocol):
user_input_requested,
)
# Execute tool calls — separate real tools from set_output
# Execute tool calls — framework tools (set_output, ask_user)
# run inline; real MCP tools run in parallel.
real_tool_results: list[dict] = []
limit_hit = False
executed_in_batch = 0
hard_limit = int(
self._config.max_tool_calls_per_turn * (1 + self._config.tool_call_overflow_margin)
)
# Phase 1: triage — handle framework tools immediately,
# queue real tools for parallel execution.
results_by_id: dict[str, ToolResult] = {}
pending_real: list[ToolCallEvent] = []
for tc in tool_calls:
tool_call_count += 1
if tool_call_count > hard_limit:
@@ -1072,11 +1090,9 @@ class EventLoopNode(NodeProtocol):
break
executed_in_batch += 1
# Publish tool call started
await self._publish_tool_started(
stream_id, node_id, tc.tool_use_id, tc.tool_name, tc.tool_input
)
logger.info(
"[%s] tool_call: %s(%s)",
node_id,
@@ -1107,6 +1123,7 @@ class EventLoopNode(NodeProtocol):
key = tc.tool_input.get("key", "")
await accumulator.set(key, value)
outputs_set_this_turn.append(key)
await self._publish_output_key_set(stream_id, node_id, key)
logged_tool_calls.append(
{
"tool_use_id": tc.tool_use_id,
@@ -1116,6 +1133,8 @@ class EventLoopNode(NodeProtocol):
"is_error": result.is_error,
}
)
results_by_id[tc.tool_use_id] = result
elif tc.tool_name == "ask_user":
# --- Framework-level ask_user handling ---
user_input_requested = True
@@ -1124,10 +1143,10 @@ class EventLoopNode(NodeProtocol):
content="Waiting for user input...",
is_error=False,
)
results_by_id[tc.tool_use_id] = result
else:
# --- Real tool execution ---
# Guard: detect truncated tool arguments (_raw fallback
# from litellm when json.loads fails on max_tokens hit).
# --- Real tool: check for truncated args, else queue ---
if "_raw" in tc.tool_input:
result = ToolResult(
tool_use_id=tc.tool_use_id,
@@ -1143,9 +1162,36 @@ class EventLoopNode(NodeProtocol):
node_id,
tc.tool_name,
)
results_by_id[tc.tool_use_id] = result
else:
result = await self._execute_tool(tc)
result = self._truncate_tool_result(result, tc.tool_name)
pending_real.append(tc)
# Phase 2: execute real tools in parallel.
if pending_real:
raw_results = await asyncio.gather(
*(self._execute_tool(tc) for tc in pending_real),
return_exceptions=True,
)
for tc, raw in zip(pending_real, raw_results, strict=True):
if isinstance(raw, BaseException):
result = ToolResult(
tool_use_id=tc.tool_use_id,
content=f"Tool '{tc.tool_name}' raised: {raw}",
is_error=True,
)
else:
result = raw
results_by_id[tc.tool_use_id] = self._truncate_tool_result(result, tc.tool_name)
# Phase 3: record results into conversation in original order,
# build logged/real lists, and publish completed events.
for tc in tool_calls[:executed_in_batch]:
result = results_by_id.get(tc.tool_use_id)
if result is None:
continue # shouldn't happen
# Build log entries for real tools
if tc.tool_name not in ("set_output", "ask_user"):
tool_entry = {
"tool_use_id": tc.tool_use_id,
"tool_name": tc.tool_name,
@@ -1156,15 +1202,11 @@ class EventLoopNode(NodeProtocol):
real_tool_results.append(tool_entry)
logged_tool_calls.append(tool_entry)
# Record tool result in conversation (both real and set_output
# go into the conversation for LLM context continuity)
await conversation.add_tool_result(
tool_use_id=tc.tool_use_id,
content=result.content,
is_error=result.is_error,
)
# Publish tool call completed
await self._publish_tool_completed(
stream_id,
node_id,
@@ -1617,10 +1659,27 @@ class EventLoopNode(NodeProtocol):
return result
# load_data is the designated mechanism for reading spilled files.
# The LLM controls chunk size via offset/limit — re-spilling its
# result would create a circular loop.
# Don't re-spill (circular), but DO truncate with a pagination hint.
if tool_name == "load_data":
return result
preview_chars = max(limit - 300, limit // 2)
preview = result.content[:preview_chars]
truncated = (
f"[load_data result: {len(result.content)} chars — "
f"too large for context. Use offset and limit parameters "
f"to read smaller chunks, e.g. "
f"load_data(filename=..., offset=0, limit=50).]\n\n"
f"Preview:\n{preview}"
)
logger.info(
"load_data result truncated: %d%d chars (use offset/limit to paginate)",
len(result.content),
len(truncated),
)
return ToolResult(
tool_use_id=result.tool_use_id,
content=truncated,
is_error=False,
)
# Determine a preview size — leave room for the metadata wrapper
preview_chars = max(limit - 300, limit // 2)
@@ -2121,3 +2180,35 @@ class EventLoopNode(NodeProtocol):
result=result,
is_error=is_error,
)
async def _publish_judge_verdict(
self,
stream_id: str,
node_id: str,
action: str,
feedback: str = "",
judge_type: str = "implicit",
iteration: int = 0,
) -> None:
if self._event_bus:
await self._event_bus.emit_judge_verdict(
stream_id=stream_id,
node_id=node_id,
action=action,
feedback=feedback,
judge_type=judge_type,
iteration=iteration,
)
async def _publish_output_key_set(
self,
stream_id: str,
node_id: str,
key: str,
) -> None:
if self._event_bus:
await self._event_bus.emit_output_key_set(
stream_id=stream_id,
node_id=node_id,
key=key,
)
+65
View File
@@ -462,6 +462,13 @@ class GraphExecutor:
if session_state and current_node_id != graph.entry_node:
self.logger.info(f"🔄 Resuming from: {current_node_id}")
# Emit resume event
if self._event_bus:
await self._event_bus.emit_execution_resumed(
stream_id=self._stream_id,
node_id=current_node_id,
)
# Start run
_run_id = self.runtime.start_run(
goal_id=goal.id,
@@ -498,6 +505,14 @@ class GraphExecutor:
if self._pause_requested.is_set():
self.logger.info("⏸ Pause detected - stopping at node boundary")
# Emit pause event
if self._event_bus:
await self._event_bus.emit_execution_paused(
stream_id=self._stream_id,
node_id=current_node_id,
reason="User requested pause (Ctrl+Z)",
)
# Create session state for pause
saved_memory = memory.read_all()
pause_session_state: dict[str, Any] = {
@@ -782,6 +797,17 @@ class GraphExecutor:
self.logger.info(
f" ↻ Retrying ({node_retry_counts[current_node_id]}/{max_retries})..."
)
# Emit retry event
if self._event_bus:
await self._event_bus.emit_node_retry(
stream_id=self._stream_id,
node_id=current_node_id,
retry_count=retry_count,
max_retries=max_retries,
error=result.error or "",
)
_is_retry = True
continue
else:
@@ -868,6 +894,15 @@ class GraphExecutor:
# This must happen BEFORE determining next node, since pause nodes may have no edges
if node_spec.id in graph.pause_nodes:
self.logger.info("💾 Saving session state after pause node")
# Emit pause event
if self._event_bus:
await self._event_bus.emit_execution_paused(
stream_id=self._stream_id,
node_id=node_spec.id,
reason="HITL pause node",
)
saved_memory = memory.read_all()
session_state_out = {
"paused_at": node_spec.id,
@@ -923,6 +958,16 @@ class GraphExecutor:
if result.next_node:
# Router explicitly set next node
self.logger.info(f" → Router directing to: {result.next_node}")
# Emit edge traversed event for router-directed edge
if self._event_bus:
await self._event_bus.emit_edge_traversed(
stream_id=self._stream_id,
source_node=current_node_id,
target_node=result.next_node,
edge_condition="router",
)
current_node_id = result.next_node
self._write_progress(current_node_id, path, memory, node_visit_counts)
else:
@@ -946,6 +991,18 @@ class GraphExecutor:
targets = [e.target for e in traversable_edges]
fan_in_node = self._find_convergence_node(graph, targets)
# Emit edge traversed events for fan-out branches
if self._event_bus:
for edge in traversable_edges:
await self._event_bus.emit_edge_traversed(
stream_id=self._stream_id,
source_node=current_node_id,
target_node=edge.target,
edge_condition=edge.condition.value
if hasattr(edge.condition, "value")
else str(edge.condition),
)
# Execute branches in parallel
(
_branch_results,
@@ -989,6 +1046,14 @@ class GraphExecutor:
next_spec = graph.get_node(next_node)
self.logger.info(f" → Next: {next_spec.name if next_spec else next_node}")
# Emit edge traversed event
if self._event_bus:
await self._event_bus.emit_edge_traversed(
stream_id=self._stream_id,
source_node=current_node_id,
target_node=next_node,
)
# CHECKPOINT: node_complete (after determining next node)
if (
checkpoint_store
+138
View File
@@ -63,6 +63,16 @@ class EventType(StrEnum):
NODE_INPUT_BLOCKED = "node_input_blocked"
NODE_STALLED = "node_stalled"
# Judge decisions
JUDGE_VERDICT = "judge_verdict"
# Output tracking
OUTPUT_KEY_SET = "output_key_set"
# Retry / edge tracking
NODE_RETRY = "node_retry"
EDGE_TRAVERSED = "edge_traversed"
# Context management
CONTEXT_COMPACTED = "context_compacted"
@@ -639,6 +649,134 @@ class EventBus:
)
)
# === JUDGE / OUTPUT / RETRY / EDGE PUBLISHERS ===
async def emit_judge_verdict(
self,
stream_id: str,
node_id: str,
action: str,
feedback: str = "",
judge_type: str = "implicit",
iteration: int = 0,
execution_id: str | None = None,
) -> None:
"""Emit judge verdict event."""
await self.publish(
AgentEvent(
type=EventType.JUDGE_VERDICT,
stream_id=stream_id,
node_id=node_id,
execution_id=execution_id,
data={
"action": action,
"feedback": feedback,
"judge_type": judge_type,
"iteration": iteration,
},
)
)
async def emit_output_key_set(
self,
stream_id: str,
node_id: str,
key: str,
execution_id: str | None = None,
) -> None:
"""Emit output key set event."""
await self.publish(
AgentEvent(
type=EventType.OUTPUT_KEY_SET,
stream_id=stream_id,
node_id=node_id,
execution_id=execution_id,
data={"key": key},
)
)
async def emit_node_retry(
self,
stream_id: str,
node_id: str,
retry_count: int,
max_retries: int,
error: str = "",
execution_id: str | None = None,
) -> None:
"""Emit node retry event."""
await self.publish(
AgentEvent(
type=EventType.NODE_RETRY,
stream_id=stream_id,
node_id=node_id,
execution_id=execution_id,
data={
"retry_count": retry_count,
"max_retries": max_retries,
"error": error,
},
)
)
async def emit_edge_traversed(
self,
stream_id: str,
source_node: str,
target_node: str,
edge_condition: str = "",
execution_id: str | None = None,
) -> None:
"""Emit edge traversed event."""
await self.publish(
AgentEvent(
type=EventType.EDGE_TRAVERSED,
stream_id=stream_id,
node_id=source_node,
execution_id=execution_id,
data={
"source_node": source_node,
"target_node": target_node,
"edge_condition": edge_condition,
},
)
)
async def emit_execution_paused(
self,
stream_id: str,
node_id: str,
reason: str = "",
execution_id: str | None = None,
) -> None:
"""Emit execution paused event."""
await self.publish(
AgentEvent(
type=EventType.EXECUTION_PAUSED,
stream_id=stream_id,
node_id=node_id,
execution_id=execution_id,
data={"reason": reason},
)
)
async def emit_execution_resumed(
self,
stream_id: str,
node_id: str,
execution_id: str | None = None,
) -> None:
"""Emit execution resumed event."""
await self.publish(
AgentEvent(
type=EventType.EXECUTION_RESUMED,
stream_id=stream_id,
node_id=node_id,
execution_id=execution_id,
data={},
)
)
async def emit_webhook_received(
self,
source_id: str,
+47
View File
@@ -351,6 +351,13 @@ class AdenTUI(App):
EventType.STATE_CHANGED,
EventType.NODE_INPUT_BLOCKED,
EventType.CONTEXT_COMPACTED,
EventType.NODE_INTERNAL_OUTPUT,
EventType.JUDGE_VERDICT,
EventType.OUTPUT_KEY_SET,
EventType.NODE_RETRY,
EventType.EDGE_TRAVERSED,
EventType.EXECUTION_PAUSED,
EventType.EXECUTION_RESUMED,
]
_LOG_PANE_EVENTS = frozenset(_EVENT_TYPES) - {
@@ -415,6 +422,25 @@ class AdenTUI(App):
elif et == EventType.NODE_LOOP_COMPLETED:
self.chat_repl.handle_node_completed(event.node_id or "")
# Non-client-facing node output → chat repl
if et == EventType.NODE_INTERNAL_OUTPUT:
content = event.data.get("content", "")
if content.strip():
self.chat_repl.handle_internal_output(event.node_id or "", content)
# Execution paused/resumed → chat repl
if et == EventType.EXECUTION_PAUSED:
reason = event.data.get("reason", "")
self.chat_repl.handle_execution_paused(event.node_id or "", reason)
elif et == EventType.EXECUTION_RESUMED:
self.chat_repl.handle_execution_resumed(event.node_id or "")
# Goal achieved / constraint violation → chat repl
if et == EventType.GOAL_ACHIEVED:
self.chat_repl.handle_goal_achieved(event.data)
elif et == EventType.CONSTRAINT_VIOLATION:
self.chat_repl.handle_constraint_violation(event.data)
# --- Graph view events ---
if et in (
EventType.EXECUTION_STARTED,
@@ -451,6 +477,13 @@ class AdenTUI(App):
started=False,
)
# Edge traversal → graph view
if et == EventType.EDGE_TRAVERSED:
self.graph_view.handle_edge_traversed(
event.data.get("source_node", ""),
event.data.get("target_node", ""),
)
# --- Status bar events ---
if et == EventType.EXECUTION_STARTED:
entry_node = event.data.get("entry_node") or (
@@ -475,6 +508,20 @@ class AdenTUI(App):
before = event.data.get("usage_before", "?")
after = event.data.get("usage_after", "?")
self.status_bar.set_node_detail(f"compacted: {before}% \u2192 {after}%")
elif et == EventType.JUDGE_VERDICT:
action = event.data.get("action", "?")
self.status_bar.set_node_detail(f"judge: {action}")
elif et == EventType.OUTPUT_KEY_SET:
key = event.data.get("key", "?")
self.status_bar.set_node_detail(f"set: {key}")
elif et == EventType.NODE_RETRY:
retry = event.data.get("retry_count", "?")
max_r = event.data.get("max_retries", "?")
self.status_bar.set_node_detail(f"retry {retry}/{max_r}")
elif et == EventType.EXECUTION_PAUSED:
self.status_bar.set_node_detail("paused")
elif et == EventType.EXECUTION_RESUMED:
self.status_bar.set_node_detail("resumed")
# --- Log pane events ---
if et in self._LOG_PANE_EVENTS:
+24
View File
@@ -999,3 +999,27 @@ class ChatRepl(Vertical):
"""Clear active node when it finishes."""
if self._active_node_id == node_id:
self._active_node_id = None
def handle_internal_output(self, node_id: str, content: str) -> None:
"""Show output from non-client-facing nodes."""
self._write_history(f"[dim cyan]⟨{node_id}⟩[/dim cyan] {content}")
def handle_execution_paused(self, node_id: str, reason: str) -> None:
"""Show that execution has been paused."""
msg = f"[bold yellow]⏸ Paused[/bold yellow] at [cyan]{node_id}[/cyan]"
if reason:
msg += f" [dim]({reason})[/dim]"
self._write_history(msg)
def handle_execution_resumed(self, node_id: str) -> None:
"""Show that execution has been resumed."""
self._write_history(f"[bold green]▶ Resumed[/bold green] from [cyan]{node_id}[/cyan]")
def handle_goal_achieved(self, data: dict[str, Any]) -> None:
"""Show goal achievement prominently."""
self._write_history("[bold green]★ Goal achieved![/bold green]")
def handle_constraint_violation(self, data: dict[str, Any]) -> None:
"""Show constraint violation as a warning."""
desc = data.get("description", "Unknown constraint")
self._write_history(f"[bold red]⚠ Constraint violation:[/bold red] {desc}")
+5
View File
@@ -192,3 +192,8 @@ class GraphOverview(Vertical):
"""Highlight a stalled node."""
self._node_status[node_id] = f"[red]stalled: {reason}[/red]"
self._display_graph()
def handle_edge_traversed(self, source_node: str, target_node: str) -> None:
"""Highlight an edge being traversed."""
self._node_status[source_node] = f"[dim]→ {target_node}[/dim]"
self._display_graph()
+20 -4
View File
@@ -143,6 +143,18 @@ class FakeEventBus:
async def emit_node_loop_completed(self, **kwargs):
self.events.append(("completed", kwargs))
async def emit_edge_traversed(self, **kwargs):
self.events.append(("edge_traversed", kwargs))
async def emit_execution_paused(self, **kwargs):
self.events.append(("execution_paused", kwargs))
async def emit_execution_resumed(self, **kwargs):
self.events.append(("execution_resumed", kwargs))
async def emit_node_retry(self, **kwargs):
self.events.append(("node_retry", kwargs))
@pytest.mark.asyncio
async def test_executor_emits_node_events():
@@ -201,15 +213,19 @@ async def test_executor_emits_node_events():
assert result.success is True
assert result.path == ["n1", "n2"]
# Should have 4 events: started/completed for n1, then started/completed for n2
assert len(event_bus.events) == 4
# Should have 5 events: started/completed for n1, edge_traversed, then started/completed for n2
assert len(event_bus.events) == 5
assert event_bus.events[0] == ("started", {"stream_id": "test-stream", "node_id": "n1"})
assert event_bus.events[1] == (
"completed",
{"stream_id": "test-stream", "node_id": "n1", "iterations": 1},
)
assert event_bus.events[2] == ("started", {"stream_id": "test-stream", "node_id": "n2"})
assert event_bus.events[3] == (
assert event_bus.events[2] == (
"edge_traversed",
{"stream_id": "test-stream", "source_node": "n1", "target_node": "n2"},
)
assert event_bus.events[3] == ("started", {"stream_id": "test-stream", "node_id": "n2"})
assert event_bus.events[4] == (
"completed",
{"stream_id": "test-stream", "node_id": "n2", "iterations": 1},
)