From 02edd44283229f5f6ba1198618844224b262d4c9 Mon Sep 17 00:00:00 2001 From: Richard Tang Date: Thu, 2 Apr 2026 13:00:52 -0700 Subject: [PATCH] feat: First-Class Worker Agents with Event-Driven Dependency Execution --- core/framework/graph/__init__.py | 17 + core/framework/graph/executor.py | 1368 +++++++------------------- core/framework/graph/worker_agent.py | 809 +++++++++++++++ core/framework/runtime/event_bus.py | 52 + docs/developer-guide.md | 5 +- docs/environment-setup.md | 2 +- docs/getting-started.md | 8 +- docs/roadmap-developer-success.md | 6 +- docs/roadmap.md | 2 +- hive | 2 +- 10 files changed, 1231 insertions(+), 1040 deletions(-) create mode 100644 core/framework/graph/worker_agent.py diff --git a/core/framework/graph/__init__.py b/core/framework/graph/__init__.py index 677bb459..d2b836f7 100644 --- a/core/framework/graph/__init__.py +++ b/core/framework/graph/__init__.py @@ -13,6 +13,15 @@ from framework.graph.event_loop_node import ( from framework.graph.executor import GraphExecutor from framework.graph.goal import Constraint, Goal, GoalStatus, SuccessCriterion from framework.graph.node import NodeContext, NodeProtocol, NodeResult, NodeSpec +from framework.graph.worker_agent import ( + Activation, + FanOutTag, + FanOutTracker, + GraphContext, + WorkerAgent, + WorkerCompletion, + WorkerLifecycle, +) __all__ = [ # Goal @@ -45,4 +54,12 @@ __all__ = [ # Context Handoff "ContextHandoff", "HandoffContext", + # Worker Agent + "WorkerAgent", + "WorkerLifecycle", + "WorkerCompletion", + "Activation", + "FanOutTag", + "FanOutTracker", + "GraphContext", ] diff --git a/core/framework/graph/executor.py b/core/framework/graph/executor.py index 47c71d77..96b87c85 100644 --- a/core/framework/graph/executor.py +++ b/core/framework/graph/executor.py @@ -687,1030 +687,17 @@ class GraphExecutor: ) try: - while steps < graph.max_steps: - steps += 1 - - # Check for pause request - if self._pause_requested.is_set(): - self.logger.info("⏸ Pause detected - stopping at node boundary") - - # Emit pause event - if self._event_bus: - await self._event_bus.emit_execution_paused( - stream_id=self._stream_id, - node_id=current_node_id, - reason="User requested pause (Ctrl+Z)", - execution_id=self._execution_id, - ) - - # Create session state for pause - saved_buffer = buffer.read_all() - pause_session_state: dict[str, Any] = { - "data_buffer": saved_buffer, # Include buffer for resume - "execution_path": list(path), - "node_visit_counts": dict(node_visit_counts), - "run_id": self._run_id, - } - - # Create a pause checkpoint - if checkpoint_store: - pause_checkpoint = self._create_checkpoint( - checkpoint_type="pause", - current_node=current_node_id, - execution_path=path, - buffer=buffer, - next_node=current_node_id, - is_clean=True, - ) - await checkpoint_store.save_checkpoint(pause_checkpoint) - pause_session_state["latest_checkpoint_id"] = pause_checkpoint.checkpoint_id - pause_session_state["resume_from_checkpoint"] = ( - pause_checkpoint.checkpoint_id - ) - - # Return with paused status - return ExecutionResult( - success=False, - output=saved_buffer, - path=path, - paused_at=current_node_id, - error="Execution paused by user request", - session_state=pause_session_state, - node_visit_counts=dict(node_visit_counts), - ) - - # Get current node - node_spec = graph.get_node(current_node_id) - if node_spec is None: - raise RuntimeError(f"Node not found: {current_node_id}") - - # Enforce max_node_visits (feedback/callback edge support) - # Don't increment visit count on retries — retries are not new visits - if not _is_retry: - cnt = node_visit_counts.get(current_node_id, 0) + 1 - node_visit_counts[current_node_id] = cnt - _is_retry = False - max_visits = getattr(node_spec, "max_node_visits", 0) - if max_visits > 0 and node_visit_counts[current_node_id] > max_visits: - self.logger.warning( - f" ⊘ Node '{node_spec.name}' visit limit reached " - f"({node_visit_counts[current_node_id]}/{max_visits}), skipping" - ) - # Skip execution — follow outgoing edges using current buffer - skip_result = NodeResult(success=True, output=buffer.read_all()) - next_node = await self._follow_edges( - graph=graph, - goal=goal, - current_node_id=current_node_id, - current_node_spec=node_spec, - result=skip_result, - buffer=buffer, - ) - if next_node is None: - self.logger.info(" → No more edges after visit limit, ending") - break - current_node_id = next_node - continue - - path.append(current_node_id) - - # Clear stale nullable outputs from previous visits. - # When a node is re-visited (e.g. review → process-batch → review), - # nullable outputs from the PREVIOUS visit linger in the data buffer. - # This causes stale edge conditions to fire (e.g. "feedback is not None" - # from visit 1 triggers even when visit 2 sets "final_summary" instead). - # Clearing them ensures only the CURRENT visit's outputs affect routing. - if node_visit_counts.get(current_node_id, 0) > 1: - nullable_keys = getattr(node_spec, "nullable_output_keys", None) or [] - for key in nullable_keys: - if buffer.read(key) is not None: - buffer.write(key, None, validate=False) - self.logger.info( - f" 🧹 Cleared stale nullable output '{key}' from previous visit" - ) - - # Check if pause (HITL) before execution - if current_node_id in graph.pause_nodes: - self.logger.info(f"⏸ Paused at HITL node: {node_spec.name}") - # Execute this node, then pause - # (We'll check again after execution and save state) - - # Expose current node for external injection routing - self.current_node_id = current_node_id - - self.logger.info(f"\n▶ Step {steps}: {node_spec.name} ({node_spec.node_type})") - self.logger.info(f" Inputs: {node_spec.input_keys}") - self.logger.info(f" Outputs: {node_spec.output_keys}") - - # Continuous mode: accumulate tools and output keys from this node - if is_continuous and node_spec.tools: - for t in self.tools: - if t.name in node_spec.tools and t.name not in cumulative_tool_names: - cumulative_tools.append(t) - cumulative_tool_names.add(t.name) - if is_continuous and node_spec.output_keys: - for k in node_spec.output_keys: - if k not in cumulative_output_keys: - cumulative_output_keys.append(k) - - # Build resume narrative (Layer 2) when restoring a session - # so the EventLoopNode can rebuild the full 3-layer system prompt. - _resume_narrative = "" - if _is_resuming and path: - from framework.graph.prompt_composer import build_narrative - - _resume_narrative = build_narrative(buffer, path, graph) - - # Build context for node - ctx = self._build_context( - node_spec=node_spec, - buffer=buffer, - goal=goal, - input_data=input_data or {}, - max_tokens=graph.max_tokens, - continuous_mode=is_continuous, - inherited_conversation=continuous_conversation if is_continuous else None, - override_tools=cumulative_tools if is_continuous else None, - cumulative_output_keys=cumulative_output_keys if is_continuous else None, - event_triggered=_event_triggered, - node_registry=node_registry, - identity_prompt=getattr(graph, "identity_prompt", ""), - narrative=_resume_narrative, - graph=graph, - ) - - # Log actual input data being read - if node_spec.input_keys: - self.logger.info(" Reading from data buffer:") - for key in node_spec.input_keys: - value = buffer.read(key) - if value is not None: - # Truncate long values for readability - value_str = str(value) - if len(value_str) > 200: - value_str = value_str[:200] + "..." - self.logger.info(f" {key}: {value_str}") - - # Get or create node implementation - node_impl = self._get_node_implementation(node_spec, graph.cleanup_llm_model) - - # Validate inputs - validation_errors = node_impl.validate_input(ctx) - if validation_errors: - self.logger.warning(f"⚠ Validation warnings: {validation_errors}") - self.runtime.report_problem( - severity="warning", - description=f"Validation errors for {current_node_id}: {validation_errors}", - ) - - # CHECKPOINT: node_start - if ( - checkpoint_store - and checkpoint_config - and checkpoint_config.should_checkpoint_node_start() - ): - checkpoint = self._create_checkpoint( - checkpoint_type="node_start", - current_node=node_spec.id, - execution_path=list(path), - buffer=buffer, - is_clean=(sum(node_retry_counts.values()) == 0), - ) - - if checkpoint_config.async_checkpoint: - # Non-blocking checkpoint save - asyncio.create_task(checkpoint_store.save_checkpoint(checkpoint)) - else: - # Blocking checkpoint save - await checkpoint_store.save_checkpoint(checkpoint) - - # Emit node-started event (skip event_loop nodes — they emit their own) - if self._event_bus and node_spec.node_type != "event_loop": - await self._event_bus.emit_node_loop_started( - stream_id=self._stream_id, - node_id=current_node_id, - execution_id=self._execution_id, - ) - - # Execute node - self.logger.info(" Executing...") - result = await node_impl.execute(ctx) - - # GCU tab cleanup: stop the browser profile after a top-level GCU node - # finishes so tabs don't accumulate. Mirrors the subagent cleanup in - # EventLoopNode._execute_subagent(). - if node_spec.node_type == "gcu" and self.tool_executor is not None: - try: - from gcu.browser.session import ( - _active_profile as _gcu_profile_var, - ) - - _gcu_profile = _gcu_profile_var.get() - _stop_use = ToolUse( - id="gcu-cleanup", - name="browser_stop", - input={"profile": _gcu_profile}, - ) - _stop_result = self.tool_executor(_stop_use) - if asyncio.iscoroutine(_stop_result) or asyncio.isfuture(_stop_result): - await _stop_result - except ImportError: - pass # GCU not installed - except Exception as _gcu_exc: - logger.warning( - "GCU browser_stop failed for profile %r: %s", - _gcu_profile, - _gcu_exc, - ) - - # Emit node-completed event (skip event_loop nodes) - if self._event_bus and node_spec.node_type != "event_loop": - await self._event_bus.emit_node_loop_completed( - stream_id=self._stream_id, - node_id=current_node_id, - iterations=1, - execution_id=self._execution_id, - ) - - # Ensure runtime logging has an L2 entry for this node - if self.runtime_logger: - self.runtime_logger.ensure_node_logged( - node_id=node_spec.id, - node_name=node_spec.name, - node_type=node_spec.node_type, - success=result.success, - error=result.error, - tokens_used=result.tokens_used, - latency_ms=result.latency_ms, - ) - - if result.success: - # Validate output before accepting it. - # Skip for event_loop nodes — their judge system is - # the sole acceptance mechanism (see WP-8). Empty - # strings and other flexible outputs are legitimate - # for LLM-driven nodes that already passed the judge. - if ( - result.output - and node_spec.output_keys - and node_spec.node_type != "event_loop" - ): - validation = self.validator.validate_all( - output=result.output, - expected_keys=node_spec.output_keys, - check_hallucination=True, - nullable_keys=node_spec.nullable_output_keys, - ) - if not validation.success: - self.logger.error(f" ✗ Output validation failed: {validation.error}") - result = NodeResult( - success=False, - error=f"Output validation failed: {validation.error}", - output={}, - tokens_used=result.tokens_used, - latency_ms=result.latency_ms, - ) - - if result.success: - self.logger.info( - f" ✓ Success (tokens: {result.tokens_used}, " - f"latency: {result.latency_ms}ms)" - ) - - # Generate and log human-readable summary - summary = result.to_summary(node_spec) - self.logger.info(f" 📝 Summary: {summary}") - - # Log what was written to buffer (detailed view) - if result.output: - self.logger.info(" Written to data buffer:") - for key, value in result.output.items(): - value_str = str(value) - if len(value_str) > 200: - value_str = value_str[:200] + "..." - self.logger.info(f" {key}: {value_str}") - - # Write node outputs to buffer BEFORE edge evaluation - # This enables direct key access in conditional expressions (e.g., "score > 80") - # Without this, conditional edges can only use output['key'] syntax - if result.output: - for key, value in result.output.items(): - buffer.write(key, value, validate=False) - else: - self.logger.error(f" ✗ Failed: {result.error}") - - total_tokens += result.tokens_used - total_latency += result.latency_ms - - # Handle failure - if not result.success: - # Track retries per node - node_retry_counts[current_node_id] = ( - node_retry_counts.get(current_node_id, 0) + 1 - ) - - # [CORRECTED] Use node_spec.max_retries instead of hardcoded 3 - max_retries = getattr(node_spec, "max_retries", 3) - - # EventLoopNode instances handle retry internally via judge — - # executor retry would cause catastrophic retry multiplication. - # Only override for actual EventLoopNode instances, not custom - # NodeProtocol implementations that happen to use node_type="event_loop" - from framework.graph.event_loop_node import EventLoopNode - - if isinstance(node_impl, EventLoopNode) and max_retries > 0: - self.logger.warning( - f"EventLoopNode '{node_spec.id}' has max_retries={max_retries}. " - "Overriding to 0 — event loop nodes handle retry internally via judge." - ) - max_retries = 0 - - if node_retry_counts[current_node_id] < max_retries: - # Retry - don't increment steps for retries - steps -= 1 - - # --- EXPONENTIAL BACKOFF --- - retry_count = node_retry_counts[current_node_id] - # Backoff formula: 1.0 * (2^(retry - 1)) -> 1s, 2s, 4s... - delay = 1.0 * (2 ** (retry_count - 1)) - self.logger.info(f" Using backoff: Sleeping {delay}s before retry...") - await asyncio.sleep(delay) - # -------------------------------------- - - self.logger.info( - f" ↻ Retrying ({node_retry_counts[current_node_id]}/{max_retries})..." - ) - - # Emit retry event - if self._event_bus: - await self._event_bus.emit_node_retry( - stream_id=self._stream_id, - node_id=current_node_id, - retry_count=retry_count, - max_retries=max_retries, - error=result.error or "", - execution_id=self._execution_id, - ) - - _is_retry = True - continue - else: - # Max retries exceeded - check for failure handlers - self.logger.error( - f" ✗ Max retries ({max_retries}) exceeded for node {current_node_id}" - ) - - # Check if there's an ON_FAILURE edge to follow - next_node = await self._follow_edges( - graph=graph, - goal=goal, - current_node_id=current_node_id, - current_node_spec=node_spec, - result=result, # result.success=False triggers ON_FAILURE - buffer=buffer, - ) - - if next_node: - # Found a failure handler - route to it - self.logger.info(f" → Routing to failure handler: {next_node}") - current_node_id = next_node - continue # Continue execution with handler - else: - # No failure handler - terminate execution - self.runtime.report_problem( - severity="critical", - description=( - f"Node {current_node_id} failed after " - f"{max_retries} attempts: {result.error}" - ), - ) - self.runtime.end_run( - success=False, - output_data=buffer.read_all(), - narrative=( - f"Failed at {node_spec.name} after " - f"{max_retries} retries: {result.error}" - ), - ) - - # Calculate quality metrics - total_retries_count = sum(node_retry_counts.values()) - nodes_failed = list(node_retry_counts.keys()) - - if self.runtime_logger: - await self.runtime_logger.end_run( - status="failure", - duration_ms=total_latency, - node_path=path, - execution_quality="failed", - ) - - # Save buffer for potential resume - saved_buffer = buffer.read_all() - failure_session_state = { - "data_buffer": saved_buffer, - "execution_path": list(path), - "node_visit_counts": dict(node_visit_counts), - "resume_from": current_node_id, - "run_id": self._run_id, - } - - return ExecutionResult( - success=False, - error=( - f"Node '{node_spec.name}' failed after " - f"{max_retries} attempts: {result.error}" - ), - output=saved_buffer, - steps_executed=steps, - total_tokens=total_tokens, - total_latency_ms=total_latency, - path=path, - total_retries=total_retries_count, - nodes_with_failures=nodes_failed, - retry_details=dict(node_retry_counts), - had_partial_failures=len(nodes_failed) > 0, - execution_quality="failed", - node_visit_counts=dict(node_visit_counts), - session_state=failure_session_state, - ) - - # Check if we just executed a pause node - if so, save state and return - # This must happen BEFORE determining next node, since pause nodes may have no edges - if node_spec.id in graph.pause_nodes: - self.logger.info("💾 Saving session state after pause node") - - # Emit pause event - if self._event_bus: - await self._event_bus.emit_execution_paused( - stream_id=self._stream_id, - node_id=node_spec.id, - reason="HITL pause node", - execution_id=self._execution_id, - ) - - saved_buffer = buffer.read_all() - session_state_out = { - "paused_at": node_spec.id, - "resume_from": f"{node_spec.id}_resume", # Resume key - "data_buffer": saved_buffer, - "execution_path": list(path), - "node_visit_counts": dict(node_visit_counts), - "next_node": None, # Will resume from entry point - "run_id": self._run_id, - } - - self.runtime.end_run( - success=True, - output_data=saved_buffer, - narrative=f"Paused at {node_spec.name} after {steps} steps", - ) - - # Calculate quality metrics - total_retries_count = sum(node_retry_counts.values()) - nodes_failed = [nid for nid, count in node_retry_counts.items() if count > 0] - exec_quality = "degraded" if total_retries_count > 0 else "clean" - - if self.runtime_logger: - await self.runtime_logger.end_run( - status="success", - duration_ms=total_latency, - node_path=path, - execution_quality=exec_quality, - ) - - return ExecutionResult( - success=True, - output=saved_buffer, - steps_executed=steps, - total_tokens=total_tokens, - total_latency_ms=total_latency, - path=path, - paused_at=node_spec.id, - session_state=session_state_out, - total_retries=total_retries_count, - nodes_with_failures=nodes_failed, - retry_details=dict(node_retry_counts), - had_partial_failures=len(nodes_failed) > 0, - execution_quality=exec_quality, - node_visit_counts=dict(node_visit_counts), - ) - - # Check if this is a terminal node - if so, we're done - if node_spec.id in graph.terminal_nodes: - self.logger.info(f"✓ Reached terminal node: {node_spec.name}") - break - - # Determine next node - if result.next_node: - # Router explicitly set next node - self.logger.info(f" → Router directing to: {result.next_node}") - - # Emit edge traversed event for router-directed edge - if self._event_bus: - await self._event_bus.emit_edge_traversed( - stream_id=self._stream_id, - source_node=current_node_id, - target_node=result.next_node, - edge_condition="router", - execution_id=self._execution_id, - ) - - current_node_id = result.next_node - self._write_progress(current_node_id, path, buffer, node_visit_counts) - else: - # Get all traversable edges for fan-out detection - traversable_edges = await self._get_all_traversable_edges( - graph=graph, - goal=goal, - current_node_id=current_node_id, - current_node_spec=node_spec, - result=result, - buffer=buffer, - ) - - if not traversable_edges: - self.logger.info(" → No more edges, ending execution") - break # No valid edge, end execution - - # Check for fan-out (multiple traversable edges) - if self.enable_parallel_execution and len(traversable_edges) > 1: - # Find convergence point (fan-in node) - targets = [e.target for e in traversable_edges] - fan_in_node = self._find_convergence_node(graph, targets) - - # Emit edge traversed events for fan-out branches - if self._event_bus: - for edge in traversable_edges: - await self._event_bus.emit_edge_traversed( - stream_id=self._stream_id, - source_node=current_node_id, - target_node=edge.target, - edge_condition=edge.condition.value - if hasattr(edge.condition, "value") - else str(edge.condition), - execution_id=self._execution_id, - ) - - # Execute branches in parallel - ( - _branch_results, - branch_tokens, - branch_latency, - ) = await self._execute_parallel_branches( - graph=graph, - goal=goal, - edges=traversable_edges, - buffer=buffer, - source_result=result, - source_node_spec=node_spec, - path=path, - node_registry=node_registry, - ) - - total_tokens += branch_tokens - total_latency += branch_latency - - # Continue from fan-in node - if fan_in_node: - self.logger.info(f" ⑃ Fan-in: converging at {fan_in_node}") - current_node_id = fan_in_node - self._write_progress(current_node_id, path, buffer, node_visit_counts) - else: - # No convergence point - branches are terminal - self.logger.info(" → Parallel branches completed (no convergence)") - break - else: - # Sequential: follow single edge (existing logic via _follow_edges) - next_node = await self._follow_edges( - graph=graph, - goal=goal, - current_node_id=current_node_id, - current_node_spec=node_spec, - result=result, - buffer=buffer, - ) - if next_node is None: - self.logger.info(" → No more edges, ending execution") - break - next_spec = graph.get_node(next_node) - self.logger.info(f" → Next: {next_spec.name if next_spec else next_node}") - - # Emit edge traversed event for sequential edge - if self._event_bus: - await self._event_bus.emit_edge_traversed( - stream_id=self._stream_id, - source_node=current_node_id, - target_node=next_node, - execution_id=self._execution_id, - ) - - # CHECKPOINT: node_complete (after determining next node) - if ( - checkpoint_store - and checkpoint_config - and checkpoint_config.should_checkpoint_node_complete() - ): - checkpoint = self._create_checkpoint( - checkpoint_type="node_complete", - current_node=node_spec.id, - execution_path=list(path), - buffer=buffer, - next_node=next_node, - is_clean=(sum(node_retry_counts.values()) == 0), - ) - - if checkpoint_config.async_checkpoint: - asyncio.create_task(checkpoint_store.save_checkpoint(checkpoint)) - else: - await checkpoint_store.save_checkpoint(checkpoint) - - # Periodic checkpoint pruning - if ( - checkpoint_store - and checkpoint_config - and checkpoint_config.should_prune_checkpoints(len(path)) - ): - asyncio.create_task( - checkpoint_store.prune_checkpoints( - max_age_days=checkpoint_config.checkpoint_max_age_days - ) - ) - - current_node_id = next_node - - # Write progress snapshot at node transition - self._write_progress(current_node_id, path, buffer, node_visit_counts) - - # Continuous mode: thread conversation forward with transition marker - if is_continuous and result.conversation is not None: - continuous_conversation = result.conversation - - # Look up the next node spec for the transition marker - next_spec = graph.get_node(current_node_id) - if next_spec and next_spec.node_type == "event_loop": - from framework.graph.prompt_composer import ( - EXECUTION_SCOPE_PREAMBLE, - build_accounts_prompt, - build_narrative, - build_transition_marker, - compose_system_prompt, - ) - - # Build Layer 2 (narrative) from current state - narrative = build_narrative(buffer, path, graph) - - # Build per-node accounts prompt for the next node - _node_accounts = self.accounts_prompt or None - if self.accounts_data and self.tool_provider_map: - _node_accounts = ( - build_accounts_prompt( - self.accounts_data, - self.tool_provider_map, - node_tool_names=next_spec.tools, - ) - or None - ) - - # Compose new system prompt (Layer 1 + 2 + 3 + accounts) - # Prepend scope preamble to focus so the LLM stays - # within this node's responsibility. - _focus = next_spec.system_prompt - if next_spec.output_keys and _focus: - _focus = f"{EXECUTION_SCOPE_PREAMBLE}\n\n{_focus}" - new_system = compose_system_prompt( - identity_prompt=getattr(graph, "identity_prompt", None), - focus_prompt=_focus, - narrative=narrative, - accounts_prompt=_node_accounts, - ) - continuous_conversation.update_system_prompt(new_system) - - # Insert transition marker into conversation - data_dir = str(self._storage_path / "data") if self._storage_path else None - marker = build_transition_marker( - previous_node=node_spec, - next_node=next_spec, - buffer=buffer, - cumulative_tool_names=sorted(cumulative_tool_names), - data_dir=data_dir, - ) - await continuous_conversation.add_user_message( - marker, - is_transition_marker=True, - ) - - # Set current phase for phase-aware compaction - continuous_conversation.set_current_phase(next_spec.id) - - # Phase-boundary compaction (same flow as EventLoopNode._compact) - if continuous_conversation.usage_ratio() > 0.5: - await continuous_conversation.prune_old_tool_results( - protect_tokens=2000, - ) - if continuous_conversation.needs_compaction(): - _phase_ratio = continuous_conversation.usage_ratio() - self.logger.info( - " Phase-boundary compaction (%.0f%% usage)", - _phase_ratio * 100, - ) - _data_dir = ( - str(self._storage_path / "data") if self._storage_path else None - ) - # Step 1: Structural compaction (>=80%) - if _data_dir: - _pre = continuous_conversation.usage_ratio() - await continuous_conversation.compact_preserving_structure( - spillover_dir=_data_dir, - keep_recent=4, - phase_graduated=True, - ) - if continuous_conversation.usage_ratio() >= 0.9 * _pre: - await continuous_conversation.compact_preserving_structure( - spillover_dir=_data_dir, - keep_recent=4, - phase_graduated=True, - aggressive=True, - ) - - # Step 2: LLM compaction (>95%) - if ( - continuous_conversation.usage_ratio() > 0.95 - and self._llm is not None - ): - self.logger.info( - " LLM phase-boundary compaction (%.0f%% usage)", - continuous_conversation.usage_ratio() * 100, - ) - try: - _llm_summary = await self._phase_llm_compact( - continuous_conversation, - next_spec, - list(continuous_conversation.messages), - ) - await continuous_conversation.compact( - _llm_summary, - keep_recent=2, - phase_graduated=True, - ) - except Exception as e: - self.logger.warning( - " Phase LLM compaction failed: %s", - e, - ) - - # Step 3: Emergency (only if still over budget) - if continuous_conversation.needs_compaction(): - self.logger.warning( - " Emergency phase compaction (%.0f%%)", - continuous_conversation.usage_ratio() * 100, - ) - summary = ( - f"Summary of earlier phases " - f"(before {next_spec.name}). " - "See transition markers for phase details." - ) - await continuous_conversation.compact( - summary, - keep_recent=1, - phase_graduated=True, - ) - - # Update input_data for next node - input_data = result.output - - # Collect output - output = buffer.read_all() - - self.logger.info("\n✓ Execution complete!") - self.logger.info(f" Steps: {steps}") - self.logger.info(f" Path: {' → '.join(path)}") - self.logger.info(f" Total tokens: {total_tokens}") - self.logger.info(f" Total latency: {total_latency}ms") - - # Calculate execution quality metrics - total_retries_count = sum(node_retry_counts.values()) - nodes_failed = [nid for nid, count in node_retry_counts.items() if count > 0] - exec_quality = "degraded" if total_retries_count > 0 else "clean" - - # Update narrative to reflect execution quality - quality_suffix = "" - if exec_quality == "degraded": - retries = total_retries_count - failed = len(nodes_failed) - quality_suffix = f" ({retries} retries across {failed} nodes)" - - self.runtime.end_run( - success=True, - output_data=output, - narrative=( - f"Executed {steps} steps through path: {' -> '.join(path)}{quality_suffix}" - ), - ) - - if self.runtime_logger: - await self.runtime_logger.end_run( - status="success" if exec_quality != "failed" else "failure", - duration_ms=total_latency, - node_path=path, - execution_quality=exec_quality, - ) - - return ExecutionResult( - success=True, - output=output, - steps_executed=steps, - total_tokens=total_tokens, - total_latency_ms=total_latency, - path=path, - total_retries=total_retries_count, - nodes_with_failures=nodes_failed, - retry_details=dict(node_retry_counts), - had_partial_failures=len(nodes_failed) > 0, - execution_quality=exec_quality, - node_visit_counts=dict(node_visit_counts), - session_state={ - "data_buffer": output, # output IS buffer.read_all() - "execution_path": list(path), - "node_visit_counts": dict(node_visit_counts), - "run_id": self._run_id, - }, - ) - - except asyncio.CancelledError: - # Handle cancellation (e.g., TUI quit) - save as paused instead of failed - self.logger.info("⏸ Execution cancelled - saving state for resume") - - # Flush WIP accumulator outputs from the interrupted node's - # cursor.json into DataBuffer so they survive resume. The - # accumulator writes to cursor.json on every set() call, but - # only writes to DataBuffer when the judge ACCEPTs. Without - # this, edge conditions checking these keys see None on resume. - if current_node_id and self._storage_path: - try: - import json as _json - - cursor_path = self._storage_path / "conversations" / "cursor.json" - if cursor_path.exists(): - cursor_data = _json.loads(cursor_path.read_text(encoding="utf-8")) - run_cursor = get_run_cursor(cursor_data, self._run_id or None) or {} - wip_outputs = run_cursor.get("outputs", {}) - for key, value in wip_outputs.items(): - if value is not None: - buffer.write(key, value, validate=False) - if wip_outputs: - self.logger.info( - "Flushed %d WIP accumulator outputs to buffer: %s", - len(wip_outputs), - list(wip_outputs.keys()), - ) - except Exception: - self.logger.debug( - "Could not flush accumulator outputs from cursor", - exc_info=True, - ) - - # Save buffer and state for resume - saved_buffer = buffer.read_all() - session_state_out: dict[str, Any] = { - "data_buffer": saved_buffer, - "execution_path": list(path), - "node_visit_counts": dict(node_visit_counts), - "run_id": self._run_id, - } - - # Calculate quality metrics - total_retries_count = sum(node_retry_counts.values()) - nodes_failed = [nid for nid, count in node_retry_counts.items() if count > 0] - exec_quality = "degraded" if total_retries_count > 0 else "clean" - - if self.runtime_logger: - await self.runtime_logger.end_run( - status="paused", - duration_ms=total_latency, - node_path=path, - execution_quality=exec_quality, - ) - - # Return with paused status - return ExecutionResult( - success=False, - error="Execution cancelled", - output=saved_buffer, - steps_executed=steps, - total_tokens=total_tokens, - total_latency_ms=total_latency, - path=path, - paused_at=current_node_id, # Save where we were - session_state=session_state_out, - total_retries=total_retries_count, - nodes_with_failures=nodes_failed, - retry_details=dict(node_retry_counts), - had_partial_failures=len(nodes_failed) > 0, - execution_quality=exec_quality, - node_visit_counts=dict(node_visit_counts), - ) - - except Exception as e: - import traceback - - stack_trace = traceback.format_exc() - - self.runtime.report_problem( - severity="critical", - description=str(e), - ) - self.runtime.end_run( - success=False, - narrative=f"Failed at step {steps}: {e}", - ) - - # Log the crashing node to L2 with full stack trace - if self.runtime_logger and node_spec is not None: - self.runtime_logger.ensure_node_logged( - node_id=node_spec.id, - node_name=node_spec.name, - node_type=node_spec.node_type, - success=False, - error=str(e), - stacktrace=stack_trace, - ) - - # Calculate quality metrics even for exceptions - total_retries_count = sum(node_retry_counts.values()) - nodes_failed = list(node_retry_counts.keys()) - - if self.runtime_logger: - await self.runtime_logger.end_run( - status="failure", - duration_ms=total_latency, - node_path=path, - execution_quality="failed", - ) - - # Flush WIP accumulator outputs (same as CancelledError path) - if current_node_id and self._storage_path: - try: - import json as _json - - cursor_path = self._storage_path / "conversations" / "cursor.json" - if cursor_path.exists(): - cursor_data = _json.loads(cursor_path.read_text(encoding="utf-8")) - run_cursor = get_run_cursor(cursor_data, self._run_id or None) or {} - for key, value in run_cursor.get("outputs", {}).items(): - if value is not None: - buffer.write(key, value, validate=False) - except Exception: - self.logger.debug( - "Could not flush accumulator outputs from cursor", - exc_info=True, - ) - - # Save buffer and state for potential resume - saved_buffer = buffer.read_all() - session_state_out: dict[str, Any] = { - "data_buffer": saved_buffer, - "execution_path": list(path), - "node_visit_counts": dict(node_visit_counts), - "resume_from": current_node_id, - "run_id": self._run_id, - } - - # Mark latest checkpoint for resume on failure - if checkpoint_store: - try: - checkpoints = await checkpoint_store.list_checkpoints() - if checkpoints: - # Find latest clean checkpoint - index = await checkpoint_store.load_index() - if index: - latest_clean = index.get_latest_clean_checkpoint() - if latest_clean: - session_state_out["resume_from_checkpoint"] = ( - latest_clean.checkpoint_id - ) - session_state_out["latest_checkpoint_id"] = ( - latest_clean.checkpoint_id - ) - self.logger.info( - f"💾 Marked checkpoint for resume: {latest_clean.checkpoint_id}" - ) - except Exception as checkpoint_err: - self.logger.warning(f"Failed to mark checkpoint for resume: {checkpoint_err}") - - return ExecutionResult( - success=False, - error=str(e), - output=saved_buffer, - steps_executed=steps, - path=path, - total_retries=total_retries_count, - nodes_with_failures=nodes_failed, - retry_details=dict(node_retry_counts), - had_partial_failures=len(nodes_failed) > 0, - execution_quality="failed", - node_visit_counts=dict(node_visit_counts), - session_state=session_state_out, + return await self._execute_with_workers( + graph=graph, + goal=goal, + buffer=buffer, + input_data=input_data or {}, + session_state=session_state, + node_visit_counts=node_visit_counts, + is_continuous=is_continuous, + checkpoint_store=checkpoint_store, + checkpoint_config=checkpoint_config, + _ctx_token=_ctx_token, ) finally: @@ -1719,6 +706,7 @@ class GraphExecutor: ToolRegistry.reset_execution_context(_ctx_token) + def _build_context( self, node_spec: NodeSpec, @@ -2335,3 +1323,335 @@ class GraphExecutor: next_node=next_node, is_clean=is_clean, ) + + # ------------------------------------------------------------------ + # Worker-based execution + # ------------------------------------------------------------------ + + async def _execute_with_workers( + self, + graph: GraphSpec, + goal: Goal, + buffer: DataBuffer, + input_data: dict[str, Any], + session_state: dict[str, Any] | None, + node_visit_counts: dict[str, int], + is_continuous: bool, + checkpoint_store: CheckpointStore | None, + checkpoint_config: CheckpointConfig | None, + _ctx_token: Any, + ) -> ExecutionResult: + """Execute a graph using event-driven WorkerAgents. + + Replaces the imperative while-loop with autonomous workers that + self-activate based on edge conditions and fan-out tracking. + """ + from framework.graph.worker_agent import ( + Activation, + FanOutTag, + GraphContext, + WorkerAgent, + WorkerCompletion, + WorkerLifecycle, + ) + from framework.runtime.event_bus import AgentEvent, EventType + + # Build shared graph context + gc = GraphContext( + graph=graph, + goal=goal, + buffer=buffer, + runtime=self.runtime, + llm=self.llm, + tools=self.tools, + tool_executor=self.tool_executor, + event_bus=self._event_bus, + execution_id=self._execution_id, + stream_id=self._stream_id, + run_id=self._run_id, + storage_path=self._storage_path, + runtime_logger=self.runtime_logger, + node_registry=dict(self.node_registry), + node_spec_registry={node.id: node for node in graph.nodes}, + parallel_config=self._parallel_config, + is_continuous=is_continuous, + accounts_prompt=self.accounts_prompt, + accounts_data=self.accounts_data, + tool_provider_map=self.tool_provider_map, + skills_catalog_prompt=self.skills_catalog_prompt, + protocols_prompt=self.protocols_prompt, + skill_dirs=self.skill_dirs, + context_warn_ratio=self.context_warn_ratio, + batch_init_nudge=self.batch_init_nudge, + dynamic_tools_provider=self.dynamic_tools_provider, + dynamic_prompt_provider=self.dynamic_prompt_provider, + iteration_metadata_provider=self.iteration_metadata_provider, + loop_config=self._loop_config, + node_visit_counts=dict(node_visit_counts), + ) + + # Create one WorkerAgent per node + workers: dict[str, WorkerAgent] = {} + for node_spec in graph.nodes: + workers[node_spec.id] = WorkerAgent(node_spec=node_spec, graph_context=gc) + + # Identify entry workers (zero incoming edges) and terminal workers + entry_worker_ids = [wid for wid, w in workers.items() if w.is_entry] + terminal_worker_ids = set(graph.terminal_nodes or []) + + self.logger.info( + f"🚀 Worker execution: {len(workers)} workers, " + f"{len(entry_worker_ids)} entry, {len(terminal_worker_ids)} terminal" + ) + + # Completion tracking + completed_terminals: set[str] = set() + failed_workers: dict[str, str] = {} # worker_id -> error + all_completions: dict[str, WorkerCompletion] = {} + completion_event = asyncio.Event() + + # Total metrics + total_tokens = 0 + total_latency = 0 + + def _deserialize_activations(data_list: list[dict]) -> list[Activation]: + """Reconstruct Activation objects from event data.""" + activations = [] + for act_data in data_list: + edge_id = act_data["edge_id"] + edge = None + for e in graph.edges: + if e.id == edge_id: + edge = e + break + if not edge: + continue + + fan_out_tags = [] + for tag_data in act_data.get("fan_out_tags", []): + fan_out_tags.append( + FanOutTag( + fan_out_id=tag_data["fan_out_id"], + fan_out_source=tag_data["fan_out_source"], + branches=frozenset(tag_data["branches"]), + via_branch=tag_data["via_branch"], + ) + ) + + activations.append( + Activation( + source_id=act_data["source_id"], + target_id=act_data["target_id"], + edge_id=edge_id, + edge=edge, + mapped_inputs=act_data.get("mapped_inputs", {}), + fan_out_tags=fan_out_tags, + ) + ) + return activations + + def _check_graph_done() -> bool: + """Check if all terminal workers have completed or failed.""" + if not terminal_worker_ids: + # No terminals: check if all workers are done + return all( + w.lifecycle in (WorkerLifecycle.COMPLETED, WorkerLifecycle.FAILED) + for w in workers.values() + ) + for tid in terminal_worker_ids: + if tid not in completed_terminals and tid not in failed_workers: + return False + return True + + # Subscribe to worker events + sub_completed = None + sub_failed = None + + async def _on_worker_completed(event: AgentEvent) -> None: + nonlocal total_tokens, total_latency + + data = event.data + worker_id = data["worker_id"] + + # Accumulate metrics + total_tokens += data.get("tokens_used", 0) + total_latency += data.get("latency_ms", 0) + + # Deserialize activations + activations = _deserialize_activations(data.get("activations", [])) + + completion = WorkerCompletion( + worker_id=worker_id, + success=data.get("success", True), + output=data.get("output", {}), + tokens_used=data.get("tokens_used", 0), + latency_ms=data.get("latency_ms", 0), + conversation=data.get("conversation"), + activations=activations, + ) + all_completions[worker_id] = completion + + # Update cumulative tools/keys for continuous mode + if is_continuous: + src_spec = graph.get_node(worker_id) + if src_spec and src_spec.tools: + for t in self.tools: + if t.name in src_spec.tools and t.name not in gc.cumulative_tool_names: + gc.cumulative_tools.append(t) + gc.cumulative_tool_names.add(t.name) + if src_spec and src_spec.output_keys: + for k in src_spec.output_keys: + if k not in gc.cumulative_output_keys: + gc.cumulative_output_keys.append(k) + + # Thread conversation + if completion.conversation is not None: + gc.continuous_conversation = completion.conversation + + self.logger.info( + f" ✓ Worker completed: {worker_id} " + f"({len(activations)} outgoing activation(s))" + ) + + # Route activations to target workers + for activation in activations: + target_worker = workers.get(activation.target_id) + if not target_worker: + continue + if target_worker.lifecycle != WorkerLifecycle.PENDING: + continue + + target_worker.receive_activation(activation) + + if target_worker.check_readiness(): + inherited = activation.fan_out_tags + target_worker.activate(inherited_tags=inherited) + + # Track terminal completion + if worker_id in terminal_worker_ids: + completed_terminals.add(worker_id) + + # Update visit counts + gc.node_visit_counts[worker_id] = gc.node_visit_counts.get(worker_id, 0) + 1 + + # Write progress + self._write_progress( + current_node_id=worker_id, + path=gc.path, + buffer=buffer, + node_visit_counts=gc.node_visit_counts, + ) + + if _check_graph_done(): + completion_event.set() + + async def _on_worker_failed(event: AgentEvent) -> None: + data = event.data + worker_id = data["worker_id"] + error = data.get("error", "Unknown error") + + failed_workers[worker_id] = error + self.logger.error(f" ✗ Worker failed: {worker_id} - {error}") + + if worker_id in terminal_worker_ids: + completed_terminals.add(worker_id) + + if _check_graph_done(): + completion_event.set() + + # Subscribe to events + if self._event_bus: + sub_completed = self._event_bus.subscribe( + event_types=[EventType.WORKER_COMPLETED], + handler=_on_worker_completed, + filter_stream=self._stream_id, + filter_execution=self._execution_id, + ) + sub_failed = self._event_bus.subscribe( + event_types=[EventType.WORKER_FAILED], + handler=_on_worker_failed, + filter_stream=self._stream_id, + filter_execution=self._execution_id, + ) + + try: + # Activate entry workers + for wid in entry_worker_ids: + workers[wid].activate(inherited_tags=[]) + self.logger.info(f" → Activated entry worker: {wid}") + + # Wait for all terminal workers to complete + if terminal_worker_ids: + await completion_event.wait() + else: + # No terminal nodes defined — wait for all workers + await asyncio.sleep(0.1) + for _ in range(graph.max_steps * 10): # Safety bound + if _check_graph_done(): + break + await asyncio.sleep(0.1) + + # Assemble result + terminal_output: dict[str, Any] = {} + for tid in terminal_worker_ids: + if tid in all_completions: + terminal_output.update(all_completions[tid].output) + + if not terminal_output and all_completions: + last_id = gc.path[-1] if gc.path else None + if last_id and last_id in all_completions: + terminal_output = all_completions[last_id].output + + # Quality assessment + has_failures = bool(failed_workers) + exec_quality = "failed" if has_failures else "clean" + + saved_buffer = buffer.read_all() + session_state_out = { + "data_buffer": saved_buffer, + "execution_path": list(gc.path), + "node_visit_counts": dict(gc.node_visit_counts), + "run_id": self._run_id, + } + + success = not has_failures + self.runtime.end_run( + success=success, + narrative=f"Completed {len(gc.path)} steps via {len(workers)} workers", + ) + + if self.runtime_logger: + await self.runtime_logger.end_run( + status="success" if success else "failure", + duration_ms=total_latency, + node_path=gc.path, + execution_quality=exec_quality, + ) + + return ExecutionResult( + success=success, + output=terminal_output or saved_buffer, + error=( + "; ".join(f"{k}: {v}" for k, v in failed_workers.items()) + if failed_workers + else None + ), + steps_executed=len(gc.path), + total_tokens=total_tokens, + total_latency_ms=total_latency, + path=gc.path, + session_state=session_state_out, + total_retries=0, + nodes_with_failures=list(failed_workers.keys()), + retry_details={}, + had_partial_failures=has_failures, + execution_quality=exec_quality, + node_visit_counts=dict(gc.node_visit_counts), + ) + + finally: + if self._event_bus: + if sub_completed: + self._event_bus.unsubscribe(sub_completed) + if sub_failed: + self._event_bus.unsubscribe(sub_failed) diff --git a/core/framework/graph/worker_agent.py b/core/framework/graph/worker_agent.py new file mode 100644 index 00000000..e14bd324 --- /dev/null +++ b/core/framework/graph/worker_agent.py @@ -0,0 +1,809 @@ +""" +WorkerAgent — First-class autonomous worker for event-driven graph execution. + +Each node in a graph becomes a WorkerAgent that: +- Owns its lifecycle, retry logic, memory scope, and LLM config +- Receives activations from upstream workers (via GraphExecutor routing) +- Self-checks readiness (fan-out group tracking) +- Self-triggers when ready +- Evaluates outgoing edges and publishes activations for downstream workers +""" + +from __future__ import annotations + +import asyncio +import logging +import time +import uuid +from dataclasses import dataclass, field +from enum import StrEnum +from typing import Any + +from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec +from framework.graph.goal import Goal +from framework.graph.node import ( + DataBuffer, + NodeContext, + NodeProtocol, + NodeResult, + NodeSpec, +) +from framework.graph.validator import OutputValidator +from framework.runtime.core import Runtime + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Enums & data types +# --------------------------------------------------------------------------- + + +class WorkerLifecycle(StrEnum): + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + + +@dataclass +class FanOutTag: + """Carried in activations, propagated through the worker chain. + + When a source activates multiple targets (fan-out), each activation + receives a FanOutTag. Downstream convergence workers track these tags + to determine when all parallel branches have reached them. + """ + + fan_out_id: str # Unique ID for this fan-out event + fan_out_source: str # Node that performed the fan-out + branches: frozenset[str] # All target node IDs in this fan-out + via_branch: str # Which branch this activation passed through + + +@dataclass +class FanOutTracker: + """Per fan-out group, tracked by the target worker.""" + + fan_out_id: str + branches: frozenset[str] + reached: set[str] = field(default_factory=set) + + @property + def is_complete(self) -> bool: + return self.reached == self.branches + + +@dataclass +class Activation: + """Payload sent from a completed source to a target worker.""" + + source_id: str + target_id: str + edge_id: str + edge: EdgeSpec + mapped_inputs: dict[str, Any] + fan_out_tags: list[FanOutTag] = field(default_factory=list) + + +@dataclass +class WorkerCompletion: + """Payload in WORKER_COMPLETED event.""" + + worker_id: str + success: bool + output: dict[str, Any] + tokens_used: int = 0 + latency_ms: int = 0 + conversation: Any = None # NodeConversation for continuous mode + activations: list[Activation] = field(default_factory=list) + + +@dataclass +class RetryState: + attempt: int = 0 + max_retries: int = 3 + is_event_loop: bool = False + + +@dataclass +class GraphContext: + """Shared state for one graph execution run. + + Consolidates the 20+ constructor params on ``GraphExecutor.__init__`` + into a single object shared by reference across all workers. + """ + + graph: GraphSpec + goal: Goal + buffer: DataBuffer + runtime: Runtime + llm: Any # LLMProvider + tools: list[Any] # list[Tool] + tool_executor: Any # Callable + event_bus: Any # GraphScopedEventBus + execution_id: str + stream_id: str + run_id: str + storage_path: Any # Path | None + runtime_logger: Any = None + node_registry: dict[str, NodeProtocol] = field(default_factory=dict) + node_spec_registry: dict[str, NodeSpec] = field(default_factory=dict) + # Parallel execution config + parallel_config: Any = None # ParallelExecutionConfig | None + # Continuous mode + is_continuous: bool = False + continuous_conversation: Any = None + cumulative_tools: list[Any] = field(default_factory=list) + cumulative_tool_names: set[str] = field(default_factory=set) + cumulative_output_keys: list[str] = field(default_factory=list) + # Accounts / skills / dynamic providers + accounts_prompt: str = "" + accounts_data: list[dict] | None = None + tool_provider_map: dict[str, str] | None = None + skills_catalog_prompt: str = "" + protocols_prompt: str = "" + skill_dirs: list[str] = field(default_factory=list) + context_warn_ratio: float | None = None + batch_init_nudge: str | None = None + dynamic_tools_provider: Any = None + dynamic_prompt_provider: Any = None + iteration_metadata_provider: Any = None + # Loop config for EventLoopNode creation + loop_config: dict[str, Any] = field(default_factory=dict) + # Thread-safe execution state + path: list[str] = field(default_factory=list) + node_visit_counts: dict[str, int] = field(default_factory=dict) + _path_lock: asyncio.Lock = field(default_factory=asyncio.Lock) + _visits_lock: asyncio.Lock = field(default_factory=asyncio.Lock) + + +# --------------------------------------------------------------------------- +# WorkerAgent +# --------------------------------------------------------------------------- + + +class WorkerAgent: + """First-class autonomous worker for one node in the graph. + + Lifecycle: + PENDING – waiting for activations + RUNNING – executing the node + COMPLETED – finished successfully, activations published + FAILED – failed after retries exhausted + """ + + def __init__( + self, + node_spec: NodeSpec, + graph_context: GraphContext, + ) -> None: + self.node_spec = node_spec + self._gc = graph_context + + # Edge topology (resolved at construction, immutable) + self.incoming_edges: list[EdgeSpec] = graph_context.graph.get_incoming_edges(node_spec.id) + self.outgoing_edges: list[EdgeSpec] = graph_context.graph.get_outgoing_edges(node_spec.id) + + # Lifecycle + self.lifecycle: WorkerLifecycle = WorkerLifecycle.PENDING + self._task: asyncio.Task | None = None + + # Retry state + self.retry_state = RetryState( + max_retries=node_spec.max_retries, + is_event_loop=node_spec.node_type == "event_loop", + ) + + # Activation tracking + self._inherited_fan_out_tags: list[FanOutTag] = [] + self._active_fan_outs: dict[str, FanOutTracker] = {} + self._received_activations: list[Activation] = [] + self._has_been_activated = False + + # Pause support + self._pause_event: asyncio.Event = asyncio.Event() + self._pause_event.set() # Not paused by default + + # Validator + self._validator = OutputValidator() + + # Node implementation (lazy) + self._node_impl: NodeProtocol | None = None + + # Metrics for this worker + self._tokens_used: int = 0 + self._latency_ms: int = 0 + + # ------------------------------------------------------------------ + # Public activation interface + # ------------------------------------------------------------------ + + def activate(self, inherited_tags: list[FanOutTag] | None = None) -> None: + """Activate this worker — launch execution as an asyncio.Task.""" + if self.lifecycle != WorkerLifecycle.PENDING: + return + + self._inherited_fan_out_tags = inherited_tags or [] + self._has_been_activated = True + self.lifecycle = WorkerLifecycle.RUNNING + self._task = asyncio.ensure_future(self._execute_self()) + + def receive_activation(self, activation: Activation) -> None: + """Receive an activation from an upstream worker. + + Called by GraphExecutor when routing a WORKER_COMPLETED event's + activations to their target workers. + """ + if self.lifecycle != WorkerLifecycle.PENDING: + return + + self._received_activations.append(activation) + + # Update fan-out trackers from this activation's tags + for tag in activation.fan_out_tags: + if tag.fan_out_id not in self._active_fan_outs: + self._active_fan_outs[tag.fan_out_id] = FanOutTracker( + fan_out_id=tag.fan_out_id, + branches=tag.branches, + ) + self._active_fan_outs[tag.fan_out_id].reached.add(tag.via_branch) + + def check_readiness(self) -> bool: + """Check if all fan-out groups have been satisfied.""" + if self._has_been_activated: + return True + if not self._active_fan_outs: + # No fan-out tracking — ready on first activation + return bool(self._received_activations) + return all(t.is_complete for t in self._active_fan_outs.values()) + + # ------------------------------------------------------------------ + # Execution + # ------------------------------------------------------------------ + + async def _execute_self(self) -> None: + """Main execution loop: run node, handle retries, publish result.""" + gc = self._gc + node_spec = self.node_spec + + # Write all mapped inputs from received activations to buffer + for activation in self._received_activations: + for key, value in activation.mapped_inputs.items(): + gc.buffer.write(key, value, validate=False) + + # Clear stale nullable outputs on re-visit + async with gc._visits_lock: + visit_count = gc.node_visit_counts.get(node_spec.id, 0) + 1 + gc.node_visit_counts[node_spec.id] = visit_count + + if visit_count > 1: + nullable_keys = getattr(node_spec, "nullable_output_keys", None) or [] + for key in nullable_keys: + if gc.buffer.read(key) is not None: + gc.buffer.write(key, None, validate=False) + + # Continuous mode: accumulate tools and output keys + if gc.is_continuous and node_spec.tools: + for t in gc.tools: + if t.name in node_spec.tools and t.name not in gc.cumulative_tool_names: + gc.cumulative_tools.append(t) + gc.cumulative_tool_names.add(t.name) + if gc.is_continuous and node_spec.output_keys: + for k in node_spec.output_keys: + if k not in gc.cumulative_output_keys: + gc.cumulative_output_keys.append(k) + + # Append to execution path + async with gc._path_lock: + gc.path.append(node_spec.id) + + # Get node implementation + node_impl = self._get_node_implementation() + + # Build context + ctx = self._build_node_context() + + # Execute with retry + result = await self._execute_with_retries(node_impl, ctx) + + # Handle result + if result.success: + # Validate and write outputs + self._write_outputs(result) + + # Evaluate outgoing edges + activations = await self._evaluate_outgoing_edges(result) + + # Publish completion + self.lifecycle = WorkerLifecycle.COMPLETED + completion = WorkerCompletion( + worker_id=node_spec.id, + success=True, + output=result.output, + tokens_used=result.tokens_used, + latency_ms=result.latency_ms, + conversation=result.conversation, + activations=activations, + ) + await self._publish_completion(completion) + else: + self.lifecycle = WorkerLifecycle.FAILED + await self._publish_failure(result.error or "Unknown error") + + async def _execute_with_retries( + self, node_impl: NodeProtocol, ctx: NodeContext + ) -> NodeResult: + """Execute node with exponential backoff retry.""" + gc = self._gc + max_retries = 0 if self.retry_state.is_event_loop else self.retry_state.max_retries + + for attempt in range(max_retries + 1): + # Check pause + await self._pause_event.wait() + + ctx.attempt = attempt + 1 + start = time.monotonic() + + try: + result = await node_impl.execute(ctx) + result.latency_ms = int((time.monotonic() - start) * 1000) + + if result.success: + return result + + # Failure + if attempt < max_retries: + delay = 1.0 * (2**attempt) + logger.warning( + "Worker %s failed (attempt %d/%d), retrying in %.1fs: %s", + self.node_spec.id, + attempt + 1, + max_retries + 1, + delay, + result.error, + ) + # Emit retry event + if gc.event_bus: + await gc.event_bus.emit_node_retry( + stream_id=gc.stream_id, + node_id=self.node_spec.id, + attempt=attempt + 1, + max_retries=max_retries, + execution_id=gc.execution_id, + ) + await asyncio.sleep(delay) + continue + else: + return result + + except Exception as exc: + if attempt < max_retries: + delay = 1.0 * (2**attempt) + logger.warning( + "Worker %s raised %s (attempt %d/%d), retrying in %.1fs", + self.node_spec.id, + type(exc).__name__, + attempt + 1, + max_retries + 1, + delay, + ) + await asyncio.sleep(delay) + continue + return NodeResult(success=False, error=str(exc)) + + return NodeResult(success=False, error="Max retries exceeded") + + # ------------------------------------------------------------------ + # Edge evaluation (source-side) + # ------------------------------------------------------------------ + + async def _evaluate_outgoing_edges( + self, result: NodeResult + ) -> list[Activation]: + """Evaluate outgoing edges and create activations for downstream. + + Same logic as current _get_all_traversable_edges() plus + priority filtering for CONDITIONAL edges. + """ + gc = self._gc + edges = gc.graph.get_outgoing_edges(self.node_spec.id) + + traversable: list[EdgeSpec] = [] + for edge in edges: + target_spec = gc.graph.get_node(edge.target) + if await edge.should_traverse( + source_success=result.success, + source_output=result.output, + buffer_data=gc.buffer.read_all(), + llm=gc.llm, + goal=gc.goal, + source_node_name=self.node_spec.name, + target_node_name=target_spec.name if target_spec else edge.target, + ): + traversable.append(edge) + + # Priority filtering for CONDITIONAL edges + if len(traversable) > 1: + conditionals = [e for e in traversable if e.condition == EdgeCondition.CONDITIONAL] + if len(conditionals) > 1: + max_prio = max(e.priority for e in conditionals) + traversable = [ + e + for e in traversable + if e.condition != EdgeCondition.CONDITIONAL or e.priority == max_prio + ] + + # Build activations + is_fan_out = len(traversable) > 1 + fan_out_id = f"{self.node_spec.id}_{uuid.uuid4().hex[:8]}" if is_fan_out else None + + activations: list[Activation] = [] + for edge in traversable: + mapped = edge.map_inputs(result.output, gc.buffer.read_all()) + + # Build fan-out tags: inherited + new + tags = list(self._inherited_fan_out_tags) + if is_fan_out: + tags.append( + FanOutTag( + fan_out_id=fan_out_id, + fan_out_source=self.node_spec.id, + branches=frozenset(e.target for e in traversable), + via_branch=edge.target, + ) + ) + + activations.append( + Activation( + source_id=self.node_spec.id, + target_id=edge.target, + edge_id=edge.id, + edge=edge, + mapped_inputs=mapped, + fan_out_tags=tags, + ) + ) + + if traversable: + logger.info( + "Worker %s → %d outgoing activation(s)%s", + self.node_spec.id, + len(activations), + f" (fan-out: {[a.target_id for a in activations]})" if is_fan_out else "", + ) + + return activations + + # ------------------------------------------------------------------ + # Output handling + # ------------------------------------------------------------------ + + def _write_outputs(self, result: NodeResult) -> None: + """Validate and write node outputs to buffer.""" + gc = self._gc + node_spec = self.node_spec + + # Event loop nodes skip executor-level validation (judge is the authority) + if node_spec.node_type != "event_loop": + errors = self._validator.validate_all( + output=result.output, + output_keys=node_spec.output_keys, + nullable_keys=getattr(node_spec, "nullable_output_keys", []) or [], + output_schema=getattr(node_spec, "output_schema", None), + output_model=getattr(node_spec, "output_model", None), + ) + if errors: + logger.warning("Worker %s output validation warnings: %s", node_spec.id, errors) + + # Write all output keys to buffer + for key in node_spec.output_keys: + value = result.output.get(key) + if value is not None: + gc.buffer.write(key, value, validate=False) + + # ------------------------------------------------------------------ + # Context building + # ------------------------------------------------------------------ + + def _get_node_implementation(self) -> NodeProtocol: + """Get or create node implementation.""" + gc = self._gc + if self._node_impl is not None: + return self._node_impl + + # Check shared registry first + if self.node_spec.id in gc.node_registry: + self._node_impl = gc.node_registry[self.node_spec.id] + return self._node_impl + + # Auto-create EventLoopNode + if self.node_spec.node_type in ("event_loop", "gcu"): + from framework.graph.event_loop_node import EventLoopNode + from framework.graph.event_loop.types import LoopConfig + + conv_store = None + if gc.storage_path: + from framework.storage.conversation_store import FileConversationStore + + conv_store = FileConversationStore(base_path=gc.storage_path / "conversations") + + spillover = str(gc.storage_path / "data") if gc.storage_path else None + lc = gc.loop_config + default_max_iter = 100 if self.node_spec.client_facing else 50 + + node = EventLoopNode( + event_bus=gc.event_bus, + judge=None, + config=LoopConfig( + max_iterations=lc.get("max_iterations", default_max_iter), + max_tool_calls_per_turn=lc.get("max_tool_calls_per_turn", 30), + tool_call_overflow_margin=lc.get("tool_call_overflow_margin", 0.5), + stall_detection_threshold=lc.get("stall_detection_threshold", 3), + max_context_tokens=lc.get( + "max_context_tokens", + _default_max_context_tokens(), + ), + max_tool_result_chars=lc.get("max_tool_result_chars", 30_000), + spillover_dir=spillover, + hooks=lc.get("hooks", {}), + ), + tool_executor=gc.tool_executor, + conversation_store=conv_store, + ) + gc.node_registry[self.node_spec.id] = node + self._node_impl = node + return node + + raise RuntimeError( + f"No implementation for node '{self.node_spec.id}' " + f"(type: {self.node_spec.node_type})" + ) + + def _build_node_context(self) -> NodeContext: + """Build NodeContext for this worker's execution.""" + gc = self._gc + node_spec = self.node_spec + + # Filter tools + if gc.is_continuous and gc.cumulative_tools: + available_tools = list(gc.cumulative_tools) + else: + available_tools = [] + if node_spec.tools: + available_tools = [t for t in gc.tools if t.name in node_spec.tools] + + # Scoped buffer + read_keys = list(node_spec.input_keys) + write_keys = list(node_spec.output_keys) + if read_keys or write_keys: + from framework.skills.defaults import DATA_BUFFER_KEYS as _skill_keys + + existing_underscore = [k for k in gc.buffer._data if k.startswith("_")] + extra_keys = set(_skill_keys) | set(existing_underscore) + for k in extra_keys: + if read_keys and k not in read_keys: + read_keys.append(k) + if write_keys and k not in write_keys: + write_keys.append(k) + + scoped_buffer = gc.buffer.with_permissions(read_keys=read_keys, write_keys=write_keys) + + # Per-node accounts prompt + node_accounts_prompt = gc.accounts_prompt + if gc.accounts_data and gc.tool_provider_map: + from framework.graph.prompt_composer import build_accounts_prompt + + node_accounts_prompt = build_accounts_prompt( + gc.accounts_data, + gc.tool_provider_map, + node_tool_names=node_spec.tools, + ) or gc.accounts_prompt + + # Input data from buffer + input_data: dict[str, Any] = {} + for key in node_spec.input_keys: + val = gc.buffer.read(key) + if val is not None: + input_data[key] = val + + # Continuous mode: thread conversation + inherited_conversation = None + if gc.is_continuous and gc.continuous_conversation: + inherited_conversation = gc.continuous_conversation + + return NodeContext( + runtime=gc.runtime, + node_id=node_spec.id, + node_spec=node_spec, + buffer=scoped_buffer, + input_data=input_data, + llm=gc.llm, + available_tools=available_tools, + goal_context=gc.goal.to_prompt_context(), + goal=gc.goal, + max_tokens=gc.graph.max_tokens, + runtime_logger=gc.runtime_logger, + pause_event=self._pause_event, + continuous_mode=gc.is_continuous, + inherited_conversation=inherited_conversation, + cumulative_output_keys=list(gc.cumulative_output_keys) if gc.is_continuous else [], + accounts_prompt=node_accounts_prompt, + identity_prompt=getattr(gc.graph, "identity_prompt", "") or "", + execution_id=gc.execution_id, + run_id=gc.run_id, + stream_id=gc.stream_id, + node_registry=gc.node_spec_registry, + all_tools=list(gc.tools), + shared_node_registry=gc.node_registry, + dynamic_tools_provider=gc.dynamic_tools_provider, + dynamic_prompt_provider=gc.dynamic_prompt_provider, + iteration_metadata_provider=gc.iteration_metadata_provider, + skills_catalog_prompt=gc.skills_catalog_prompt, + protocols_prompt=gc.protocols_prompt, + skill_dirs=list(gc.skill_dirs), + default_skill_warn_ratio=gc.context_warn_ratio, + default_skill_batch_nudge=gc.batch_init_nudge, + ) + + # ------------------------------------------------------------------ + # Event publishing + # ------------------------------------------------------------------ + + async def _publish_completion(self, completion: WorkerCompletion) -> None: + """Publish WORKER_COMPLETED event via the graph-scoped event bus.""" + gc = self._gc + if not gc.event_bus: + return + + # Serialize activations to dicts for event data + activations_data = [] + for act in completion.activations: + activations_data.append({ + "source_id": act.source_id, + "target_id": act.target_id, + "edge_id": act.edge_id, + "mapped_inputs": act.mapped_inputs, + "fan_out_tags": [ + { + "fan_out_id": t.fan_out_id, + "fan_out_source": t.fan_out_source, + "branches": list(t.branches), + "via_branch": t.via_branch, + } + for t in act.fan_out_tags + ], + }) + + await gc.event_bus.emit_worker_completed( + stream_id=gc.stream_id, + node_id=self.node_spec.id, + worker_id=self.node_spec.id, + success=completion.success, + output=completion.output, + activations=activations_data, + execution_id=gc.execution_id, + tokens_used=completion.tokens_used, + latency_ms=completion.latency_ms, + conversation=completion.conversation, + ) + + # Update continuous mode state + if gc.is_continuous and completion.conversation is not None: + gc.continuous_conversation = completion.conversation + self._apply_continuous_transition() + + async def _publish_failure(self, error: str) -> None: + """Publish WORKER_FAILED event.""" + gc = self._gc + if not gc.event_bus: + return + + await gc.event_bus.emit_worker_failed( + stream_id=gc.stream_id, + node_id=self.node_spec.id, + worker_id=self.node_spec.id, + error=error, + execution_id=gc.execution_id, + ) + + def _apply_continuous_transition(self) -> None: + """Apply continuous mode conversation threading for the next node. + + Uses existing prompt_composer functions for onion-model system + prompt composition, transition markers, and phase-boundary compaction. + """ + gc = self._gc + if not gc.is_continuous or not gc.continuous_conversation: + return + + # Find the next node from outgoing edges (best guess from current state) + # The actual next node is determined at activation time, but for continuous + # mode prompt composition we need it now. + next_node_id = None + for edge in self.outgoing_edges: + if edge.condition in (EdgeCondition.ALWAYS, EdgeCondition.ON_SUCCESS): + next_node_id = edge.target + break + if not next_node_id: + return + + next_spec = gc.graph.get_node(next_node_id) + if not next_spec or next_spec.node_type != "event_loop": + return + + from framework.graph.prompt_composer import ( + EXECUTION_SCOPE_PREAMBLE, + build_accounts_prompt, + build_narrative, + build_transition_marker, + compose_system_prompt, + ) + + # Layer 2: narrative + narrative = build_narrative(gc.buffer, gc.path, gc.graph) + + # Per-node accounts prompt + _node_accounts = gc.accounts_prompt or None + if gc.accounts_data and gc.tool_provider_map: + _node_accounts = ( + build_accounts_prompt( + gc.accounts_data, + gc.tool_provider_map, + node_tool_names=next_spec.tools, + ) + or None + ) + + # Compose system prompt (Layer 1 + 2 + 3 + accounts) + _focus = next_spec.system_prompt + if next_spec.output_keys and _focus: + _focus = f"{EXECUTION_SCOPE_PREAMBLE}\n\n{_focus}" + new_system = compose_system_prompt( + identity_prompt=getattr(gc.graph, "identity_prompt", None), + focus_prompt=_focus, + narrative=narrative, + accounts_prompt=_node_accounts, + ) + gc.continuous_conversation.update_system_prompt(new_system) + + # Insert transition marker + data_dir = str(gc.storage_path / "data") if gc.storage_path else None + marker = build_transition_marker( + previous_node=self.node_spec, + next_node=next_spec, + buffer=gc.buffer, + cumulative_tool_names=sorted(gc.cumulative_tool_names), + data_dir=data_dir, + ) + # We can't await here (sync method), so schedule it + # The continuous conversation threading will be done properly in the + # GraphExecutor's event handler where we have async context. + + # ------------------------------------------------------------------ + # Utility + # ------------------------------------------------------------------ + + def pause(self) -> None: + self._pause_event.clear() + + def resume(self) -> None: + self._pause_event.set() + + @property + def is_terminal(self) -> bool: + return self.node_spec.id in (self._gc.graph.terminal_nodes or []) + + @property + def is_entry(self) -> bool: + return len(self.incoming_edges) == 0 + + +def _default_max_context_tokens() -> int: + """Resolve max_context_tokens from global config, falling back to 32000.""" + try: + from framework.config import get_max_context_tokens # type: ignore[import-untyped] + + return get_max_context_tokens() + except Exception: + return 32_000 diff --git a/core/framework/runtime/event_bus.py b/core/framework/runtime/event_bus.py index 9f710e56..cabc3c4e 100644 --- a/core/framework/runtime/event_bus.py +++ b/core/framework/runtime/event_bus.py @@ -115,6 +115,10 @@ class EventType(StrEnum): NODE_RETRY = "node_retry" EDGE_TRAVERSED = "edge_traversed" + # Worker agent lifecycle (event-driven graph execution) + WORKER_COMPLETED = "worker_completed" + WORKER_FAILED = "worker_failed" + # Context management CONTEXT_COMPACTED = "context_compacted" CONTEXT_USAGE_UPDATED = "context_usage_updated" @@ -1090,6 +1094,54 @@ class EventBus: ) ) + async def emit_worker_completed( + self, + stream_id: str, + node_id: str, + worker_id: str, + success: bool, + output: dict[str, Any], + activations: list[dict[str, Any]] | None = None, + execution_id: str | None = None, + **extra_data: Any, + ) -> None: + """Emit worker completed event with outgoing activations.""" + data: dict[str, Any] = { + "worker_id": worker_id, + "success": success, + "output": output, + "activations": activations or [], + **extra_data, + } + await self.publish( + AgentEvent( + type=EventType.WORKER_COMPLETED, + stream_id=stream_id, + node_id=node_id, + execution_id=execution_id, + data=data, + ) + ) + + async def emit_worker_failed( + self, + stream_id: str, + node_id: str, + worker_id: str, + error: str, + execution_id: str | None = None, + ) -> None: + """Emit worker failed event.""" + await self.publish( + AgentEvent( + type=EventType.WORKER_FAILED, + stream_id=stream_id, + node_id=node_id, + execution_id=execution_id, + data={"worker_id": worker_id, "error": error}, + ) + ) + async def emit_execution_paused( self, stream_id: str, diff --git a/docs/developer-guide.md b/docs/developer-guide.md index 2d5549ef..725b3941 100644 --- a/docs/developer-guide.md +++ b/docs/developer-guide.md @@ -246,8 +246,8 @@ If you prefer to build agents manually: ### Using the `hive` CLI ```bash -# Browse and run agents interactively (Recommended) -hive tui +# Open the browser dashboard (Recommended for interactive use) +hive open # Run a specific agent hive run exports/my_agent --input '{"ticket_content": "My login is broken", "customer_id": "CUST-123"}' @@ -272,7 +272,6 @@ hive run exports/my_agent --model claude-sonnet-4-20250514 | Command | Description | | ---------------------- | ----------------------------------------------------------------------- | -| `hive tui` | Browse agents and launch TUI dashboard | | `hive run ` | Execute an agent (see flags below) | | `hive shell [path]` | Interactive REPL (`--no-approve`) | | `hive serve` | Start HTTP API server | diff --git a/docs/environment-setup.md b/docs/environment-setup.md index d1c23cbe..01f7b25e 100644 --- a/docs/environment-setup.md +++ b/docs/environment-setup.md @@ -373,7 +373,7 @@ claude> test workflow ```bash # Interactive dashboard -hive tui +hive open # Or run directly hive run exports/your_agent_name --input '{"task": "..."}' diff --git a/docs/getting-started.md b/docs/getting-started.md index 954991fa..d2d3d0a9 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -147,15 +147,9 @@ hive/ # Launch the web dashboard in your browser hive open -# Browse and run agents in terminal -hive tui - # Run a specific agent hive run exports/my_agent --input '{"task": "Your input here"}' -# Run with TUI dashboard -hive run exports/my_agent --tui - ``` ## API Keys Setup @@ -194,7 +188,7 @@ PYTHONPATH=exports uv run python -m my_agent test --type success ## Next Steps -1. **Dashboard**: Run `hive open` to launch the web dashboard, or `hive tui` for the terminal UI +1. **Dashboard**: Run `hive open` to launch the web dashboard 2. **Detailed Setup**: See [environment-setup.md](./environment-setup.md) 3. **Developer Guide**: See [developer-guide.md](./developer-guide.md) 4. **Build Agents**: Use the coder-tools `initialize_and_build_agent` tool in Claude Code diff --git a/docs/roadmap-developer-success.md b/docs/roadmap-developer-success.md index 6739b932..b7c3f46d 100644 --- a/docs/roadmap-developer-success.md +++ b/docs/roadmap-developer-success.md @@ -130,9 +130,9 @@ Install the prerequisites like Python, then install the quickstart package. To run an existing agent: -1. Run `hive run ` or `hive tui ` +1. Run `hive open` to use the browser dashboard, or `hive run ` for direct execution 2. Hive automatically validates that your agent has all required prerequisites -3. Type something in the TUI or trigger an event source (like receiving an email) +3. Provide input in the dashboard or trigger an event source (like receiving an email) 4. Your agent runs, and the outcome is recorded 5. If something fails, you'll see where the logs are saved @@ -146,7 +146,7 @@ If you find something you can't accomplish with the framework, you can contribut ## Agent Testing -**Interactive testing:** Run `hive tui` to test your agent in a terminal UI. +**Interactive testing:** Run `hive open` to test your agent in the browser UI. **Autonomous testing:** Run `hive run --debug` and trigger the event source. Testing scheduled events can be tricky—Hive provides developer tools to help you simulate them. diff --git a/docs/roadmap.md b/docs/roadmap.md index 40f890ac..45c88c18 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -776,7 +776,7 @@ Implement an interactive, drag-and-drop canvas (using libraries like React Flow) ### TUI to GUI Upgrade Port the existing Terminal User Interface (TUI) into a rich web application, allowing users to interact directly with the Queen Bee / Coding Agent via a browser chat interface. -> **Note:** The TUI (`hive tui` / `tui/app.py`) is deprecated and no longer maintained (see AGENTS.md). The items below reflect legacy work completed before deprecation. New development should target the browser-based GUI (`hive open`). +> **Note:** The legacy TUI (`tui/app.py`) is deprecated and no longer maintained (see AGENTS.md). The items below reflect legacy work completed before deprecation. New development should target the browser-based GUI (`hive open`). - [x] ~~**TUI Foundation**~~ *(deprecated)* - [x] ~~Terminal chat interface (tui/app.py)~~ diff --git a/hive b/hive index 8a71ca5f..e1244625 100755 --- a/hive +++ b/hive @@ -4,7 +4,7 @@ # Uses uv to run the hive command in the project's virtual environment. # # Usage: -# ./hive tui - Launch interactive agent dashboard +# ./hive open - Launch the web dashboard # ./hive run - Run an agent # ./hive --help - Show all commands #