From fd95f8da28d2fd920292f29ac7906576bda21f31 Mon Sep 17 00:00:00 2001 From: Timothy Date: Tue, 24 Feb 2026 09:03:21 -0800 Subject: [PATCH] feat: active streams and waiting nodes --- core/framework/graph/event_loop_node.py | 67 ++- core/framework/graph/executor.py | 24 +- core/framework/graph/node.py | 4 + core/framework/monitoring/__init__.py | 6 +- .../{worker_health_judge.py => judge.py} | 6 +- core/framework/runtime/EVENT_TYPES.md | 539 ++++++++++++++++++ core/framework/runtime/agent_runtime.py | 39 ++ core/framework/runtime/execution_stream.py | 29 +- core/framework/tools/queen_lifecycle_tools.py | 41 +- .../tools/worker_monitoring_tools.py | 2 +- core/framework/tui/app.py | 22 +- docs/server-cli-arch.md | 66 ++- 12 files changed, 769 insertions(+), 76 deletions(-) rename core/framework/monitoring/{worker_health_judge.py => judge.py} (98%) create mode 100644 core/framework/runtime/EVENT_TYPES.md diff --git a/core/framework/graph/event_loop_node.py b/core/framework/graph/event_loop_node.py index f1f1baaa..2145d3a0 100644 --- a/core/framework/graph/event_loop_node.py +++ b/core/framework/graph/event_loop_node.py @@ -229,8 +229,9 @@ class EventLoopNode(NodeProtocol): start_time = time.time() total_input_tokens = 0 total_output_tokens = 0 - stream_id = ctx.node_id + stream_id = ctx.stream_id or ctx.node_id node_id = ctx.node_id + execution_id = ctx.execution_id or "" # Verdict counters for runtime logging _accept_count = _retry_count = _escalate_count = _continue_count = 0 @@ -369,7 +370,7 @@ class EventLoopNode(NodeProtocol): ) # 4. Publish loop started - await self._publish_loop_started(stream_id, node_id) + await self._publish_loop_started(stream_id, node_id, execution_id) # 5. Stall / doom loop detection state (restored from cursor if resuming) recent_responses: list[str] = _restored_recent_responses @@ -411,7 +412,7 @@ class EventLoopNode(NodeProtocol): await self._drain_injection_queue(conversation) # 6c. Publish iteration event - await self._publish_iteration(stream_id, node_id, iteration) + await self._publish_iteration(stream_id, node_id, iteration, execution_id) # 6d. Pre-turn compaction check (tiered) if conversation.needs_compaction(): @@ -484,6 +485,7 @@ class EventLoopNode(NodeProtocol): retry_count=_stream_retry_count, max_retries=self._config.max_stream_retries, error=str(e)[:500], + execution_id=execution_id, ) await asyncio.sleep(delay) continue # retry same iteration @@ -560,7 +562,7 @@ class EventLoopNode(NodeProtocol): node_id, iteration, ) - await self._publish_loop_completed(stream_id, node_id, iteration + 1) + await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id) latency_ms = int((time.time() - start_time) * 1000) return NodeResult( success=True, @@ -575,7 +577,7 @@ class EventLoopNode(NodeProtocol): if len(recent_responses) > self._config.stall_detection_threshold: recent_responses.pop(0) if self._is_stalled(recent_responses): - await self._publish_stalled(stream_id, node_id) + await self._publish_stalled(stream_id, node_id, execution_id) latency_ms = int((time.time() - start_time) * 1000) _continue_count += 1 if ctx.runtime_logger: @@ -646,6 +648,7 @@ class EventLoopNode(NodeProtocol): stream_id=stream_id, node_id=node_id, description=doom_desc, + execution_id=execution_id, ) warning_msg = ( f"[SYSTEM] {doom_desc}. You are repeating the " @@ -660,6 +663,7 @@ class EventLoopNode(NodeProtocol): stream_id=stream_id, node_id=node_id, prompt=doom_desc, + execution_id=execution_id, ) self._awaiting_input = True try: @@ -709,7 +713,7 @@ class EventLoopNode(NodeProtocol): if _cf_block: if self._shutdown: - await self._publish_loop_completed(stream_id, node_id, iteration + 1) + await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id) latency_ms = int((time.time() - start_time) * 1000) _continue_count += 1 if ctx.runtime_logger: @@ -759,7 +763,7 @@ class EventLoopNode(NodeProtocol): got_input = await self._await_user_input(ctx) logger.info("[%s] iter=%d: unblocked, got_input=%s", node_id, iteration, got_input) if not got_input: - await self._publish_loop_completed(stream_id, node_id, iteration + 1) + await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id) latency_ms = int((time.time() - start_time) * 1000) _continue_count += 1 if ctx.runtime_logger: @@ -878,6 +882,7 @@ class EventLoopNode(NodeProtocol): feedback=fb_preview, judge_type=judge_type, iteration=iteration, + execution_id=execution_id, ) if verdict.action == "ACCEPT": @@ -919,7 +924,7 @@ class EventLoopNode(NodeProtocol): for key, value in accumulator.to_dict().items(): ctx.memory.write(key, value, validate=False) - await self._publish_loop_completed(stream_id, node_id, iteration + 1) + await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id) latency_ms = int((time.time() - start_time) * 1000) _accept_count += 1 if ctx.runtime_logger: @@ -962,7 +967,7 @@ class EventLoopNode(NodeProtocol): elif verdict.action == "ESCALATE": # Exit point 6: Judge ESCALATE — log step + log_node_complete - await self._publish_loop_completed(stream_id, node_id, iteration + 1) + await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id) latency_ms = int((time.time() - start_time) * 1000) _escalate_count += 1 if ctx.runtime_logger: @@ -1026,7 +1031,7 @@ class EventLoopNode(NodeProtocol): continue # 7. Max iterations exhausted - await self._publish_loop_completed(stream_id, node_id, self._config.max_iterations) + await self._publish_loop_completed(stream_id, node_id, self._config.max_iterations, execution_id) latency_ms = int((time.time() - start_time) * 1000) if ctx.runtime_logger: ctx.runtime_logger.log_node_complete( @@ -1093,9 +1098,10 @@ class EventLoopNode(NodeProtocol): if self._event_bus: await self._event_bus.emit_client_input_requested( - stream_id=ctx.node_id, + stream_id=ctx.stream_id or ctx.node_id, node_id=ctx.node_id, prompt="", + execution_id=ctx.execution_id or "", ) self._awaiting_input = True @@ -1134,8 +1140,9 @@ class EventLoopNode(NodeProtocol): ``real_tool_results`` which resets each inner iteration, this list grows across the entire turn. """ - stream_id = ctx.node_id + stream_id = ctx.stream_id or ctx.node_id node_id = ctx.node_id + execution_id = ctx.execution_id or "" token_counts: dict[str, int] = {"input": 0, "output": 0} tool_call_count = 0 final_text = "" @@ -1187,7 +1194,8 @@ class EventLoopNode(NodeProtocol): if isinstance(event, TextDeltaEvent): accumulated_text = event.snapshot await self._publish_text_delta( - stream_id, node_id, event.content, event.snapshot, ctx + stream_id, node_id, event.content, event.snapshot, ctx, + execution_id, ) elif isinstance(event, ToolCallEvent): @@ -1271,7 +1279,8 @@ class EventLoopNode(NodeProtocol): executed_in_batch += 1 await self._publish_tool_started( - stream_id, node_id, tc.tool_use_id, tc.tool_name, tc.tool_input + stream_id, node_id, tc.tool_use_id, tc.tool_name, tc.tool_input, + execution_id, ) logger.info( "[%s] tool_call: %s(%s)", @@ -1303,7 +1312,7 @@ class EventLoopNode(NodeProtocol): key = tc.tool_input.get("key", "") await accumulator.set(key, value) outputs_set_this_turn.append(key) - await self._publish_output_key_set(stream_id, node_id, key) + await self._publish_output_key_set(stream_id, node_id, key, execution_id) logged_tool_calls.append( { "tool_use_id": tc.tool_use_id, @@ -1414,6 +1423,7 @@ class EventLoopNode(NodeProtocol): tc.tool_name, result.content, result.is_error, + execution_id, ) # If the limit was hit, add error results for every remaining @@ -2133,7 +2143,7 @@ class EventLoopNode(NodeProtocol): await self._event_bus.publish( AgentEvent( type=EventType.CONTEXT_COMPACTED, - stream_id=ctx.node_id, + stream_id=ctx.stream_id or ctx.node_id, node_id=ctx.node_id, data={ "level": "prune_only", @@ -2189,7 +2199,7 @@ class EventLoopNode(NodeProtocol): await self._event_bus.publish( AgentEvent( type=EventType.CONTEXT_COMPACTED, - stream_id=ctx.node_id, + stream_id=ctx.stream_id or ctx.node_id, node_id=ctx.node_id, data={ "level": level, @@ -2505,36 +2515,40 @@ class EventLoopNode(NodeProtocol): # EventBus publishing helpers # ------------------------------------------------------------------- - async def _publish_loop_started(self, stream_id: str, node_id: str) -> None: + async def _publish_loop_started(self, stream_id: str, node_id: str, execution_id: str = "") -> None: if self._event_bus: await self._event_bus.emit_node_loop_started( stream_id=stream_id, node_id=node_id, max_iterations=self._config.max_iterations, + execution_id=execution_id, ) - async def _publish_iteration(self, stream_id: str, node_id: str, iteration: int) -> None: + async def _publish_iteration(self, stream_id: str, node_id: str, iteration: int, execution_id: str = "") -> None: if self._event_bus: await self._event_bus.emit_node_loop_iteration( stream_id=stream_id, node_id=node_id, iteration=iteration, + execution_id=execution_id, ) - async def _publish_loop_completed(self, stream_id: str, node_id: str, iterations: int) -> None: + async def _publish_loop_completed(self, stream_id: str, node_id: str, iterations: int, execution_id: str = "") -> None: if self._event_bus: await self._event_bus.emit_node_loop_completed( stream_id=stream_id, node_id=node_id, iterations=iterations, + execution_id=execution_id, ) - async def _publish_stalled(self, stream_id: str, node_id: str) -> None: + async def _publish_stalled(self, stream_id: str, node_id: str, execution_id: str = "") -> None: if self._event_bus: await self._event_bus.emit_node_stalled( stream_id=stream_id, node_id=node_id, reason="Consecutive identical responses detected", + execution_id=execution_id, ) async def _publish_text_delta( @@ -2544,6 +2558,7 @@ class EventLoopNode(NodeProtocol): content: str, snapshot: str, ctx: NodeContext, + execution_id: str = "", ) -> None: if self._event_bus: if ctx.node_spec.client_facing: @@ -2552,6 +2567,7 @@ class EventLoopNode(NodeProtocol): node_id=node_id, content=content, snapshot=snapshot, + execution_id=execution_id, ) else: await self._event_bus.emit_llm_text_delta( @@ -2559,6 +2575,7 @@ class EventLoopNode(NodeProtocol): node_id=node_id, content=content, snapshot=snapshot, + execution_id=execution_id, ) async def _publish_tool_started( @@ -2568,6 +2585,7 @@ class EventLoopNode(NodeProtocol): tool_use_id: str, tool_name: str, tool_input: dict, + execution_id: str = "", ) -> None: if self._event_bus: await self._event_bus.emit_tool_call_started( @@ -2576,6 +2594,7 @@ class EventLoopNode(NodeProtocol): tool_use_id=tool_use_id, tool_name=tool_name, tool_input=tool_input, + execution_id=execution_id, ) async def _publish_tool_completed( @@ -2586,6 +2605,7 @@ class EventLoopNode(NodeProtocol): tool_name: str, result: str, is_error: bool, + execution_id: str = "", ) -> None: if self._event_bus: await self._event_bus.emit_tool_call_completed( @@ -2595,6 +2615,7 @@ class EventLoopNode(NodeProtocol): tool_name=tool_name, result=result, is_error=is_error, + execution_id=execution_id, ) async def _publish_judge_verdict( @@ -2605,6 +2626,7 @@ class EventLoopNode(NodeProtocol): feedback: str = "", judge_type: str = "implicit", iteration: int = 0, + execution_id: str = "", ) -> None: if self._event_bus: await self._event_bus.emit_judge_verdict( @@ -2614,6 +2636,7 @@ class EventLoopNode(NodeProtocol): feedback=feedback, judge_type=judge_type, iteration=iteration, + execution_id=execution_id, ) async def _publish_output_key_set( @@ -2621,10 +2644,12 @@ class EventLoopNode(NodeProtocol): stream_id: str, node_id: str, key: str, + execution_id: str = "", ) -> None: if self._event_bus: await self._event_bus.emit_output_key_set( stream_id=stream_id, node_id=node_id, key=key, + execution_id=execution_id, ) diff --git a/core/framework/graph/executor.py b/core/framework/graph/executor.py index c7b0e74a..1b2252c9 100644 --- a/core/framework/graph/executor.py +++ b/core/framework/graph/executor.py @@ -131,6 +131,7 @@ class GraphExecutor: parallel_config: ParallelExecutionConfig | None = None, event_bus: Any | None = None, stream_id: str = "", + execution_id: str = "", runtime_logger: Any = None, storage_path: str | Path | None = None, loop_config: dict[str, Any] | None = None, @@ -170,6 +171,7 @@ class GraphExecutor: self.logger = logging.getLogger(__name__) self._event_bus = event_bus self._stream_id = stream_id + self._execution_id = execution_id or getattr(runtime, "execution_id", "") self.runtime_logger = runtime_logger self._storage_path = Path(storage_path) if storage_path else None self._loop_config = loop_config or {} @@ -553,6 +555,7 @@ class GraphExecutor: await self._event_bus.emit_execution_resumed( stream_id=self._stream_id, node_id=current_node_id, + execution_id=self._execution_id, ) # Start run @@ -597,6 +600,7 @@ class GraphExecutor: stream_id=self._stream_id, node_id=current_node_id, reason="User requested pause (Ctrl+Z)", + execution_id=self._execution_id, ) # Create session state for pause @@ -767,7 +771,8 @@ class GraphExecutor: # Emit node-started event (skip event_loop nodes — they emit their own) if self._event_bus and node_spec.node_type != "event_loop": await self._event_bus.emit_node_loop_started( - stream_id=self._stream_id, node_id=current_node_id + stream_id=self._stream_id, node_id=current_node_id, + execution_id=self._execution_id, ) # Execute node @@ -777,7 +782,8 @@ class GraphExecutor: # Emit node-completed event (skip event_loop nodes) if self._event_bus and node_spec.node_type != "event_loop": await self._event_bus.emit_node_loop_completed( - stream_id=self._stream_id, node_id=current_node_id, iterations=1 + stream_id=self._stream_id, node_id=current_node_id, iterations=1, + execution_id=self._execution_id, ) # Ensure runtime logging has an L2 entry for this node @@ -897,6 +903,7 @@ class GraphExecutor: retry_count=retry_count, max_retries=max_retries, error=result.error or "", + execution_id=self._execution_id, ) _is_retry = True @@ -992,6 +999,7 @@ class GraphExecutor: stream_id=self._stream_id, node_id=node_spec.id, reason="HITL pause node", + execution_id=self._execution_id, ) saved_memory = memory.read_all() @@ -1057,6 +1065,7 @@ class GraphExecutor: source_node=current_node_id, target_node=result.next_node, edge_condition="router", + execution_id=self._execution_id, ) current_node_id = result.next_node @@ -1092,6 +1101,7 @@ class GraphExecutor: edge_condition=edge.condition.value if hasattr(edge.condition, "value") else str(edge.condition), + execution_id=self._execution_id, ) # Execute branches in parallel @@ -1143,6 +1153,7 @@ class GraphExecutor: stream_id=self._stream_id, source_node=current_node_id, target_node=next_node, + execution_id=self._execution_id, ) # CHECKPOINT: node_complete (after determining next node) @@ -1577,7 +1588,8 @@ class GraphExecutor: cumulative_output_keys=cumulative_output_keys or [], event_triggered=event_triggered, accounts_prompt=node_accounts_prompt, - execution_id=self.runtime.execution_id, + execution_id=self._execution_id, + stream_id=self._stream_id, ) VALID_NODE_TYPES = { @@ -1937,7 +1949,8 @@ class GraphExecutor: # Emit node-started event (skip event_loop nodes) if self._event_bus and node_spec.node_type != "event_loop": await self._event_bus.emit_node_loop_started( - stream_id=self._stream_id, node_id=branch.node_id + stream_id=self._stream_id, node_id=branch.node_id, + execution_id=self._execution_id, ) self.logger.info( @@ -1961,7 +1974,8 @@ class GraphExecutor: # Emit node-completed event (skip event_loop nodes) if self._event_bus and node_spec.node_type != "event_loop": await self._event_bus.emit_node_loop_completed( - stream_id=self._stream_id, node_id=branch.node_id, iterations=1 + stream_id=self._stream_id, node_id=branch.node_id, iterations=1, + execution_id=self._execution_id, ) if result.success: diff --git a/core/framework/graph/node.py b/core/framework/graph/node.py index c4a133b3..966bf72f 100644 --- a/core/framework/graph/node.py +++ b/core/framework/graph/node.py @@ -511,6 +511,10 @@ class NodeContext: # Execution ID (from StreamRuntimeAdapter) execution_id: str = "" + # Stream identity — the ExecutionStream this node runs within. + # Falls back to node_id when not set (legacy / standalone executor). + stream_id: str = "" + @dataclass class NodeResult: diff --git a/core/framework/monitoring/__init__.py b/core/framework/monitoring/__init__.py index 4fc357b4..9d2ab343 100644 --- a/core/framework/monitoring/__init__.py +++ b/core/framework/monitoring/__init__.py @@ -15,15 +15,15 @@ Usage:: # Load judge as secondary graph on the worker runtime await worker_runtime.add_graph( - graph_id="worker_health_judge", + graph_id="judge", graph=judge_graph, goal=judge_goal, entry_points={"health_check": HEALTH_JUDGE_ENTRY_POINT}, - storage_subpath="graphs/worker_health_judge", + storage_subpath="graphs/judge", ) """ -from .worker_health_judge import HEALTH_JUDGE_ENTRY_POINT, judge_goal, judge_graph, judge_node +from .judge import HEALTH_JUDGE_ENTRY_POINT, judge_goal, judge_graph, judge_node __all__ = [ "HEALTH_JUDGE_ENTRY_POINT", diff --git a/core/framework/monitoring/worker_health_judge.py b/core/framework/monitoring/judge.py similarity index 98% rename from core/framework/monitoring/worker_health_judge.py rename to core/framework/monitoring/judge.py index 62df3fc7..f3271d5c 100644 --- a/core/framework/monitoring/worker_health_judge.py +++ b/core/framework/monitoring/judge.py @@ -20,11 +20,11 @@ Usage:: # Load judge as secondary graph on the worker runtime await worker_runtime.add_graph( - graph_id="worker_health_judge", + graph_id="judge", graph=judge_graph, goal=judge_goal, entry_points={"health_check": HEALTH_JUDGE_ENTRY_POINT}, - storage_subpath="graphs/worker_health_judge", + storage_subpath="graphs/judge", ) Design: @@ -239,7 +239,7 @@ HEALTH_JUDGE_ENTRY_POINT = AsyncEntryPointSpec( # --------------------------------------------------------------------------- judge_graph = GraphSpec( - id="worker-health-judge-graph", + id="judge-graph", goal_id=judge_goal.id, version="1.0.0", entry_node="judge", diff --git a/core/framework/runtime/EVENT_TYPES.md b/core/framework/runtime/EVENT_TYPES.md new file mode 100644 index 00000000..cc74a563 --- /dev/null +++ b/core/framework/runtime/EVENT_TYPES.md @@ -0,0 +1,539 @@ +# Event Types and Schema Reference + +The Hive runtime uses a pub/sub `EventBus` for inter-component communication and observability. Every event is an `AgentEvent` dataclass published through `EventBus.publish()`. + +## Event Envelope (`AgentEvent`) + +Every event shares a common envelope: + +| Field | Type | Description | +| ---------------- | ----------------- | ------------------------------------------------------------ | +| `type` | `EventType` (str) | Event type identifier (see below) | +| `stream_id` | `str` | Entry point / pipeline that emitted the event | +| `node_id` | `str \| None` | Graph node that emitted the event | +| `execution_id` | `str \| None` | Unique execution run ID (UUID, set by `ExecutionStream`) | +| `graph_id` | `str \| None` | Graph that emitted the event (set by `GraphScopedEventBus`) | +| `data` | `dict` | Event-type-specific payload (see individual schemas below) | +| `timestamp` | `datetime` | When the event was created | +| `correlation_id` | `str \| None` | Optional ID for tracking related events across streams | + +### Identity Fields + +The identity tuple `(graph_id, stream_id, node_id, execution_id)` uniquely locates any event: + +- **`graph_id`** — Which graph produced the event. Set automatically by `GraphScopedEventBus` (a subclass that stamps `graph_id` on every `publish()` call). Values: `"worker"`, `"judge"`, `"queen"`, or the graph spec ID. +- **`stream_id`** — Which entry point / pipeline. Corresponds to `EntryPointSpec.id` in the graph definition. For single-entry-point graphs, this equals the entry point name (e.g. `"default"`, `"health_check"`, `"ticket_receiver"`). +- **`node_id`** — Which specific node emitted the event. For `EventLoopNode` events, this is the node spec ID. +- **`execution_id`** — UUID identifying a specific execution run. Multiple concurrent executions of the same entry point each get a unique `execution_id`. + +--- + +## Execution Lifecycle + +### `execution_started` + +A new graph execution has begun. + +| Data Field | Type | Description | +| ---------- | ------ | ------------------------------- | +| `input` | `dict` | Input data passed to the graph | + +**Emitted by:** `ExecutionStream._run_execution()` + +--- + +### `execution_completed` + +A graph execution finished successfully. + +| Data Field | Type | Description | +| ---------- | ------ | ----------------- | +| `output` | `dict` | Final output data | + +**Emitted by:** `ExecutionStream._run_execution()` + +--- + +### `execution_failed` + +A graph execution failed with an error. + +| Data Field | Type | Description | +| ---------- | ----- | ------------- | +| `error` | `str` | Error message | + +**Emitted by:** `ExecutionStream._run_execution()` + +--- + +### `execution_paused` + +Execution has been paused (Ctrl+Z or HITL approval). + +| Data Field | Type | Description | +| ---------- | ----- | ----------------- | +| `reason` | `str` | Why it was paused | + +**Emitted by:** `GraphExecutor.execute()` + +--- + +### `execution_resumed` + +Execution has resumed from a paused state. + +| Data Field | Type | Description | +| ---------- | ---- | ----------- | +| *(none)* | | | + +**Emitted by:** `GraphExecutor.execute()` + +--- + +## Node Event-Loop Lifecycle + +These events track the inner loop of `EventLoopNode` — the multi-turn LLM streaming loop that powers most agent nodes. + +### `node_loop_started` + +An EventLoopNode has begun its execution loop. + +| Data Field | Type | Description | +| ---------------- | ---------- | ------------------------------- | +| `max_iterations` | `int\|null`| Maximum iterations configured | + +**Emitted by:** `EventLoopNode._publish_loop_started()`, `GraphExecutor` (for function nodes in parallel branches) + +--- + +### `node_loop_iteration` + +An EventLoopNode has started a new iteration (one LLM turn). + +| Data Field | Type | Description | +| ----------- | ----- | ------------------------- | +| `iteration` | `int` | Zero-based iteration index | + +**Emitted by:** `EventLoopNode._publish_iteration()` + +--- + +### `node_loop_completed` + +An EventLoopNode has finished its execution loop. + +| Data Field | Type | Description | +| ------------ | ----- | -------------------------------------- | +| `iterations` | `int` | Total number of iterations completed | + +**Emitted by:** `EventLoopNode._publish_loop_completed()`, `GraphExecutor` (for function nodes in parallel branches) + +--- + +## LLM Streaming + +### `llm_text_delta` + +Incremental text output from the LLM (non-client-facing nodes only). + +| Data Field | Type | Description | +| ---------- | ----- | ---------------------------------------- | +| `content` | `str` | New text chunk (delta) | +| `snapshot` | `str` | Full accumulated text so far | + +**Emitted by:** `EventLoopNode._publish_text_delta()` when `client_facing=False` + +--- + +### `llm_reasoning_delta` + +Incremental reasoning/thinking output from the LLM. + +| Data Field | Type | Description | +| ---------- | ----- | ------------------- | +| `content` | `str` | New reasoning chunk | + +**Emitted by:** Not currently wired in `EventLoopNode` (reserved for extended thinking models). + +--- + +## Tool Lifecycle + +### `tool_call_started` + +The LLM has requested a tool call and execution is about to begin. + +| Data Field | Type | Description | +| ------------ | ------ | ------------------------------------ | +| `tool_use_id`| `str` | Unique ID for this tool invocation | +| `tool_name` | `str` | Name of the tool being called | +| `tool_input` | `dict` | Arguments passed to the tool | + +**Emitted by:** `EventLoopNode._publish_tool_started()` + +--- + +### `tool_call_completed` + +A tool call has finished executing. + +| Data Field | Type | Description | +| ------------ | ------ | -------------------------------------- | +| `tool_use_id`| `str` | Same ID from `tool_call_started` | +| `tool_name` | `str` | Name of the tool | +| `result` | `str` | Tool execution result (may be truncated)| +| `is_error` | `bool` | Whether the tool returned an error | + +**Emitted by:** `EventLoopNode._publish_tool_completed()` + +--- + +## Client I/O + +These events are emitted only by nodes with `client_facing=True`. They drive the TUI's chat interface. + +### `client_output_delta` + +Incremental text output meant for the human operator. + +| Data Field | Type | Description | +| ---------- | ----- | ---------------------------- | +| `content` | `str` | New text chunk (delta) | +| `snapshot` | `str` | Full accumulated text so far | + +**Emitted by:** `EventLoopNode._publish_text_delta()` when `client_facing=True` + +--- + +### `client_input_requested` + +The node is waiting for human input (via `ask_user` tool or auto-block on text-only turns). + +| Data Field | Type | Description | +| ---------- | ----- | ------------------------------------------------- | +| `prompt` | `str` | Optional prompt/question shown to the user | + +**Emitted by:** `EventLoopNode._await_user_input()`, doom loop handler + +The TUI subscribes to this event to show the input prompt and focus the chat input. After the user types, `inject_event()` is called on the node to unblock it. + +--- + +## Internal Node Observability + +### `node_internal_output` + +Output from a non-client-facing node (for debugging/monitoring). + +| Data Field | Type | Description | +| ---------- | ----- | ---------------- | +| `content` | `str` | Output text | + +**Emitted by:** Available via `emit_node_internal_output()` — not currently wired in the default `EventLoopNode`. + +--- + +### `node_input_blocked` + +A non-client-facing node is blocked waiting for input. + +| Data Field | Type | Description | +| ---------- | ----- | --------------- | +| `prompt` | `str` | Block reason | + +**Emitted by:** Available via `emit_node_input_blocked()` — reserved for future use. + +--- + +### `node_stalled` + +The node's LLM has produced identical responses for several consecutive turns (stall detection). + +| Data Field | Type | Description | +| ---------- | ----- | ------------------------------------------------- | +| `reason` | `str` | Always `"Consecutive identical responses detected"`| + +**Emitted by:** `EventLoopNode._publish_stalled()` + +--- + +### `node_tool_doom_loop` + +The LLM is calling the same tool(s) with identical arguments repeatedly (doom loop detection). + +| Data Field | Type | Description | +| ------------- | ----- | ------------------------------------ | +| `description` | `str` | Human-readable doom loop description | + +**Emitted by:** `EventLoopNode` doom loop handler + +--- + +## Judge Decisions + +### `judge_verdict` + +The judge (custom or implicit) has evaluated the current iteration. + +| Data Field | Type | Description | +| ------------ | ----- | ---------------------------------------------------- | +| `action` | `str` | `"ACCEPT"`, `"RETRY"`, `"ESCALATE"`, or `"CONTINUE"` | +| `feedback` | `str` | Judge feedback (empty for ACCEPT/CONTINUE) | +| `judge_type` | `str` | `"custom"` (explicit JudgeProtocol) or `"implicit"` (stop-reason heuristic) | +| `iteration` | `int` | Which iteration this verdict applies to | + +**Emitted by:** `EventLoopNode._publish_judge_verdict()` + +**Verdict meanings:** +- **ACCEPT** — Output meets requirements; node exits successfully. +- **RETRY** — Output needs improvement; loop continues with feedback injected. +- **ESCALATE** — Problem cannot be solved at this level; triggers escalation. +- **CONTINUE** — Implicit verdict: LLM called tools, so it's making progress — let it keep going. + +--- + +## Output Tracking + +### `output_key_set` + +A node has set an output key via the `set_output` synthetic tool. + +| Data Field | Type | Description | +| ---------- | ----- | ----------------- | +| `key` | `str` | Output key name | + +**Emitted by:** `EventLoopNode._publish_output_key_set()` + +--- + +## Retry & Edge Tracking + +### `node_retry` + +A transient error occurred during an LLM call and the node is retrying. + +| Data Field | Type | Description | +| ------------- | ----- | ---------------------------------- | +| `retry_count` | `int` | Current retry attempt number | +| `max_retries` | `int` | Maximum retries configured | +| `error` | `str` | Error message (truncated to 500ch) | + +**Emitted by:** `EventLoopNode` (stream retry handler), `GraphExecutor` (node-level retry) + +--- + +### `edge_traversed` + +The executor has traversed an edge from one node to another. + +| Data Field | Type | Description | +| ---------------- | ----- | ---------------------------------------------- | +| `source_node` | `str` | Node ID the edge starts from | +| `target_node` | `str` | Node ID the edge goes to | +| `edge_condition` | `str` | Edge condition: `"router"`, `"on_success"`, etc. | + +**Emitted by:** `GraphExecutor.execute()` — after router decisions, condition-based edges, and fallback edges. + +--- + +## Context Management + +### `context_compacted` + +Not currently emitted — reserved for future use when `NodeConversation` compacts history. + +--- + +## State Changes + +### `state_changed` + +A shared memory key has been modified. + +| Data Field | Type | Description | +| ----------- | ----- | ---------------------------------- | +| `key` | `str` | Memory key that changed | +| `old_value` | `Any` | Previous value | +| `new_value` | `Any` | New value | +| `scope` | `str` | Scope of the change | + +**Emitted by:** Available via `emit_state_changed()` — not currently wired in default execution. + +--- + +### `state_conflict` + +Not currently emitted — reserved for concurrent write conflict detection. + +--- + +## Goal Tracking + +### `goal_progress` + +Goal completion progress update. + +| Data Field | Type | Description | +| ----------------- | ------- | ------------------------------------ | +| `progress` | `float` | 0.0–1.0 completion fraction | +| `criteria_status` | `dict` | Per-criterion status | + +**Emitted by:** Available via `emit_goal_progress()` — not currently wired in default execution. + +--- + +### `goal_achieved` + +Not currently emitted — reserved for explicit goal completion signals. + +--- + +### `constraint_violation` + +A goal constraint has been violated. + +| Data Field | Type | Description | +| --------------- | ----- | ------------------------ | +| `constraint_id` | `str` | Which constraint failed | +| `description` | `str` | What went wrong | + +**Emitted by:** Available via `emit_constraint_violation()`. + +--- + +## Stream Lifecycle + +### `stream_started` / `stream_stopped` + +Not currently emitted — reserved for `ExecutionStream` lifecycle tracking. + +--- + +## External Triggers + +### `webhook_received` + +An external webhook has been received. + +| Data Field | Type | Description | +| -------------- | ------ | ---------------------------- | +| `path` | `str` | Webhook URL path | +| `method` | `str` | HTTP method | +| `headers` | `dict` | HTTP headers | +| `payload` | `dict` | Request body | +| `query_params` | `dict` | URL query parameters | + +**Emitted by:** Webhook server integration. + +Note: `node_id` is not set on this event; `stream_id` is the webhook source ID. + +--- + +## Escalation + +### `escalation_requested` + +An agent has requested handoff to the Hive Coder (via the `escalate_to_coder` synthetic tool). + +| Data Field | Type | Description | +| ---------- | ----- | ------------------------------- | +| `reason` | `str` | Why escalation is needed | +| `context` | `str` | Additional context for the coder| + +**Emitted by:** `EventLoopNode` when the LLM calls `escalate_to_coder`. + +--- + +## Worker Health Monitoring + +These events form the **judge → queen → operator** escalation pipeline. + +### `worker_escalation_ticket` + +The Worker Health Judge has detected a degradation pattern and is escalating to the Queen. + +| Data Field | Type | Description | +| ---------- | ------ | ------------------------------------ | +| `ticket` | `dict` | Full `EscalationTicket` (see below) | + +**Emitted by:** `emit_escalation_ticket` tool (in `worker_monitoring_tools.py`) + +#### EscalationTicket Schema + +| Field | Type | Description | +| ------------------------- | ------------------ | -------------------------------------------------------- | +| `ticket_id` | `str` | Auto-generated UUID | +| `created_at` | `str` | ISO timestamp | +| `worker_agent_id` | `str` | Which worker agent | +| `worker_session_id` | `str` | Which session | +| `worker_node_id` | `str` | Which node is struggling | +| `worker_graph_id` | `str` | Which graph | +| `severity` | `str` | `"low"`, `"medium"`, `"high"`, or `"critical"` | +| `cause` | `str` | Human-readable problem description | +| `judge_reasoning` | `str` | Judge's deliberation chain | +| `suggested_action` | `str` | e.g. `"Restart node"`, `"Human review"`, `"Kill session"`| +| `recent_verdicts` | `list[str]` | e.g. `["RETRY", "RETRY", "CONTINUE", "RETRY"]` | +| `total_steps_checked` | `int` | Steps the judge inspected | +| `steps_since_last_accept` | `int` | Consecutive non-ACCEPT steps | +| `stall_minutes` | `float \| null` | Minutes since last activity (null if active) | +| `evidence_snippet` | `str` | Excerpt from recent LLM output | + +--- + +### `queen_intervention_requested` + +The Queen has triaged an escalation ticket and decided the human operator should be involved. + +| Data Field | Type | Description | +| ----------------- | ----- | ---------------------------------------------------- | +| `ticket_id` | `str` | From the original `EscalationTicket` | +| `analysis` | `str` | Queen's 2–3 sentence analysis | +| `severity` | `str` | `"low"`, `"medium"`, `"high"`, or `"critical"` | +| `queen_graph_id` | `str` | Queen's graph ID (for TUI navigation) | +| `queen_stream_id` | `str` | Queen's stream ID | + +**Emitted by:** `notify_operator` tool (in `worker_monitoring_tools.py`) + +The TUI subscribes to this event and shows a non-disruptive notification. The worker continues running. + +--- + +## Custom Events + +### `custom` + +User-defined events with arbitrary payloads. No schema enforced. + +--- + +## Subscription & Filtering + +Events can be filtered when subscribing: + +```python +bus.subscribe( + event_types=[EventType.TOOL_CALL_STARTED, EventType.TOOL_CALL_COMPLETED], + handler=my_handler, + filter_stream="default", # Only events from this stream + filter_node="planner", # Only events from this node + filter_execution="exec-uuid", # Only events from this execution + filter_graph="worker", # Only events from this graph +) +``` + +## Debug Event Logging + +Set `HIVE_DEBUG_EVENTS=1` to write every published event to a JSONL file at `~/.hive/event_logs/.jsonl`. Each line is the full JSON serialization of an `AgentEvent`: + +```json +{ + "type": "tool_call_started", + "stream_id": "default", + "node_id": "planner", + "execution_id": "a1b2c3d4-...", + "graph_id": "worker", + "data": {"tool_use_id": "tu_1", "tool_name": "web_search", "tool_input": {"query": "..."}}, + "timestamp": "2026-02-24T12:00:00.000000", + "correlation_id": null +} +``` diff --git a/core/framework/runtime/agent_runtime.py b/core/framework/runtime/agent_runtime.py index dd4003a1..605cca90 100644 --- a/core/framework/runtime/agent_runtime.py +++ b/core/framework/runtime/agent_runtime.py @@ -1272,6 +1272,45 @@ class AgentRuntime: "state_manager": self._state_manager.get_stats(), } + def get_active_streams(self) -> list[dict[str, Any]]: + """Return metadata for every stream that has active executions. + + Each dict contains: ``graph_id``, ``stream_id``, ``entry_point_id``, + ``active_execution_ids``, ``is_awaiting_input``, ``waiting_nodes``. + """ + result: list[dict[str, Any]] = [] + for graph_id, reg in self._graphs.items(): + for ep_id, stream in reg.streams.items(): + active = stream.active_execution_ids + if not active: + continue + result.append({ + "graph_id": graph_id, + "stream_id": stream.stream_id, + "entry_point_id": ep_id, + "active_execution_ids": active, + "is_awaiting_input": stream.is_awaiting_input, + "waiting_nodes": stream.get_waiting_nodes(), + }) + return result + + def get_waiting_nodes(self) -> list[dict[str, Any]]: + """Return all nodes currently blocked waiting for client input. + + Each dict contains: ``graph_id``, ``stream_id``, ``node_id``, + ``execution_id``. + """ + result: list[dict[str, Any]] = [] + for graph_id, reg in self._graphs.items(): + for _ep_id, stream in reg.streams.items(): + for waiting in stream.get_waiting_nodes(): + result.append({ + "graph_id": graph_id, + "stream_id": stream.stream_id, + **waiting, + }) + return result + # === PROPERTIES === @property diff --git a/core/framework/runtime/execution_stream.py b/core/framework/runtime/execution_stream.py index 298b60e4..285c3d10 100644 --- a/core/framework/runtime/execution_stream.py +++ b/core/framework/runtime/execution_stream.py @@ -35,7 +35,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class _GraphScopedEventBus(EventBus): +class GraphScopedEventBus(EventBus): """Proxy that stamps ``graph_id`` on every published event. The ``GraphExecutor`` and ``EventLoopNode`` emit events via the @@ -235,7 +235,7 @@ class ExecutionStream: # Graph-scoped event bus (stamps graph_id on published events) self._scoped_event_bus = self._event_bus if self._event_bus and self.graph_id: - self._scoped_event_bus = _GraphScopedEventBus(self._event_bus, self.graph_id) + self._scoped_event_bus = GraphScopedEventBus(self._event_bus, self.graph_id) # State self._running = False @@ -276,6 +276,30 @@ class ExecutionStream: return True return False + def get_waiting_nodes(self) -> list[dict[str, str]]: + """Return nodes currently blocked waiting for client input. + + Each entry is ``{"node_id": ..., "execution_id": ...}``. + """ + waiting: list[dict[str, str]] = [] + for exec_id, executor in self._active_executors.items(): + for node_id, node in executor.node_registry.items(): + if getattr(node, "_awaiting_input", False): + waiting.append({"node_id": node_id, "execution_id": exec_id}) + return waiting + + def get_injectable_nodes(self) -> list[dict[str, str]]: + """Return nodes that support message injection (have ``inject_event``). + + Each entry is ``{"node_id": ..., "execution_id": ...}``. + """ + injectable: list[dict[str, str]] = [] + for exec_id, executor in self._active_executors.items(): + for node_id, node in executor.node_registry.items(): + if hasattr(node, "inject_event"): + injectable.append({"node_id": node_id, "execution_id": exec_id}) + return injectable + def _record_execution_result(self, execution_id: str, result: ExecutionResult) -> None: """Record a completed execution result with retention pruning.""" self._execution_results[execution_id] = result @@ -488,6 +512,7 @@ class ExecutionStream: tool_executor=self._tool_executor, event_bus=self._scoped_event_bus, stream_id=self.stream_id, + execution_id=execution_id, storage_path=exec_storage, runtime_logger=runtime_logger, loop_config=self.graph.loop_config, diff --git a/core/framework/tools/queen_lifecycle_tools.py b/core/framework/tools/queen_lifecycle_tools.py index 72478acf..9b63701a 100644 --- a/core/framework/tools/queen_lifecycle_tools.py +++ b/core/framework/tools/queen_lifecycle_tools.py @@ -164,25 +164,18 @@ def register_queen_lifecycle_tools( }) # Check if the worker is waiting for user input - waiting_for_input = False - waiting_node_id = None - for ep_id, stream in reg.streams.items(): - # Check active executors for pending input - for executor in stream._active_executors.values(): - for node_id, node in executor.node_registry.items(): - if hasattr(node, "_waiting_for_input") and node._waiting_for_input: - waiting_for_input = True - waiting_node_id = node_id - break + waiting_nodes = [] + for _ep_id, stream in reg.streams.items(): + waiting_nodes.extend(stream.get_waiting_nodes()) - status = "waiting_for_input" if waiting_for_input else "running" + status = "waiting_for_input" if waiting_nodes else "running" result = { **base, "status": status, "active_executions": active_execs, } - if waiting_node_id: - result["waiting_node_id"] = waiting_node_id + if waiting_nodes: + result["waiting_node_id"] = waiting_nodes[0]["node_id"] return json.dumps(result) _status_tool = Tool( @@ -212,18 +205,16 @@ def register_queen_lifecycle_tools( # Find an active node that can accept injected input for stream in reg.streams.values(): - for executor in stream._active_executors.values(): - for node_id, node in executor.node_registry.items(): - if hasattr(node, "inject_event"): - try: - await node.inject_event(content) - return json.dumps({ - "status": "delivered", - "node_id": node_id, - "content_preview": content[:100], - }) - except Exception as e: - return json.dumps({"error": f"Injection failed: {e}"}) + injectable = stream.get_injectable_nodes() + if injectable: + target_node_id = injectable[0]["node_id"] + ok = await stream.inject_input(target_node_id, content) + if ok: + return json.dumps({ + "status": "delivered", + "node_id": target_node_id, + "content_preview": content[:100], + }) return json.dumps({ "error": "No active worker node found — worker may be idle.", diff --git a/core/framework/tools/worker_monitoring_tools.py b/core/framework/tools/worker_monitoring_tools.py index d5f4b582..88070510 100644 --- a/core/framework/tools/worker_monitoring_tools.py +++ b/core/framework/tools/worker_monitoring_tools.py @@ -45,7 +45,7 @@ def register_worker_monitoring_tools( registry: "ToolRegistry", event_bus: "EventBus", storage_path: Path, - stream_id: str = "worker_health_judge", + stream_id: str = "judge", worker_graph_id: str | None = None, ) -> int: """Register worker monitoring tools bound to *event_bus* and *storage_path*. diff --git a/core/framework/tui/app.py b/core/framework/tui/app.py index ecd22e09..d6fdce68 100644 --- a/core/framework/tui/app.py +++ b/core/framework/tui/app.py @@ -500,7 +500,7 @@ class AdenTUI(App): # 2. Storage dirs — under worker's base path but completely owned # by the judge/queen. Worker never writes here. - judge_dir = storage_path / "graphs" / "worker_health_judge" / "session" + judge_dir = storage_path / "graphs" / "judge" / "session" judge_dir.mkdir(parents=True, exist_ok=True) queen_dir = storage_path / "graphs" / "queen" / "session" queen_dir.mkdir(parents=True, exist_ok=True) @@ -508,10 +508,18 @@ class AdenTUI(App): # --------------------------------------------------------------- # 3. Health judge — background task, fires every 2 minutes. # --------------------------------------------------------------- - judge_runtime = Runtime(storage_path / "graphs" / "worker_health_judge") + judge_runtime = Runtime(storage_path / "graphs" / "judge") monitoring_tools = list(monitoring_registry.get_tools().values()) monitoring_executor = monitoring_registry.get_executor() + # Scoped event buses — stamp graph_id on every event so + # downstream routing (queen-primary mode) can distinguish + # queen/judge/worker events. + from framework.runtime.execution_stream import GraphScopedEventBus + + judge_event_bus = GraphScopedEventBus(event_bus, "judge") + queen_event_bus = GraphScopedEventBus(event_bus, "queen") + async def _judge_loop(): interval = 120 # seconds first = True @@ -525,8 +533,8 @@ class AdenTUI(App): llm=llm, tools=monitoring_tools, tool_executor=monitoring_executor, - event_bus=event_bus, - stream_id="worker_health_judge", + event_bus=judge_event_bus, + stream_id="judge", storage_path=judge_dir, loop_config=judge_graph.loop_config, ) @@ -544,7 +552,7 @@ class AdenTUI(App): self._judge_task = asyncio.run_coroutine_threadsafe( _judge_loop(), agent_loop, ) - self._judge_graph_id = "worker_health_judge" + self._judge_graph_id = "judge" # --------------------------------------------------------------- # 4. Queen — persistent interactive conversation. @@ -632,7 +640,7 @@ class AdenTUI(App): llm=llm, tools=queen_tools, tool_executor=queen_tool_executor, - event_bus=event_bus, + event_bus=queen_event_bus, stream_id="queen", storage_path=queen_dir, loop_config=queen_graph.loop_config, @@ -1288,7 +1296,7 @@ class AdenTUI(App): # The judge runs as a silent background task. Only surface # escalation ticket events on the status bar; everything else # (LLM deltas, tool calls, node iterations) goes to logs only. - if event.stream_id == "worker_health_judge": + if event.stream_id == "judge": if et == EventType.WORKER_ESCALATION_TICKET: ticket = event.data.get("ticket", {}) severity = ticket.get("severity", "") diff --git a/docs/server-cli-arch.md b/docs/server-cli-arch.md index 386f2ec9..02c9e098 100644 --- a/docs/server-cli-arch.md +++ b/docs/server-cli-arch.md @@ -297,6 +297,17 @@ One AgentRuntime (shared EventBus) isolation_level: isolated ``` +### GraphScopedEventBus and Event Identity Fields + +Every event carries four identity fields: `(graph_id, stream_id, node_id, execution_id)`. + +- **`graph_id`** — Set automatically by `GraphScopedEventBus`, a public subclass of `EventBus` that stamps `graph_id` on every `publish()` call. All three components (worker, judge, queen) use a scoped bus so their events are distinguishable. +- **`stream_id`** — The entry point pipeline. Flows from `EntryPointSpec.id` through `ExecutionStream` → `GraphExecutor` → `NodeContext` → `EventLoopNode`. +- **`node_id`** — The graph node emitting the event. +- **`execution_id`** — UUID for a specific execution run, set by `ExecutionStream` and wired through `GraphExecutor` → `EventLoopNode` → all `emit_*` calls. + +See [EVENT_TYPES.md](../core/framework/runtime/EVENT_TYPES.md) for the complete event type and schema reference. + ### New EventBus Event Types Two new events added to `EventType` enum: @@ -307,7 +318,7 @@ Emitted by the health judge's `emit_escalation_ticket` tool when the judge detec ```python EventBus.emit_worker_escalation_ticket( - stream_id: str, # Judge's stream ID (e.g. "worker_health_judge::health_check") + stream_id: str, # Judge's stream ID (e.g. "judge") node_id: str, # "judge" ticket: dict, # Full EscalationTicket (see data model below) execution_id: str | None = None, @@ -351,8 +362,8 @@ EventBus.emit_queen_intervention_requested( ticket_id: str, # References the original EscalationTicket analysis: str, # Queen's 2-3 sentence analysis severity: str, # "low" | "medium" | "high" | "critical" - queen_graph_id: str, # "hive_coder_queen" - queen_stream_id: str, # "hive_coder_queen::ticket_receiver" + queen_graph_id: str, # "queen" + queen_stream_id: str, # "queen" execution_id: str | None = None, ) ``` @@ -364,8 +375,8 @@ EventBus.emit_queen_intervention_requested( "ticket_id": "uuid", "analysis": "Worker is stuck in a rate-limit retry loop for 6+ minutes. Suggest pausing and retrying with backoff.", "severity": "high", - "queen_graph_id": "hive_coder_queen", - "queen_stream_id": "hive_coder_queen::ticket_receiver" + "queen_graph_id": "queen", + "queen_stream_id": "queen" } ``` @@ -417,6 +428,15 @@ The following existing methods gained a `graph_id` parameter to support multi-gr |---|---|---| | `get_active_graph()` | `-> GraphSpec` | Returns the `GraphSpec` for the currently active graph (used by TUI/chat routing) | | `active_graph_id` (property) | `str` (get/set) | The graph that receives user input. Set by TUI when switching between worker and queen views | +| `get_active_streams()` | `-> list[dict]` | Returns metadata for every stream with active executions across all graphs. Each dict contains `graph_id`, `stream_id`, `entry_point_id`, `active_execution_ids`, `is_awaiting_input`, `waiting_nodes`. | +| `get_waiting_nodes()` | `-> list[dict]` | Flat list of all nodes currently blocked waiting for client input across all graphs/streams. Each dict contains `graph_id`, `stream_id`, `node_id`, `execution_id`. | + +### New ExecutionStream APIs + +| Method | Signature | Description | +|---|---|---| +| `get_waiting_nodes()` | `-> list[dict]` | Returns `[{"node_id": str, "execution_id": str}]` for every `EventLoopNode` with `_awaiting_input == True`. | +| `get_injectable_nodes()` | `-> list[dict]` | Returns `[{"node_id": str, "execution_id": str}]` for every node that supports message injection (has `inject_event` method). | ### Proposed HTTP Endpoints @@ -433,6 +453,13 @@ These endpoints are not yet implemented. They expose the new multi-graph and mon | `PUT /api/agents/{id}/active-graph` | Switch active graph | `runtime.active_graph_id = graph_id` | | `GET /api/agents/{id}/active-graph` | Get active graph | `runtime.active_graph_id` | +#### Stream Introspection + +| HTTP Endpoint | Method | Runtime Primitive | +|---|---|---| +| `GET /api/agents/{id}/streams` | Active streams | `runtime.get_active_streams()` — all streams with active executions | +| `GET /api/agents/{id}/waiting-nodes` | Waiting nodes | `runtime.get_waiting_nodes()` — all nodes blocked on client input | + #### Worker Health Monitoring | HTTP Endpoint | Method | Runtime Primitive | @@ -476,7 +503,7 @@ Secondary graphs have fully isolated storage under `graphs/{graph_id}/` to preve | +-- conversations/ | +-- logs/ +-- graphs/ -| +-- worker_health_judge/ # Health judge (secondary) +| +-- judge/ # Health judge (secondary) | | +-- sessions/ | | | +-- session_YYYYMMDD_HHMMSS_{uuid}/ # ONE persistent session | | | +-- state.json @@ -485,7 +512,7 @@ Secondary graphs have fully isolated storage under `graphs/{graph_id}/` to preve | | | +-- tool_logs.jsonl | | | +-- details.jsonl | | +-- runtime_logs/ -| +-- hive_coder_queen/ # Queen triage (secondary) +| +-- queen/ # Queen triage (secondary) | +-- sessions/ | | +-- session_YYYYMMDD_HHMMSS_{uuid}/ # ONE persistent session | | +-- state.json @@ -498,7 +525,7 @@ Secondary graphs have fully isolated storage under `graphs/{graph_id}/` to preve Each secondary graph gets its own `SessionStore` and `RuntimeLogStore` scoped to `graphs/{graph_id}/`. This is set up in `AgentRuntime.add_graph()`: ```python -graph_base = self._session_store.base_path / subpath # e.g. .../graphs/worker_health_judge +graph_base = self._session_store.base_path / subpath # e.g. .../graphs/judge graph_session_store = SessionStore(graph_base) graph_log_store = RuntimeLogStore(graph_base / "runtime_logs") ``` @@ -513,14 +540,27 @@ Three tools registered via `register_worker_monitoring_tools(registry, event_bus | `emit_escalation_ticket(ticket_json)` | Health Judge | Validates JSON against `EscalationTicket` schema (Pydantic rejects partial tickets), then calls `EventBus.emit_worker_escalation_ticket()`. | | `notify_operator(ticket_id, analysis, urgency)` | Queen | Calls `EventBus.emit_queen_intervention_requested()` so the TUI/frontend surfaces a notification. | +### Queen Lifecycle Tools + +Four tools registered via `register_queen_lifecycle_tools(registry, worker_runtime, event_bus)`. These close over the worker's `AgentRuntime` to give the Queen control over the worker agent's lifecycle. + +| Tool | Description | +|---|---| +| `start_worker(task)` | Trigger the worker's default entry point with a task description. Returns an `execution_id`. | +| `stop_worker()` | Cancel all active worker executions. Returns IDs of cancelled executions. | +| `get_worker_status()` | Check if the worker is idle, running, or waiting for input. Returns execution details and waiting node ID if applicable. Uses `stream.get_waiting_nodes()` for accurate detection. | +| `inject_worker_message(content)` | Send a message to the running worker agent by finding an injectable node via `stream.get_injectable_nodes()` and calling `stream.inject_input()`. | + ### New File Reference | Component | Path | |---|---| | EscalationTicket model | `core/framework/runtime/escalation_ticket.py` | -| Worker Health Judge graph | `core/framework/monitoring/worker_health_judge.py` | +| Worker Health Judge graph | `core/framework/monitoring/judge.py` | | Worker monitoring tools | `core/framework/tools/worker_monitoring_tools.py` | +| Queen lifecycle tools | `core/framework/tools/queen_lifecycle_tools.py` | | Monitoring package init | `core/framework/monitoring/__init__.py` | +| Event types reference | `core/framework/runtime/EVENT_TYPES.md` | --- @@ -544,3 +584,11 @@ Three tools registered via `register_worker_monitoring_tools(registry, event_bus | SessionStore | `core/framework/storage/session_store.py` | | CheckpointStore | `core/framework/storage/checkpoint_store.py` | | Runtime logger | `core/framework/runtime/core.py` | +| EventBus | `core/framework/runtime/event_bus.py` | +| ExecutionStream | `core/framework/runtime/execution_stream.py` | +| GraphScopedEventBus | `core/framework/runtime/execution_stream.py` | +| EscalationTicket | `core/framework/runtime/escalation_ticket.py` | +| Queen lifecycle tools | `core/framework/tools/queen_lifecycle_tools.py` | +| Worker monitoring tools | `core/framework/tools/worker_monitoring_tools.py` | +| Health Judge graph | `core/framework/monitoring/judge.py` | +| Event types reference | `core/framework/runtime/EVENT_TYPES.md` |