Compare commits

...

1 Commits

Author SHA1 Message Date
Timothy 2268c4964b feat(architecture): super judge that escalates strategically 2026-02-22 19:58:59 -08:00
10 changed files with 1277 additions and 6 deletions
+8 -5
View File
@@ -12,7 +12,8 @@ from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import coder_node
from .nodes import coder_node, ticket_triage_node
from .ticket_receiver import TICKET_RECEIVER_ENTRY_POINT
# Goal definition
goal = Goal(
@@ -90,8 +91,9 @@ goal = Goal(
],
)
# Nodes — single coder node (guardian is now auto-attached by the framework)
nodes = [coder_node]
# Nodes: primary coder node + ticket_triage for queen role when loaded as
# a secondary graph alongside a worker.
nodes = [coder_node, ticket_triage_node]
# No edges needed — single forever-alive event_loop node
edges = []
@@ -102,8 +104,9 @@ entry_points = {"start": "coder"}
pause_nodes = []
terminal_nodes = [] # Forever-alive: loops until user exits
# No async entry points — guardian is now auto-attached via attach_guardian()
async_entry_points = []
# Async entry points: queen receives escalation tickets from the health judge
# when loaded as a secondary graph alongside a worker runtime.
async_entry_points = [TICKET_RECEIVER_ENTRY_POINT]
# Module-level variables read by AgentRunner.load()
conversation_mode = "continuous"
@@ -553,4 +553,113 @@ forbidden action attempted, output format invalid).
)
__all__ = ["coder_node", "guardian_node", "ALL_GUARDIAN_TOOLS"]
# ---------------------------------------------------------------------------
# Ticket Triage Node (Queen's secondary role)
# ---------------------------------------------------------------------------
# Tools available to the queen during triage — only notify_operator needed.
# The queen does NOT need file I/O or shell tools for triage; those are for
# interactive fix sessions (deferred to a future phase).
ALL_QUEEN_TRIAGE_TOOLS = ["notify_operator"]
ticket_triage_node = NodeSpec(
id="ticket_triage",
name="Ticket Triage",
description=(
"Queen's ticket triage node. Fires when the Worker Health Judge emits "
"an EscalationTicket. Reads the ticket, deliberates on whether to dismiss "
"or notify the human operator, and calls notify_operator if warranted."
),
node_type="event_loop",
client_facing=True, # Operator can connect and chat with queen once notified
max_node_visits=0,
input_keys=["ticket"],
output_keys=["intervention_decision"],
nullable_output_keys=["intervention_decision"],
tools=ALL_QUEEN_TRIAGE_TOOLS,
success_criteria=(
"A clear, documented decision is reached: either 'dismissed: <reason>' "
"explaining why the ticket doesn't warrant operator involvement, or the "
"operator has been notified via notify_operator with a specific, actionable "
"analysis of the issue."
),
system_prompt="""\
You are the Queen Hive Coder. The Worker Health Judge has escalated an issue \
to you via a structured EscalationTicket. Your job: read it carefully and decide \
whether to notify the human operator or dismiss it.
# Reading the Ticket
The ticket is provided in your context under the memory key "ticket". It contains:
- severity: low|medium|high|critical
- cause: what the judge observed (concrete, specific)
- judge_reasoning: why the judge decided to escalate
- suggested_action: judge's recommendation
- recent_verdicts: last N judge verdicts (ACCEPT/RETRY/CONTINUE/ESCALATE)
- steps_since_last_accept: consecutive non-ACCEPT steps
- stall_minutes: wall-clock since last log step (null if worker is active)
- evidence_snippet: recent LLM output from the worker
# Dismiss Criteria (do NOT call notify_operator)
Dismiss when the ticket does NOT require human involvement:
- severity is "low" AND steps_since_last_accept < 8
- The cause describes a clearly transient issue (single API timeout, brief network
hiccup) and evidence shows the worker is otherwise progressing
- The evidence_snippet shows the agent is actively reasoning about a hard problem
(complex tool use, exploring solutions) just taking time, not stuck
- A single anomalous check that contradicts prior healthy behavior
Dismiss with a specific reason, not a vague "looks fine."
# Intervene Criteria (call notify_operator)
Notify the operator when human judgment or action is likely needed:
- severity is "high" or "critical"
- steps_since_last_accept >= 10 with no sign of recovery in evidence
- stall_minutes >= 4 (worker has produced no new iterations in 4+ minutes)
- evidence_snippet shows a clear doom loop (same error repeated, same tool,
no variation in approach, no new information being processed)
- Cause suggests a configuration error, missing credential, or logic bug
that the agent cannot self-correct
# How to Notify
When intervening, call:
notify_operator(
ticket_id=<ticket["ticket_id"]>,
analysis="<2-3 sentences: what is wrong, why it matters, what to do>",
urgency="<low|medium|high|critical>"
)
Your analysis should be specific and actionable. Example:
"The worker agent has produced 18 consecutive RETRY verdicts with identical
tool errors in get_calendar_events. The OAuth token appears expired based
on the 401 errors in evidence. Recommend: check calendar API credentials
or restart with fresh auth token."
# After Deciding
Always set_output at the end:
set_output("intervention_decision", "dismissed: <reason>")
OR
set_output("intervention_decision", "escalated: <one-line summary>")
# Rules
- Be concise. You are a second quality gate, not a rubber stamp.
- One false dismissal is less bad than two false alarms.
- But do not dismiss high/critical severity tickets without strong reason.
- Read the ticket completely before deciding. Do not skim.
- You are the last automated line before the human is disturbed. Take it seriously.
""",
)
__all__ = [
"coder_node",
"guardian_node",
"ticket_triage_node",
"ALL_GUARDIAN_TOOLS",
"ALL_QUEEN_TRIAGE_TOOLS",
]
@@ -0,0 +1,27 @@
"""Queen's ticket receiver entry point.
When the Worker Health Judge emits a WORKER_ESCALATION_TICKET event on the
shared EventBus, this entry point fires and routes to the ``ticket_triage``
node, where the Queen deliberates and decides whether to notify the operator.
Isolation level is ``isolated`` the queen's triage memory is kept separate
from the worker's shared memory. Each ticket triage runs in its own context.
"""
from __future__ import annotations
from framework.graph.edge import AsyncEntryPointSpec
TICKET_RECEIVER_ENTRY_POINT = AsyncEntryPointSpec(
id="ticket_receiver",
name="Worker Escalation Ticket Receiver",
entry_node="ticket_triage",
trigger_type="event",
trigger_config={
"event_types": ["worker_escalation_ticket"],
# Do not fire on our own graph's events (prevents loops if queen
# somehow emits a worker_escalation_ticket for herself)
"exclude_own_graph": True,
},
isolation_level="isolated",
)
+24
View File
@@ -0,0 +1,24 @@
"""Framework-level monitoring components.
Provides reusable monitoring graphs that can be attached to any worker
agent runtime via ``runtime.add_graph()``.
Currently included:
- ``worker_health_judge`` timer-driven health monitor that reads worker
session logs, detects degradation patterns, and emits structured
EscalationTickets to the shared EventBus.
"""
from framework.monitoring.worker_health_judge import (
HEALTH_JUDGE_ENTRY_POINT,
judge_goal,
judge_graph,
judge_node,
)
__all__ = [
"HEALTH_JUDGE_ENTRY_POINT",
"judge_goal",
"judge_graph",
"judge_node",
]
@@ -0,0 +1,267 @@
"""Worker Health Judge — framework-level reusable monitoring graph.
Attaches to any worker agent runtime as a secondary graph. Fires on a
2-minute timer, reads the worker's session logs via ``get_worker_health_summary``,
accumulates observations in a continuous conversation context, and emits a
structured ``EscalationTicket`` when it detects a degradation pattern.
Usage::
from framework.monitoring import judge_graph, judge_goal, HEALTH_JUDGE_ENTRY_POINT
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
# Register tools bound to the worker runtime's event bus
monitoring_registry = ToolRegistry()
register_worker_monitoring_tools(
monitoring_registry, worker_runtime._event_bus, storage_path
)
monitoring_tools = list(monitoring_registry.get_tools().values())
monitoring_executor = monitoring_registry.get_executor()
# Load judge as secondary graph on the worker runtime
await worker_runtime.add_graph(
graph_id="worker_health_judge",
graph=judge_graph,
goal=judge_goal,
entry_points=[HEALTH_JUDGE_ENTRY_POINT],
tools=monitoring_tools,
executor=monitoring_executor,
storage_subpath="graphs/worker_health_judge",
)
Design:
- ``isolation_level="isolated"`` the judge has its own memory, not
polluting the worker's shared memory namespace.
- ``conversation_mode="continuous"`` the judge's conversation carries
across timer ticks. The conversation IS the judge's memory. It tracks
trends by referring to its own prior messages ("Last check I saw 47
steps; now 52; 5 new steps, 3 RETRY").
- No shared memory keys. No external state files.
"""
from __future__ import annotations
from framework.graph import Constraint, Goal, NodeSpec, SuccessCriterion
from framework.graph.edge import AsyncEntryPointSpec, GraphSpec
# ---------------------------------------------------------------------------
# Goal
# ---------------------------------------------------------------------------
judge_goal = Goal(
id="worker-health-monitor",
name="Worker Health Monitor",
description=(
"Periodically assess the health of the worker agent by reading its "
"execution logs. Detect degradation patterns (excessive retries, "
"stalls, doom loops) and emit structured EscalationTickets when the "
"worker needs attention."
),
success_criteria=[
SuccessCriterion(
id="accurate-detection",
description="Only escalates genuine degradation, not normal retry cycles",
metric="false_positive_rate",
target="low",
weight=0.5,
),
SuccessCriterion(
id="timely-detection",
description="Detects genuine stalls within 2 timer ticks (≤4 minutes)",
metric="detection_latency_minutes",
target="<=4",
weight=0.5,
),
],
constraints=[
Constraint(
id="conservative-escalation",
description=(
"Do not escalate on a single bad verdict or a brief stall. "
"Require clear patterns (10+ consecutive bad verdicts or 4+ minute stall) "
"before creating a ticket."
),
constraint_type="hard",
category="quality",
),
Constraint(
id="complete-ticket",
description=(
"Every EscalationTicket must have all required fields filled. "
"Do not emit partial or placeholder tickets."
),
constraint_type="hard",
category="correctness",
),
],
)
# ---------------------------------------------------------------------------
# Node
# ---------------------------------------------------------------------------
judge_node = NodeSpec(
id="judge",
name="Worker Health Judge",
description=(
"Autonomous health monitor for worker agents. Reads execution logs "
"on each timer tick, compares to prior observations (via conversation "
"history), and emits a structured EscalationTicket when a genuine "
"degradation pattern is detected."
),
node_type="event_loop",
client_facing=False, # Autonomous monitor, not interactive
max_node_visits=0, # Unbounded — runs on every timer tick
input_keys=[],
output_keys=["health_verdict"],
nullable_output_keys=["health_verdict"],
success_criteria=(
"A clear health verdict is produced each check: either 'healthy' with "
"a brief observation, or a complete EscalationTicket is emitted via "
"emit_escalation_ticket and health_verdict describes the issue."
),
tools=[
"get_worker_health_summary",
"emit_escalation_ticket",
],
system_prompt="""\
You are the Worker Health Judge. You run every 2 minutes alongside a worker \
agent to monitor its execution health.
# Your Role
You observe the worker's iteration patterns over time and escalate only when \
you see genuine degradation not normal retry cycles. Your conversation history \
IS your memory. On each check, refer to your previous observations to track trends.
# Required Context
At the start of your first check, your context will include:
- worker_agent_id: the agent ID you are watching
- worker_session_id: the active session to monitor
- worker_node_id: the node currently executing (usually the primary node)
- worker_graph_id: the graph ID
These are provided in your initial system context. Use them when calling tools \
and building tickets.
# Check Procedure
On each timer tick (every 2 minutes):
## Step 1: Read health snapshot
Call get_worker_health_summary(session_id=<worker_session_id>) to get:
- total_steps: how many log steps have been recorded
- recent_verdicts: list of recent ACCEPT/RETRY/CONTINUE verdicts
- steps_since_last_accept: consecutive non-ACCEPT steps
- stall_minutes: wall-clock since last step (null if active)
- evidence_snippet: recent LLM output
## Step 2: Compare to prior check
Look at your conversation history. What was total_steps last time?
- If total_steps is UNCHANGED from prior check AND prior check was also unchanged:
STALL confirmed (worker has produced no new iterations in 4+ minutes).
Escalate with severity="high" or "critical" depending on stall duration.
- If total_steps increased: worker is making progress. Examine verdicts.
## Step 3: Analyze verdict pattern
- Healthy: Mix of ACCEPT and RETRY, steps_since_last_accept < 5. No action.
- Warning: steps_since_last_accept is 5-9. Note it, no escalation yet.
- Degraded: steps_since_last_accept >= 10. Examine evidence_snippet.
- If evidence shows the agent is making real progress (complex reasoning,
exploring solutions, productive tool use): may be a hard problem. Note it.
- If evidence shows a loop (same error, same tool call, no new information):
Escalate with severity="medium" or "high".
- Critical: steps_since_last_accept >= 20, OR stall_minutes >= 4.
Escalate with severity="critical".
## Step 4: Decide
### If healthy:
set_output("health_verdict", "healthy: <brief observation>")
Done.
### If escalating:
Build an EscalationTicket JSON string with ALL required fields:
{
"worker_agent_id": "<from context>",
"worker_session_id": "<from context>",
"worker_node_id": "<from context>",
"worker_graph_id": "<from context>",
"severity": "<low|medium|high|critical>",
"cause": "<what you observed — concrete, specific>",
"judge_reasoning": "<why you decided to escalate, not just dismiss>",
"suggested_action": "<what you recommend: restart, human review, etc.>",
"recent_verdicts": [<list from get_worker_health_summary>],
"total_steps_checked": <int>,
"steps_since_last_accept": <int>,
"stall_minutes": <float or null>,
"evidence_snippet": "<from get_worker_health_summary>"
}
Call: emit_escalation_ticket(ticket_json=<the JSON string above>)
Then: set_output("health_verdict", "escalated: <one-line summary>")
# Severity Guide
- low: Mild concern, worth noting. 5-9 consecutive bad verdicts.
- medium: Clear degradation pattern. 10-15 bad verdicts or brief stall (1-2 min).
- high: Serious issue. 15+ bad verdicts or stall 2-4 minutes or clear doom loop.
- critical: Worker is definitively stuck. 20+ bad verdicts or stall > 4 minutes.
# Conservative Bias
You MUST resist the urge to escalate prematurely. Worker agents naturally retry.
A node may legitimately need 5-8 retries before succeeding. Do not escalate unless:
1. The pattern is clear and sustained across your observation window, AND
2. The evidence shows no genuine progress
One missed escalation is less costly than two false alarms. The Queen will filter \
further. But do not be passive genuine stalls and doom loops must be caught.
# Rules
- Never escalate on the FIRST check unless stall_minutes > 4
- Always call get_worker_health_summary FIRST before deciding anything
- All ticket fields are REQUIRED do not submit partial tickets
- After any emit_escalation_ticket call, always set_output to complete the check
""",
)
# ---------------------------------------------------------------------------
# Entry Point
# ---------------------------------------------------------------------------
HEALTH_JUDGE_ENTRY_POINT = AsyncEntryPointSpec(
id="health_check",
name="Worker Health Check",
entry_node="judge",
trigger_type="timer",
trigger_config={
"interval_minutes": 2,
"run_immediately": False, # Give worker time to start producing logs
},
isolation_level="isolated", # Own memory namespace, not polluting worker's
)
# ---------------------------------------------------------------------------
# Graph
# ---------------------------------------------------------------------------
judge_graph = GraphSpec(
id="worker-health-judge-graph",
goal_id=judge_goal.id,
version="1.0.0",
entry_node="judge",
entry_points={"health_check": "judge"},
terminal_nodes=[], # Forever-alive: fires on every timer tick
pause_nodes=[],
nodes=[judge_node],
edges=[],
conversation_mode="continuous", # Conversation persists across timer ticks
async_entry_points=[HEALTH_JUDGE_ENTRY_POINT],
loop_config={
"max_iterations": 10, # One check shouldn't take many turns
"max_tool_calls_per_turn": 3, # get_summary + optionally emit_ticket
"max_history_tokens": 16000, # Compact — judge only needs recent context
},
)
@@ -0,0 +1,83 @@
"""Structured escalation ticket emitted by the Worker Health Judge.
The ticket is the required artifact that the judge must fill out before
escalating an issue to the Queen. Requiring this structured form prevents
impulsive escalations the judge must articulate the cause, evidence,
severity, and a suggested action before anything is emitted.
"""
from __future__ import annotations
from datetime import UTC, datetime
from typing import Literal
from uuid import uuid4
from pydantic import BaseModel, Field
class EscalationTicket(BaseModel):
"""A structured escalation report from the Worker Health Judge.
The judge fills this out when it observes a degradation pattern in
the worker agent's execution. The Queen receives this ticket and
decides whether to notify the human operator.
"""
# Identity
ticket_id: str = Field(default_factory=lambda: str(uuid4()))
created_at: str = Field(default_factory=lambda: datetime.now(UTC).isoformat())
# Worker identification
worker_agent_id: str = Field(description="Agent ID of the monitored worker")
worker_session_id: str = Field(description="Active session ID being monitored")
worker_node_id: str = Field(description="Node ID where the issue is occurring")
worker_graph_id: str = Field(description="Graph ID of the worker")
# Problem characterization — filled by judge via LLM reasoning
severity: Literal["low", "medium", "high", "critical"] = Field(
description="Severity of the detected issue"
)
cause: str = Field(
description=(
"Human-readable description of what the judge observed. "
"Example: 'Node has produced 18 consecutive RETRY verdicts with no progress.'"
)
)
judge_reasoning: str = Field(
description="The judge's deliberation chain: why it decided to escalate vs dismiss."
)
suggested_action: str = Field(
description=(
"What the judge recommends. Examples: "
"'Restart the node', 'Check API credentials', "
"'Review system prompt for logic errors', 'Kill session'."
)
)
# Evidence — quantitative signals the Queen can evaluate
recent_verdicts: list[str] = Field(
default_factory=list,
description="Last N judge verdicts (ACCEPT/RETRY/CONTINUE/ESCALATE) in order.",
)
total_steps_checked: int = Field(
default=0,
description="Total number of log steps the judge examined in this check.",
)
steps_since_last_accept: int = Field(
default=0,
description="How many consecutive steps have passed without an ACCEPT verdict.",
)
stall_minutes: float | None = Field(
default=None,
description=(
"Wall-clock minutes since the last new log step was written. "
"None if the worker is actively producing steps."
),
)
evidence_snippet: str = Field(
default="",
description=(
"Brief excerpt from the worker's most recent LLM output or error message. "
"Used by the Queen to assess context without reading the full log."
),
)
+59
View File
@@ -86,6 +86,10 @@ class EventType(StrEnum):
# Escalation (agent requests handoff to hive_coder)
ESCALATION_REQUESTED = "escalation_requested"
# Worker health monitoring (judge → queen → operator pipeline)
WORKER_ESCALATION_TICKET = "worker_escalation_ticket"
QUEEN_INTERVENTION_REQUESTED = "queen_intervention_requested"
@dataclass
class AgentEvent:
@@ -852,6 +856,61 @@ class EventBus:
)
)
async def emit_worker_escalation_ticket(
self,
stream_id: str,
node_id: str,
ticket: dict,
execution_id: str | None = None,
) -> None:
"""Emit a structured escalation ticket from the Worker Health Judge.
The ticket dict should be a serialized EscalationTicket. The Queen's
event-driven entry point fires on this event to triage the ticket.
"""
await self.publish(
AgentEvent(
type=EventType.WORKER_ESCALATION_TICKET,
stream_id=stream_id,
node_id=node_id,
execution_id=execution_id,
data={"ticket": ticket},
)
)
async def emit_queen_intervention_requested(
self,
stream_id: str,
node_id: str,
ticket_id: str,
analysis: str,
severity: str,
queen_graph_id: str,
queen_stream_id: str,
execution_id: str | None = None,
) -> None:
"""Emit a human operator notification from the Queen.
The TUI subscribes to this event to surface a non-disruptive overlay.
queen_graph_id and queen_stream_id allow the TUI to route the operator
directly into the Queen's active session for a follow-up conversation.
"""
await self.publish(
AgentEvent(
type=EventType.QUEEN_INTERVENTION_REQUESTED,
stream_id=stream_id,
node_id=node_id,
execution_id=execution_id,
data={
"ticket_id": ticket_id,
"analysis": analysis,
"severity": severity,
"queen_graph_id": queen_graph_id,
"queen_stream_id": queen_stream_id,
},
)
)
# === QUERY OPERATIONS ===
def get_history(
@@ -0,0 +1,357 @@
"""Worker monitoring tools for the Health Judge and Queen triage agents.
Three tools are registered by ``register_worker_monitoring_tools()``:
- ``get_worker_health_summary`` reads the worker's session log files and
returns a compact health snapshot (recent verdicts, step count, timing).
Used by the Health Judge on every timer tick.
- ``emit_escalation_ticket`` validates and publishes an EscalationTicket
to the shared EventBus as a WORKER_ESCALATION_TICKET event.
Used by the Health Judge when it decides to escalate.
- ``notify_operator`` emits a QUEEN_INTERVENTION_REQUESTED event so the TUI
can surface a non-disruptive operator notification.
Used by the Queen's ticket_triage_node when it decides to intervene.
Usage::
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
register_worker_monitoring_tools(tool_registry, event_bus, storage_path)
"""
from __future__ import annotations
import json
import logging
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.event_bus import EventBus
logger = logging.getLogger(__name__)
# How many tool_log steps to include in the health summary
_DEFAULT_LAST_N_STEPS = 40
def register_worker_monitoring_tools(
registry: "ToolRegistry",
event_bus: "EventBus",
storage_path: Path,
stream_id: str = "worker_health_judge",
) -> int:
"""Register worker monitoring tools bound to *event_bus* and *storage_path*.
Args:
registry: ToolRegistry to register tools on.
event_bus: The shared EventBus for the worker runtime.
storage_path: Root storage path of the worker runtime
(e.g. ``~/.hive/agents/{name}``).
stream_id: Stream ID used when emitting events; defaults to judge's stream.
Returns:
Number of tools registered.
"""
from framework.llm.provider import Tool
tools_registered = 0
# -------------------------------------------------------------------------
# get_worker_health_summary
# -------------------------------------------------------------------------
async def get_worker_health_summary(
session_id: str,
last_n_steps: int = _DEFAULT_LAST_N_STEPS,
) -> str:
"""Read the worker's execution logs and return a compact health snapshot.
Returns a JSON object with:
- session_status: "running"|"completed"|"failed"|"in_progress"|"unknown"
- total_steps: total number of log steps recorded so far
- recent_verdicts: list of last N verdict strings (ACCEPT/RETRY/CONTINUE/ESCALATE)
- steps_since_last_accept: consecutive non-ACCEPT steps from the end
- last_step_time_iso: ISO timestamp of the most recent step (or null)
- stall_minutes: wall-clock minutes since last step (null if < 1 min)
- evidence_snippet: last LLM text from the most recent step (truncated)
- session_id: echoed back for reference
"""
# Resolve log paths
session_dir = storage_path / "sessions" / session_id
tool_logs_path = session_dir / "logs" / "tool_logs.jsonl"
state_path = session_dir / "state.json"
# Read session status
session_status = "unknown"
if state_path.exists():
try:
state = json.loads(state_path.read_text(encoding="utf-8"))
session_status = state.get("status", "unknown")
except Exception:
pass
# Read tool logs
steps: list[dict] = []
if tool_logs_path.exists():
try:
with open(tool_logs_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
steps.append(json.loads(line))
except json.JSONDecodeError:
continue
except OSError as e:
return json.dumps({"error": f"Could not read tool logs: {e}"})
total_steps = len(steps)
recent = steps[-last_n_steps:] if len(steps) > last_n_steps else steps
# Extract verdict sequence
recent_verdicts = [s.get("verdict", "") for s in recent if s.get("verdict")]
# Count consecutive non-ACCEPT from the end
steps_since_last_accept = 0
for v in reversed(recent_verdicts):
if v == "ACCEPT":
break
steps_since_last_accept += 1
# Timing: last step timestamp
last_step_time_iso: str | None = None
stall_minutes: float | None = None
if steps:
# tool_log steps don't have timestamps; use file mtime as proxy
try:
mtime = tool_logs_path.stat().st_mtime
last_step_time_iso = datetime.fromtimestamp(mtime, UTC).isoformat()
elapsed = (datetime.now(UTC).timestamp() - mtime) / 60
stall_minutes = round(elapsed, 1) if elapsed >= 1.0 else None
except OSError:
pass
# Evidence snippet: last LLM text
evidence_snippet = ""
for step in reversed(recent):
text = step.get("llm_text", "")
if text:
evidence_snippet = text[:500]
break
return json.dumps(
{
"session_id": session_id,
"session_status": session_status,
"total_steps": total_steps,
"recent_verdicts": recent_verdicts,
"steps_since_last_accept": steps_since_last_accept,
"last_step_time_iso": last_step_time_iso,
"stall_minutes": stall_minutes,
"evidence_snippet": evidence_snippet,
},
ensure_ascii=False,
)
_health_summary_tool = Tool(
name="get_worker_health_summary",
description=(
"Read the worker agent's execution logs and return a compact health snapshot. "
"Returns recent judge verdicts, step count, time since last step, and "
"a snippet of the most recent LLM output. "
"Use this on every health check to observe trends."
),
parameters={
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "The worker's active session ID (e.g. 'session_20250101T120000_abc123')",
},
"last_n_steps": {
"type": "integer",
"description": f"How many recent log steps to include (default {_DEFAULT_LAST_N_STEPS})",
},
},
"required": ["session_id"],
},
)
registry.register(
"get_worker_health_summary",
_health_summary_tool,
lambda inputs: get_worker_health_summary(**inputs),
)
tools_registered += 1
# -------------------------------------------------------------------------
# emit_escalation_ticket
# -------------------------------------------------------------------------
async def emit_escalation_ticket(ticket_json: str) -> str:
"""Validate and publish an EscalationTicket to the shared EventBus.
ticket_json must be a JSON string containing all required EscalationTicket
fields. The ticket is validated before publishing this ensures the judge
has genuinely filled out all required evidence fields.
Returns a confirmation JSON with the ticket_id on success, or an error.
"""
from framework.runtime.escalation_ticket import EscalationTicket
try:
raw = json.loads(ticket_json) if isinstance(ticket_json, str) else ticket_json
ticket = EscalationTicket(**raw)
except Exception as e:
return json.dumps({"error": f"Invalid ticket: {e}"})
try:
await event_bus.emit_worker_escalation_ticket(
stream_id=stream_id,
node_id="judge",
ticket=ticket.model_dump(),
)
logger.info(
"EscalationTicket emitted: ticket_id=%s severity=%s cause=%r",
ticket.ticket_id,
ticket.severity,
ticket.cause[:80],
)
return json.dumps(
{
"status": "emitted",
"ticket_id": ticket.ticket_id,
"severity": ticket.severity,
}
)
except Exception as e:
return json.dumps({"error": f"Failed to emit ticket: {e}"})
_emit_ticket_tool = Tool(
name="emit_escalation_ticket",
description=(
"Validate and publish a structured EscalationTicket to the shared EventBus. "
"The Queen's ticket_receiver entry point will fire and triage the ticket. "
"ticket_json must be a JSON string with all required EscalationTicket fields: "
"worker_agent_id, worker_session_id, worker_node_id, worker_graph_id, "
"severity (low/medium/high/critical), cause, judge_reasoning, suggested_action, "
"recent_verdicts (list), total_steps_checked, steps_since_last_accept, "
"stall_minutes (float or null), evidence_snippet."
),
parameters={
"type": "object",
"properties": {
"ticket_json": {
"type": "string",
"description": "JSON string of the complete EscalationTicket",
},
},
"required": ["ticket_json"],
},
)
registry.register(
"emit_escalation_ticket",
_emit_ticket_tool,
lambda inputs: emit_escalation_ticket(**inputs),
)
tools_registered += 1
# -------------------------------------------------------------------------
# notify_operator
# -------------------------------------------------------------------------
async def notify_operator(
ticket_id: str,
analysis: str,
urgency: str,
) -> str:
"""Emit a QUEEN_INTERVENTION_REQUESTED event to notify the human operator.
The TUI subscribes to this event and surfaces a non-disruptive dismissable
overlay. The worker agent is NOT paused. The operator can choose to open
a split-pane conversation with the Queen about the issue.
Args:
ticket_id: The ticket_id from the original EscalationTicket.
analysis: 2-3 sentence description of what is wrong, why it matters,
and what action is suggested.
urgency: Severity level: "low", "medium", "high", or "critical".
Returns:
Confirmation JSON.
"""
valid_urgencies = {"low", "medium", "high", "critical"}
if urgency not in valid_urgencies:
return json.dumps(
{"error": f"urgency must be one of {sorted(valid_urgencies)}, got {urgency!r}"}
)
try:
await event_bus.emit_queen_intervention_requested(
stream_id=stream_id,
node_id="ticket_triage",
ticket_id=ticket_id,
analysis=analysis,
severity=urgency,
queen_graph_id="hive_coder_queen",
queen_stream_id=f"hive_coder_queen::ticket_receiver",
)
logger.info(
"Queen intervention requested: ticket_id=%s urgency=%s",
ticket_id,
urgency,
)
return json.dumps(
{
"status": "operator_notified",
"ticket_id": ticket_id,
"urgency": urgency,
}
)
except Exception as e:
return json.dumps({"error": f"Failed to notify operator: {e}"})
_notify_tool = Tool(
name="notify_operator",
description=(
"Notify the human operator that a worker agent needs attention. "
"This emits a QUEEN_INTERVENTION_REQUESTED event that the TUI surfaces "
"as a non-disruptive overlay. The worker keeps running. "
"Only call this when you (the Queen) have decided the issue warrants "
"human attention after reading the escalation ticket."
),
parameters={
"type": "object",
"properties": {
"ticket_id": {
"type": "string",
"description": "The ticket_id from the EscalationTicket being triaged",
},
"analysis": {
"type": "string",
"description": (
"2-3 sentence analysis: what is wrong, why it matters, "
"and what action you suggest."
),
},
"urgency": {
"type": "string",
"enum": ["low", "medium", "high", "critical"],
"description": "Severity level for the operator notification",
},
},
"required": ["ticket_id", "analysis", "urgency"],
},
)
registry.register(
"notify_operator",
_notify_tool,
lambda inputs: notify_operator(**inputs),
)
tools_registered += 1
return tools_registered
+172
View File
@@ -215,6 +215,7 @@ class AdenTUI(App):
Binding("ctrl+a", "show_agent_picker", "Agents", show=True, priority=True),
Binding("ctrl+e", "escalate_to_coder", "Coder", show=True, priority=True),
Binding("ctrl+e", "return_from_coder", "← Back", show=True, priority=True),
Binding("ctrl+q", "connect_to_queen", "Queen", show=True, priority=True),
Binding("tab", "focus_next", "Next Panel", show=True),
Binding("shift+tab", "focus_previous", "Previous Panel", show=False),
]
@@ -239,6 +240,10 @@ class AdenTUI(App):
# Escalation stack: stores worker state when coder is in foreground
self._escalation_stack: list[dict] = []
# Health judge + queen monitoring graphs (loaded alongside worker agents)
self._queen_graph_id: str | None = None
self._judge_graph_id: str | None = None
# Widgets are created lazily when runtime is available
self.graph_view = None
self.chat_repl = None
@@ -399,6 +404,10 @@ class AdenTUI(App):
"""Complete agent setup, guardian attach, and widget mount."""
import asyncio
# Reset health monitoring state from any prior load
self._queen_graph_id = None
self._judge_graph_id = None
loop = asyncio.get_event_loop()
try:
if runner._agent_runtime is None:
@@ -429,6 +438,117 @@ class AdenTUI(App):
agent_name = runner.agent_path.name
self.notify(f"Agent loaded: {agent_name}", severity="information", timeout=3)
# Load health judge + queen for worker agents (skip for hive_coder itself)
if agent_name != "hive_coder":
await self._load_judge_and_queen(runner._storage_path)
async def _load_judge_and_queen(self, storage_path) -> None:
"""Load the health judge and queen's ticket_triage as secondary monitoring graphs.
Both are added to the current worker runtime so they share its EventBus.
The health judge fires every 2 minutes via a timer; the queen fires on
WORKER_ESCALATION_TICKET events emitted by the judge.
"""
from pathlib import Path
from framework.agents.hive_coder.nodes import ticket_triage_node
from framework.agents.hive_coder.ticket_receiver import TICKET_RECEIVER_ENTRY_POINT
from framework.graph import Goal
from framework.graph.edge import GraphSpec
from framework.monitoring import HEALTH_JUDGE_ENTRY_POINT, judge_goal, judge_graph
from framework.runner.tool_registry import ToolRegistry
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
try:
storage_path = Path(storage_path)
# 1. Register monitoring tools bound to this runtime's EventBus + storage
monitoring_registry = ToolRegistry()
register_worker_monitoring_tools(
monitoring_registry,
self.runtime._event_bus,
storage_path,
)
monitoring_tools = list(monitoring_registry.get_tools().values())
monitoring_executor = monitoring_registry.get_executor()
monitoring_tool_names = {t.name for t in monitoring_tools}
# Inject monitoring tools into runtime so secondary graphs can call them.
# Secondary graph streams inherit self.runtime._tools / _tool_executor.
original_executor = self.runtime._tool_executor
def merged_executor(tool_use):
if tool_use.name in monitoring_tool_names:
return monitoring_executor(tool_use)
if original_executor is not None:
return original_executor(tool_use)
from framework.llm.provider import ToolResult
import json as _json
return ToolResult(
tool_use_id=tool_use.id,
content=_json.dumps({"error": f"Unknown tool: {tool_use.name}"}),
is_error=True,
)
self.runtime._tools = self.runtime._tools + monitoring_tools
self.runtime._tool_executor = merged_executor
# 2. Load health judge as a secondary graph on the worker runtime
await self.runtime.add_graph(
graph_id="worker_health_judge",
graph=judge_graph,
goal=judge_goal,
entry_points={"health_check": HEALTH_JUDGE_ENTRY_POINT},
storage_subpath="graphs/worker_health_judge",
)
self._judge_graph_id = "worker_health_judge"
# 3. Build minimal queen graph (ticket_triage only, no coder node)
queen_triage_goal = Goal(
id="queen-ticket-triage",
name="Queen Ticket Triage",
description=(
"Receive escalation tickets from the Health Judge and decide "
"whether to notify the operator."
),
)
queen_graph = GraphSpec(
id="hive-coder-queen-triage",
goal_id=queen_triage_goal.id,
version="1.0.0",
entry_node="ticket_triage",
entry_points={"ticket_receiver": "ticket_triage"},
terminal_nodes=[],
pause_nodes=[],
nodes=[ticket_triage_node],
edges=[],
conversation_mode="continuous",
async_entry_points=[TICKET_RECEIVER_ENTRY_POINT],
)
await self.runtime.add_graph(
graph_id="hive_coder_queen",
graph=queen_graph,
goal=queen_triage_goal,
entry_points={"ticket_receiver": TICKET_RECEIVER_ENTRY_POINT},
storage_subpath="graphs/hive_coder_queen",
)
self._queen_graph_id = "hive_coder_queen"
self.notify(
"Health judge + queen monitoring active",
severity="information",
timeout=3,
)
except Exception as e:
logging.getLogger("tui.judge").error(
"Failed to load health monitoring: %s", e, exc_info=True
)
self.notify(
f"Health monitoring unavailable: {e}",
severity="warning",
timeout=5,
)
def _show_account_selection(self, runner, accounts: list[dict]) -> None:
"""Show the account selection screen and continue loading on selection."""
from framework.tui.screens.account_selection import AccountSelectionScreen
@@ -851,6 +971,8 @@ class AdenTUI(App):
EventType.EXECUTION_PAUSED,
EventType.EXECUTION_RESUMED,
EventType.ESCALATION_REQUESTED,
EventType.WORKER_ESCALATION_TICKET,
EventType.QUEEN_INTERVENTION_REQUESTED,
]
_LOG_PANE_EVENTS = frozenset(_EVENT_TYPES) - {
@@ -922,6 +1044,15 @@ class AdenTUI(App):
severity="information",
timeout=5,
)
elif et == EventType.WORKER_ESCALATION_TICKET:
# Judge ticket emitted from background graph — still update status bar
ticket = event.data.get("ticket", {})
severity = ticket.get("severity", "")
if severity:
self.status_bar.set_node_detail(f"judge: {severity} ticket")
elif et == EventType.QUEEN_INTERVENTION_REQUESTED:
# Queen intervention always surfaced to operator regardless of active graph
self._handle_queen_intervention(event.data)
# All other background events are silently dropped (visible in logs)
return
@@ -958,6 +1089,13 @@ class AdenTUI(App):
context=event.data.get("context", ""),
node_id=event.node_id or "",
)
elif et == EventType.WORKER_ESCALATION_TICKET:
ticket = event.data.get("ticket", {})
severity = ticket.get("severity", "")
if severity:
self.status_bar.set_node_detail(f"judge: {severity} ticket")
elif et == EventType.QUEEN_INTERVENTION_REQUESTED:
self._handle_queen_intervention(event.data)
elif et == EventType.NODE_LOOP_STARTED:
self.chat_repl.handle_node_started(event.node_id or "")
elif et == EventType.NODE_LOOP_ITERATION:
@@ -1082,6 +1220,30 @@ class AdenTUI(App):
exc_info=True,
)
def _handle_queen_intervention(self, data: dict) -> None:
"""Notify the operator of a queen escalation — non-disruptively.
The worker keeps running. The operator can dismiss or switch to the
queen's graph view via Ctrl+Q.
"""
severity = data.get("severity", "unknown")
analysis = data.get("analysis", "(no analysis)")
severity_markup = {
"low": "[dim]low[/dim]",
"medium": "[yellow]medium[/yellow]",
"high": "[bold red]high[/bold red]",
"critical": "[bold red]CRITICAL[/bold red]",
}
sev_label = severity_markup.get(severity, severity)
msg = f"Queen escalation ({sev_label}): {analysis}"
if self._queen_graph_id:
msg += "\nPress [bold]Ctrl+Q[/bold] to chat with queen."
textual_severity = "error" if severity in ("high", "critical") else "warning"
self.notify(msg, severity=textual_severity, timeout=30)
# -- Actions --
def action_switch_graph(self, graph_id: str) -> None:
@@ -1244,13 +1406,23 @@ class AdenTUI(App):
Both escalate_to_coder and return_from_coder are bound to Ctrl+E.
check_action toggles which one is active based on escalation state,
so the footer shows "Coder" or "← Back" accordingly.
connect_to_queen is only shown when a queen monitoring graph is active.
"""
if action == "escalate_to_coder":
return not self._escalation_stack
if action == "return_from_coder":
return bool(self._escalation_stack)
if action == "connect_to_queen":
return bool(self._queen_graph_id and self.runtime is not None)
return True
def action_connect_to_queen(self) -> None:
"""Switch to the queen's ticket-triage graph view (Ctrl+Q)."""
if not self._queen_graph_id:
self.notify("No queen monitoring active", severity="warning", timeout=3)
return
self.action_switch_graph(self._queen_graph_id)
def action_escalate_to_coder(self) -> None:
"""Escalate to Hive Coder (bound to Ctrl+E)."""
if self.runtime is None:
+170
View File
@@ -0,0 +1,170 @@
# Worker Escalation Design: Judge → Queen → Operator
## Problem
The previous guardian-subgraph approach had two failure modes:
1. **Never fires** — worker agents ran in a separate TUI context with their own EventBus. The guardian attached to hive_coder's runtime never saw the worker's events.
2. **Too trigger-happy** — when it did fire (e.g. when hive_coder's own execution stalled waiting for user input), it reacted to normal wait states as if they were failures. `exclude_own_graph: False` meant the guardian also fired on hive_coder's own events.
The root cause: reactive event-based monitoring (fire on hard failures) cannot distinguish between "agent is genuinely broken" and "agent is momentarily blocked on user input."
## New Design
Three agents share **one runtime and one EventBus** when a worker is started:
```
One Runtime (shared EventBus)
├── Worker Graph (primary)
│ └── EventLoopNode — runs the actual work
│ ↑ logs per step to sessions/{id}/logs/tool_logs.jsonl
├── Health Judge Graph (secondary, framework-level reusable)
│ └── Entry point: timer every 2 min → judge_node (event_loop)
│ ├── Reads worker session logs via get_worker_health_summary()
│ ├── Accumulates observations in continuous conversation context
│ ├── Detects: too many RETRYs, stall (no new log steps), doom loops
│ └── If problematic: calls emit_escalation_ticket tool
│ → emits WORKER_ESCALATION_TICKET on shared EventBus
└── Queen Graph (secondary, hive_coder ticket_triage)
└── Entry point: event-driven on WORKER_ESCALATION_TICKET
├── ticket_triage_node receives EscalationTicket in memory
├── LLM decides: dismiss or intervene
└── If intervene: calls notify_operator tool
→ emits QUEEN_INTERVENTION_REQUESTED on shared EventBus
TUI
├── Subscribes to QUEEN_INTERVENTION_REQUESTED
├── Shows non-disruptive dismissable overlay (worker keeps running)
└── "Chat with Queen" → split-pane view, queen is interactive
```
## Key Design Decisions
### Judge as event_loop node agent (not inline code)
The health monitor is a proper `event_loop` NodeSpec — same as every other node in the system. This means:
- Its reasoning is transparent (conversation is observable)
- It can deliberate over patterns, not just threshold checks
- It's tunable via its system prompt
- It accumulates state across timer ticks via continuous conversation mode
The conversation IS the judge's memory. No shared memory keys, no external state. The judge reads its own prior messages to track trends: "Last check I saw 47 steps; now I see 52; 5 new steps, 3 RETRY."
### Judge conversation: isolated + continuous
- `isolation_level="isolated"` — judge has its own memory namespace, not polluting the worker's shared memory
- `conversation_mode="continuous"` — conversation persists between timer ticks in `graphs/worker_health_judge/conversations/judge/`
### Structured escalation ticket (not a string)
The judge must fill out a rigorous `EscalationTicket` before escalating. This prevents impulsive escalations. Required fields:
```
ticket_id, created_at
worker_agent_id, worker_session_id, worker_node_id, worker_graph_id
severity: "low"|"medium"|"high"|"critical"
cause: str # what the judge observed
judge_reasoning: str # why it decided to escalate
suggested_action: str # restart? human review? kill session?
recent_verdicts: list[str] # last N judge verdicts
total_steps_checked: int
steps_since_last_accept: int
stall_minutes: float|None # wall-clock since last new log step
evidence_snippet: str # recent LLM output excerpt
```
### Queen as deliberating intermediary
The queen (hive_coder's `ticket_triage_node`) receives every ticket and must decide: **dismiss or notify the operator**. This adds a second quality gate — the judge might be overly sensitive, and the queen provides a second opinion.
The queen only notifies the operator if the ticket meets intervention criteria. Conservative by design: one unnecessary alert is less costly than alert fatigue.
### Non-disruptive operator notification
`QUEEN_INTERVENTION_REQUESTED` does NOT tear down the worker or force a context switch. The TUI shows a dismissable overlay. The worker keeps running. The operator can choose to connect to the queen's active session for a conversation about the issue.
## New Event Types
| Event | Emitted by | Received by | Purpose |
|-------|-----------|-------------|---------|
| `WORKER_ESCALATION_TICKET` | Health Judge | Queen (event-driven entry point) | Structured ticket delivery |
| `QUEEN_INTERVENTION_REQUESTED` | Queen (notify_operator tool) | TUI | Human operator notification |
## New Files
| File | Purpose |
|------|---------|
| `core/framework/runtime/escalation_ticket.py` | `EscalationTicket` Pydantic schema |
| `core/framework/monitoring/__init__.py` | New framework monitoring package |
| `core/framework/monitoring/worker_health_judge.py` | Health judge graph, node, entry point, goal |
| `core/framework/agents/hive_coder/ticket_receiver.py` | Queen's `TICKET_RECEIVER_ENTRY_POINT` |
| `core/framework/tools/worker_monitoring_tools.py` | `get_worker_health_summary`, `emit_escalation_ticket`, `notify_operator`; `register_worker_monitoring_tools()` |
## Modified Files
| File | Changes |
|------|---------|
| `core/framework/runtime/event_bus.py` | +2 EventTypes, +2 emit methods |
| `core/framework/agents/hive_coder/nodes/__init__.py` | +`ticket_triage_node`, +`ALL_QUEEN_TRIAGE_TOOLS` |
| `core/framework/agents/hive_coder/agent.py` | +`ticket_triage_node` to nodes, +`TICKET_RECEIVER_ENTRY_POINT` to async_entry_points |
| `core/framework/tui/app.py` | `_load_judge_and_queen()`, `QUEEN_INTERVENTION_REQUESTED` handler, split-pane view |
## NOT Modified
- `core/framework/graph/event_loop_node.py` — no inline health check code; monitoring is fully external
- `core/framework/graph/conversation_judge.py` — existing per-iteration quality judge unchanged
- `core/framework/agents/hive_coder/guardian.py` — kept; user-triggered Ctrl+E escalation path still works
## Known Gaps
**Gap 1 — Queen tool starvation.**
The queen's `ticket_triage_node` only has `notify_operator`. If the operator connects and wants the queen to actually fix the worker's code, she'd need file I/O + shell tools. Deferred to a follow-up: dynamically registering the full hive-tools MCP when operator connects.
**Gap 2 — LLM-hang detection latency.**
If the worker's LLM call hangs (never returns), no new log entries are written and no iterations complete. The judge detects this on its next timer tick (≤2 min later). Not zero-latency, but bounded.
**Gap 3 — Worker session_id in judge's context.**
The judge needs the worker's agent_id + session_id to query logs. The timer entry fires with empty input_data. Solution: TUI passes this via the initial session state when loading the judge graph. The worker also writes its session_id to a well-known shared memory key early in execution. Gap: this write doesn't happen automatically today.
**Gap 4 — Dual monitoring channels coexist.**
The old guardian still exists. The new health judge + queen is the primary path. Both can run simultaneously if old code isn't cleaned up. Not a correctness issue, but noisy.
**Gap 5 — Single shared EventBus requirement.**
All three agents (worker, judge, queen) share one EventBus only when loaded in the same runtime via `add_graph`. A separately-started hive_coder session in another TUI window is NOT connected.
## Data Flow
```
Worker iterates → logs ACCEPT/RETRY/CONTINUE to tool_logs.jsonl
Every 2 minutes:
Health Judge wakes (timer)
→ calls get_worker_health_summary(agent_id, session_id)
→ reads recent verdicts, step counts, timing
→ compares to prior check (from conversation history)
→ if healthy: set_output("health_verdict", "healthy"), done
→ if degraded:
builds EscalationTicket JSON
calls emit_escalation_ticket(ticket_json)
→ WORKER_ESCALATION_TICKET published to shared EventBus
Queen's ticket_receiver fires:
→ ticket_triage_node receives ticket in memory key "ticket"
→ LLM reads ticket, applies dismiss/intervene criteria
→ if dismiss: set_output("intervention_decision", "dismissed: ...")
→ if intervene:
calls notify_operator(ticket_id, analysis, urgency)
→ QUEEN_INTERVENTION_REQUESTED published to shared EventBus
TUI receives QUEEN_INTERVENTION_REQUESTED:
→ shows dismissable overlay (worker is NOT paused)
→ operator chooses:
"Dismiss" → overlay gone, worker continues
"Chat with Queen" → split-pane view appears
worker stream on left, queen conversation on right
operator can type to queen
queen can further analyze and provide guidance
```