feat(arch): architecture patches to support multi-entry agents consuming external events
This commit is contained in:
@@ -400,9 +400,13 @@ class GraphBuilder:
|
||||
if not terminal_candidates and self.session.nodes:
|
||||
warnings.append("No terminal nodes found (all nodes have outgoing edges)")
|
||||
|
||||
# Check reachability
|
||||
# Check reachability from ALL entry candidates (not just the first one).
|
||||
# Agents with async entry points have multiple nodes with no incoming
|
||||
# edges (e.g., a primary entry node and an event-driven entry node).
|
||||
if entry_candidates and self.session.nodes:
|
||||
reachable = self._compute_reachable(entry_candidates[0])
|
||||
reachable = set()
|
||||
for candidate in entry_candidates:
|
||||
reachable |= self._compute_reachable(candidate)
|
||||
unreachable = [n.id for n in self.session.nodes if n.id not in reachable]
|
||||
if unreachable:
|
||||
errors.append(f"Unreachable nodes: {unreachable}")
|
||||
|
||||
@@ -1108,6 +1108,7 @@ class EventLoopNode(NodeProtocol):
|
||||
|
||||
accumulated_text = ""
|
||||
tool_calls: list[ToolCallEvent] = []
|
||||
_stream_error: StreamErrorEvent | None = None
|
||||
|
||||
# Stream LLM response
|
||||
async for event in ctx.llm.stream(
|
||||
@@ -1132,7 +1133,16 @@ class EventLoopNode(NodeProtocol):
|
||||
elif isinstance(event, StreamErrorEvent):
|
||||
if not event.recoverable:
|
||||
raise RuntimeError(f"Stream error: {event.error}")
|
||||
logger.warning(f"Recoverable stream error: {event.error}")
|
||||
_stream_error = event
|
||||
logger.warning("Recoverable stream error: %s", event.error)
|
||||
|
||||
# If a recoverable stream error produced an empty response,
|
||||
# raise so the outer transient-error retry can handle it
|
||||
# with proper backoff instead of burning judge iterations.
|
||||
if _stream_error and not accumulated_text and not tool_calls:
|
||||
raise ConnectionError(
|
||||
f"Stream failed with recoverable error: {_stream_error.error}"
|
||||
)
|
||||
|
||||
final_text = accumulated_text
|
||||
logger.info(
|
||||
|
||||
@@ -459,6 +459,66 @@ class GraphExecutor:
|
||||
|
||||
steps = 0
|
||||
|
||||
# Fresh shared-session execution: clear stale cursor so the entry
|
||||
# node doesn't restore a filled OutputAccumulator from the previous
|
||||
# webhook run (which would cause the judge to accept immediately).
|
||||
# The conversation history is preserved (continuous memory).
|
||||
_is_fresh_shared = bool(
|
||||
session_state
|
||||
and session_state.get("resume_session_id")
|
||||
and not session_state.get("paused_at")
|
||||
and not session_state.get("resume_from_checkpoint")
|
||||
)
|
||||
if _is_fresh_shared and is_continuous and self._storage_path:
|
||||
try:
|
||||
from framework.storage.conversation_store import FileConversationStore
|
||||
|
||||
entry_conv_path = self._storage_path / "conversations" / current_node_id
|
||||
if entry_conv_path.exists():
|
||||
_store = FileConversationStore(base_path=entry_conv_path)
|
||||
|
||||
# Read cursor to find next seq for the transition marker.
|
||||
_cursor = await _store.read_cursor() or {}
|
||||
_next_seq = _cursor.get("next_seq", 0)
|
||||
if _next_seq == 0:
|
||||
# Fallback: scan part files for max seq
|
||||
_parts = await _store.read_parts()
|
||||
if _parts:
|
||||
_next_seq = max(p.get("seq", 0) for p in _parts) + 1
|
||||
|
||||
# Reset cursor — clears stale accumulator outputs and
|
||||
# iteration counter so the node starts fresh work while
|
||||
# the conversation thread carries forward.
|
||||
await _store.write_cursor({})
|
||||
|
||||
# Append a transition marker so the LLM knows a new
|
||||
# event arrived and previous results are outdated.
|
||||
await _store.write_part(
|
||||
_next_seq,
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
"--- NEW EVENT TRIGGER ---\n"
|
||||
"A new event has been received. "
|
||||
"Process this as a fresh request — "
|
||||
"previous outputs are no longer valid."
|
||||
),
|
||||
"seq": _next_seq,
|
||||
"is_transition_marker": True,
|
||||
},
|
||||
)
|
||||
self.logger.info(
|
||||
"🔄 Cleared stale cursor and added transition marker "
|
||||
"for shared-session entry node '%s'",
|
||||
current_node_id,
|
||||
)
|
||||
except Exception:
|
||||
self.logger.debug(
|
||||
"Could not prepare conversation store for shared-session entry node '%s'",
|
||||
current_node_id,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
if session_state and current_node_id != graph.entry_node:
|
||||
self.logger.info(f"🔄 Resuming from: {current_node_id}")
|
||||
|
||||
|
||||
@@ -726,6 +726,16 @@ class LiteLLMProvider(LLMProvider):
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
if _is_stream_transient_error(e) and attempt < RATE_LIMIT_MAX_RETRIES:
|
||||
wait = _compute_retry_delay(attempt, exception=e)
|
||||
logger.warning(
|
||||
f"[stream-retry] {self.model} transient error "
|
||||
f"({type(e).__name__}): {e!s}. "
|
||||
f"Retrying in {wait:.1f}s "
|
||||
f"(attempt {attempt + 1}/{RATE_LIMIT_MAX_RETRIES})"
|
||||
)
|
||||
await asyncio.sleep(wait)
|
||||
continue
|
||||
recoverable = _is_stream_transient_error(e)
|
||||
yield StreamErrorEvent(error=str(e), recoverable=recoverable)
|
||||
return
|
||||
|
||||
@@ -1112,11 +1112,11 @@ def validate_graph() -> str:
|
||||
if entry_candidates:
|
||||
reachable = set()
|
||||
|
||||
# For pause/resume agents, start from ALL entry points (including resume)
|
||||
if is_pause_resume_agent:
|
||||
to_visit = list(entry_candidates) # All nodes without incoming edges
|
||||
else:
|
||||
to_visit = [entry_candidates[0]] # Just the primary entry
|
||||
# Start from ALL entry candidates (nodes without incoming edges).
|
||||
# This handles both pause/resume agents and async entry point agents
|
||||
# where multiple nodes have no incoming edges (e.g., a primary entry
|
||||
# node and an event-driven entry node).
|
||||
to_visit = list(entry_candidates)
|
||||
|
||||
while to_visit:
|
||||
current = to_visit.pop()
|
||||
|
||||
@@ -25,7 +25,7 @@ from framework.graph.executor import ExecutionResult
|
||||
from framework.graph.node import NodeSpec
|
||||
from framework.llm.provider import LLMProvider, Tool
|
||||
from framework.runner.tool_registry import ToolRegistry
|
||||
from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
|
||||
from framework.runtime.agent_runtime import AgentRuntime, AgentRuntimeConfig, create_agent_runtime
|
||||
from framework.runtime.execution_stream import EntryPointSpec
|
||||
from framework.runtime.runtime_log_store import RuntimeLogStore
|
||||
|
||||
@@ -246,6 +246,7 @@ class AgentRunner:
|
||||
storage_path: Path | None = None,
|
||||
model: str | None = None,
|
||||
intro_message: str = "",
|
||||
runtime_config: "AgentRuntimeConfig | None" = None,
|
||||
):
|
||||
"""
|
||||
Initialize the runner (use AgentRunner.load() instead).
|
||||
@@ -258,6 +259,7 @@ class AgentRunner:
|
||||
storage_path: Path for runtime storage (defaults to temp)
|
||||
model: Model to use (reads from agent config or ~/.hive/configuration.json if None)
|
||||
intro_message: Optional greeting shown to user on TUI load
|
||||
runtime_config: Optional AgentRuntimeConfig (webhook settings, etc.)
|
||||
"""
|
||||
self.agent_path = agent_path
|
||||
self.graph = graph
|
||||
@@ -265,6 +267,7 @@ class AgentRunner:
|
||||
self.mock_mode = mock_mode
|
||||
self.model = model or self._resolve_default_model()
|
||||
self.intro_message = intro_message
|
||||
self.runtime_config = runtime_config
|
||||
|
||||
# Set up storage
|
||||
if storage_path:
|
||||
@@ -408,19 +411,32 @@ class AgentRunner:
|
||||
intro_message = agent_metadata.intro_message
|
||||
|
||||
# Build GraphSpec from module-level variables
|
||||
graph = GraphSpec(
|
||||
id=f"{agent_path.name}-graph",
|
||||
goal_id=goal.id,
|
||||
version="1.0.0",
|
||||
entry_node=getattr(agent_module, "entry_node", nodes[0].id),
|
||||
entry_points=getattr(agent_module, "entry_points", {}),
|
||||
terminal_nodes=getattr(agent_module, "terminal_nodes", []),
|
||||
pause_nodes=getattr(agent_module, "pause_nodes", []),
|
||||
nodes=nodes,
|
||||
edges=edges,
|
||||
max_tokens=max_tokens,
|
||||
loop_config=getattr(agent_module, "loop_config", {}),
|
||||
)
|
||||
graph_kwargs: dict = {
|
||||
"id": f"{agent_path.name}-graph",
|
||||
"goal_id": goal.id,
|
||||
"version": "1.0.0",
|
||||
"entry_node": getattr(agent_module, "entry_node", nodes[0].id),
|
||||
"entry_points": getattr(agent_module, "entry_points", {}),
|
||||
"async_entry_points": getattr(agent_module, "async_entry_points", []),
|
||||
"terminal_nodes": getattr(agent_module, "terminal_nodes", []),
|
||||
"pause_nodes": getattr(agent_module, "pause_nodes", []),
|
||||
"nodes": nodes,
|
||||
"edges": edges,
|
||||
"max_tokens": max_tokens,
|
||||
"loop_config": getattr(agent_module, "loop_config", {}),
|
||||
}
|
||||
# Only pass optional fields if explicitly defined by the agent module
|
||||
conversation_mode = getattr(agent_module, "conversation_mode", None)
|
||||
if conversation_mode is not None:
|
||||
graph_kwargs["conversation_mode"] = conversation_mode
|
||||
identity_prompt = getattr(agent_module, "identity_prompt", None)
|
||||
if identity_prompt is not None:
|
||||
graph_kwargs["identity_prompt"] = identity_prompt
|
||||
|
||||
graph = GraphSpec(**graph_kwargs)
|
||||
|
||||
# Read runtime config (webhook settings, etc.) if defined
|
||||
agent_runtime_config = getattr(agent_module, "runtime_config", None)
|
||||
|
||||
return cls(
|
||||
agent_path=agent_path,
|
||||
@@ -430,6 +446,7 @@ class AgentRunner:
|
||||
storage_path=storage_path,
|
||||
model=model,
|
||||
intro_message=intro_message,
|
||||
runtime_config=agent_runtime_config,
|
||||
)
|
||||
|
||||
# Fallback: load from agent.json (legacy JSON-based agents)
|
||||
@@ -685,17 +702,19 @@ class AgentRunner:
|
||||
)
|
||||
entry_points.append(ep)
|
||||
|
||||
# Single-entry agent with no async entry points: create a default entry point
|
||||
if not entry_points and self.graph.entry_node:
|
||||
logger.info("Creating default entry point for single-entry agent")
|
||||
entry_points.append(
|
||||
# Always create a primary entry point for the graph's entry node.
|
||||
# For multi-entry-point agents this ensures the primary path (e.g.
|
||||
# user-facing rule setup) is reachable alongside async entry points.
|
||||
if self.graph.entry_node:
|
||||
entry_points.insert(
|
||||
0,
|
||||
EntryPointSpec(
|
||||
id="default",
|
||||
name="Default",
|
||||
entry_node=self.graph.entry_node,
|
||||
trigger_type="manual",
|
||||
isolation_level="shared",
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
# Create AgentRuntime with all entry points
|
||||
@@ -722,6 +741,7 @@ class AgentRunner:
|
||||
tool_executor=tool_executor,
|
||||
runtime_log_store=log_store,
|
||||
checkpoint_config=checkpoint_config,
|
||||
config=self.runtime_config,
|
||||
)
|
||||
|
||||
# Pass intro_message through for TUI display
|
||||
|
||||
@@ -289,7 +289,17 @@ class AgentRuntime:
|
||||
def _make_handler(entry_point_id: str):
|
||||
async def _on_event(event):
|
||||
if self._running and entry_point_id in self._streams:
|
||||
await self.trigger(entry_point_id, {"event": event.to_dict()})
|
||||
# Run in the same session as the primary entry
|
||||
# point so memory (e.g. user-defined rules) is
|
||||
# shared and logs land in one session directory.
|
||||
session_state = self._get_primary_session_state(
|
||||
exclude_entry_point=entry_point_id
|
||||
)
|
||||
await self.trigger(
|
||||
entry_point_id,
|
||||
{"event": event.to_dict()},
|
||||
session_state=session_state,
|
||||
)
|
||||
|
||||
return _on_event
|
||||
|
||||
@@ -391,6 +401,66 @@ class AgentRuntime:
|
||||
raise ValueError(f"Entry point '{entry_point_id}' not found")
|
||||
return await stream.wait_for_completion(exec_id, timeout)
|
||||
|
||||
def _get_primary_session_state(self, exclude_entry_point: str) -> dict[str, Any] | None:
|
||||
"""Build session_state so an async entry point runs in the primary session.
|
||||
|
||||
Looks for an active execution from another stream (the "primary"
|
||||
session, e.g. the user-facing intake loop) and returns a
|
||||
``session_state`` dict containing:
|
||||
|
||||
- ``resume_session_id``: reuse the same session directory
|
||||
- ``memory``: only the keys that the async entry node declares
|
||||
as inputs (e.g. ``rules``, ``max_emails``). Stale outputs
|
||||
from previous runs (``emails``, ``actions_taken``, …) are
|
||||
excluded so each trigger starts fresh.
|
||||
|
||||
The memory is read from the primary session's ``state.json``
|
||||
which is kept up-to-date by ``GraphExecutor._write_progress()``
|
||||
at every node transition.
|
||||
|
||||
Returns ``None`` if no primary session is active (the webhook
|
||||
execution will just create its own session).
|
||||
"""
|
||||
import json as _json
|
||||
|
||||
# Determine which memory keys the async entry node needs.
|
||||
allowed_keys: set[str] | None = None
|
||||
ep_spec = self._entry_points.get(exclude_entry_point)
|
||||
if ep_spec:
|
||||
entry_node = self.graph.get_node(ep_spec.entry_node)
|
||||
if entry_node and entry_node.input_keys:
|
||||
allowed_keys = set(entry_node.input_keys)
|
||||
|
||||
for ep_id, stream in self._streams.items():
|
||||
if ep_id == exclude_entry_point:
|
||||
continue
|
||||
for exec_id in stream.active_execution_ids:
|
||||
state_path = self._storage.base_path / "sessions" / exec_id / "state.json"
|
||||
try:
|
||||
if state_path.exists():
|
||||
data = _json.loads(state_path.read_text(encoding="utf-8"))
|
||||
full_memory = data.get("memory", {})
|
||||
if not full_memory:
|
||||
continue
|
||||
# Filter to only input keys so stale outputs
|
||||
# from previous triggers don't leak through.
|
||||
if allowed_keys is not None:
|
||||
memory = {k: v for k, v in full_memory.items() if k in allowed_keys}
|
||||
else:
|
||||
memory = full_memory
|
||||
if memory:
|
||||
return {
|
||||
"resume_session_id": exec_id,
|
||||
"memory": memory,
|
||||
}
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"Could not read state.json for %s: skipping",
|
||||
exec_id,
|
||||
exc_info=True,
|
||||
)
|
||||
return None
|
||||
|
||||
async def inject_input(self, node_id: str, content: str) -> bool:
|
||||
"""Inject user input into a running client-facing node.
|
||||
|
||||
|
||||
@@ -196,6 +196,11 @@ class ExecutionStream:
|
||||
)
|
||||
)
|
||||
|
||||
@property
|
||||
def active_execution_ids(self) -> list[str]:
|
||||
"""Return IDs of all currently active executions."""
|
||||
return list(self._active_executions.keys())
|
||||
|
||||
def _record_execution_result(self, execution_id: str, result: ExecutionResult) -> None:
|
||||
"""Record a completed execution result with retention pruning."""
|
||||
self._execution_results[execution_id] = result
|
||||
@@ -342,6 +347,11 @@ class ExecutionStream:
|
||||
"""Run a single execution within the stream."""
|
||||
execution_id = ctx.id
|
||||
|
||||
# When sharing a session with another entry point (resume_session_id),
|
||||
# skip writing initial/final session state — the primary execution
|
||||
# owns the state.json and _write_progress() keeps memory up-to-date.
|
||||
_is_shared_session = bool(ctx.session_state and ctx.session_state.get("resume_session_id"))
|
||||
|
||||
# Acquire semaphore to limit concurrency
|
||||
async with self._semaphore:
|
||||
ctx.status = "running"
|
||||
@@ -404,7 +414,8 @@ class ExecutionStream:
|
||||
self._active_executors[execution_id] = executor
|
||||
|
||||
# Write initial session state
|
||||
await self._write_session_state(execution_id, ctx)
|
||||
if not _is_shared_session:
|
||||
await self._write_session_state(execution_id, ctx)
|
||||
|
||||
# Create modified graph with entry point
|
||||
# We need to override the entry_node to use our entry point
|
||||
@@ -438,8 +449,9 @@ class ExecutionStream:
|
||||
if result.paused_at:
|
||||
ctx.status = "paused"
|
||||
|
||||
# Write final session state
|
||||
await self._write_session_state(execution_id, ctx, result=result)
|
||||
# Write final session state (skip for shared-session executions)
|
||||
if not _is_shared_session:
|
||||
await self._write_session_state(execution_id, ctx, result=result)
|
||||
|
||||
# Emit completion/failure event
|
||||
if self._event_bus:
|
||||
@@ -490,11 +502,14 @@ class ExecutionStream:
|
||||
# Store result with retention
|
||||
self._record_execution_result(execution_id, result)
|
||||
|
||||
# Write session state
|
||||
if has_result and result.paused_at:
|
||||
await self._write_session_state(execution_id, ctx, result=result)
|
||||
else:
|
||||
await self._write_session_state(execution_id, ctx, error="Execution cancelled")
|
||||
# Write session state (skip for shared-session executions)
|
||||
if not _is_shared_session:
|
||||
if has_result and result.paused_at:
|
||||
await self._write_session_state(execution_id, ctx, result=result)
|
||||
else:
|
||||
await self._write_session_state(
|
||||
execution_id, ctx, error="Execution cancelled"
|
||||
)
|
||||
|
||||
# Don't re-raise - we've handled it and saved state
|
||||
|
||||
@@ -511,8 +526,9 @@ class ExecutionStream:
|
||||
),
|
||||
)
|
||||
|
||||
# Write error session state
|
||||
await self._write_session_state(execution_id, ctx, error=str(e))
|
||||
# Write error session state (skip for shared-session executions)
|
||||
if not _is_shared_session:
|
||||
await self._write_session_state(execution_id, ctx, error=str(e))
|
||||
|
||||
# End run with failure (for observability)
|
||||
try:
|
||||
@@ -648,20 +664,35 @@ class ExecutionStream:
|
||||
logger.error(f"Failed to write state.json for {execution_id}: {e}")
|
||||
|
||||
def _create_modified_graph(self) -> "GraphSpec":
|
||||
"""Create a graph with the entry point overridden."""
|
||||
# Use the existing graph but override entry_node
|
||||
"""Create a graph with the entry point overridden.
|
||||
|
||||
Preserves the original graph's entry_points and async_entry_points
|
||||
so that validation correctly considers ALL entry nodes reachable.
|
||||
Each stream only executes from its own entry_node, but the full
|
||||
graph must validate with all entry points accounted for.
|
||||
"""
|
||||
from framework.graph.edge import GraphSpec
|
||||
|
||||
# Create a copy with modified entry node
|
||||
# Merge entry points: this stream's entry + original graph's primary
|
||||
# entry + any other entry points. This ensures all nodes are
|
||||
# reachable during validation even though this stream only starts
|
||||
# from self.entry_spec.entry_node.
|
||||
merged_entry_points = {
|
||||
"start": self.entry_spec.entry_node,
|
||||
}
|
||||
# Preserve the original graph's primary entry node
|
||||
if self.graph.entry_node:
|
||||
merged_entry_points["primary"] = self.graph.entry_node
|
||||
# Include any explicitly defined entry points from the graph
|
||||
merged_entry_points.update(self.graph.entry_points)
|
||||
|
||||
return GraphSpec(
|
||||
id=self.graph.id,
|
||||
goal_id=self.graph.goal_id,
|
||||
version=self.graph.version,
|
||||
entry_node=self.entry_spec.entry_node, # Use our entry point
|
||||
entry_points={
|
||||
"start": self.entry_spec.entry_node,
|
||||
**self.graph.entry_points,
|
||||
},
|
||||
entry_points=merged_entry_points,
|
||||
async_entry_points=self.graph.async_entry_points,
|
||||
terminal_nodes=self.graph.terminal_nodes,
|
||||
pause_nodes=self.graph.pause_nodes,
|
||||
nodes=self.graph.nodes,
|
||||
@@ -671,6 +702,8 @@ class ExecutionStream:
|
||||
max_steps=self.graph.max_steps,
|
||||
cleanup_llm_model=self.graph.cleanup_llm_model,
|
||||
loop_config=self.graph.loop_config,
|
||||
conversation_mode=self.graph.conversation_mode,
|
||||
identity_prompt=self.graph.identity_prompt,
|
||||
)
|
||||
|
||||
async def wait_for_completion(
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import logging
|
||||
import platform
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
|
||||
from textual.app import App, ComposeResult
|
||||
@@ -358,9 +359,20 @@ class AdenTUI(App):
|
||||
pass
|
||||
|
||||
async def _handle_event(self, event: AgentEvent) -> None:
|
||||
"""Called from the agent thread — bridge to Textual's main thread."""
|
||||
"""Bridge events to Textual's main thread for UI updates.
|
||||
|
||||
Events may arrive from the agent-execution thread (normal LLM/tool
|
||||
work) or from the Textual thread itself (e.g. webhook server events).
|
||||
``call_from_thread`` requires a *different* thread, so we detect
|
||||
which thread we're on and act accordingly.
|
||||
"""
|
||||
try:
|
||||
self.call_from_thread(self._route_event, event)
|
||||
if threading.get_ident() == self._thread_id:
|
||||
# Already on Textual's thread — call directly.
|
||||
self._route_event(event)
|
||||
else:
|
||||
# On a different thread — bridge via call_from_thread.
|
||||
self.call_from_thread(self._route_event, event)
|
||||
except Exception as e:
|
||||
logging.getLogger("tui.events").error(
|
||||
"call_from_thread failed for %s (node=%s): %s",
|
||||
|
||||
@@ -1188,6 +1188,65 @@ class TestTransientErrorRetry:
|
||||
assert len(retry_events) == 1
|
||||
assert retry_events[0].data["retry_count"] == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_recoverable_stream_error_retried_not_silent(self, runtime, node_spec, memory):
|
||||
"""Recoverable StreamErrorEvent with empty response should raise ConnectionError.
|
||||
|
||||
Previously, recoverable stream errors were silently swallowed,
|
||||
producing empty responses that the judge retried — creating an
|
||||
infinite loop of 50+ empty-response iterations. Now they raise
|
||||
ConnectionError so the outer transient-error retry handles them
|
||||
with proper backoff.
|
||||
"""
|
||||
node_spec.output_keys = ["result"]
|
||||
|
||||
call_index = 0
|
||||
|
||||
class RecoverableErrorThenSuccessLLM(LLMProvider):
|
||||
async def stream(self, messages, system="", tools=None, max_tokens=4096):
|
||||
nonlocal call_index
|
||||
idx = call_index
|
||||
call_index += 1
|
||||
if idx == 0:
|
||||
# Recoverable error with no content
|
||||
yield StreamErrorEvent(
|
||||
error="503 service unavailable",
|
||||
recoverable=True,
|
||||
)
|
||||
elif idx == 1:
|
||||
# Success: set output
|
||||
for event in tool_call_scenario(
|
||||
"set_output", {"key": "result", "value": "done"}
|
||||
):
|
||||
yield event
|
||||
else:
|
||||
# Subsequent calls: text-only (no more tool calls)
|
||||
for event in text_scenario("done"):
|
||||
yield event
|
||||
|
||||
def complete(self, messages, system="", **kwargs):
|
||||
return LLMResponse(content="ok", model="mock", stop_reason="stop")
|
||||
|
||||
def complete_with_tools(self, messages, system, tools, tool_executor, **kwargs):
|
||||
return LLMResponse(content="", model="mock", stop_reason="stop")
|
||||
|
||||
llm = RecoverableErrorThenSuccessLLM()
|
||||
ctx = build_ctx(runtime, node_spec, memory, llm)
|
||||
node = EventLoopNode(
|
||||
config=LoopConfig(
|
||||
max_iterations=5,
|
||||
max_stream_retries=3,
|
||||
stream_retry_backoff_base=0.01,
|
||||
),
|
||||
)
|
||||
result = await node.execute(ctx)
|
||||
assert result.success is True
|
||||
assert result.output.get("result") == "done"
|
||||
# call 0: recoverable error → ConnectionError raised → outer retry
|
||||
# call 1: set_output tool call succeeds
|
||||
# call 2: inner tool loop re-invokes LLM after tool result → text "done"
|
||||
assert call_index == 3
|
||||
|
||||
|
||||
class TestIsTransientError:
|
||||
"""Unit tests for _is_transient_error() classification."""
|
||||
|
||||
@@ -120,3 +120,146 @@ async def test_execution_stream_retention(tmp_path):
|
||||
|
||||
await stream.stop()
|
||||
await storage.stop()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_shared_session_reuses_directory_and_memory(tmp_path):
|
||||
"""When an async entry point uses resume_session_id, it should:
|
||||
1. Run in the same session directory as the primary execution
|
||||
2. Have access to the primary session's memory
|
||||
3. NOT overwrite the primary session's state.json
|
||||
"""
|
||||
goal = Goal(
|
||||
id="test-goal",
|
||||
name="Test",
|
||||
description="Shared session test",
|
||||
success_criteria=[
|
||||
SuccessCriterion(
|
||||
id="result",
|
||||
description="Result present",
|
||||
metric="output_contains",
|
||||
target="result",
|
||||
)
|
||||
],
|
||||
constraints=[],
|
||||
)
|
||||
|
||||
node = NodeSpec(
|
||||
id="hello",
|
||||
name="Hello",
|
||||
description="Return a result",
|
||||
node_type="llm_generate",
|
||||
input_keys=["user_name"],
|
||||
output_keys=["result"],
|
||||
system_prompt='Return JSON: {"result": "ok"}',
|
||||
)
|
||||
|
||||
graph = GraphSpec(
|
||||
id="test-graph",
|
||||
goal_id=goal.id,
|
||||
version="1.0.0",
|
||||
entry_node="hello",
|
||||
entry_points={"start": "hello"},
|
||||
terminal_nodes=["hello"],
|
||||
pause_nodes=[],
|
||||
nodes=[node],
|
||||
edges=[],
|
||||
default_model="dummy",
|
||||
max_tokens=10,
|
||||
)
|
||||
|
||||
storage = ConcurrentStorage(tmp_path)
|
||||
await storage.start()
|
||||
|
||||
from framework.storage.session_store import SessionStore
|
||||
|
||||
session_store = SessionStore(tmp_path)
|
||||
|
||||
# Primary stream
|
||||
primary_stream = ExecutionStream(
|
||||
stream_id="primary",
|
||||
entry_spec=EntryPointSpec(
|
||||
id="primary",
|
||||
name="Primary",
|
||||
entry_node="hello",
|
||||
trigger_type="manual",
|
||||
isolation_level="shared",
|
||||
),
|
||||
graph=graph,
|
||||
goal=goal,
|
||||
state_manager=SharedStateManager(),
|
||||
storage=storage,
|
||||
outcome_aggregator=OutcomeAggregator(goal, EventBus()),
|
||||
event_bus=None,
|
||||
llm=DummyLLMProvider(),
|
||||
tools=[],
|
||||
tool_executor=None,
|
||||
session_store=session_store,
|
||||
)
|
||||
|
||||
await primary_stream.start()
|
||||
|
||||
# Run primary execution — creates session directory and state.json
|
||||
primary_exec_id = await primary_stream.execute({"user_name": "alice"})
|
||||
primary_result = await primary_stream.wait_for_completion(primary_exec_id, timeout=5)
|
||||
assert primary_result is not None
|
||||
assert primary_result.success
|
||||
|
||||
# Verify primary session's state.json exists and has the primary entry_point
|
||||
primary_state_path = tmp_path / "sessions" / primary_exec_id / "state.json"
|
||||
assert primary_state_path.exists()
|
||||
primary_state = json.loads(primary_state_path.read_text())
|
||||
assert primary_state["entry_point"] == "primary"
|
||||
|
||||
# Async stream — simulates a webhook entry point sharing the session
|
||||
async_stream = ExecutionStream(
|
||||
stream_id="webhook",
|
||||
entry_spec=EntryPointSpec(
|
||||
id="webhook",
|
||||
name="Webhook",
|
||||
entry_node="hello",
|
||||
trigger_type="event",
|
||||
isolation_level="shared",
|
||||
),
|
||||
graph=graph,
|
||||
goal=goal,
|
||||
state_manager=SharedStateManager(),
|
||||
storage=storage,
|
||||
outcome_aggregator=OutcomeAggregator(goal, EventBus()),
|
||||
event_bus=None,
|
||||
llm=DummyLLMProvider(),
|
||||
tools=[],
|
||||
tool_executor=None,
|
||||
session_store=session_store,
|
||||
)
|
||||
|
||||
await async_stream.start()
|
||||
|
||||
# Run async execution with resume_session_id pointing to primary session
|
||||
session_state = {
|
||||
"resume_session_id": primary_exec_id,
|
||||
"memory": {"rules": "star important emails"},
|
||||
}
|
||||
async_exec_id = await async_stream.execute({"event": "new_email"}, session_state=session_state)
|
||||
|
||||
# Should reuse the primary session ID
|
||||
assert async_exec_id == primary_exec_id
|
||||
|
||||
async_result = await async_stream.wait_for_completion(async_exec_id, timeout=5)
|
||||
assert async_result is not None
|
||||
assert async_result.success
|
||||
|
||||
# State.json should NOT have been overwritten by the async execution
|
||||
# (it should still show the primary entry point)
|
||||
final_state = json.loads(primary_state_path.read_text())
|
||||
assert final_state["entry_point"] == "primary"
|
||||
|
||||
# Verify only ONE session directory exists (not two)
|
||||
sessions_dir = tmp_path / "sessions"
|
||||
session_dirs = [d for d in sessions_dir.iterdir() if d.is_dir()]
|
||||
assert len(session_dirs) == 1
|
||||
assert session_dirs[0].name == primary_exec_id
|
||||
|
||||
await primary_stream.stop()
|
||||
await async_stream.stop()
|
||||
await storage.stop()
|
||||
|
||||
@@ -0,0 +1,69 @@
|
||||
# fix: multi-entry event-driven agent — transient errors, shared sessions, and TUI stability
|
||||
|
||||
## Summary
|
||||
|
||||
Fixes six interconnected bugs that prevented multi-entry-point event-driven agents (like Gmail Inbox Guardian) from working correctly. These agents have a primary user-facing entry point (e.g. rule setup) and async event-driven entry points (e.g. webhook triggers) that share a single session.
|
||||
|
||||
## Bugs Fixed
|
||||
|
||||
### 1. Transient LLM errors cause infinite retry loops (0 tokens, 50 iterations)
|
||||
|
||||
**Root cause:** `LiteLLMProvider.stream()` only retried `RateLimitError`. Other transient errors (`ConnectionError`, `InternalServerError`, `APIConnectionError`) were caught by the generic `except Exception` handler, which yielded a `StreamErrorEvent(recoverable=True)` without retrying. The `EventLoopNode` logged the warning and continued with an empty response — the judge saw no outputs and returned RETRY, burning all 50 iterations doing nothing.
|
||||
|
||||
**Fix (two layers):**
|
||||
|
||||
- **`litellm.py`**: Added retry with exponential backoff for transient errors in the `except Exception` handler, matching the existing `RateLimitError` retry logic.
|
||||
- **`event_loop_node.py`**: After the stream completes, if a recoverable error occurred AND the response is empty (no text, no tool calls), raise `ConnectionError` so the outer transient-error handler catches it with proper backoff instead of silently burning judge iterations.
|
||||
|
||||
### 2. Webhook triggers create separate sessions instead of sharing the primary session
|
||||
|
||||
**Root cause:** `AgentRuntime._make_handler()` called `self.trigger(entry_point_id, {...})` without any `session_state`, so each webhook execution created a brand new session directory. The webhook execution couldn't access user-defined rules from the primary session, and logs were scattered across multiple session directories.
|
||||
|
||||
**Fix:** Added `_get_primary_session_state()` to `AgentRuntime`. When a webhook fires, it finds the active primary session, reads its `state.json`, and passes `resume_session_id` + filtered memory as `session_state`. The webhook execution now runs in the same session directory, with access to shared memory (rules, config) while stale outputs from previous runs are filtered out based on the async entry node's declared `input_keys`.
|
||||
|
||||
### 3. Shared-session executions overwrite the primary session's state.json
|
||||
|
||||
**Root cause:** `ExecutionStream._run_execution()` unconditionally called `_write_session_state()` at start, completion, error, and cancellation. When a webhook execution shared the primary session via `resume_session_id`, these writes would overwrite the primary session's state.json with the webhook execution's state.
|
||||
|
||||
**Fix:** Added `_is_shared_session` guard. When `session_state` contains `resume_session_id`, all `_write_session_state()` calls are skipped — the primary execution owns `state.json`, and `_write_progress()` in the executor keeps memory up-to-date at every node transition.
|
||||
|
||||
### 4. TUI crashes with `call_from_thread` errors on webhook events
|
||||
|
||||
**Root cause:** The webhook HTTP server runs on Textual's main event loop thread. When webhook events fire, `_handle_event` called `call_from_thread()` — but this method requires being called from a *different* thread. Calling it from Textual's own thread raises an exception, flooding the logs with errors for every event bus emission.
|
||||
|
||||
**Fix:** Check `threading.get_ident() == self._thread_id` before deciding how to route the event. If already on Textual's thread, call `_route_event()` directly. Otherwise, use `call_from_thread()` as before.
|
||||
|
||||
### 5. Stale memory from previous webhook runs leaks into new executions
|
||||
|
||||
**Root cause:** `_get_primary_session_state()` initially passed ALL memory from the primary session's `state.json`, including outputs from previous webhook runs (`emails`, `actions_taken`, `summary_report`). When `fetch-emails` received these stale outputs in memory, edge conditions and the node's own logic treated them as already-complete work.
|
||||
|
||||
**Fix:** Filter memory to only the keys declared in the async entry node's `input_keys` (e.g. `rules`, `max_emails`). Stale outputs from previous runs are excluded so each webhook trigger starts with clean execution state.
|
||||
|
||||
### 6. fetch-emails skipped on subsequent webhook triggers (stale conversation restore)
|
||||
|
||||
**Root cause:** Even after fixing stale memory, `fetch-emails` was still skipped. The `EventLoopNode._restore()` crash-recovery mechanism loaded the previous webhook run's conversation from `FileConversationStore`, which included a stale `OutputAccumulator` (stored in the cursor). The judge saw outputs already filled and accepted immediately without the LLM doing any work.
|
||||
|
||||
**Fix:** In the executor, detect fresh shared-session executions (`resume_session_id` present, no `paused_at`, no `resume_from_checkpoint`). Before the node runs, clear the stale cursor (resets the `OutputAccumulator` and iteration counter) and append a transition marker message to the conversation. The conversation history is preserved (continuous memory), but the execution state is fresh. The LLM sees the full thread plus a "NEW EVENT TRIGGER" marker and processes the new event from scratch.
|
||||
|
||||
## Other Changes
|
||||
|
||||
### AgentRunner.load() now supports multi-entry agents
|
||||
|
||||
- Reads `conversation_mode`, `identity_prompt`, `async_entry_points`, and `runtime_config` from agent modules
|
||||
- Always creates a primary "default" entry point alongside any async entry points
|
||||
- Passes `AgentRuntimeConfig` (webhook settings) through to `create_agent_runtime()`
|
||||
|
||||
### Graph validation supports multiple entry candidates
|
||||
|
||||
- `workflow.py` and `agent_builder_server.py`: Reachability check now starts from ALL entry candidates (nodes with no incoming edges), not just the first one. Fixes false "unreachable nodes" errors for agents with async entry points.
|
||||
|
||||
### ExecutionStream preserves graph metadata
|
||||
|
||||
- `_create_modified_graph()` now copies `conversation_mode`, `identity_prompt`, and `async_entry_points` from the original graph. Previously these were silently dropped, breaking continuous conversation mode and event routing for webhook-triggered executions.
|
||||
|
||||
## Test Plan
|
||||
|
||||
- [x] `test_execution_stream.py` — existing tests pass + new `test_shared_session_reuses_directory_and_memory` verifies session sharing, memory access, and state.json ownership
|
||||
- [x] `test_event_loop_node.py` — existing tests pass + new `test_recoverable_stream_error_retried_not_silent` verifies recoverable stream errors raise `ConnectionError` instead of silently producing empty results
|
||||
- [x] `test_executor*.py` — all executor tests pass
|
||||
- [x] Gmail Inbox Guardian agent validates successfully
|
||||
Reference in New Issue
Block a user