Merge branch 'feature/concurrent-judge-runtime' into feat/open-hive

This commit is contained in:
bryan
2026-02-24 09:11:42 -08:00
12 changed files with 768 additions and 77 deletions
+46 -21
View File
@@ -229,8 +229,9 @@ class EventLoopNode(NodeProtocol):
start_time = time.time()
total_input_tokens = 0
total_output_tokens = 0
stream_id = ctx.node_id
stream_id = ctx.stream_id or ctx.node_id
node_id = ctx.node_id
execution_id = ctx.execution_id or ""
# Verdict counters for runtime logging
_accept_count = _retry_count = _escalate_count = _continue_count = 0
@@ -369,7 +370,7 @@ class EventLoopNode(NodeProtocol):
)
# 4. Publish loop started
await self._publish_loop_started(stream_id, node_id)
await self._publish_loop_started(stream_id, node_id, execution_id)
# 5. Stall / doom loop detection state (restored from cursor if resuming)
recent_responses: list[str] = _restored_recent_responses
@@ -411,7 +412,7 @@ class EventLoopNode(NodeProtocol):
await self._drain_injection_queue(conversation)
# 6c. Publish iteration event
await self._publish_iteration(stream_id, node_id, iteration)
await self._publish_iteration(stream_id, node_id, iteration, execution_id)
# 6d. Pre-turn compaction check (tiered)
if conversation.needs_compaction():
@@ -484,6 +485,7 @@ class EventLoopNode(NodeProtocol):
retry_count=_stream_retry_count,
max_retries=self._config.max_stream_retries,
error=str(e)[:500],
execution_id=execution_id,
)
await asyncio.sleep(delay)
continue # retry same iteration
@@ -560,7 +562,7 @@ class EventLoopNode(NodeProtocol):
node_id,
iteration,
)
await self._publish_loop_completed(stream_id, node_id, iteration + 1)
await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id)
latency_ms = int((time.time() - start_time) * 1000)
return NodeResult(
success=True,
@@ -575,7 +577,7 @@ class EventLoopNode(NodeProtocol):
if len(recent_responses) > self._config.stall_detection_threshold:
recent_responses.pop(0)
if self._is_stalled(recent_responses):
await self._publish_stalled(stream_id, node_id)
await self._publish_stalled(stream_id, node_id, execution_id)
latency_ms = int((time.time() - start_time) * 1000)
_continue_count += 1
if ctx.runtime_logger:
@@ -646,6 +648,7 @@ class EventLoopNode(NodeProtocol):
stream_id=stream_id,
node_id=node_id,
description=doom_desc,
execution_id=execution_id,
)
warning_msg = (
f"[SYSTEM] {doom_desc}. You are repeating the "
@@ -660,6 +663,7 @@ class EventLoopNode(NodeProtocol):
stream_id=stream_id,
node_id=node_id,
prompt=doom_desc,
execution_id=execution_id,
)
self._awaiting_input = True
try:
@@ -709,7 +713,7 @@ class EventLoopNode(NodeProtocol):
if _cf_block:
if self._shutdown:
await self._publish_loop_completed(stream_id, node_id, iteration + 1)
await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id)
latency_ms = int((time.time() - start_time) * 1000)
_continue_count += 1
if ctx.runtime_logger:
@@ -759,7 +763,7 @@ class EventLoopNode(NodeProtocol):
got_input = await self._await_user_input(ctx)
logger.info("[%s] iter=%d: unblocked, got_input=%s", node_id, iteration, got_input)
if not got_input:
await self._publish_loop_completed(stream_id, node_id, iteration + 1)
await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id)
latency_ms = int((time.time() - start_time) * 1000)
_continue_count += 1
if ctx.runtime_logger:
@@ -878,6 +882,7 @@ class EventLoopNode(NodeProtocol):
feedback=fb_preview,
judge_type=judge_type,
iteration=iteration,
execution_id=execution_id,
)
if verdict.action == "ACCEPT":
@@ -919,7 +924,7 @@ class EventLoopNode(NodeProtocol):
for key, value in accumulator.to_dict().items():
ctx.memory.write(key, value, validate=False)
await self._publish_loop_completed(stream_id, node_id, iteration + 1)
await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id)
latency_ms = int((time.time() - start_time) * 1000)
_accept_count += 1
if ctx.runtime_logger:
@@ -962,7 +967,7 @@ class EventLoopNode(NodeProtocol):
elif verdict.action == "ESCALATE":
# Exit point 6: Judge ESCALATE — log step + log_node_complete
await self._publish_loop_completed(stream_id, node_id, iteration + 1)
await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id)
latency_ms = int((time.time() - start_time) * 1000)
_escalate_count += 1
if ctx.runtime_logger:
@@ -1026,7 +1031,7 @@ class EventLoopNode(NodeProtocol):
continue
# 7. Max iterations exhausted
await self._publish_loop_completed(stream_id, node_id, self._config.max_iterations)
await self._publish_loop_completed(stream_id, node_id, self._config.max_iterations, execution_id)
latency_ms = int((time.time() - start_time) * 1000)
if ctx.runtime_logger:
ctx.runtime_logger.log_node_complete(
@@ -1098,9 +1103,10 @@ class EventLoopNode(NodeProtocol):
if self._event_bus:
await self._event_bus.emit_client_input_requested(
stream_id=ctx.node_id,
stream_id=ctx.stream_id or ctx.node_id,
node_id=ctx.node_id,
prompt="",
execution_id=ctx.execution_id or "",
)
self._awaiting_input = True
@@ -1139,8 +1145,9 @@ class EventLoopNode(NodeProtocol):
``real_tool_results`` which resets each inner iteration, this list grows
across the entire turn.
"""
stream_id = ctx.node_id
stream_id = ctx.stream_id or ctx.node_id
node_id = ctx.node_id
execution_id = ctx.execution_id or ""
token_counts: dict[str, int] = {"input": 0, "output": 0}
tool_call_count = 0
final_text = ""
@@ -1192,7 +1199,8 @@ class EventLoopNode(NodeProtocol):
if isinstance(event, TextDeltaEvent):
accumulated_text = event.snapshot
await self._publish_text_delta(
stream_id, node_id, event.content, event.snapshot, ctx
stream_id, node_id, event.content, event.snapshot, ctx,
execution_id,
)
elif isinstance(event, ToolCallEvent):
@@ -1276,7 +1284,8 @@ class EventLoopNode(NodeProtocol):
executed_in_batch += 1
await self._publish_tool_started(
stream_id, node_id, tc.tool_use_id, tc.tool_name, tc.tool_input
stream_id, node_id, tc.tool_use_id, tc.tool_name, tc.tool_input,
execution_id,
)
logger.info(
"[%s] tool_call: %s(%s)",
@@ -1308,7 +1317,7 @@ class EventLoopNode(NodeProtocol):
key = tc.tool_input.get("key", "")
await accumulator.set(key, value)
outputs_set_this_turn.append(key)
await self._publish_output_key_set(stream_id, node_id, key)
await self._publish_output_key_set(stream_id, node_id, key, execution_id)
logged_tool_calls.append(
{
"tool_use_id": tc.tool_use_id,
@@ -1419,6 +1428,7 @@ class EventLoopNode(NodeProtocol):
tc.tool_name,
result.content,
result.is_error,
execution_id,
)
# If the limit was hit, add error results for every remaining
@@ -2138,7 +2148,7 @@ class EventLoopNode(NodeProtocol):
await self._event_bus.publish(
AgentEvent(
type=EventType.CONTEXT_COMPACTED,
stream_id=ctx.node_id,
stream_id=ctx.stream_id or ctx.node_id,
node_id=ctx.node_id,
data={
"level": "prune_only",
@@ -2194,7 +2204,7 @@ class EventLoopNode(NodeProtocol):
await self._event_bus.publish(
AgentEvent(
type=EventType.CONTEXT_COMPACTED,
stream_id=ctx.node_id,
stream_id=ctx.stream_id or ctx.node_id,
node_id=ctx.node_id,
data={
"level": level,
@@ -2516,36 +2526,40 @@ class EventLoopNode(NodeProtocol):
# EventBus publishing helpers
# -------------------------------------------------------------------
async def _publish_loop_started(self, stream_id: str, node_id: str) -> None:
async def _publish_loop_started(self, stream_id: str, node_id: str, execution_id: str = "") -> None:
if self._event_bus:
await self._event_bus.emit_node_loop_started(
stream_id=stream_id,
node_id=node_id,
max_iterations=self._config.max_iterations,
execution_id=execution_id,
)
async def _publish_iteration(self, stream_id: str, node_id: str, iteration: int) -> None:
async def _publish_iteration(self, stream_id: str, node_id: str, iteration: int, execution_id: str = "") -> None:
if self._event_bus:
await self._event_bus.emit_node_loop_iteration(
stream_id=stream_id,
node_id=node_id,
iteration=iteration,
execution_id=execution_id,
)
async def _publish_loop_completed(self, stream_id: str, node_id: str, iterations: int) -> None:
async def _publish_loop_completed(self, stream_id: str, node_id: str, iterations: int, execution_id: str = "") -> None:
if self._event_bus:
await self._event_bus.emit_node_loop_completed(
stream_id=stream_id,
node_id=node_id,
iterations=iterations,
execution_id=execution_id,
)
async def _publish_stalled(self, stream_id: str, node_id: str) -> None:
async def _publish_stalled(self, stream_id: str, node_id: str, execution_id: str = "") -> None:
if self._event_bus:
await self._event_bus.emit_node_stalled(
stream_id=stream_id,
node_id=node_id,
reason="Consecutive identical responses detected",
execution_id=execution_id,
)
async def _publish_text_delta(
@@ -2555,6 +2569,7 @@ class EventLoopNode(NodeProtocol):
content: str,
snapshot: str,
ctx: NodeContext,
execution_id: str = "",
) -> None:
if self._event_bus:
if ctx.node_spec.client_facing:
@@ -2563,6 +2578,7 @@ class EventLoopNode(NodeProtocol):
node_id=node_id,
content=content,
snapshot=snapshot,
execution_id=execution_id,
)
else:
await self._event_bus.emit_llm_text_delta(
@@ -2570,6 +2586,7 @@ class EventLoopNode(NodeProtocol):
node_id=node_id,
content=content,
snapshot=snapshot,
execution_id=execution_id,
)
async def _publish_tool_started(
@@ -2579,6 +2596,7 @@ class EventLoopNode(NodeProtocol):
tool_use_id: str,
tool_name: str,
tool_input: dict,
execution_id: str = "",
) -> None:
if self._event_bus:
await self._event_bus.emit_tool_call_started(
@@ -2587,6 +2605,7 @@ class EventLoopNode(NodeProtocol):
tool_use_id=tool_use_id,
tool_name=tool_name,
tool_input=tool_input,
execution_id=execution_id,
)
async def _publish_tool_completed(
@@ -2597,6 +2616,7 @@ class EventLoopNode(NodeProtocol):
tool_name: str,
result: str,
is_error: bool,
execution_id: str = "",
) -> None:
if self._event_bus:
await self._event_bus.emit_tool_call_completed(
@@ -2606,6 +2626,7 @@ class EventLoopNode(NodeProtocol):
tool_name=tool_name,
result=result,
is_error=is_error,
execution_id=execution_id,
)
async def _publish_judge_verdict(
@@ -2616,6 +2637,7 @@ class EventLoopNode(NodeProtocol):
feedback: str = "",
judge_type: str = "implicit",
iteration: int = 0,
execution_id: str = "",
) -> None:
if self._event_bus:
await self._event_bus.emit_judge_verdict(
@@ -2625,6 +2647,7 @@ class EventLoopNode(NodeProtocol):
feedback=feedback,
judge_type=judge_type,
iteration=iteration,
execution_id=execution_id,
)
async def _publish_output_key_set(
@@ -2632,10 +2655,12 @@ class EventLoopNode(NodeProtocol):
stream_id: str,
node_id: str,
key: str,
execution_id: str = "",
) -> None:
if self._event_bus:
await self._event_bus.emit_output_key_set(
stream_id=stream_id,
node_id=node_id,
key=key,
execution_id=execution_id,
)
+19 -5
View File
@@ -131,6 +131,7 @@ class GraphExecutor:
parallel_config: ParallelExecutionConfig | None = None,
event_bus: Any | None = None,
stream_id: str = "",
execution_id: str = "",
runtime_logger: Any = None,
storage_path: str | Path | None = None,
loop_config: dict[str, Any] | None = None,
@@ -170,6 +171,7 @@ class GraphExecutor:
self.logger = logging.getLogger(__name__)
self._event_bus = event_bus
self._stream_id = stream_id
self._execution_id = execution_id or getattr(runtime, "execution_id", "")
self.runtime_logger = runtime_logger
self._storage_path = Path(storage_path) if storage_path else None
self._loop_config = loop_config or {}
@@ -553,6 +555,7 @@ class GraphExecutor:
await self._event_bus.emit_execution_resumed(
stream_id=self._stream_id,
node_id=current_node_id,
execution_id=self._execution_id,
)
# Start run
@@ -597,6 +600,7 @@ class GraphExecutor:
stream_id=self._stream_id,
node_id=current_node_id,
reason="User requested pause (Ctrl+Z)",
execution_id=self._execution_id,
)
# Create session state for pause
@@ -767,7 +771,8 @@ class GraphExecutor:
# Emit node-started event (skip event_loop nodes — they emit their own)
if self._event_bus and node_spec.node_type != "event_loop":
await self._event_bus.emit_node_loop_started(
stream_id=self._stream_id, node_id=current_node_id
stream_id=self._stream_id, node_id=current_node_id,
execution_id=self._execution_id,
)
# Execute node
@@ -777,7 +782,8 @@ class GraphExecutor:
# Emit node-completed event (skip event_loop nodes)
if self._event_bus and node_spec.node_type != "event_loop":
await self._event_bus.emit_node_loop_completed(
stream_id=self._stream_id, node_id=current_node_id, iterations=1
stream_id=self._stream_id, node_id=current_node_id, iterations=1,
execution_id=self._execution_id,
)
# Ensure runtime logging has an L2 entry for this node
@@ -897,6 +903,7 @@ class GraphExecutor:
retry_count=retry_count,
max_retries=max_retries,
error=result.error or "",
execution_id=self._execution_id,
)
_is_retry = True
@@ -992,6 +999,7 @@ class GraphExecutor:
stream_id=self._stream_id,
node_id=node_spec.id,
reason="HITL pause node",
execution_id=self._execution_id,
)
saved_memory = memory.read_all()
@@ -1057,6 +1065,7 @@ class GraphExecutor:
source_node=current_node_id,
target_node=result.next_node,
edge_condition="router",
execution_id=self._execution_id,
)
current_node_id = result.next_node
@@ -1092,6 +1101,7 @@ class GraphExecutor:
edge_condition=edge.condition.value
if hasattr(edge.condition, "value")
else str(edge.condition),
execution_id=self._execution_id,
)
# Execute branches in parallel
@@ -1143,6 +1153,7 @@ class GraphExecutor:
stream_id=self._stream_id,
source_node=current_node_id,
target_node=next_node,
execution_id=self._execution_id,
)
# CHECKPOINT: node_complete (after determining next node)
@@ -1573,7 +1584,8 @@ class GraphExecutor:
cumulative_output_keys=cumulative_output_keys or [],
event_triggered=event_triggered,
accounts_prompt=node_accounts_prompt,
execution_id=self.runtime.execution_id,
execution_id=self._execution_id,
stream_id=self._stream_id,
)
VALID_NODE_TYPES = {
@@ -1933,7 +1945,8 @@ class GraphExecutor:
# Emit node-started event (skip event_loop nodes)
if self._event_bus and node_spec.node_type != "event_loop":
await self._event_bus.emit_node_loop_started(
stream_id=self._stream_id, node_id=branch.node_id
stream_id=self._stream_id, node_id=branch.node_id,
execution_id=self._execution_id,
)
self.logger.info(
@@ -1957,7 +1970,8 @@ class GraphExecutor:
# Emit node-completed event (skip event_loop nodes)
if self._event_bus and node_spec.node_type != "event_loop":
await self._event_bus.emit_node_loop_completed(
stream_id=self._stream_id, node_id=branch.node_id, iterations=1
stream_id=self._stream_id, node_id=branch.node_id, iterations=1,
execution_id=self._execution_id,
)
if result.success:
+4
View File
@@ -518,6 +518,10 @@ class NodeContext:
# Execution ID (from StreamRuntimeAdapter)
execution_id: str = ""
# Stream identity — the ExecutionStream this node runs within.
# Falls back to node_id when not set (legacy / standalone executor).
stream_id: str = ""
@dataclass
class NodeResult:
+3 -3
View File
@@ -15,15 +15,15 @@ Usage::
# Load judge as secondary graph on the worker runtime
await worker_runtime.add_graph(
graph_id="worker_health_judge",
graph_id="judge",
graph=judge_graph,
goal=judge_goal,
entry_points={"health_check": HEALTH_JUDGE_ENTRY_POINT},
storage_subpath="graphs/worker_health_judge",
storage_subpath="graphs/judge",
)
"""
from .worker_health_judge import HEALTH_JUDGE_ENTRY_POINT, judge_goal, judge_graph, judge_node
from .judge import HEALTH_JUDGE_ENTRY_POINT, judge_goal, judge_graph, judge_node
__all__ = [
"HEALTH_JUDGE_ENTRY_POINT",
@@ -20,11 +20,11 @@ Usage::
# Load judge as secondary graph on the worker runtime
await worker_runtime.add_graph(
graph_id="worker_health_judge",
graph_id="judge",
graph=judge_graph,
goal=judge_goal,
entry_points={"health_check": HEALTH_JUDGE_ENTRY_POINT},
storage_subpath="graphs/worker_health_judge",
storage_subpath="graphs/judge",
)
Design:
@@ -239,7 +239,7 @@ HEALTH_JUDGE_ENTRY_POINT = AsyncEntryPointSpec(
# ---------------------------------------------------------------------------
judge_graph = GraphSpec(
id="worker-health-judge-graph",
id="judge-graph",
goal_id=judge_goal.id,
version="1.0.0",
entry_node="judge",
+539
View File
@@ -0,0 +1,539 @@
# Event Types and Schema Reference
The Hive runtime uses a pub/sub `EventBus` for inter-component communication and observability. Every event is an `AgentEvent` dataclass published through `EventBus.publish()`.
## Event Envelope (`AgentEvent`)
Every event shares a common envelope:
| Field | Type | Description |
| ---------------- | ----------------- | ------------------------------------------------------------ |
| `type` | `EventType` (str) | Event type identifier (see below) |
| `stream_id` | `str` | Entry point / pipeline that emitted the event |
| `node_id` | `str \| None` | Graph node that emitted the event |
| `execution_id` | `str \| None` | Unique execution run ID (UUID, set by `ExecutionStream`) |
| `graph_id` | `str \| None` | Graph that emitted the event (set by `GraphScopedEventBus`) |
| `data` | `dict` | Event-type-specific payload (see individual schemas below) |
| `timestamp` | `datetime` | When the event was created |
| `correlation_id` | `str \| None` | Optional ID for tracking related events across streams |
### Identity Fields
The identity tuple `(graph_id, stream_id, node_id, execution_id)` uniquely locates any event:
- **`graph_id`** — Which graph produced the event. Set automatically by `GraphScopedEventBus` (a subclass that stamps `graph_id` on every `publish()` call). Values: `"worker"`, `"judge"`, `"queen"`, or the graph spec ID.
- **`stream_id`** — Which entry point / pipeline. Corresponds to `EntryPointSpec.id` in the graph definition. For single-entry-point graphs, this equals the entry point name (e.g. `"default"`, `"health_check"`, `"ticket_receiver"`).
- **`node_id`** — Which specific node emitted the event. For `EventLoopNode` events, this is the node spec ID.
- **`execution_id`** — UUID identifying a specific execution run. Multiple concurrent executions of the same entry point each get a unique `execution_id`.
---
## Execution Lifecycle
### `execution_started`
A new graph execution has begun.
| Data Field | Type | Description |
| ---------- | ------ | ------------------------------- |
| `input` | `dict` | Input data passed to the graph |
**Emitted by:** `ExecutionStream._run_execution()`
---
### `execution_completed`
A graph execution finished successfully.
| Data Field | Type | Description |
| ---------- | ------ | ----------------- |
| `output` | `dict` | Final output data |
**Emitted by:** `ExecutionStream._run_execution()`
---
### `execution_failed`
A graph execution failed with an error.
| Data Field | Type | Description |
| ---------- | ----- | ------------- |
| `error` | `str` | Error message |
**Emitted by:** `ExecutionStream._run_execution()`
---
### `execution_paused`
Execution has been paused (Ctrl+Z or HITL approval).
| Data Field | Type | Description |
| ---------- | ----- | ----------------- |
| `reason` | `str` | Why it was paused |
**Emitted by:** `GraphExecutor.execute()`
---
### `execution_resumed`
Execution has resumed from a paused state.
| Data Field | Type | Description |
| ---------- | ---- | ----------- |
| *(none)* | | |
**Emitted by:** `GraphExecutor.execute()`
---
## Node Event-Loop Lifecycle
These events track the inner loop of `EventLoopNode` — the multi-turn LLM streaming loop that powers most agent nodes.
### `node_loop_started`
An EventLoopNode has begun its execution loop.
| Data Field | Type | Description |
| ---------------- | ---------- | ------------------------------- |
| `max_iterations` | `int\|null`| Maximum iterations configured |
**Emitted by:** `EventLoopNode._publish_loop_started()`, `GraphExecutor` (for function nodes in parallel branches)
---
### `node_loop_iteration`
An EventLoopNode has started a new iteration (one LLM turn).
| Data Field | Type | Description |
| ----------- | ----- | ------------------------- |
| `iteration` | `int` | Zero-based iteration index |
**Emitted by:** `EventLoopNode._publish_iteration()`
---
### `node_loop_completed`
An EventLoopNode has finished its execution loop.
| Data Field | Type | Description |
| ------------ | ----- | -------------------------------------- |
| `iterations` | `int` | Total number of iterations completed |
**Emitted by:** `EventLoopNode._publish_loop_completed()`, `GraphExecutor` (for function nodes in parallel branches)
---
## LLM Streaming
### `llm_text_delta`
Incremental text output from the LLM (non-client-facing nodes only).
| Data Field | Type | Description |
| ---------- | ----- | ---------------------------------------- |
| `content` | `str` | New text chunk (delta) |
| `snapshot` | `str` | Full accumulated text so far |
**Emitted by:** `EventLoopNode._publish_text_delta()` when `client_facing=False`
---
### `llm_reasoning_delta`
Incremental reasoning/thinking output from the LLM.
| Data Field | Type | Description |
| ---------- | ----- | ------------------- |
| `content` | `str` | New reasoning chunk |
**Emitted by:** Not currently wired in `EventLoopNode` (reserved for extended thinking models).
---
## Tool Lifecycle
### `tool_call_started`
The LLM has requested a tool call and execution is about to begin.
| Data Field | Type | Description |
| ------------ | ------ | ------------------------------------ |
| `tool_use_id`| `str` | Unique ID for this tool invocation |
| `tool_name` | `str` | Name of the tool being called |
| `tool_input` | `dict` | Arguments passed to the tool |
**Emitted by:** `EventLoopNode._publish_tool_started()`
---
### `tool_call_completed`
A tool call has finished executing.
| Data Field | Type | Description |
| ------------ | ------ | -------------------------------------- |
| `tool_use_id`| `str` | Same ID from `tool_call_started` |
| `tool_name` | `str` | Name of the tool |
| `result` | `str` | Tool execution result (may be truncated)|
| `is_error` | `bool` | Whether the tool returned an error |
**Emitted by:** `EventLoopNode._publish_tool_completed()`
---
## Client I/O
These events are emitted only by nodes with `client_facing=True`. They drive the TUI's chat interface.
### `client_output_delta`
Incremental text output meant for the human operator.
| Data Field | Type | Description |
| ---------- | ----- | ---------------------------- |
| `content` | `str` | New text chunk (delta) |
| `snapshot` | `str` | Full accumulated text so far |
**Emitted by:** `EventLoopNode._publish_text_delta()` when `client_facing=True`
---
### `client_input_requested`
The node is waiting for human input (via `ask_user` tool or auto-block on text-only turns).
| Data Field | Type | Description |
| ---------- | ----- | ------------------------------------------------- |
| `prompt` | `str` | Optional prompt/question shown to the user |
**Emitted by:** `EventLoopNode._await_user_input()`, doom loop handler
The TUI subscribes to this event to show the input prompt and focus the chat input. After the user types, `inject_event()` is called on the node to unblock it.
---
## Internal Node Observability
### `node_internal_output`
Output from a non-client-facing node (for debugging/monitoring).
| Data Field | Type | Description |
| ---------- | ----- | ---------------- |
| `content` | `str` | Output text |
**Emitted by:** Available via `emit_node_internal_output()` — not currently wired in the default `EventLoopNode`.
---
### `node_input_blocked`
A non-client-facing node is blocked waiting for input.
| Data Field | Type | Description |
| ---------- | ----- | --------------- |
| `prompt` | `str` | Block reason |
**Emitted by:** Available via `emit_node_input_blocked()` — reserved for future use.
---
### `node_stalled`
The node's LLM has produced identical responses for several consecutive turns (stall detection).
| Data Field | Type | Description |
| ---------- | ----- | ------------------------------------------------- |
| `reason` | `str` | Always `"Consecutive identical responses detected"`|
**Emitted by:** `EventLoopNode._publish_stalled()`
---
### `node_tool_doom_loop`
The LLM is calling the same tool(s) with identical arguments repeatedly (doom loop detection).
| Data Field | Type | Description |
| ------------- | ----- | ------------------------------------ |
| `description` | `str` | Human-readable doom loop description |
**Emitted by:** `EventLoopNode` doom loop handler
---
## Judge Decisions
### `judge_verdict`
The judge (custom or implicit) has evaluated the current iteration.
| Data Field | Type | Description |
| ------------ | ----- | ---------------------------------------------------- |
| `action` | `str` | `"ACCEPT"`, `"RETRY"`, `"ESCALATE"`, or `"CONTINUE"` |
| `feedback` | `str` | Judge feedback (empty for ACCEPT/CONTINUE) |
| `judge_type` | `str` | `"custom"` (explicit JudgeProtocol) or `"implicit"` (stop-reason heuristic) |
| `iteration` | `int` | Which iteration this verdict applies to |
**Emitted by:** `EventLoopNode._publish_judge_verdict()`
**Verdict meanings:**
- **ACCEPT** — Output meets requirements; node exits successfully.
- **RETRY** — Output needs improvement; loop continues with feedback injected.
- **ESCALATE** — Problem cannot be solved at this level; triggers escalation.
- **CONTINUE** — Implicit verdict: LLM called tools, so it's making progress — let it keep going.
---
## Output Tracking
### `output_key_set`
A node has set an output key via the `set_output` synthetic tool.
| Data Field | Type | Description |
| ---------- | ----- | ----------------- |
| `key` | `str` | Output key name |
**Emitted by:** `EventLoopNode._publish_output_key_set()`
---
## Retry & Edge Tracking
### `node_retry`
A transient error occurred during an LLM call and the node is retrying.
| Data Field | Type | Description |
| ------------- | ----- | ---------------------------------- |
| `retry_count` | `int` | Current retry attempt number |
| `max_retries` | `int` | Maximum retries configured |
| `error` | `str` | Error message (truncated to 500ch) |
**Emitted by:** `EventLoopNode` (stream retry handler), `GraphExecutor` (node-level retry)
---
### `edge_traversed`
The executor has traversed an edge from one node to another.
| Data Field | Type | Description |
| ---------------- | ----- | ---------------------------------------------- |
| `source_node` | `str` | Node ID the edge starts from |
| `target_node` | `str` | Node ID the edge goes to |
| `edge_condition` | `str` | Edge condition: `"router"`, `"on_success"`, etc. |
**Emitted by:** `GraphExecutor.execute()` — after router decisions, condition-based edges, and fallback edges.
---
## Context Management
### `context_compacted`
Not currently emitted — reserved for future use when `NodeConversation` compacts history.
---
## State Changes
### `state_changed`
A shared memory key has been modified.
| Data Field | Type | Description |
| ----------- | ----- | ---------------------------------- |
| `key` | `str` | Memory key that changed |
| `old_value` | `Any` | Previous value |
| `new_value` | `Any` | New value |
| `scope` | `str` | Scope of the change |
**Emitted by:** Available via `emit_state_changed()` — not currently wired in default execution.
---
### `state_conflict`
Not currently emitted — reserved for concurrent write conflict detection.
---
## Goal Tracking
### `goal_progress`
Goal completion progress update.
| Data Field | Type | Description |
| ----------------- | ------- | ------------------------------------ |
| `progress` | `float` | 0.01.0 completion fraction |
| `criteria_status` | `dict` | Per-criterion status |
**Emitted by:** Available via `emit_goal_progress()` — not currently wired in default execution.
---
### `goal_achieved`
Not currently emitted — reserved for explicit goal completion signals.
---
### `constraint_violation`
A goal constraint has been violated.
| Data Field | Type | Description |
| --------------- | ----- | ------------------------ |
| `constraint_id` | `str` | Which constraint failed |
| `description` | `str` | What went wrong |
**Emitted by:** Available via `emit_constraint_violation()`.
---
## Stream Lifecycle
### `stream_started` / `stream_stopped`
Not currently emitted — reserved for `ExecutionStream` lifecycle tracking.
---
## External Triggers
### `webhook_received`
An external webhook has been received.
| Data Field | Type | Description |
| -------------- | ------ | ---------------------------- |
| `path` | `str` | Webhook URL path |
| `method` | `str` | HTTP method |
| `headers` | `dict` | HTTP headers |
| `payload` | `dict` | Request body |
| `query_params` | `dict` | URL query parameters |
**Emitted by:** Webhook server integration.
Note: `node_id` is not set on this event; `stream_id` is the webhook source ID.
---
## Escalation
### `escalation_requested`
An agent has requested handoff to the Hive Coder (via the `escalate_to_coder` synthetic tool).
| Data Field | Type | Description |
| ---------- | ----- | ------------------------------- |
| `reason` | `str` | Why escalation is needed |
| `context` | `str` | Additional context for the coder|
**Emitted by:** `EventLoopNode` when the LLM calls `escalate_to_coder`.
---
## Worker Health Monitoring
These events form the **judge → queen → operator** escalation pipeline.
### `worker_escalation_ticket`
The Worker Health Judge has detected a degradation pattern and is escalating to the Queen.
| Data Field | Type | Description |
| ---------- | ------ | ------------------------------------ |
| `ticket` | `dict` | Full `EscalationTicket` (see below) |
**Emitted by:** `emit_escalation_ticket` tool (in `worker_monitoring_tools.py`)
#### EscalationTicket Schema
| Field | Type | Description |
| ------------------------- | ------------------ | -------------------------------------------------------- |
| `ticket_id` | `str` | Auto-generated UUID |
| `created_at` | `str` | ISO timestamp |
| `worker_agent_id` | `str` | Which worker agent |
| `worker_session_id` | `str` | Which session |
| `worker_node_id` | `str` | Which node is struggling |
| `worker_graph_id` | `str` | Which graph |
| `severity` | `str` | `"low"`, `"medium"`, `"high"`, or `"critical"` |
| `cause` | `str` | Human-readable problem description |
| `judge_reasoning` | `str` | Judge's deliberation chain |
| `suggested_action` | `str` | e.g. `"Restart node"`, `"Human review"`, `"Kill session"`|
| `recent_verdicts` | `list[str]` | e.g. `["RETRY", "RETRY", "CONTINUE", "RETRY"]` |
| `total_steps_checked` | `int` | Steps the judge inspected |
| `steps_since_last_accept` | `int` | Consecutive non-ACCEPT steps |
| `stall_minutes` | `float \| null` | Minutes since last activity (null if active) |
| `evidence_snippet` | `str` | Excerpt from recent LLM output |
---
### `queen_intervention_requested`
The Queen has triaged an escalation ticket and decided the human operator should be involved.
| Data Field | Type | Description |
| ----------------- | ----- | ---------------------------------------------------- |
| `ticket_id` | `str` | From the original `EscalationTicket` |
| `analysis` | `str` | Queen's 23 sentence analysis |
| `severity` | `str` | `"low"`, `"medium"`, `"high"`, or `"critical"` |
| `queen_graph_id` | `str` | Queen's graph ID (for TUI navigation) |
| `queen_stream_id` | `str` | Queen's stream ID |
**Emitted by:** `notify_operator` tool (in `worker_monitoring_tools.py`)
The TUI subscribes to this event and shows a non-disruptive notification. The worker continues running.
---
## Custom Events
### `custom`
User-defined events with arbitrary payloads. No schema enforced.
---
## Subscription & Filtering
Events can be filtered when subscribing:
```python
bus.subscribe(
event_types=[EventType.TOOL_CALL_STARTED, EventType.TOOL_CALL_COMPLETED],
handler=my_handler,
filter_stream="default", # Only events from this stream
filter_node="planner", # Only events from this node
filter_execution="exec-uuid", # Only events from this execution
filter_graph="worker", # Only events from this graph
)
```
## Debug Event Logging
Set `HIVE_DEBUG_EVENTS=1` to write every published event to a JSONL file at `~/.hive/event_logs/<timestamp>.jsonl`. Each line is the full JSON serialization of an `AgentEvent`:
```json
{
"type": "tool_call_started",
"stream_id": "default",
"node_id": "planner",
"execution_id": "a1b2c3d4-...",
"graph_id": "worker",
"data": {"tool_use_id": "tu_1", "tool_name": "web_search", "tool_input": {"query": "..."}},
"timestamp": "2026-02-24T12:00:00.000000",
"correlation_id": null
}
```
+39
View File
@@ -1318,6 +1318,45 @@ class AgentRuntime:
"state_manager": self._state_manager.get_stats(),
}
def get_active_streams(self) -> list[dict[str, Any]]:
"""Return metadata for every stream that has active executions.
Each dict contains: ``graph_id``, ``stream_id``, ``entry_point_id``,
``active_execution_ids``, ``is_awaiting_input``, ``waiting_nodes``.
"""
result: list[dict[str, Any]] = []
for graph_id, reg in self._graphs.items():
for ep_id, stream in reg.streams.items():
active = stream.active_execution_ids
if not active:
continue
result.append({
"graph_id": graph_id,
"stream_id": stream.stream_id,
"entry_point_id": ep_id,
"active_execution_ids": active,
"is_awaiting_input": stream.is_awaiting_input,
"waiting_nodes": stream.get_waiting_nodes(),
})
return result
def get_waiting_nodes(self) -> list[dict[str, Any]]:
"""Return all nodes currently blocked waiting for client input.
Each dict contains: ``graph_id``, ``stream_id``, ``node_id``,
``execution_id``.
"""
result: list[dict[str, Any]] = []
for graph_id, reg in self._graphs.items():
for _ep_id, stream in reg.streams.items():
for waiting in stream.get_waiting_nodes():
result.append({
"graph_id": graph_id,
"stream_id": stream.stream_id,
**waiting,
})
return result
# === PROPERTIES ===
@property
+27 -2
View File
@@ -35,7 +35,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class _GraphScopedEventBus(EventBus):
class GraphScopedEventBus(EventBus):
"""Proxy that stamps ``graph_id`` on every published event.
The ``GraphExecutor`` and ``EventLoopNode`` emit events via the
@@ -235,7 +235,7 @@ class ExecutionStream:
# Graph-scoped event bus (stamps graph_id on published events)
self._scoped_event_bus = self._event_bus
if self._event_bus and self.graph_id:
self._scoped_event_bus = _GraphScopedEventBus(self._event_bus, self.graph_id)
self._scoped_event_bus = GraphScopedEventBus(self._event_bus, self.graph_id)
# State
self._running = False
@@ -276,6 +276,30 @@ class ExecutionStream:
return True
return False
def get_waiting_nodes(self) -> list[dict[str, str]]:
"""Return nodes currently blocked waiting for client input.
Each entry is ``{"node_id": ..., "execution_id": ...}``.
"""
waiting: list[dict[str, str]] = []
for exec_id, executor in self._active_executors.items():
for node_id, node in executor.node_registry.items():
if getattr(node, "_awaiting_input", False):
waiting.append({"node_id": node_id, "execution_id": exec_id})
return waiting
def get_injectable_nodes(self) -> list[dict[str, str]]:
"""Return nodes that support message injection (have ``inject_event``).
Each entry is ``{"node_id": ..., "execution_id": ...}``.
"""
injectable: list[dict[str, str]] = []
for exec_id, executor in self._active_executors.items():
for node_id, node in executor.node_registry.items():
if hasattr(node, "inject_event"):
injectable.append({"node_id": node_id, "execution_id": exec_id})
return injectable
def _record_execution_result(self, execution_id: str, result: ExecutionResult) -> None:
"""Record a completed execution result with retention pruning."""
self._execution_results[execution_id] = result
@@ -494,6 +518,7 @@ class ExecutionStream:
tool_executor=self._tool_executor,
event_bus=self._scoped_event_bus,
stream_id=self.stream_id,
execution_id=execution_id,
storage_path=exec_storage,
runtime_logger=runtime_logger,
loop_config=self.graph.loop_config,
+15 -26
View File
@@ -172,25 +172,18 @@ def register_queen_lifecycle_tools(
)
# Check if the worker is waiting for user input
waiting_for_input = False
waiting_node_id = None
waiting_nodes = []
for _ep_id, stream in reg.streams.items():
# Check active executors for pending input
for executor in stream._active_executors.values():
for node_id, node in executor.node_registry.items():
if hasattr(node, "_waiting_for_input") and node._waiting_for_input:
waiting_for_input = True
waiting_node_id = node_id
break
waiting_nodes.extend(stream.get_waiting_nodes())
status = "waiting_for_input" if waiting_for_input else "running"
status = "waiting_for_input" if waiting_nodes else "running"
result = {
**base,
"status": status,
"active_executions": active_execs,
}
if waiting_node_id:
result["waiting_node_id"] = waiting_node_id
if waiting_nodes:
result["waiting_node_id"] = waiting_nodes[0]["node_id"]
return json.dumps(result)
_status_tool = Tool(
@@ -220,20 +213,16 @@ def register_queen_lifecycle_tools(
# Find an active node that can accept injected input
for stream in reg.streams.values():
for executor in stream._active_executors.values():
for node_id, node in executor.node_registry.items():
if hasattr(node, "inject_event"):
try:
await node.inject_event(content)
return json.dumps(
{
"status": "delivered",
"node_id": node_id,
"content_preview": content[:100],
}
)
except Exception as e:
return json.dumps({"error": f"Injection failed: {e}"})
injectable = stream.get_injectable_nodes()
if injectable:
target_node_id = injectable[0]["node_id"]
ok = await stream.inject_input(target_node_id, content)
if ok:
return json.dumps({
"status": "delivered",
"node_id": target_node_id,
"content_preview": content[:100],
})
return json.dumps(
{
@@ -45,7 +45,7 @@ def register_worker_monitoring_tools(
registry: ToolRegistry,
event_bus: EventBus,
storage_path: Path,
stream_id: str = "worker_health_judge",
stream_id: str = "judge",
worker_graph_id: str | None = None,
) -> int:
"""Register worker monitoring tools bound to *event_bus* and *storage_path*.
+15 -7
View File
@@ -498,7 +498,7 @@ class AdenTUI(App):
# 2. Storage dirs — under worker's base path but completely owned
# by the judge/queen. Worker never writes here.
judge_dir = storage_path / "graphs" / "worker_health_judge" / "session"
judge_dir = storage_path / "graphs" / "judge" / "session"
judge_dir.mkdir(parents=True, exist_ok=True)
queen_dir = storage_path / "graphs" / "queen" / "session"
queen_dir.mkdir(parents=True, exist_ok=True)
@@ -506,10 +506,18 @@ class AdenTUI(App):
# ---------------------------------------------------------------
# 3. Health judge — background task, fires every 2 minutes.
# ---------------------------------------------------------------
judge_runtime = Runtime(storage_path / "graphs" / "worker_health_judge")
judge_runtime = Runtime(storage_path / "graphs" / "judge")
monitoring_tools = list(monitoring_registry.get_tools().values())
monitoring_executor = monitoring_registry.get_executor()
# Scoped event buses — stamp graph_id on every event so
# downstream routing (queen-primary mode) can distinguish
# queen/judge/worker events.
from framework.runtime.execution_stream import GraphScopedEventBus
judge_event_bus = GraphScopedEventBus(event_bus, "judge")
queen_event_bus = GraphScopedEventBus(event_bus, "queen")
async def _judge_loop():
interval = 120 # seconds
first = True
@@ -523,8 +531,8 @@ class AdenTUI(App):
llm=llm,
tools=monitoring_tools,
tool_executor=monitoring_executor,
event_bus=event_bus,
stream_id="worker_health_judge",
event_bus=judge_event_bus,
stream_id="judge",
storage_path=judge_dir,
loop_config=judge_graph.loop_config,
)
@@ -543,7 +551,7 @@ class AdenTUI(App):
_judge_loop(),
agent_loop,
)
self._judge_graph_id = "worker_health_judge"
self._judge_graph_id = "judge"
# ---------------------------------------------------------------
# 4. Queen — persistent interactive conversation.
@@ -631,7 +639,7 @@ class AdenTUI(App):
llm=llm,
tools=queen_tools,
tool_executor=queen_tool_executor,
event_bus=event_bus,
event_bus=queen_event_bus,
stream_id="queen",
storage_path=queen_dir,
loop_config=queen_graph.loop_config,
@@ -1286,7 +1294,7 @@ class AdenTUI(App):
# The judge runs as a silent background task. Only surface
# escalation ticket events on the status bar; everything else
# (LLM deltas, tool calls, node iterations) goes to logs only.
if event.stream_id == "worker_health_judge":
if event.stream_id == "judge":
if et == EventType.WORKER_ESCALATION_TICKET:
ticket = event.data.get("ticket", {})
severity = ticket.get("severity", "")
+57 -9
View File
@@ -297,6 +297,17 @@ One AgentRuntime (shared EventBus)
isolation_level: isolated
```
### GraphScopedEventBus and Event Identity Fields
Every event carries four identity fields: `(graph_id, stream_id, node_id, execution_id)`.
- **`graph_id`** — Set automatically by `GraphScopedEventBus`, a public subclass of `EventBus` that stamps `graph_id` on every `publish()` call. All three components (worker, judge, queen) use a scoped bus so their events are distinguishable.
- **`stream_id`** — The entry point pipeline. Flows from `EntryPointSpec.id` through `ExecutionStream``GraphExecutor``NodeContext``EventLoopNode`.
- **`node_id`** — The graph node emitting the event.
- **`execution_id`** — UUID for a specific execution run, set by `ExecutionStream` and wired through `GraphExecutor``EventLoopNode` → all `emit_*` calls.
See [EVENT_TYPES.md](../core/framework/runtime/EVENT_TYPES.md) for the complete event type and schema reference.
### New EventBus Event Types
Two new events added to `EventType` enum:
@@ -307,7 +318,7 @@ Emitted by the health judge's `emit_escalation_ticket` tool when the judge detec
```python
EventBus.emit_worker_escalation_ticket(
stream_id: str, # Judge's stream ID (e.g. "worker_health_judge::health_check")
stream_id: str, # Judge's stream ID (e.g. "judge")
node_id: str, # "judge"
ticket: dict, # Full EscalationTicket (see data model below)
execution_id: str | None = None,
@@ -351,8 +362,8 @@ EventBus.emit_queen_intervention_requested(
ticket_id: str, # References the original EscalationTicket
analysis: str, # Queen's 2-3 sentence analysis
severity: str, # "low" | "medium" | "high" | "critical"
queen_graph_id: str, # "hive_coder_queen"
queen_stream_id: str, # "hive_coder_queen::ticket_receiver"
queen_graph_id: str, # "queen"
queen_stream_id: str, # "queen"
execution_id: str | None = None,
)
```
@@ -364,8 +375,8 @@ EventBus.emit_queen_intervention_requested(
"ticket_id": "uuid",
"analysis": "Worker is stuck in a rate-limit retry loop for 6+ minutes. Suggest pausing and retrying with backoff.",
"severity": "high",
"queen_graph_id": "hive_coder_queen",
"queen_stream_id": "hive_coder_queen::ticket_receiver"
"queen_graph_id": "queen",
"queen_stream_id": "queen"
}
```
@@ -417,6 +428,15 @@ The following existing methods gained a `graph_id` parameter to support multi-gr
|---|---|---|
| `get_active_graph()` | `-> GraphSpec` | Returns the `GraphSpec` for the currently active graph (used by TUI/chat routing) |
| `active_graph_id` (property) | `str` (get/set) | The graph that receives user input. Set by TUI when switching between worker and queen views |
| `get_active_streams()` | `-> list[dict]` | Returns metadata for every stream with active executions across all graphs. Each dict contains `graph_id`, `stream_id`, `entry_point_id`, `active_execution_ids`, `is_awaiting_input`, `waiting_nodes`. |
| `get_waiting_nodes()` | `-> list[dict]` | Flat list of all nodes currently blocked waiting for client input across all graphs/streams. Each dict contains `graph_id`, `stream_id`, `node_id`, `execution_id`. |
### New ExecutionStream APIs
| Method | Signature | Description |
|---|---|---|
| `get_waiting_nodes()` | `-> list[dict]` | Returns `[{"node_id": str, "execution_id": str}]` for every `EventLoopNode` with `_awaiting_input == True`. |
| `get_injectable_nodes()` | `-> list[dict]` | Returns `[{"node_id": str, "execution_id": str}]` for every node that supports message injection (has `inject_event` method). |
### Proposed HTTP Endpoints
@@ -433,6 +453,13 @@ These endpoints are not yet implemented. They expose the new multi-graph and mon
| `PUT /api/agents/{id}/active-graph` | Switch active graph | `runtime.active_graph_id = graph_id` |
| `GET /api/agents/{id}/active-graph` | Get active graph | `runtime.active_graph_id` |
#### Stream Introspection
| HTTP Endpoint | Method | Runtime Primitive |
|---|---|---|
| `GET /api/agents/{id}/streams` | Active streams | `runtime.get_active_streams()` — all streams with active executions |
| `GET /api/agents/{id}/waiting-nodes` | Waiting nodes | `runtime.get_waiting_nodes()` — all nodes blocked on client input |
#### Worker Health Monitoring
| HTTP Endpoint | Method | Runtime Primitive |
@@ -476,7 +503,7 @@ Secondary graphs have fully isolated storage under `graphs/{graph_id}/` to preve
| +-- conversations/
| +-- logs/
+-- graphs/
| +-- worker_health_judge/ # Health judge (secondary)
| +-- judge/ # Health judge (secondary)
| | +-- sessions/
| | | +-- session_YYYYMMDD_HHMMSS_{uuid}/ # ONE persistent session
| | | +-- state.json
@@ -485,7 +512,7 @@ Secondary graphs have fully isolated storage under `graphs/{graph_id}/` to preve
| | | +-- tool_logs.jsonl
| | | +-- details.jsonl
| | +-- runtime_logs/
| +-- hive_coder_queen/ # Queen triage (secondary)
| +-- queen/ # Queen triage (secondary)
| +-- sessions/
| | +-- session_YYYYMMDD_HHMMSS_{uuid}/ # ONE persistent session
| | +-- state.json
@@ -498,7 +525,7 @@ Secondary graphs have fully isolated storage under `graphs/{graph_id}/` to preve
Each secondary graph gets its own `SessionStore` and `RuntimeLogStore` scoped to `graphs/{graph_id}/`. This is set up in `AgentRuntime.add_graph()`:
```python
graph_base = self._session_store.base_path / subpath # e.g. .../graphs/worker_health_judge
graph_base = self._session_store.base_path / subpath # e.g. .../graphs/judge
graph_session_store = SessionStore(graph_base)
graph_log_store = RuntimeLogStore(graph_base / "runtime_logs")
```
@@ -513,14 +540,27 @@ Three tools registered via `register_worker_monitoring_tools(registry, event_bus
| `emit_escalation_ticket(ticket_json)` | Health Judge | Validates JSON against `EscalationTicket` schema (Pydantic rejects partial tickets), then calls `EventBus.emit_worker_escalation_ticket()`. |
| `notify_operator(ticket_id, analysis, urgency)` | Queen | Calls `EventBus.emit_queen_intervention_requested()` so the TUI/frontend surfaces a notification. |
### Queen Lifecycle Tools
Four tools registered via `register_queen_lifecycle_tools(registry, worker_runtime, event_bus)`. These close over the worker's `AgentRuntime` to give the Queen control over the worker agent's lifecycle.
| Tool | Description |
|---|---|
| `start_worker(task)` | Trigger the worker's default entry point with a task description. Returns an `execution_id`. |
| `stop_worker()` | Cancel all active worker executions. Returns IDs of cancelled executions. |
| `get_worker_status()` | Check if the worker is idle, running, or waiting for input. Returns execution details and waiting node ID if applicable. Uses `stream.get_waiting_nodes()` for accurate detection. |
| `inject_worker_message(content)` | Send a message to the running worker agent by finding an injectable node via `stream.get_injectable_nodes()` and calling `stream.inject_input()`. |
### New File Reference
| Component | Path |
|---|---|
| EscalationTicket model | `core/framework/runtime/escalation_ticket.py` |
| Worker Health Judge graph | `core/framework/monitoring/worker_health_judge.py` |
| Worker Health Judge graph | `core/framework/monitoring/judge.py` |
| Worker monitoring tools | `core/framework/tools/worker_monitoring_tools.py` |
| Queen lifecycle tools | `core/framework/tools/queen_lifecycle_tools.py` |
| Monitoring package init | `core/framework/monitoring/__init__.py` |
| Event types reference | `core/framework/runtime/EVENT_TYPES.md` |
---
@@ -544,3 +584,11 @@ Three tools registered via `register_worker_monitoring_tools(registry, event_bus
| SessionStore | `core/framework/storage/session_store.py` |
| CheckpointStore | `core/framework/storage/checkpoint_store.py` |
| Runtime logger | `core/framework/runtime/core.py` |
| EventBus | `core/framework/runtime/event_bus.py` |
| ExecutionStream | `core/framework/runtime/execution_stream.py` |
| GraphScopedEventBus | `core/framework/runtime/execution_stream.py` |
| EscalationTicket | `core/framework/runtime/escalation_ticket.py` |
| Queen lifecycle tools | `core/framework/tools/queen_lifecycle_tools.py` |
| Worker monitoring tools | `core/framework/tools/worker_monitoring_tools.py` |
| Health Judge graph | `core/framework/monitoring/judge.py` |
| Event types reference | `core/framework/runtime/EVENT_TYPES.md` |