Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2268c4964b |
@@ -12,7 +12,8 @@ from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
|
||||
from framework.runtime.execution_stream import EntryPointSpec
|
||||
|
||||
from .config import default_config, metadata
|
||||
from .nodes import coder_node
|
||||
from .nodes import coder_node, ticket_triage_node
|
||||
from .ticket_receiver import TICKET_RECEIVER_ENTRY_POINT
|
||||
|
||||
# Goal definition
|
||||
goal = Goal(
|
||||
@@ -90,8 +91,9 @@ goal = Goal(
|
||||
],
|
||||
)
|
||||
|
||||
# Nodes — single coder node (guardian is now auto-attached by the framework)
|
||||
nodes = [coder_node]
|
||||
# Nodes: primary coder node + ticket_triage for queen role when loaded as
|
||||
# a secondary graph alongside a worker.
|
||||
nodes = [coder_node, ticket_triage_node]
|
||||
|
||||
# No edges needed — single forever-alive event_loop node
|
||||
edges = []
|
||||
@@ -102,8 +104,9 @@ entry_points = {"start": "coder"}
|
||||
pause_nodes = []
|
||||
terminal_nodes = [] # Forever-alive: loops until user exits
|
||||
|
||||
# No async entry points — guardian is now auto-attached via attach_guardian()
|
||||
async_entry_points = []
|
||||
# Async entry points: queen receives escalation tickets from the health judge
|
||||
# when loaded as a secondary graph alongside a worker runtime.
|
||||
async_entry_points = [TICKET_RECEIVER_ENTRY_POINT]
|
||||
|
||||
# Module-level variables read by AgentRunner.load()
|
||||
conversation_mode = "continuous"
|
||||
|
||||
@@ -553,4 +553,113 @@ forbidden action attempted, output format invalid).
|
||||
)
|
||||
|
||||
|
||||
__all__ = ["coder_node", "guardian_node", "ALL_GUARDIAN_TOOLS"]
|
||||
# ---------------------------------------------------------------------------
|
||||
# Ticket Triage Node (Queen's secondary role)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Tools available to the queen during triage — only notify_operator needed.
|
||||
# The queen does NOT need file I/O or shell tools for triage; those are for
|
||||
# interactive fix sessions (deferred to a future phase).
|
||||
ALL_QUEEN_TRIAGE_TOOLS = ["notify_operator"]
|
||||
|
||||
ticket_triage_node = NodeSpec(
|
||||
id="ticket_triage",
|
||||
name="Ticket Triage",
|
||||
description=(
|
||||
"Queen's ticket triage node. Fires when the Worker Health Judge emits "
|
||||
"an EscalationTicket. Reads the ticket, deliberates on whether to dismiss "
|
||||
"or notify the human operator, and calls notify_operator if warranted."
|
||||
),
|
||||
node_type="event_loop",
|
||||
client_facing=True, # Operator can connect and chat with queen once notified
|
||||
max_node_visits=0,
|
||||
input_keys=["ticket"],
|
||||
output_keys=["intervention_decision"],
|
||||
nullable_output_keys=["intervention_decision"],
|
||||
tools=ALL_QUEEN_TRIAGE_TOOLS,
|
||||
success_criteria=(
|
||||
"A clear, documented decision is reached: either 'dismissed: <reason>' "
|
||||
"explaining why the ticket doesn't warrant operator involvement, or the "
|
||||
"operator has been notified via notify_operator with a specific, actionable "
|
||||
"analysis of the issue."
|
||||
),
|
||||
system_prompt="""\
|
||||
You are the Queen — Hive Coder. The Worker Health Judge has escalated an issue \
|
||||
to you via a structured EscalationTicket. Your job: read it carefully and decide \
|
||||
whether to notify the human operator or dismiss it.
|
||||
|
||||
# Reading the Ticket
|
||||
|
||||
The ticket is provided in your context under the memory key "ticket". It contains:
|
||||
- severity: low|medium|high|critical
|
||||
- cause: what the judge observed (concrete, specific)
|
||||
- judge_reasoning: why the judge decided to escalate
|
||||
- suggested_action: judge's recommendation
|
||||
- recent_verdicts: last N judge verdicts (ACCEPT/RETRY/CONTINUE/ESCALATE)
|
||||
- steps_since_last_accept: consecutive non-ACCEPT steps
|
||||
- stall_minutes: wall-clock since last log step (null if worker is active)
|
||||
- evidence_snippet: recent LLM output from the worker
|
||||
|
||||
# Dismiss Criteria (do NOT call notify_operator)
|
||||
|
||||
Dismiss when the ticket does NOT require human involvement:
|
||||
- severity is "low" AND steps_since_last_accept < 8
|
||||
- The cause describes a clearly transient issue (single API timeout, brief network
|
||||
hiccup) and evidence shows the worker is otherwise progressing
|
||||
- The evidence_snippet shows the agent is actively reasoning about a hard problem
|
||||
(complex tool use, exploring solutions) — just taking time, not stuck
|
||||
- A single anomalous check that contradicts prior healthy behavior
|
||||
|
||||
Dismiss with a specific reason, not a vague "looks fine."
|
||||
|
||||
# Intervene Criteria (call notify_operator)
|
||||
|
||||
Notify the operator when human judgment or action is likely needed:
|
||||
- severity is "high" or "critical"
|
||||
- steps_since_last_accept >= 10 with no sign of recovery in evidence
|
||||
- stall_minutes >= 4 (worker has produced no new iterations in 4+ minutes)
|
||||
- evidence_snippet shows a clear doom loop (same error repeated, same tool,
|
||||
no variation in approach, no new information being processed)
|
||||
- Cause suggests a configuration error, missing credential, or logic bug
|
||||
that the agent cannot self-correct
|
||||
|
||||
# How to Notify
|
||||
|
||||
When intervening, call:
|
||||
notify_operator(
|
||||
ticket_id=<ticket["ticket_id"]>,
|
||||
analysis="<2-3 sentences: what is wrong, why it matters, what to do>",
|
||||
urgency="<low|medium|high|critical>"
|
||||
)
|
||||
|
||||
Your analysis should be specific and actionable. Example:
|
||||
"The worker agent has produced 18 consecutive RETRY verdicts with identical
|
||||
tool errors in get_calendar_events. The OAuth token appears expired based
|
||||
on the 401 errors in evidence. Recommend: check calendar API credentials
|
||||
or restart with fresh auth token."
|
||||
|
||||
# After Deciding
|
||||
|
||||
Always set_output at the end:
|
||||
set_output("intervention_decision", "dismissed: <reason>")
|
||||
OR
|
||||
set_output("intervention_decision", "escalated: <one-line summary>")
|
||||
|
||||
# Rules
|
||||
|
||||
- Be concise. You are a second quality gate, not a rubber stamp.
|
||||
- One false dismissal is less bad than two false alarms.
|
||||
- But do not dismiss high/critical severity tickets without strong reason.
|
||||
- Read the ticket completely before deciding. Do not skim.
|
||||
- You are the last automated line before the human is disturbed. Take it seriously.
|
||||
""",
|
||||
)
|
||||
|
||||
|
||||
__all__ = [
|
||||
"coder_node",
|
||||
"guardian_node",
|
||||
"ticket_triage_node",
|
||||
"ALL_GUARDIAN_TOOLS",
|
||||
"ALL_QUEEN_TRIAGE_TOOLS",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
"""Queen's ticket receiver entry point.
|
||||
|
||||
When the Worker Health Judge emits a WORKER_ESCALATION_TICKET event on the
|
||||
shared EventBus, this entry point fires and routes to the ``ticket_triage``
|
||||
node, where the Queen deliberates and decides whether to notify the operator.
|
||||
|
||||
Isolation level is ``isolated`` — the queen's triage memory is kept separate
|
||||
from the worker's shared memory. Each ticket triage runs in its own context.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from framework.graph.edge import AsyncEntryPointSpec
|
||||
|
||||
TICKET_RECEIVER_ENTRY_POINT = AsyncEntryPointSpec(
|
||||
id="ticket_receiver",
|
||||
name="Worker Escalation Ticket Receiver",
|
||||
entry_node="ticket_triage",
|
||||
trigger_type="event",
|
||||
trigger_config={
|
||||
"event_types": ["worker_escalation_ticket"],
|
||||
# Do not fire on our own graph's events (prevents loops if queen
|
||||
# somehow emits a worker_escalation_ticket for herself)
|
||||
"exclude_own_graph": True,
|
||||
},
|
||||
isolation_level="isolated",
|
||||
)
|
||||
@@ -0,0 +1,24 @@
|
||||
"""Framework-level monitoring components.
|
||||
|
||||
Provides reusable monitoring graphs that can be attached to any worker
|
||||
agent runtime via ``runtime.add_graph()``.
|
||||
|
||||
Currently included:
|
||||
- ``worker_health_judge`` — timer-driven health monitor that reads worker
|
||||
session logs, detects degradation patterns, and emits structured
|
||||
EscalationTickets to the shared EventBus.
|
||||
"""
|
||||
|
||||
from framework.monitoring.worker_health_judge import (
|
||||
HEALTH_JUDGE_ENTRY_POINT,
|
||||
judge_goal,
|
||||
judge_graph,
|
||||
judge_node,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"HEALTH_JUDGE_ENTRY_POINT",
|
||||
"judge_goal",
|
||||
"judge_graph",
|
||||
"judge_node",
|
||||
]
|
||||
@@ -0,0 +1,267 @@
|
||||
"""Worker Health Judge — framework-level reusable monitoring graph.
|
||||
|
||||
Attaches to any worker agent runtime as a secondary graph. Fires on a
|
||||
2-minute timer, reads the worker's session logs via ``get_worker_health_summary``,
|
||||
accumulates observations in a continuous conversation context, and emits a
|
||||
structured ``EscalationTicket`` when it detects a degradation pattern.
|
||||
|
||||
Usage::
|
||||
|
||||
from framework.monitoring import judge_graph, judge_goal, HEALTH_JUDGE_ENTRY_POINT
|
||||
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
|
||||
|
||||
# Register tools bound to the worker runtime's event bus
|
||||
monitoring_registry = ToolRegistry()
|
||||
register_worker_monitoring_tools(
|
||||
monitoring_registry, worker_runtime._event_bus, storage_path
|
||||
)
|
||||
monitoring_tools = list(monitoring_registry.get_tools().values())
|
||||
monitoring_executor = monitoring_registry.get_executor()
|
||||
|
||||
# Load judge as secondary graph on the worker runtime
|
||||
await worker_runtime.add_graph(
|
||||
graph_id="worker_health_judge",
|
||||
graph=judge_graph,
|
||||
goal=judge_goal,
|
||||
entry_points=[HEALTH_JUDGE_ENTRY_POINT],
|
||||
tools=monitoring_tools,
|
||||
executor=monitoring_executor,
|
||||
storage_subpath="graphs/worker_health_judge",
|
||||
)
|
||||
|
||||
Design:
|
||||
- ``isolation_level="isolated"`` — the judge has its own memory, not
|
||||
polluting the worker's shared memory namespace.
|
||||
- ``conversation_mode="continuous"`` — the judge's conversation carries
|
||||
across timer ticks. The conversation IS the judge's memory. It tracks
|
||||
trends by referring to its own prior messages ("Last check I saw 47
|
||||
steps; now 52; 5 new steps, 3 RETRY").
|
||||
- No shared memory keys. No external state files.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from framework.graph import Constraint, Goal, NodeSpec, SuccessCriterion
|
||||
from framework.graph.edge import AsyncEntryPointSpec, GraphSpec
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Goal
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
judge_goal = Goal(
|
||||
id="worker-health-monitor",
|
||||
name="Worker Health Monitor",
|
||||
description=(
|
||||
"Periodically assess the health of the worker agent by reading its "
|
||||
"execution logs. Detect degradation patterns (excessive retries, "
|
||||
"stalls, doom loops) and emit structured EscalationTickets when the "
|
||||
"worker needs attention."
|
||||
),
|
||||
success_criteria=[
|
||||
SuccessCriterion(
|
||||
id="accurate-detection",
|
||||
description="Only escalates genuine degradation, not normal retry cycles",
|
||||
metric="false_positive_rate",
|
||||
target="low",
|
||||
weight=0.5,
|
||||
),
|
||||
SuccessCriterion(
|
||||
id="timely-detection",
|
||||
description="Detects genuine stalls within 2 timer ticks (≤4 minutes)",
|
||||
metric="detection_latency_minutes",
|
||||
target="<=4",
|
||||
weight=0.5,
|
||||
),
|
||||
],
|
||||
constraints=[
|
||||
Constraint(
|
||||
id="conservative-escalation",
|
||||
description=(
|
||||
"Do not escalate on a single bad verdict or a brief stall. "
|
||||
"Require clear patterns (10+ consecutive bad verdicts or 4+ minute stall) "
|
||||
"before creating a ticket."
|
||||
),
|
||||
constraint_type="hard",
|
||||
category="quality",
|
||||
),
|
||||
Constraint(
|
||||
id="complete-ticket",
|
||||
description=(
|
||||
"Every EscalationTicket must have all required fields filled. "
|
||||
"Do not emit partial or placeholder tickets."
|
||||
),
|
||||
constraint_type="hard",
|
||||
category="correctness",
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Node
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
judge_node = NodeSpec(
|
||||
id="judge",
|
||||
name="Worker Health Judge",
|
||||
description=(
|
||||
"Autonomous health monitor for worker agents. Reads execution logs "
|
||||
"on each timer tick, compares to prior observations (via conversation "
|
||||
"history), and emits a structured EscalationTicket when a genuine "
|
||||
"degradation pattern is detected."
|
||||
),
|
||||
node_type="event_loop",
|
||||
client_facing=False, # Autonomous monitor, not interactive
|
||||
max_node_visits=0, # Unbounded — runs on every timer tick
|
||||
input_keys=[],
|
||||
output_keys=["health_verdict"],
|
||||
nullable_output_keys=["health_verdict"],
|
||||
success_criteria=(
|
||||
"A clear health verdict is produced each check: either 'healthy' with "
|
||||
"a brief observation, or a complete EscalationTicket is emitted via "
|
||||
"emit_escalation_ticket and health_verdict describes the issue."
|
||||
),
|
||||
tools=[
|
||||
"get_worker_health_summary",
|
||||
"emit_escalation_ticket",
|
||||
],
|
||||
system_prompt="""\
|
||||
You are the Worker Health Judge. You run every 2 minutes alongside a worker \
|
||||
agent to monitor its execution health.
|
||||
|
||||
# Your Role
|
||||
|
||||
You observe the worker's iteration patterns over time and escalate only when \
|
||||
you see genuine degradation — not normal retry cycles. Your conversation history \
|
||||
IS your memory. On each check, refer to your previous observations to track trends.
|
||||
|
||||
# Required Context
|
||||
|
||||
At the start of your first check, your context will include:
|
||||
- worker_agent_id: the agent ID you are watching
|
||||
- worker_session_id: the active session to monitor
|
||||
- worker_node_id: the node currently executing (usually the primary node)
|
||||
- worker_graph_id: the graph ID
|
||||
|
||||
These are provided in your initial system context. Use them when calling tools \
|
||||
and building tickets.
|
||||
|
||||
# Check Procedure
|
||||
|
||||
On each timer tick (every 2 minutes):
|
||||
|
||||
## Step 1: Read health snapshot
|
||||
Call get_worker_health_summary(session_id=<worker_session_id>) to get:
|
||||
- total_steps: how many log steps have been recorded
|
||||
- recent_verdicts: list of recent ACCEPT/RETRY/CONTINUE verdicts
|
||||
- steps_since_last_accept: consecutive non-ACCEPT steps
|
||||
- stall_minutes: wall-clock since last step (null if active)
|
||||
- evidence_snippet: recent LLM output
|
||||
|
||||
## Step 2: Compare to prior check
|
||||
Look at your conversation history. What was total_steps last time?
|
||||
- If total_steps is UNCHANGED from prior check AND prior check was also unchanged:
|
||||
→ STALL confirmed (worker has produced no new iterations in 4+ minutes).
|
||||
→ Escalate with severity="high" or "critical" depending on stall duration.
|
||||
- If total_steps increased: worker is making progress. Examine verdicts.
|
||||
|
||||
## Step 3: Analyze verdict pattern
|
||||
- Healthy: Mix of ACCEPT and RETRY, steps_since_last_accept < 5. No action.
|
||||
- Warning: steps_since_last_accept is 5-9. Note it, no escalation yet.
|
||||
- Degraded: steps_since_last_accept >= 10. Examine evidence_snippet.
|
||||
- If evidence shows the agent is making real progress (complex reasoning,
|
||||
exploring solutions, productive tool use): may be a hard problem. Note it.
|
||||
- If evidence shows a loop (same error, same tool call, no new information):
|
||||
→ Escalate with severity="medium" or "high".
|
||||
- Critical: steps_since_last_accept >= 20, OR stall_minutes >= 4.
|
||||
→ Escalate with severity="critical".
|
||||
|
||||
## Step 4: Decide
|
||||
|
||||
### If healthy:
|
||||
set_output("health_verdict", "healthy: <brief observation>")
|
||||
Done.
|
||||
|
||||
### If escalating:
|
||||
Build an EscalationTicket JSON string with ALL required fields:
|
||||
{
|
||||
"worker_agent_id": "<from context>",
|
||||
"worker_session_id": "<from context>",
|
||||
"worker_node_id": "<from context>",
|
||||
"worker_graph_id": "<from context>",
|
||||
"severity": "<low|medium|high|critical>",
|
||||
"cause": "<what you observed — concrete, specific>",
|
||||
"judge_reasoning": "<why you decided to escalate, not just dismiss>",
|
||||
"suggested_action": "<what you recommend: restart, human review, etc.>",
|
||||
"recent_verdicts": [<list from get_worker_health_summary>],
|
||||
"total_steps_checked": <int>,
|
||||
"steps_since_last_accept": <int>,
|
||||
"stall_minutes": <float or null>,
|
||||
"evidence_snippet": "<from get_worker_health_summary>"
|
||||
}
|
||||
|
||||
Call: emit_escalation_ticket(ticket_json=<the JSON string above>)
|
||||
Then: set_output("health_verdict", "escalated: <one-line summary>")
|
||||
|
||||
# Severity Guide
|
||||
|
||||
- low: Mild concern, worth noting. 5-9 consecutive bad verdicts.
|
||||
- medium: Clear degradation pattern. 10-15 bad verdicts or brief stall (1-2 min).
|
||||
- high: Serious issue. 15+ bad verdicts or stall 2-4 minutes or clear doom loop.
|
||||
- critical: Worker is definitively stuck. 20+ bad verdicts or stall > 4 minutes.
|
||||
|
||||
# Conservative Bias
|
||||
|
||||
You MUST resist the urge to escalate prematurely. Worker agents naturally retry.
|
||||
A node may legitimately need 5-8 retries before succeeding. Do not escalate unless:
|
||||
1. The pattern is clear and sustained across your observation window, AND
|
||||
2. The evidence shows no genuine progress
|
||||
|
||||
One missed escalation is less costly than two false alarms. The Queen will filter \
|
||||
further. But do not be passive — genuine stalls and doom loops must be caught.
|
||||
|
||||
# Rules
|
||||
- Never escalate on the FIRST check unless stall_minutes > 4
|
||||
- Always call get_worker_health_summary FIRST before deciding anything
|
||||
- All ticket fields are REQUIRED — do not submit partial tickets
|
||||
- After any emit_escalation_ticket call, always set_output to complete the check
|
||||
""",
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entry Point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
HEALTH_JUDGE_ENTRY_POINT = AsyncEntryPointSpec(
|
||||
id="health_check",
|
||||
name="Worker Health Check",
|
||||
entry_node="judge",
|
||||
trigger_type="timer",
|
||||
trigger_config={
|
||||
"interval_minutes": 2,
|
||||
"run_immediately": False, # Give worker time to start producing logs
|
||||
},
|
||||
isolation_level="isolated", # Own memory namespace, not polluting worker's
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Graph
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
judge_graph = GraphSpec(
|
||||
id="worker-health-judge-graph",
|
||||
goal_id=judge_goal.id,
|
||||
version="1.0.0",
|
||||
entry_node="judge",
|
||||
entry_points={"health_check": "judge"},
|
||||
terminal_nodes=[], # Forever-alive: fires on every timer tick
|
||||
pause_nodes=[],
|
||||
nodes=[judge_node],
|
||||
edges=[],
|
||||
conversation_mode="continuous", # Conversation persists across timer ticks
|
||||
async_entry_points=[HEALTH_JUDGE_ENTRY_POINT],
|
||||
loop_config={
|
||||
"max_iterations": 10, # One check shouldn't take many turns
|
||||
"max_tool_calls_per_turn": 3, # get_summary + optionally emit_ticket
|
||||
"max_history_tokens": 16000, # Compact — judge only needs recent context
|
||||
},
|
||||
)
|
||||
@@ -0,0 +1,83 @@
|
||||
"""Structured escalation ticket emitted by the Worker Health Judge.
|
||||
|
||||
The ticket is the required artifact that the judge must fill out before
|
||||
escalating an issue to the Queen. Requiring this structured form prevents
|
||||
impulsive escalations — the judge must articulate the cause, evidence,
|
||||
severity, and a suggested action before anything is emitted.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from typing import Literal
|
||||
from uuid import uuid4
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class EscalationTicket(BaseModel):
|
||||
"""A structured escalation report from the Worker Health Judge.
|
||||
|
||||
The judge fills this out when it observes a degradation pattern in
|
||||
the worker agent's execution. The Queen receives this ticket and
|
||||
decides whether to notify the human operator.
|
||||
"""
|
||||
|
||||
# Identity
|
||||
ticket_id: str = Field(default_factory=lambda: str(uuid4()))
|
||||
created_at: str = Field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
|
||||
# Worker identification
|
||||
worker_agent_id: str = Field(description="Agent ID of the monitored worker")
|
||||
worker_session_id: str = Field(description="Active session ID being monitored")
|
||||
worker_node_id: str = Field(description="Node ID where the issue is occurring")
|
||||
worker_graph_id: str = Field(description="Graph ID of the worker")
|
||||
|
||||
# Problem characterization — filled by judge via LLM reasoning
|
||||
severity: Literal["low", "medium", "high", "critical"] = Field(
|
||||
description="Severity of the detected issue"
|
||||
)
|
||||
cause: str = Field(
|
||||
description=(
|
||||
"Human-readable description of what the judge observed. "
|
||||
"Example: 'Node has produced 18 consecutive RETRY verdicts with no progress.'"
|
||||
)
|
||||
)
|
||||
judge_reasoning: str = Field(
|
||||
description="The judge's deliberation chain: why it decided to escalate vs dismiss."
|
||||
)
|
||||
suggested_action: str = Field(
|
||||
description=(
|
||||
"What the judge recommends. Examples: "
|
||||
"'Restart the node', 'Check API credentials', "
|
||||
"'Review system prompt for logic errors', 'Kill session'."
|
||||
)
|
||||
)
|
||||
|
||||
# Evidence — quantitative signals the Queen can evaluate
|
||||
recent_verdicts: list[str] = Field(
|
||||
default_factory=list,
|
||||
description="Last N judge verdicts (ACCEPT/RETRY/CONTINUE/ESCALATE) in order.",
|
||||
)
|
||||
total_steps_checked: int = Field(
|
||||
default=0,
|
||||
description="Total number of log steps the judge examined in this check.",
|
||||
)
|
||||
steps_since_last_accept: int = Field(
|
||||
default=0,
|
||||
description="How many consecutive steps have passed without an ACCEPT verdict.",
|
||||
)
|
||||
stall_minutes: float | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Wall-clock minutes since the last new log step was written. "
|
||||
"None if the worker is actively producing steps."
|
||||
),
|
||||
)
|
||||
evidence_snippet: str = Field(
|
||||
default="",
|
||||
description=(
|
||||
"Brief excerpt from the worker's most recent LLM output or error message. "
|
||||
"Used by the Queen to assess context without reading the full log."
|
||||
),
|
||||
)
|
||||
@@ -86,6 +86,10 @@ class EventType(StrEnum):
|
||||
# Escalation (agent requests handoff to hive_coder)
|
||||
ESCALATION_REQUESTED = "escalation_requested"
|
||||
|
||||
# Worker health monitoring (judge → queen → operator pipeline)
|
||||
WORKER_ESCALATION_TICKET = "worker_escalation_ticket"
|
||||
QUEEN_INTERVENTION_REQUESTED = "queen_intervention_requested"
|
||||
|
||||
|
||||
@dataclass
|
||||
class AgentEvent:
|
||||
@@ -852,6 +856,61 @@ class EventBus:
|
||||
)
|
||||
)
|
||||
|
||||
async def emit_worker_escalation_ticket(
|
||||
self,
|
||||
stream_id: str,
|
||||
node_id: str,
|
||||
ticket: dict,
|
||||
execution_id: str | None = None,
|
||||
) -> None:
|
||||
"""Emit a structured escalation ticket from the Worker Health Judge.
|
||||
|
||||
The ticket dict should be a serialized EscalationTicket. The Queen's
|
||||
event-driven entry point fires on this event to triage the ticket.
|
||||
"""
|
||||
await self.publish(
|
||||
AgentEvent(
|
||||
type=EventType.WORKER_ESCALATION_TICKET,
|
||||
stream_id=stream_id,
|
||||
node_id=node_id,
|
||||
execution_id=execution_id,
|
||||
data={"ticket": ticket},
|
||||
)
|
||||
)
|
||||
|
||||
async def emit_queen_intervention_requested(
|
||||
self,
|
||||
stream_id: str,
|
||||
node_id: str,
|
||||
ticket_id: str,
|
||||
analysis: str,
|
||||
severity: str,
|
||||
queen_graph_id: str,
|
||||
queen_stream_id: str,
|
||||
execution_id: str | None = None,
|
||||
) -> None:
|
||||
"""Emit a human operator notification from the Queen.
|
||||
|
||||
The TUI subscribes to this event to surface a non-disruptive overlay.
|
||||
queen_graph_id and queen_stream_id allow the TUI to route the operator
|
||||
directly into the Queen's active session for a follow-up conversation.
|
||||
"""
|
||||
await self.publish(
|
||||
AgentEvent(
|
||||
type=EventType.QUEEN_INTERVENTION_REQUESTED,
|
||||
stream_id=stream_id,
|
||||
node_id=node_id,
|
||||
execution_id=execution_id,
|
||||
data={
|
||||
"ticket_id": ticket_id,
|
||||
"analysis": analysis,
|
||||
"severity": severity,
|
||||
"queen_graph_id": queen_graph_id,
|
||||
"queen_stream_id": queen_stream_id,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# === QUERY OPERATIONS ===
|
||||
|
||||
def get_history(
|
||||
|
||||
@@ -0,0 +1,357 @@
|
||||
"""Worker monitoring tools for the Health Judge and Queen triage agents.
|
||||
|
||||
Three tools are registered by ``register_worker_monitoring_tools()``:
|
||||
|
||||
- ``get_worker_health_summary`` — reads the worker's session log files and
|
||||
returns a compact health snapshot (recent verdicts, step count, timing).
|
||||
Used by the Health Judge on every timer tick.
|
||||
|
||||
- ``emit_escalation_ticket`` — validates and publishes an EscalationTicket
|
||||
to the shared EventBus as a WORKER_ESCALATION_TICKET event.
|
||||
Used by the Health Judge when it decides to escalate.
|
||||
|
||||
- ``notify_operator`` — emits a QUEEN_INTERVENTION_REQUESTED event so the TUI
|
||||
can surface a non-disruptive operator notification.
|
||||
Used by the Queen's ticket_triage_node when it decides to intervene.
|
||||
|
||||
Usage::
|
||||
|
||||
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
|
||||
|
||||
register_worker_monitoring_tools(tool_registry, event_bus, storage_path)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from framework.runner.tool_registry import ToolRegistry
|
||||
from framework.runtime.event_bus import EventBus
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# How many tool_log steps to include in the health summary
|
||||
_DEFAULT_LAST_N_STEPS = 40
|
||||
|
||||
|
||||
def register_worker_monitoring_tools(
|
||||
registry: "ToolRegistry",
|
||||
event_bus: "EventBus",
|
||||
storage_path: Path,
|
||||
stream_id: str = "worker_health_judge",
|
||||
) -> int:
|
||||
"""Register worker monitoring tools bound to *event_bus* and *storage_path*.
|
||||
|
||||
Args:
|
||||
registry: ToolRegistry to register tools on.
|
||||
event_bus: The shared EventBus for the worker runtime.
|
||||
storage_path: Root storage path of the worker runtime
|
||||
(e.g. ``~/.hive/agents/{name}``).
|
||||
stream_id: Stream ID used when emitting events; defaults to judge's stream.
|
||||
|
||||
Returns:
|
||||
Number of tools registered.
|
||||
"""
|
||||
from framework.llm.provider import Tool
|
||||
|
||||
tools_registered = 0
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# get_worker_health_summary
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
async def get_worker_health_summary(
|
||||
session_id: str,
|
||||
last_n_steps: int = _DEFAULT_LAST_N_STEPS,
|
||||
) -> str:
|
||||
"""Read the worker's execution logs and return a compact health snapshot.
|
||||
|
||||
Returns a JSON object with:
|
||||
- session_status: "running"|"completed"|"failed"|"in_progress"|"unknown"
|
||||
- total_steps: total number of log steps recorded so far
|
||||
- recent_verdicts: list of last N verdict strings (ACCEPT/RETRY/CONTINUE/ESCALATE)
|
||||
- steps_since_last_accept: consecutive non-ACCEPT steps from the end
|
||||
- last_step_time_iso: ISO timestamp of the most recent step (or null)
|
||||
- stall_minutes: wall-clock minutes since last step (null if < 1 min)
|
||||
- evidence_snippet: last LLM text from the most recent step (truncated)
|
||||
- session_id: echoed back for reference
|
||||
"""
|
||||
# Resolve log paths
|
||||
session_dir = storage_path / "sessions" / session_id
|
||||
tool_logs_path = session_dir / "logs" / "tool_logs.jsonl"
|
||||
state_path = session_dir / "state.json"
|
||||
|
||||
# Read session status
|
||||
session_status = "unknown"
|
||||
if state_path.exists():
|
||||
try:
|
||||
state = json.loads(state_path.read_text(encoding="utf-8"))
|
||||
session_status = state.get("status", "unknown")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Read tool logs
|
||||
steps: list[dict] = []
|
||||
if tool_logs_path.exists():
|
||||
try:
|
||||
with open(tool_logs_path, encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
steps.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
except OSError as e:
|
||||
return json.dumps({"error": f"Could not read tool logs: {e}"})
|
||||
|
||||
total_steps = len(steps)
|
||||
recent = steps[-last_n_steps:] if len(steps) > last_n_steps else steps
|
||||
|
||||
# Extract verdict sequence
|
||||
recent_verdicts = [s.get("verdict", "") for s in recent if s.get("verdict")]
|
||||
|
||||
# Count consecutive non-ACCEPT from the end
|
||||
steps_since_last_accept = 0
|
||||
for v in reversed(recent_verdicts):
|
||||
if v == "ACCEPT":
|
||||
break
|
||||
steps_since_last_accept += 1
|
||||
|
||||
# Timing: last step timestamp
|
||||
last_step_time_iso: str | None = None
|
||||
stall_minutes: float | None = None
|
||||
if steps:
|
||||
# tool_log steps don't have timestamps; use file mtime as proxy
|
||||
try:
|
||||
mtime = tool_logs_path.stat().st_mtime
|
||||
last_step_time_iso = datetime.fromtimestamp(mtime, UTC).isoformat()
|
||||
elapsed = (datetime.now(UTC).timestamp() - mtime) / 60
|
||||
stall_minutes = round(elapsed, 1) if elapsed >= 1.0 else None
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# Evidence snippet: last LLM text
|
||||
evidence_snippet = ""
|
||||
for step in reversed(recent):
|
||||
text = step.get("llm_text", "")
|
||||
if text:
|
||||
evidence_snippet = text[:500]
|
||||
break
|
||||
|
||||
return json.dumps(
|
||||
{
|
||||
"session_id": session_id,
|
||||
"session_status": session_status,
|
||||
"total_steps": total_steps,
|
||||
"recent_verdicts": recent_verdicts,
|
||||
"steps_since_last_accept": steps_since_last_accept,
|
||||
"last_step_time_iso": last_step_time_iso,
|
||||
"stall_minutes": stall_minutes,
|
||||
"evidence_snippet": evidence_snippet,
|
||||
},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
|
||||
_health_summary_tool = Tool(
|
||||
name="get_worker_health_summary",
|
||||
description=(
|
||||
"Read the worker agent's execution logs and return a compact health snapshot. "
|
||||
"Returns recent judge verdicts, step count, time since last step, and "
|
||||
"a snippet of the most recent LLM output. "
|
||||
"Use this on every health check to observe trends."
|
||||
),
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"session_id": {
|
||||
"type": "string",
|
||||
"description": "The worker's active session ID (e.g. 'session_20250101T120000_abc123')",
|
||||
},
|
||||
"last_n_steps": {
|
||||
"type": "integer",
|
||||
"description": f"How many recent log steps to include (default {_DEFAULT_LAST_N_STEPS})",
|
||||
},
|
||||
},
|
||||
"required": ["session_id"],
|
||||
},
|
||||
)
|
||||
registry.register(
|
||||
"get_worker_health_summary",
|
||||
_health_summary_tool,
|
||||
lambda inputs: get_worker_health_summary(**inputs),
|
||||
)
|
||||
tools_registered += 1
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# emit_escalation_ticket
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
async def emit_escalation_ticket(ticket_json: str) -> str:
|
||||
"""Validate and publish an EscalationTicket to the shared EventBus.
|
||||
|
||||
ticket_json must be a JSON string containing all required EscalationTicket
|
||||
fields. The ticket is validated before publishing — this ensures the judge
|
||||
has genuinely filled out all required evidence fields.
|
||||
|
||||
Returns a confirmation JSON with the ticket_id on success, or an error.
|
||||
"""
|
||||
from framework.runtime.escalation_ticket import EscalationTicket
|
||||
|
||||
try:
|
||||
raw = json.loads(ticket_json) if isinstance(ticket_json, str) else ticket_json
|
||||
ticket = EscalationTicket(**raw)
|
||||
except Exception as e:
|
||||
return json.dumps({"error": f"Invalid ticket: {e}"})
|
||||
|
||||
try:
|
||||
await event_bus.emit_worker_escalation_ticket(
|
||||
stream_id=stream_id,
|
||||
node_id="judge",
|
||||
ticket=ticket.model_dump(),
|
||||
)
|
||||
logger.info(
|
||||
"EscalationTicket emitted: ticket_id=%s severity=%s cause=%r",
|
||||
ticket.ticket_id,
|
||||
ticket.severity,
|
||||
ticket.cause[:80],
|
||||
)
|
||||
return json.dumps(
|
||||
{
|
||||
"status": "emitted",
|
||||
"ticket_id": ticket.ticket_id,
|
||||
"severity": ticket.severity,
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
return json.dumps({"error": f"Failed to emit ticket: {e}"})
|
||||
|
||||
_emit_ticket_tool = Tool(
|
||||
name="emit_escalation_ticket",
|
||||
description=(
|
||||
"Validate and publish a structured EscalationTicket to the shared EventBus. "
|
||||
"The Queen's ticket_receiver entry point will fire and triage the ticket. "
|
||||
"ticket_json must be a JSON string with all required EscalationTicket fields: "
|
||||
"worker_agent_id, worker_session_id, worker_node_id, worker_graph_id, "
|
||||
"severity (low/medium/high/critical), cause, judge_reasoning, suggested_action, "
|
||||
"recent_verdicts (list), total_steps_checked, steps_since_last_accept, "
|
||||
"stall_minutes (float or null), evidence_snippet."
|
||||
),
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"ticket_json": {
|
||||
"type": "string",
|
||||
"description": "JSON string of the complete EscalationTicket",
|
||||
},
|
||||
},
|
||||
"required": ["ticket_json"],
|
||||
},
|
||||
)
|
||||
registry.register(
|
||||
"emit_escalation_ticket",
|
||||
_emit_ticket_tool,
|
||||
lambda inputs: emit_escalation_ticket(**inputs),
|
||||
)
|
||||
tools_registered += 1
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# notify_operator
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
async def notify_operator(
|
||||
ticket_id: str,
|
||||
analysis: str,
|
||||
urgency: str,
|
||||
) -> str:
|
||||
"""Emit a QUEEN_INTERVENTION_REQUESTED event to notify the human operator.
|
||||
|
||||
The TUI subscribes to this event and surfaces a non-disruptive dismissable
|
||||
overlay. The worker agent is NOT paused. The operator can choose to open
|
||||
a split-pane conversation with the Queen about the issue.
|
||||
|
||||
Args:
|
||||
ticket_id: The ticket_id from the original EscalationTicket.
|
||||
analysis: 2-3 sentence description of what is wrong, why it matters,
|
||||
and what action is suggested.
|
||||
urgency: Severity level: "low", "medium", "high", or "critical".
|
||||
|
||||
Returns:
|
||||
Confirmation JSON.
|
||||
"""
|
||||
valid_urgencies = {"low", "medium", "high", "critical"}
|
||||
if urgency not in valid_urgencies:
|
||||
return json.dumps(
|
||||
{"error": f"urgency must be one of {sorted(valid_urgencies)}, got {urgency!r}"}
|
||||
)
|
||||
|
||||
try:
|
||||
await event_bus.emit_queen_intervention_requested(
|
||||
stream_id=stream_id,
|
||||
node_id="ticket_triage",
|
||||
ticket_id=ticket_id,
|
||||
analysis=analysis,
|
||||
severity=urgency,
|
||||
queen_graph_id="hive_coder_queen",
|
||||
queen_stream_id=f"hive_coder_queen::ticket_receiver",
|
||||
)
|
||||
logger.info(
|
||||
"Queen intervention requested: ticket_id=%s urgency=%s",
|
||||
ticket_id,
|
||||
urgency,
|
||||
)
|
||||
return json.dumps(
|
||||
{
|
||||
"status": "operator_notified",
|
||||
"ticket_id": ticket_id,
|
||||
"urgency": urgency,
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
return json.dumps({"error": f"Failed to notify operator: {e}"})
|
||||
|
||||
_notify_tool = Tool(
|
||||
name="notify_operator",
|
||||
description=(
|
||||
"Notify the human operator that a worker agent needs attention. "
|
||||
"This emits a QUEEN_INTERVENTION_REQUESTED event that the TUI surfaces "
|
||||
"as a non-disruptive overlay. The worker keeps running. "
|
||||
"Only call this when you (the Queen) have decided the issue warrants "
|
||||
"human attention after reading the escalation ticket."
|
||||
),
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"ticket_id": {
|
||||
"type": "string",
|
||||
"description": "The ticket_id from the EscalationTicket being triaged",
|
||||
},
|
||||
"analysis": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"2-3 sentence analysis: what is wrong, why it matters, "
|
||||
"and what action you suggest."
|
||||
),
|
||||
},
|
||||
"urgency": {
|
||||
"type": "string",
|
||||
"enum": ["low", "medium", "high", "critical"],
|
||||
"description": "Severity level for the operator notification",
|
||||
},
|
||||
},
|
||||
"required": ["ticket_id", "analysis", "urgency"],
|
||||
},
|
||||
)
|
||||
registry.register(
|
||||
"notify_operator",
|
||||
_notify_tool,
|
||||
lambda inputs: notify_operator(**inputs),
|
||||
)
|
||||
tools_registered += 1
|
||||
|
||||
return tools_registered
|
||||
@@ -215,6 +215,7 @@ class AdenTUI(App):
|
||||
Binding("ctrl+a", "show_agent_picker", "Agents", show=True, priority=True),
|
||||
Binding("ctrl+e", "escalate_to_coder", "Coder", show=True, priority=True),
|
||||
Binding("ctrl+e", "return_from_coder", "← Back", show=True, priority=True),
|
||||
Binding("ctrl+q", "connect_to_queen", "Queen", show=True, priority=True),
|
||||
Binding("tab", "focus_next", "Next Panel", show=True),
|
||||
Binding("shift+tab", "focus_previous", "Previous Panel", show=False),
|
||||
]
|
||||
@@ -239,6 +240,10 @@ class AdenTUI(App):
|
||||
# Escalation stack: stores worker state when coder is in foreground
|
||||
self._escalation_stack: list[dict] = []
|
||||
|
||||
# Health judge + queen monitoring graphs (loaded alongside worker agents)
|
||||
self._queen_graph_id: str | None = None
|
||||
self._judge_graph_id: str | None = None
|
||||
|
||||
# Widgets are created lazily when runtime is available
|
||||
self.graph_view = None
|
||||
self.chat_repl = None
|
||||
@@ -399,6 +404,10 @@ class AdenTUI(App):
|
||||
"""Complete agent setup, guardian attach, and widget mount."""
|
||||
import asyncio
|
||||
|
||||
# Reset health monitoring state from any prior load
|
||||
self._queen_graph_id = None
|
||||
self._judge_graph_id = None
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
try:
|
||||
if runner._agent_runtime is None:
|
||||
@@ -429,6 +438,117 @@ class AdenTUI(App):
|
||||
agent_name = runner.agent_path.name
|
||||
self.notify(f"Agent loaded: {agent_name}", severity="information", timeout=3)
|
||||
|
||||
# Load health judge + queen for worker agents (skip for hive_coder itself)
|
||||
if agent_name != "hive_coder":
|
||||
await self._load_judge_and_queen(runner._storage_path)
|
||||
|
||||
async def _load_judge_and_queen(self, storage_path) -> None:
|
||||
"""Load the health judge and queen's ticket_triage as secondary monitoring graphs.
|
||||
|
||||
Both are added to the current worker runtime so they share its EventBus.
|
||||
The health judge fires every 2 minutes via a timer; the queen fires on
|
||||
WORKER_ESCALATION_TICKET events emitted by the judge.
|
||||
"""
|
||||
from pathlib import Path
|
||||
|
||||
from framework.agents.hive_coder.nodes import ticket_triage_node
|
||||
from framework.agents.hive_coder.ticket_receiver import TICKET_RECEIVER_ENTRY_POINT
|
||||
from framework.graph import Goal
|
||||
from framework.graph.edge import GraphSpec
|
||||
from framework.monitoring import HEALTH_JUDGE_ENTRY_POINT, judge_goal, judge_graph
|
||||
from framework.runner.tool_registry import ToolRegistry
|
||||
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
|
||||
|
||||
try:
|
||||
storage_path = Path(storage_path)
|
||||
|
||||
# 1. Register monitoring tools bound to this runtime's EventBus + storage
|
||||
monitoring_registry = ToolRegistry()
|
||||
register_worker_monitoring_tools(
|
||||
monitoring_registry,
|
||||
self.runtime._event_bus,
|
||||
storage_path,
|
||||
)
|
||||
monitoring_tools = list(monitoring_registry.get_tools().values())
|
||||
monitoring_executor = monitoring_registry.get_executor()
|
||||
monitoring_tool_names = {t.name for t in monitoring_tools}
|
||||
|
||||
# Inject monitoring tools into runtime so secondary graphs can call them.
|
||||
# Secondary graph streams inherit self.runtime._tools / _tool_executor.
|
||||
original_executor = self.runtime._tool_executor
|
||||
|
||||
def merged_executor(tool_use):
|
||||
if tool_use.name in monitoring_tool_names:
|
||||
return monitoring_executor(tool_use)
|
||||
if original_executor is not None:
|
||||
return original_executor(tool_use)
|
||||
from framework.llm.provider import ToolResult
|
||||
import json as _json
|
||||
return ToolResult(
|
||||
tool_use_id=tool_use.id,
|
||||
content=_json.dumps({"error": f"Unknown tool: {tool_use.name}"}),
|
||||
is_error=True,
|
||||
)
|
||||
|
||||
self.runtime._tools = self.runtime._tools + monitoring_tools
|
||||
self.runtime._tool_executor = merged_executor
|
||||
|
||||
# 2. Load health judge as a secondary graph on the worker runtime
|
||||
await self.runtime.add_graph(
|
||||
graph_id="worker_health_judge",
|
||||
graph=judge_graph,
|
||||
goal=judge_goal,
|
||||
entry_points={"health_check": HEALTH_JUDGE_ENTRY_POINT},
|
||||
storage_subpath="graphs/worker_health_judge",
|
||||
)
|
||||
self._judge_graph_id = "worker_health_judge"
|
||||
|
||||
# 3. Build minimal queen graph (ticket_triage only, no coder node)
|
||||
queen_triage_goal = Goal(
|
||||
id="queen-ticket-triage",
|
||||
name="Queen Ticket Triage",
|
||||
description=(
|
||||
"Receive escalation tickets from the Health Judge and decide "
|
||||
"whether to notify the operator."
|
||||
),
|
||||
)
|
||||
queen_graph = GraphSpec(
|
||||
id="hive-coder-queen-triage",
|
||||
goal_id=queen_triage_goal.id,
|
||||
version="1.0.0",
|
||||
entry_node="ticket_triage",
|
||||
entry_points={"ticket_receiver": "ticket_triage"},
|
||||
terminal_nodes=[],
|
||||
pause_nodes=[],
|
||||
nodes=[ticket_triage_node],
|
||||
edges=[],
|
||||
conversation_mode="continuous",
|
||||
async_entry_points=[TICKET_RECEIVER_ENTRY_POINT],
|
||||
)
|
||||
await self.runtime.add_graph(
|
||||
graph_id="hive_coder_queen",
|
||||
graph=queen_graph,
|
||||
goal=queen_triage_goal,
|
||||
entry_points={"ticket_receiver": TICKET_RECEIVER_ENTRY_POINT},
|
||||
storage_subpath="graphs/hive_coder_queen",
|
||||
)
|
||||
self._queen_graph_id = "hive_coder_queen"
|
||||
|
||||
self.notify(
|
||||
"Health judge + queen monitoring active",
|
||||
severity="information",
|
||||
timeout=3,
|
||||
)
|
||||
except Exception as e:
|
||||
logging.getLogger("tui.judge").error(
|
||||
"Failed to load health monitoring: %s", e, exc_info=True
|
||||
)
|
||||
self.notify(
|
||||
f"Health monitoring unavailable: {e}",
|
||||
severity="warning",
|
||||
timeout=5,
|
||||
)
|
||||
|
||||
def _show_account_selection(self, runner, accounts: list[dict]) -> None:
|
||||
"""Show the account selection screen and continue loading on selection."""
|
||||
from framework.tui.screens.account_selection import AccountSelectionScreen
|
||||
@@ -851,6 +971,8 @@ class AdenTUI(App):
|
||||
EventType.EXECUTION_PAUSED,
|
||||
EventType.EXECUTION_RESUMED,
|
||||
EventType.ESCALATION_REQUESTED,
|
||||
EventType.WORKER_ESCALATION_TICKET,
|
||||
EventType.QUEEN_INTERVENTION_REQUESTED,
|
||||
]
|
||||
|
||||
_LOG_PANE_EVENTS = frozenset(_EVENT_TYPES) - {
|
||||
@@ -922,6 +1044,15 @@ class AdenTUI(App):
|
||||
severity="information",
|
||||
timeout=5,
|
||||
)
|
||||
elif et == EventType.WORKER_ESCALATION_TICKET:
|
||||
# Judge ticket emitted from background graph — still update status bar
|
||||
ticket = event.data.get("ticket", {})
|
||||
severity = ticket.get("severity", "")
|
||||
if severity:
|
||||
self.status_bar.set_node_detail(f"judge: {severity} ticket")
|
||||
elif et == EventType.QUEEN_INTERVENTION_REQUESTED:
|
||||
# Queen intervention always surfaced to operator regardless of active graph
|
||||
self._handle_queen_intervention(event.data)
|
||||
# All other background events are silently dropped (visible in logs)
|
||||
return
|
||||
|
||||
@@ -958,6 +1089,13 @@ class AdenTUI(App):
|
||||
context=event.data.get("context", ""),
|
||||
node_id=event.node_id or "",
|
||||
)
|
||||
elif et == EventType.WORKER_ESCALATION_TICKET:
|
||||
ticket = event.data.get("ticket", {})
|
||||
severity = ticket.get("severity", "")
|
||||
if severity:
|
||||
self.status_bar.set_node_detail(f"judge: {severity} ticket")
|
||||
elif et == EventType.QUEEN_INTERVENTION_REQUESTED:
|
||||
self._handle_queen_intervention(event.data)
|
||||
elif et == EventType.NODE_LOOP_STARTED:
|
||||
self.chat_repl.handle_node_started(event.node_id or "")
|
||||
elif et == EventType.NODE_LOOP_ITERATION:
|
||||
@@ -1082,6 +1220,30 @@ class AdenTUI(App):
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
def _handle_queen_intervention(self, data: dict) -> None:
|
||||
"""Notify the operator of a queen escalation — non-disruptively.
|
||||
|
||||
The worker keeps running. The operator can dismiss or switch to the
|
||||
queen's graph view via Ctrl+Q.
|
||||
"""
|
||||
severity = data.get("severity", "unknown")
|
||||
analysis = data.get("analysis", "(no analysis)")
|
||||
|
||||
severity_markup = {
|
||||
"low": "[dim]low[/dim]",
|
||||
"medium": "[yellow]medium[/yellow]",
|
||||
"high": "[bold red]high[/bold red]",
|
||||
"critical": "[bold red]CRITICAL[/bold red]",
|
||||
}
|
||||
sev_label = severity_markup.get(severity, severity)
|
||||
|
||||
msg = f"Queen escalation ({sev_label}): {analysis}"
|
||||
if self._queen_graph_id:
|
||||
msg += "\nPress [bold]Ctrl+Q[/bold] to chat with queen."
|
||||
|
||||
textual_severity = "error" if severity in ("high", "critical") else "warning"
|
||||
self.notify(msg, severity=textual_severity, timeout=30)
|
||||
|
||||
# -- Actions --
|
||||
|
||||
def action_switch_graph(self, graph_id: str) -> None:
|
||||
@@ -1244,13 +1406,23 @@ class AdenTUI(App):
|
||||
Both escalate_to_coder and return_from_coder are bound to Ctrl+E.
|
||||
check_action toggles which one is active based on escalation state,
|
||||
so the footer shows "Coder" or "← Back" accordingly.
|
||||
connect_to_queen is only shown when a queen monitoring graph is active.
|
||||
"""
|
||||
if action == "escalate_to_coder":
|
||||
return not self._escalation_stack
|
||||
if action == "return_from_coder":
|
||||
return bool(self._escalation_stack)
|
||||
if action == "connect_to_queen":
|
||||
return bool(self._queen_graph_id and self.runtime is not None)
|
||||
return True
|
||||
|
||||
def action_connect_to_queen(self) -> None:
|
||||
"""Switch to the queen's ticket-triage graph view (Ctrl+Q)."""
|
||||
if not self._queen_graph_id:
|
||||
self.notify("No queen monitoring active", severity="warning", timeout=3)
|
||||
return
|
||||
self.action_switch_graph(self._queen_graph_id)
|
||||
|
||||
def action_escalate_to_coder(self) -> None:
|
||||
"""Escalate to Hive Coder (bound to Ctrl+E)."""
|
||||
if self.runtime is None:
|
||||
|
||||
@@ -0,0 +1,170 @@
|
||||
# Worker Escalation Design: Judge → Queen → Operator
|
||||
|
||||
## Problem
|
||||
|
||||
The previous guardian-subgraph approach had two failure modes:
|
||||
|
||||
1. **Never fires** — worker agents ran in a separate TUI context with their own EventBus. The guardian attached to hive_coder's runtime never saw the worker's events.
|
||||
2. **Too trigger-happy** — when it did fire (e.g. when hive_coder's own execution stalled waiting for user input), it reacted to normal wait states as if they were failures. `exclude_own_graph: False` meant the guardian also fired on hive_coder's own events.
|
||||
|
||||
The root cause: reactive event-based monitoring (fire on hard failures) cannot distinguish between "agent is genuinely broken" and "agent is momentarily blocked on user input."
|
||||
|
||||
## New Design
|
||||
|
||||
Three agents share **one runtime and one EventBus** when a worker is started:
|
||||
|
||||
```
|
||||
One Runtime (shared EventBus)
|
||||
│
|
||||
├── Worker Graph (primary)
|
||||
│ └── EventLoopNode — runs the actual work
|
||||
│ ↑ logs per step to sessions/{id}/logs/tool_logs.jsonl
|
||||
│
|
||||
├── Health Judge Graph (secondary, framework-level reusable)
|
||||
│ └── Entry point: timer every 2 min → judge_node (event_loop)
|
||||
│ ├── Reads worker session logs via get_worker_health_summary()
|
||||
│ ├── Accumulates observations in continuous conversation context
|
||||
│ ├── Detects: too many RETRYs, stall (no new log steps), doom loops
|
||||
│ └── If problematic: calls emit_escalation_ticket tool
|
||||
│ → emits WORKER_ESCALATION_TICKET on shared EventBus
|
||||
│
|
||||
└── Queen Graph (secondary, hive_coder ticket_triage)
|
||||
└── Entry point: event-driven on WORKER_ESCALATION_TICKET
|
||||
├── ticket_triage_node receives EscalationTicket in memory
|
||||
├── LLM decides: dismiss or intervene
|
||||
└── If intervene: calls notify_operator tool
|
||||
→ emits QUEEN_INTERVENTION_REQUESTED on shared EventBus
|
||||
|
||||
TUI
|
||||
├── Subscribes to QUEEN_INTERVENTION_REQUESTED
|
||||
├── Shows non-disruptive dismissable overlay (worker keeps running)
|
||||
└── "Chat with Queen" → split-pane view, queen is interactive
|
||||
```
|
||||
|
||||
## Key Design Decisions
|
||||
|
||||
### Judge as event_loop node agent (not inline code)
|
||||
|
||||
The health monitor is a proper `event_loop` NodeSpec — same as every other node in the system. This means:
|
||||
- Its reasoning is transparent (conversation is observable)
|
||||
- It can deliberate over patterns, not just threshold checks
|
||||
- It's tunable via its system prompt
|
||||
- It accumulates state across timer ticks via continuous conversation mode
|
||||
|
||||
The conversation IS the judge's memory. No shared memory keys, no external state. The judge reads its own prior messages to track trends: "Last check I saw 47 steps; now I see 52; 5 new steps, 3 RETRY."
|
||||
|
||||
### Judge conversation: isolated + continuous
|
||||
|
||||
- `isolation_level="isolated"` — judge has its own memory namespace, not polluting the worker's shared memory
|
||||
- `conversation_mode="continuous"` — conversation persists between timer ticks in `graphs/worker_health_judge/conversations/judge/`
|
||||
|
||||
### Structured escalation ticket (not a string)
|
||||
|
||||
The judge must fill out a rigorous `EscalationTicket` before escalating. This prevents impulsive escalations. Required fields:
|
||||
|
||||
```
|
||||
ticket_id, created_at
|
||||
worker_agent_id, worker_session_id, worker_node_id, worker_graph_id
|
||||
severity: "low"|"medium"|"high"|"critical"
|
||||
cause: str # what the judge observed
|
||||
judge_reasoning: str # why it decided to escalate
|
||||
suggested_action: str # restart? human review? kill session?
|
||||
recent_verdicts: list[str] # last N judge verdicts
|
||||
total_steps_checked: int
|
||||
steps_since_last_accept: int
|
||||
stall_minutes: float|None # wall-clock since last new log step
|
||||
evidence_snippet: str # recent LLM output excerpt
|
||||
```
|
||||
|
||||
### Queen as deliberating intermediary
|
||||
|
||||
The queen (hive_coder's `ticket_triage_node`) receives every ticket and must decide: **dismiss or notify the operator**. This adds a second quality gate — the judge might be overly sensitive, and the queen provides a second opinion.
|
||||
|
||||
The queen only notifies the operator if the ticket meets intervention criteria. Conservative by design: one unnecessary alert is less costly than alert fatigue.
|
||||
|
||||
### Non-disruptive operator notification
|
||||
|
||||
`QUEEN_INTERVENTION_REQUESTED` does NOT tear down the worker or force a context switch. The TUI shows a dismissable overlay. The worker keeps running. The operator can choose to connect to the queen's active session for a conversation about the issue.
|
||||
|
||||
## New Event Types
|
||||
|
||||
| Event | Emitted by | Received by | Purpose |
|
||||
|-------|-----------|-------------|---------|
|
||||
| `WORKER_ESCALATION_TICKET` | Health Judge | Queen (event-driven entry point) | Structured ticket delivery |
|
||||
| `QUEEN_INTERVENTION_REQUESTED` | Queen (notify_operator tool) | TUI | Human operator notification |
|
||||
|
||||
## New Files
|
||||
|
||||
| File | Purpose |
|
||||
|------|---------|
|
||||
| `core/framework/runtime/escalation_ticket.py` | `EscalationTicket` Pydantic schema |
|
||||
| `core/framework/monitoring/__init__.py` | New framework monitoring package |
|
||||
| `core/framework/monitoring/worker_health_judge.py` | Health judge graph, node, entry point, goal |
|
||||
| `core/framework/agents/hive_coder/ticket_receiver.py` | Queen's `TICKET_RECEIVER_ENTRY_POINT` |
|
||||
| `core/framework/tools/worker_monitoring_tools.py` | `get_worker_health_summary`, `emit_escalation_ticket`, `notify_operator`; `register_worker_monitoring_tools()` |
|
||||
|
||||
## Modified Files
|
||||
|
||||
| File | Changes |
|
||||
|------|---------|
|
||||
| `core/framework/runtime/event_bus.py` | +2 EventTypes, +2 emit methods |
|
||||
| `core/framework/agents/hive_coder/nodes/__init__.py` | +`ticket_triage_node`, +`ALL_QUEEN_TRIAGE_TOOLS` |
|
||||
| `core/framework/agents/hive_coder/agent.py` | +`ticket_triage_node` to nodes, +`TICKET_RECEIVER_ENTRY_POINT` to async_entry_points |
|
||||
| `core/framework/tui/app.py` | `_load_judge_and_queen()`, `QUEEN_INTERVENTION_REQUESTED` handler, split-pane view |
|
||||
|
||||
## NOT Modified
|
||||
|
||||
- `core/framework/graph/event_loop_node.py` — no inline health check code; monitoring is fully external
|
||||
- `core/framework/graph/conversation_judge.py` — existing per-iteration quality judge unchanged
|
||||
- `core/framework/agents/hive_coder/guardian.py` — kept; user-triggered Ctrl+E escalation path still works
|
||||
|
||||
## Known Gaps
|
||||
|
||||
**Gap 1 — Queen tool starvation.**
|
||||
The queen's `ticket_triage_node` only has `notify_operator`. If the operator connects and wants the queen to actually fix the worker's code, she'd need file I/O + shell tools. Deferred to a follow-up: dynamically registering the full hive-tools MCP when operator connects.
|
||||
|
||||
**Gap 2 — LLM-hang detection latency.**
|
||||
If the worker's LLM call hangs (never returns), no new log entries are written and no iterations complete. The judge detects this on its next timer tick (≤2 min later). Not zero-latency, but bounded.
|
||||
|
||||
**Gap 3 — Worker session_id in judge's context.**
|
||||
The judge needs the worker's agent_id + session_id to query logs. The timer entry fires with empty input_data. Solution: TUI passes this via the initial session state when loading the judge graph. The worker also writes its session_id to a well-known shared memory key early in execution. Gap: this write doesn't happen automatically today.
|
||||
|
||||
**Gap 4 — Dual monitoring channels coexist.**
|
||||
The old guardian still exists. The new health judge + queen is the primary path. Both can run simultaneously if old code isn't cleaned up. Not a correctness issue, but noisy.
|
||||
|
||||
**Gap 5 — Single shared EventBus requirement.**
|
||||
All three agents (worker, judge, queen) share one EventBus only when loaded in the same runtime via `add_graph`. A separately-started hive_coder session in another TUI window is NOT connected.
|
||||
|
||||
## Data Flow
|
||||
|
||||
```
|
||||
Worker iterates → logs ACCEPT/RETRY/CONTINUE to tool_logs.jsonl
|
||||
|
||||
Every 2 minutes:
|
||||
Health Judge wakes (timer)
|
||||
→ calls get_worker_health_summary(agent_id, session_id)
|
||||
→ reads recent verdicts, step counts, timing
|
||||
→ compares to prior check (from conversation history)
|
||||
→ if healthy: set_output("health_verdict", "healthy"), done
|
||||
→ if degraded:
|
||||
builds EscalationTicket JSON
|
||||
calls emit_escalation_ticket(ticket_json)
|
||||
→ WORKER_ESCALATION_TICKET published to shared EventBus
|
||||
|
||||
Queen's ticket_receiver fires:
|
||||
→ ticket_triage_node receives ticket in memory key "ticket"
|
||||
→ LLM reads ticket, applies dismiss/intervene criteria
|
||||
→ if dismiss: set_output("intervention_decision", "dismissed: ...")
|
||||
→ if intervene:
|
||||
calls notify_operator(ticket_id, analysis, urgency)
|
||||
→ QUEEN_INTERVENTION_REQUESTED published to shared EventBus
|
||||
|
||||
TUI receives QUEEN_INTERVENTION_REQUESTED:
|
||||
→ shows dismissable overlay (worker is NOT paused)
|
||||
→ operator chooses:
|
||||
"Dismiss" → overlay gone, worker continues
|
||||
"Chat with Queen" → split-pane view appears
|
||||
worker stream on left, queen conversation on right
|
||||
operator can type to queen
|
||||
queen can further analyze and provide guidance
|
||||
```
|
||||
Reference in New Issue
Block a user