Merge remote-tracking branch 'origin/feat/queen-responsibility' into feat/queen-responsibility

This commit is contained in:
Richard Tang
2026-03-05 19:03:27 -08:00
3 changed files with 703 additions and 218 deletions
@@ -199,8 +199,8 @@ After writing agent code, validate structurally AND run tests:
## Debugging Built Agents
When a user says "my agent is failing" or "debug this agent":
1. list_agent_sessions("{agent_name}") find the session
2. get_worker_status
4. list_agent_checkpoints / get_agent_checkpoint trace execution
2. get_worker_status(focus="issues") check for problems
3. list_agent_checkpoints / get_agent_checkpoint trace execution
# Agent Building Workflow
@@ -584,7 +584,7 @@ _queen_tools_staging = """
The agent is loaded and ready to run. You can inspect it and launch it:
- Read-only: read_file, list_directory, search_files, run_command
- list_credentials(credential_id?) Verify credentials are configured
- get_worker_status() Check the loaded worker
- get_worker_status(focus?) Brief status. Drill in with focus: memory, tools, issues, progress
- run_agent_with_input(task) Start the worker and switch to RUNNING phase
- stop_worker_and_edit() Go back to BUILDING phase
@@ -597,7 +597,7 @@ _queen_tools_running = """
The worker is running. You have monitoring and lifecycle tools:
- Read-only: read_file, list_directory, search_files, run_command
- get_worker_status() Check worker status (idle, running, waiting)
- get_worker_status(focus?) Brief status. Drill in: activity, memory, tools, issues, progress
- inject_worker_message(content) Send a message to the running worker
- get_worker_health_summary() Read the latest health data
- notify_operator(ticket_id, analysis, urgency) Alert the user (use sparingly)
@@ -763,13 +763,14 @@ You wake up when:
- An escalation ticket arrives from the judge
- The worker finishes
If the user asks for progress, call get_worker_status() ONCE and report.
If the user asks for progress, call get_worker_status() ONCE and report. \
If the summary mentions issues, follow up with get_worker_status(focus="issues").
## Handling worker escalations
When a worker escalation arrives:
1. Read reason/context from the escalation message.
2. Call get_worker_status() if you need extra details.
2. Call get_worker_status(focus="issues") or get_worker_status(focus="activity") for details.
3. Decide the next action:
- Quick unblock guidance inject_worker_message(...)
- Requires worker code/graph changes stop_worker_and_edit()
+57 -11
View File
@@ -165,6 +165,7 @@ class LoopConfig:
max_tool_calls_per_turn: int = 30
judge_every_n_turns: int = 1
stall_detection_threshold: int = 3
stall_similarity_threshold: float = 0.7
max_history_tokens: int = 32_000
store_prefix: str = ""
@@ -978,8 +979,8 @@ class EventLoopNode(NodeProtocol):
return NodeResult(
success=False,
error=(
f"Node stalled: {self._config.stall_detection_threshold} "
"consecutive identical responses"
f"Node stalled: {self._config.stall_detection_threshold} similar "
f"responses ({self._config.stall_similarity_threshold*100:.0f}+ threshold)"
),
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
@@ -2850,13 +2851,46 @@ class EventLoopNode(NodeProtocol):
skip = set(nullable_keys) if nullable_keys else set()
return [k for k in output_keys if k not in skip and accumulator.get(k) is None]
@staticmethod
def _ngram_similarity(s1: str, s2: str, n: int = 2) -> float:
"""Jaccard similarity of n-gram sets.
Returns 0.0-1.0, where 1.0 is exact match.
Fast: O(len(s) + len(s2)) using set operations.
"""
def _ngrams(s: str) -> set[str]:
return {s[i:i+n] for i in range(len(s) - n + 1) if s.strip()}
if not s1 or not s2:
return 0.0
ngrams1, ngrams2 = _ngrams(s1.lower()), _ngrams(s2.lower())
if not ngrams1 or not ngrams2:
return 0.0
intersection = len(ngrams1 & ngrams2)
union = len(ngrams1 | ngrams2)
return intersection / union if union else 0.0
def _is_stalled(self, recent_responses: list[str]) -> bool:
"""Detect stall: N consecutive identical non-empty responses."""
"""Detect stall using n-gram similarity.
Detects when N consecutive responses have similarity >= threshold.
This catches phrases like "I'm still stuck" vs "I'm stuck".
"""
if len(recent_responses) < self._config.stall_detection_threshold:
return False
if not recent_responses[0]:
return False
return all(r == recent_responses[0] for r in recent_responses)
threshold = self._config.stall_similarity_threshold
# Check similarity against all recent responses (excluding self)
for i, resp in enumerate(recent_responses):
# Compare against all previous responses
for prev in recent_responses[:i]:
if self._ngram_similarity(resp, prev) >= threshold:
return True
return False
@staticmethod
def _is_transient_error(exc: BaseException) -> bool:
@@ -2935,7 +2969,10 @@ class EventLoopNode(NodeProtocol):
self,
recent_tool_fingerprints: list[list[tuple[str, str]]],
) -> tuple[bool, str]:
"""Detect doom loop: N consecutive turns with identical tool calls.
"""Detect doom loop using n-gram similarity on tool inputs.
Detects when N consecutive turns have similar tool calls.
Similarity applies to the canonicalized tool input strings.
Returns (is_doom_loop, description).
"""
@@ -2944,15 +2981,24 @@ class EventLoopNode(NodeProtocol):
threshold = self._config.tool_doom_loop_threshold
if len(recent_tool_fingerprints) < threshold:
return False, ""
# All entries must be non-empty and identical
first = recent_tool_fingerprints[0]
if not first:
return False, ""
if all(fp == first for fp in recent_tool_fingerprints):
tool_names = [name for name, _ in first]
# Check similarity against all recent fingerprints
similarity_threshold = self._config.stall_similarity_threshold
similar_count = sum(
1
for fp in recent_tool_fingerprints
# Compare canonicalized tool input strings using n-gram similarity
if self._ngram_similarity(fp[1], first[1]) >= similarity_threshold
)
if similar_count >= threshold:
tool_names = [name for name, _ in recent_tool_fingerprints]
desc = (
f"Doom loop detected: {threshold} consecutive identical "
f"tool calls ({', '.join(tool_names)})"
f"Doom loop detected: {similar_count}/{len(recent_tool_fingerprints)} "
f"consecutive similar tool calls ({', '.join(tool_names)})"
)
return True, desc
return False, ""
@@ -3883,7 +3929,7 @@ class EventLoopNode(NodeProtocol):
await self._event_bus.emit_node_stalled(
stream_id=stream_id,
node_id=node_id,
reason="Consecutive identical responses detected",
reason="Consecutive similar responses detected",
execution_id=execution_id,
)
+639 -201
View File
@@ -555,58 +555,72 @@ def register_queen_lifecycle_tools(
"""Get the session's event bus for querying history."""
return getattr(session, "event_bus", None)
_status_last_called: dict[str, float] = {} # {"ts": monotonic time}
_STATUS_COOLDOWN = 30.0 # seconds between full status checks
# Tiered cooldowns: summary is free, detail has short cooldown, full keeps 30s
_COOLDOWN_FULL = 30.0
_COOLDOWN_DETAIL = 10.0
_status_last_called: dict[str, float] = {} # tier -> monotonic time
async def get_worker_status(last_n: int = 20) -> str:
"""Comprehensive worker status: state, execution details, and recent activity.
def _format_elapsed(seconds: float) -> str:
"""Format seconds as human-readable duration."""
s = int(seconds)
if s < 60:
return f"{s}s"
m, rem = divmod(s, 60)
if m < 60:
return f"{m}m {rem}s"
h, m = divmod(m, 60)
return f"{h}h {m}m"
Returns everything the queen needs in a single call:
- Identity and high-level state (idle / running / waiting_for_input)
- Active execution details (elapsed time, current node, iteration)
- Running tool calls (started but not yet completed)
- Recent completed tool calls (name, success/error)
- Node transitions (execution path)
- Retries, stalls, and constraint violations
- Goal progress and token consumption
def _format_time_ago(ts) -> str:
"""Format a datetime as relative time ago."""
from datetime import datetime, timezone
Args:
last_n: Number of recent events to include per category (default 20).
now = datetime.now(timezone.utc)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
delta = (now - ts).total_seconds()
if delta < 60:
return f"{int(delta)}s ago"
if delta < 3600:
return f"{int(delta / 60)}m ago"
return f"{int(delta / 3600)}h ago"
def _preview_value(value: Any, max_len: int = 120) -> str:
"""Format a memory value for display, truncating if needed."""
if value is None:
return "null (not yet set)"
if isinstance(value, list):
preview = str(value)[:max_len]
return f"[{len(value)} items] {preview}"
if isinstance(value, dict):
preview = str(value)[:max_len]
return f"{{{len(value)} keys}} {preview}"
s = str(value)
if len(s) > max_len:
return s[:max_len] + "..."
return s
def _build_preamble(
runtime: "AgentRuntime",
) -> dict[str, Any]:
"""Build the lightweight preamble: status, node, elapsed, iteration.
Always cheap to compute. Returns a dict with:
- status: idle / running / waiting_for_input
- current_node, current_iteration, elapsed_seconds (when applicable)
- pending_question (when waiting)
- _active_execs (internal, stripped before return)
"""
import time as _time
now = _time.monotonic()
last = _status_last_called.get("ts", 0.0)
if now - last < _STATUS_COOLDOWN:
remaining = int(_STATUS_COOLDOWN - (now - last))
return json.dumps(
{
"status": "cooldown",
"message": (
f"Status was checked {int(now - last)}s ago. "
f"Wait {remaining}s before checking again. "
"Do NOT call this tool in a loop — wait for user input instead."
),
}
)
_status_last_called["ts"] = now
runtime = _get_runtime()
if runtime is None:
return json.dumps({"status": "not_loaded", "message": "No worker loaded."})
from datetime import datetime
graph_id = runtime.graph_id
goal = runtime.goal
reg = runtime.get_graph_registration(graph_id)
if reg is None:
return json.dumps({"status": "not_loaded"})
return {"status": "not_loaded"}
result: dict[str, Any] = {
"worker_graph_id": graph_id,
"worker_goal": getattr(goal, "name", graph_id),
}
preamble: dict[str, Any] = {}
# --- Execution state ---
# Execution state
active_execs = []
for ep_id, stream in reg.streams.items():
for exec_id in stream.active_execution_ids:
@@ -616,214 +630,638 @@ def register_queen_lifecycle_tools(
}
ctx = stream.get_context(exec_id)
if ctx:
from datetime import datetime
elapsed = (datetime.now() - ctx.started_at).total_seconds()
exec_info["elapsed_seconds"] = round(elapsed, 1)
exec_info["exec_status"] = ctx.status
active_execs.append(exec_info)
preamble["_active_execs"] = active_execs
if not active_execs:
result["status"] = "idle"
result["message"] = "Worker has no active executions."
preamble["status"] = "idle"
else:
waiting_nodes = []
for _ep_id, stream in reg.streams.items():
waiting_nodes.extend(stream.get_waiting_nodes())
preamble["status"] = "waiting_for_input" if waiting_nodes else "running"
if active_execs:
preamble["elapsed_seconds"] = active_execs[0].get("elapsed_seconds", 0)
result["status"] = "waiting_for_input" if waiting_nodes else "running"
result["active_executions"] = active_execs
if waiting_nodes:
result["waiting_node_id"] = waiting_nodes[0]["node_id"]
result["agent_idle_seconds"] = round(runtime.agent_idle_seconds, 1)
# --- EventBus enrichment ---
# Enrich with EventBus basics (cheap limit=1 queries)
bus = _get_event_bus()
if not bus:
return json.dumps(result)
try:
# Pending user question (from ask_user tool)
if result.get("status") == "waiting_for_input":
input_events = bus.get_history(event_type=EventType.CLIENT_INPUT_REQUESTED, limit=1)
if bus:
if preamble["status"] == "waiting_for_input":
input_events = bus.get_history(
event_type=EventType.CLIENT_INPUT_REQUESTED, limit=1
)
if input_events:
prompt = input_events[0].data.get("prompt", "")
if prompt:
result["pending_question"] = prompt
# Current node
preamble["pending_question"] = prompt[:200]
edge_events = bus.get_history(event_type=EventType.EDGE_TRAVERSED, limit=1)
if edge_events:
target = edge_events[0].data.get("target_node")
if target:
result["current_node"] = target
preamble["current_node"] = target
# Current iteration
iter_events = bus.get_history(event_type=EventType.NODE_LOOP_ITERATION, limit=1)
if iter_events:
result["current_iteration"] = iter_events[0].data.get("iteration")
preamble["current_iteration"] = iter_events[0].data.get("iteration")
# Running tool calls (started but not yet completed)
tool_started = bus.get_history(event_type=EventType.TOOL_CALL_STARTED, limit=last_n * 2)
tool_completed = bus.get_history(
event_type=EventType.TOOL_CALL_COMPLETED, limit=last_n * 2
return preamble
def _detect_red_flags(bus: "EventBus") -> int:
"""Count issue categories with cheap limit=1 queries."""
count = 0
for evt_type in (
EventType.NODE_STALLED,
EventType.NODE_TOOL_DOOM_LOOP,
EventType.CONSTRAINT_VIOLATION,
):
if bus.get_history(event_type=evt_type, limit=1):
count += 1
return count
def _format_summary(preamble: dict[str, Any], red_flags: int) -> str:
"""Generate a 1-2 sentence prose summary from the preamble."""
status = preamble["status"]
if status == "idle":
return "Worker is idle. No active executions."
if status == "not_loaded":
return "No worker loaded."
if status == "waiting_for_input":
q = preamble.get("pending_question", "")
if q:
return f'Worker is waiting for input: "{q}"'
return "Worker is waiting for input."
# Running
parts = []
elapsed = preamble.get("elapsed_seconds", 0)
parts.append(f"Worker is running ({_format_elapsed(elapsed)})")
node = preamble.get("current_node")
iteration = preamble.get("current_iteration")
if node:
node_part = f"Currently in {node}"
if iteration is not None:
node_part += f", iteration {iteration}"
parts.append(node_part)
if red_flags:
parts.append(
f"{red_flags} issue type(s) detected — use focus='issues' for details"
)
completed_ids = {
evt.data.get("tool_use_id") for evt in tool_completed if evt.data.get("tool_use_id")
}
running = [
evt
for evt in tool_started
if evt.data.get("tool_use_id") and evt.data.get("tool_use_id") not in completed_ids
else:
parts.append("No issues detected")
return ". ".join(parts) + "."
def _format_activity(bus: "EventBus", preamble: dict[str, Any], last_n: int) -> str:
"""Format current activity: node, iteration, transitions, LLM output."""
lines = []
node = preamble.get("current_node", "unknown")
iteration = preamble.get("current_iteration")
elapsed = preamble.get("elapsed_seconds", 0)
node_desc = f"Current node: {node}"
if iteration is not None:
node_desc += f" (iteration {iteration}, {_format_elapsed(elapsed)} elapsed)"
else:
node_desc += f" ({_format_elapsed(elapsed)} elapsed)"
lines.append(node_desc)
# Latest LLM output snippet
text_events = bus.get_history(event_type=EventType.LLM_TEXT_DELTA, limit=1)
if text_events:
snapshot = text_events[0].data.get("snapshot", "") or ""
snippet = snapshot[-300:].strip()
if snippet:
# Show last meaningful chunk
lines.append(f'Last LLM output: "{snippet}"')
# Recent node transitions
edges = bus.get_history(event_type=EventType.EDGE_TRAVERSED, limit=last_n)
if edges:
lines.append("")
lines.append("Recent transitions:")
for evt in edges:
src = evt.data.get("source_node", "?")
tgt = evt.data.get("target_node", "?")
cond = evt.data.get("edge_condition", "")
ago = _format_time_ago(evt.timestamp)
lines.append(f" {src} -> {tgt} ({cond}, {ago})")
return "\n".join(lines)
async def _format_memory(runtime: "AgentRuntime") -> str:
"""Format the worker's shared memory snapshot and recent changes."""
from framework.runtime.shared_state import IsolationLevel
lines = []
active_streams = runtime.get_active_streams()
if not active_streams:
return "Worker has no active executions. No memory to inspect."
# Read memory from the first active execution
stream_info = active_streams[0]
exec_ids = stream_info.get("active_execution_ids", [])
stream_id = stream_info.get("stream_id", "")
if not exec_ids:
return "No active execution found."
exec_id = exec_ids[0]
memory = runtime.state_manager.create_memory(
exec_id, stream_id, IsolationLevel.SHARED
)
state = await memory.read_all()
if not state:
lines.append("Worker's shared memory is empty.")
else:
lines.append(f"Worker's shared memory ({len(state)} keys):")
for key, value in state.items():
lines.append(f" {key}: {_preview_value(value)}")
# Recent state changes
changes = runtime.state_manager.get_recent_changes(limit=5)
if changes:
lines.append("")
lines.append(f"Recent changes (last {len(changes)}):")
for change in reversed(changes): # most recent first
from datetime import datetime, timezone
ago = _format_time_ago(
datetime.fromtimestamp(change.timestamp, tz=timezone.utc)
)
if change.old_value is None:
lines.append(f" {change.key} set ({ago})")
else:
old_preview = _preview_value(change.old_value, 40)
new_preview = _preview_value(change.new_value, 40)
lines.append(f" {change.key}: {old_preview} -> {new_preview} ({ago})")
return "\n".join(lines)
def _format_tools(bus: "EventBus", last_n: int) -> str:
"""Format running and recent tool calls."""
lines = []
# Running tools (started but not yet completed)
tool_started = bus.get_history(
event_type=EventType.TOOL_CALL_STARTED, limit=last_n * 2
)
tool_completed = bus.get_history(
event_type=EventType.TOOL_CALL_COMPLETED, limit=last_n * 2
)
completed_ids = {
evt.data.get("tool_use_id")
for evt in tool_completed
if evt.data.get("tool_use_id")
}
running = [
evt
for evt in tool_started
if evt.data.get("tool_use_id")
and evt.data.get("tool_use_id") not in completed_ids
]
if running:
names = [evt.data.get("tool_name", "?") for evt in running]
lines.append(f"{len(running)} tool(s) running: {', '.join(names)}.")
for evt in running:
name = evt.data.get("tool_name", "?")
node = evt.node_id or "?"
ago = _format_time_ago(evt.timestamp)
inp = str(evt.data.get("tool_input", ""))[:150]
lines.append(f" {name} ({node}, started {ago})")
if inp:
lines.append(f" Input: {inp}")
else:
lines.append("No tools currently running.")
# Recent completed calls
if tool_completed:
lines.append("")
lines.append(f"Recent calls (last {min(last_n, len(tool_completed))}):")
for evt in tool_completed[:last_n]:
name = evt.data.get("tool_name", "?")
node = evt.node_id or "?"
is_error = bool(evt.data.get("is_error"))
status = "error" if is_error else "ok"
duration = evt.data.get("duration_s")
dur_str = f", {duration:.1f}s" if duration else ""
lines.append(f" {name} ({node}) — {status}{dur_str}")
else:
lines.append("No recent tool calls.")
return "\n".join(lines)
def _format_issues(bus: "EventBus") -> str:
"""Format retries, stalls, doom loops, and constraint violations."""
lines = []
total = 0
# Retries
retries = bus.get_history(event_type=EventType.NODE_RETRY, limit=20)
if retries:
total += len(retries)
lines.append(f"{len(retries)} retry event(s):")
for evt in retries[:5]:
node = evt.node_id or "?"
count = evt.data.get("retry_count", "?")
error = evt.data.get("error", "")[:120]
ago = _format_time_ago(evt.timestamp)
lines.append(f" {node} (attempt {count}, {ago}): {error}")
# Stalls
stalls = bus.get_history(event_type=EventType.NODE_STALLED, limit=5)
if stalls:
total += len(stalls)
lines.append(f"{len(stalls)} stall(s):")
for evt in stalls:
node = evt.node_id or "?"
reason = evt.data.get("reason", "")[:150]
ago = _format_time_ago(evt.timestamp)
lines.append(f" {node} ({ago}): {reason}")
# Doom loops
doom_loops = bus.get_history(event_type=EventType.NODE_TOOL_DOOM_LOOP, limit=5)
if doom_loops:
total += len(doom_loops)
lines.append(f"{len(doom_loops)} tool doom loop(s):")
for evt in doom_loops:
node = evt.node_id or "?"
desc = evt.data.get("description", "")[:150]
ago = _format_time_ago(evt.timestamp)
lines.append(f" {node} ({ago}): {desc}")
# Constraint violations
violations = bus.get_history(event_type=EventType.CONSTRAINT_VIOLATION, limit=5)
if violations:
total += len(violations)
lines.append(f"{len(violations)} constraint violation(s):")
for evt in violations:
cid = evt.data.get("constraint_id", "?")
desc = evt.data.get("description", "")[:150]
ago = _format_time_ago(evt.timestamp)
lines.append(f" {cid} ({ago}): {desc}")
if total == 0:
return "No issues detected. No retries, stalls, or constraint violations."
header = f"{total} issue(s) detected."
return header + "\n\n" + "\n".join(lines)
async def _format_progress(runtime: "AgentRuntime", bus: "EventBus") -> str:
"""Format goal progress, token consumption, and execution outcomes."""
lines = []
# Goal progress
try:
progress = await runtime.get_goal_progress()
if progress:
criteria = progress.get("criteria_status", {})
if criteria:
met = sum(1 for c in criteria.values() if c.get("met"))
total_c = len(criteria)
lines.append(f"Goal: {met}/{total_c} criteria met.")
for cid, cdata in criteria.items():
marker = "met" if cdata.get("met") else "not met"
desc = cdata.get("description", cid)
evidence = cdata.get("evidence", [])
ev_str = f"{evidence[0]}" if evidence else ""
lines.append(f" [{marker}] {desc}{ev_str}")
rec = progress.get("recommendation")
if rec:
lines.append(f"Recommendation: {rec}.")
except Exception:
lines.append("Goal progress unavailable.")
# Token summary
llm_events = bus.get_history(event_type=EventType.LLM_TURN_COMPLETE, limit=200)
if llm_events:
total_in = sum(evt.data.get("input_tokens", 0) or 0 for evt in llm_events)
total_out = sum(evt.data.get("output_tokens", 0) or 0 for evt in llm_events)
total_tok = total_in + total_out
lines.append("")
lines.append(
f"Tokens: {len(llm_events)} LLM turns, "
f"{total_tok:,} total ({total_in:,} in + {total_out:,} out)."
)
# Execution outcomes
exec_completed = bus.get_history(event_type=EventType.EXECUTION_COMPLETED, limit=5)
exec_failed = bus.get_history(event_type=EventType.EXECUTION_FAILED, limit=5)
completed_n = len(exec_completed)
failed_n = len(exec_failed)
active_n = len(runtime.get_active_streams())
lines.append(
f"Executions: {completed_n} completed, {failed_n} failed"
+ (f" ({active_n} active)." if active_n else ".")
)
if exec_failed:
for evt in exec_failed[:3]:
error = evt.data.get("error", "")[:150]
ago = _format_time_ago(evt.timestamp)
lines.append(f" Failed ({ago}): {error}")
return "\n".join(lines)
def _build_full_json(
runtime: "AgentRuntime",
bus: "EventBus",
preamble: dict[str, Any],
last_n: int,
) -> dict[str, Any]:
"""Build the legacy full JSON response (backward compat for focus='full')."""
from datetime import datetime
graph_id = runtime.graph_id
goal = runtime.goal
result: dict[str, Any] = {
"worker_graph_id": graph_id,
"worker_goal": getattr(goal, "name", graph_id),
"status": preamble["status"],
}
active_execs = preamble.get("_active_execs", [])
if active_execs:
result["active_executions"] = active_execs
if preamble.get("pending_question"):
result["pending_question"] = preamble["pending_question"]
result["agent_idle_seconds"] = round(runtime.agent_idle_seconds, 1)
for key in ("current_node", "current_iteration"):
if key in preamble:
result[key] = preamble[key]
# Running + completed tool calls
tool_started = bus.get_history(
event_type=EventType.TOOL_CALL_STARTED, limit=last_n * 2
)
tool_completed = bus.get_history(
event_type=EventType.TOOL_CALL_COMPLETED, limit=last_n * 2
)
completed_ids = {
evt.data.get("tool_use_id")
for evt in tool_completed
if evt.data.get("tool_use_id")
}
running = [
evt
for evt in tool_started
if evt.data.get("tool_use_id")
and evt.data.get("tool_use_id") not in completed_ids
]
if running:
result["running_tools"] = [
{
"tool": evt.data.get("tool_name"),
"node": evt.node_id,
"started_at": evt.timestamp.isoformat(),
"input_preview": str(evt.data.get("tool_input", ""))[:200],
}
for evt in running
]
if tool_completed:
result["recent_tool_calls"] = [
{
"tool": evt.data.get("tool_name"),
"error": bool(evt.data.get("is_error")),
"node": evt.node_id,
"time": evt.timestamp.isoformat(),
}
for evt in tool_completed[:last_n]
]
if running:
result["running_tools"] = [
{
"tool": evt.data.get("tool_name"),
"node": evt.node_id,
"started_at": evt.timestamp.isoformat(),
"input_preview": str(evt.data.get("tool_input", ""))[:200],
}
for evt in running
]
# Recent completed tool calls
if tool_completed:
result["recent_tool_calls"] = [
# Node transitions
edges = bus.get_history(event_type=EventType.EDGE_TRAVERSED, limit=last_n)
if edges:
result["node_transitions"] = [
{
"from": evt.data.get("source_node"),
"to": evt.data.get("target_node"),
"condition": evt.data.get("edge_condition"),
"time": evt.timestamp.isoformat(),
}
for evt in edges
]
# Retries
retries = bus.get_history(event_type=EventType.NODE_RETRY, limit=last_n)
if retries:
result["retries"] = [
{
"node": evt.node_id,
"retry_count": evt.data.get("retry_count"),
"error": evt.data.get("error", "")[:200],
"time": evt.timestamp.isoformat(),
}
for evt in retries
]
# Stalls and doom loops
stalls = bus.get_history(event_type=EventType.NODE_STALLED, limit=5)
doom_loops = bus.get_history(event_type=EventType.NODE_TOOL_DOOM_LOOP, limit=5)
issues = []
for evt in stalls:
issues.append(
{
"type": "stall",
"node": evt.node_id,
"reason": evt.data.get("reason", "")[:200],
"time": evt.timestamp.isoformat(),
}
)
for evt in doom_loops:
issues.append(
{
"type": "tool_doom_loop",
"node": evt.node_id,
"description": evt.data.get("description", "")[:200],
"time": evt.timestamp.isoformat(),
}
)
if issues:
result["issues"] = issues
# Constraint violations
violations = bus.get_history(event_type=EventType.CONSTRAINT_VIOLATION, limit=5)
if violations:
result["constraint_violations"] = [
{
"constraint": evt.data.get("constraint_id"),
"description": evt.data.get("description", "")[:200],
"time": evt.timestamp.isoformat(),
}
for evt in violations
]
# Token summary
llm_events = bus.get_history(event_type=EventType.LLM_TURN_COMPLETE, limit=200)
if llm_events:
total_in = sum(evt.data.get("input_tokens", 0) or 0 for evt in llm_events)
total_out = sum(evt.data.get("output_tokens", 0) or 0 for evt in llm_events)
result["token_summary"] = {
"llm_turns": len(llm_events),
"input_tokens": total_in,
"output_tokens": total_out,
"total_tokens": total_in + total_out,
}
# Execution outcomes
exec_completed = bus.get_history(
event_type=EventType.EXECUTION_COMPLETED, limit=5
)
exec_failed = bus.get_history(event_type=EventType.EXECUTION_FAILED, limit=5)
if exec_completed or exec_failed:
result["execution_outcomes"] = []
for evt in exec_completed:
result["execution_outcomes"].append(
{
"tool": evt.data.get("tool_name"),
"error": bool(evt.data.get("is_error")),
"node": evt.node_id,
"outcome": "completed",
"execution_id": evt.execution_id,
"time": evt.timestamp.isoformat(),
}
for evt in tool_completed[:last_n]
]
# Node transitions
edges = bus.get_history(event_type=EventType.EDGE_TRAVERSED, limit=last_n)
if edges:
result["node_transitions"] = [
)
for evt in exec_failed:
result["execution_outcomes"].append(
{
"from": evt.data.get("source_node"),
"to": evt.data.get("target_node"),
"condition": evt.data.get("edge_condition"),
"time": evt.timestamp.isoformat(),
}
for evt in edges
]
# Retries
retries = bus.get_history(event_type=EventType.NODE_RETRY, limit=last_n)
if retries:
result["retries"] = [
{
"node": evt.node_id,
"retry_count": evt.data.get("retry_count"),
"outcome": "failed",
"execution_id": evt.execution_id,
"error": evt.data.get("error", "")[:200],
"time": evt.timestamp.isoformat(),
}
for evt in retries
]
# Stalls and doom loops
stalls = bus.get_history(event_type=EventType.NODE_STALLED, limit=5)
doom_loops = bus.get_history(event_type=EventType.NODE_TOOL_DOOM_LOOP, limit=5)
issues = []
for evt in stalls:
issues.append(
{
"type": "stall",
"node": evt.node_id,
"reason": evt.data.get("reason", "")[:200],
"time": evt.timestamp.isoformat(),
}
)
for evt in doom_loops:
issues.append(
{
"type": "tool_doom_loop",
"node": evt.node_id,
"description": evt.data.get("description", "")[:200],
"time": evt.timestamp.isoformat(),
}
)
if issues:
result["issues"] = issues
# Constraint violations
violations = bus.get_history(event_type=EventType.CONSTRAINT_VIOLATION, limit=5)
if violations:
result["constraint_violations"] = [
{
"constraint": evt.data.get("constraint_id"),
"description": evt.data.get("description", "")[:200],
"time": evt.timestamp.isoformat(),
}
for evt in violations
]
return result
# Goal progress
try:
progress = await runtime.get_goal_progress()
if progress:
result["goal_progress"] = progress
except Exception:
pass
async def get_worker_status(focus: str | None = None, last_n: int = 20) -> str:
"""Check on the worker with progressive disclosure.
# Token summary
llm_events = bus.get_history(event_type=EventType.LLM_TURN_COMPLETE, limit=200)
if llm_events:
total_in = sum(evt.data.get("input_tokens", 0) or 0 for evt in llm_events)
total_out = sum(evt.data.get("output_tokens", 0) or 0 for evt in llm_events)
result["token_summary"] = {
"llm_turns": len(llm_events),
"input_tokens": total_in,
"output_tokens": total_out,
"total_tokens": total_in + total_out,
Without arguments, returns a brief prose summary. Use ``focus`` to
drill into specifics: activity, memory, tools, issues, progress,
or full (JSON dump).
Args:
focus: Aspect to inspect (activity/memory/tools/issues/progress/full).
Omit for a brief summary.
last_n: Recent events per category (default 20). For activity, tools, full.
"""
import time as _time
# --- Tiered cooldown ---
now = _time.monotonic()
if focus == "full":
cooldown = _COOLDOWN_FULL
tier = "full"
elif focus is not None:
cooldown = _COOLDOWN_DETAIL
tier = "detail"
else:
cooldown = 0.0
tier = "summary"
elapsed_since = now - _status_last_called.get(tier, 0.0)
if elapsed_since < cooldown:
remaining = int(cooldown - elapsed_since)
return json.dumps(
{
"status": "cooldown",
"message": (
f"Status '{focus or 'summary'}' was checked {int(elapsed_since)}s ago. "
f"Wait {remaining}s or try a different focus."
),
}
)
_status_last_called[tier] = now
# Execution completions/failures
exec_completed = bus.get_history(event_type=EventType.EXECUTION_COMPLETED, limit=5)
exec_failed = bus.get_history(event_type=EventType.EXECUTION_FAILED, limit=5)
if exec_completed or exec_failed:
result["execution_outcomes"] = []
for evt in exec_completed:
result["execution_outcomes"].append(
{
"outcome": "completed",
"execution_id": evt.execution_id,
"time": evt.timestamp.isoformat(),
}
)
for evt in exec_failed:
result["execution_outcomes"].append(
{
"outcome": "failed",
"execution_id": evt.execution_id,
"error": evt.data.get("error", "")[:200],
"time": evt.timestamp.isoformat(),
}
)
except Exception:
pass # Non-critical enrichment
# --- Runtime check ---
runtime = _get_runtime()
if runtime is None:
return "No worker loaded."
return json.dumps(result, default=str, ensure_ascii=False)
reg = runtime.get_graph_registration(runtime.graph_id)
if reg is None:
return "No worker loaded."
# --- Build preamble (always cheap) ---
preamble = _build_preamble(runtime)
bus = _get_event_bus()
try:
if focus is None:
# Default: brief prose summary
red_flags = _detect_red_flags(bus) if bus else 0
return _format_summary(preamble, red_flags)
if bus is None:
return (
f"Worker is {preamble['status']}. "
"EventBus unavailable — only basic status returned."
)
if focus == "activity":
return _format_activity(bus, preamble, last_n)
elif focus == "memory":
return await _format_memory(runtime)
elif focus == "tools":
return _format_tools(bus, last_n)
elif focus == "issues":
return _format_issues(bus)
elif focus == "progress":
return await _format_progress(runtime, bus)
elif focus == "full":
result = _build_full_json(runtime, bus, preamble, last_n)
# Also include goal progress in full dump
try:
progress = await runtime.get_goal_progress()
if progress:
result["goal_progress"] = progress
except Exception:
pass
return json.dumps(result, default=str, ensure_ascii=False)
else:
return (
f"Unknown focus '{focus}'. "
"Valid options: activity, memory, tools, issues, progress, full."
)
except Exception as exc:
logger.exception("get_worker_status error")
return f"Error retrieving status: {exc}"
_status_tool = Tool(
name="get_worker_status",
description=(
"Get comprehensive worker status: state (idle/running/waiting_for_input), "
"execution details (elapsed time, current node, iteration), "
"recent tool calls, running tools, node transitions, retries, "
"stalls, constraint violations, goal progress, and token consumption. "
"One call gives the queen a complete picture."
"Check on the worker. Returns a brief prose summary by default. "
"Use 'focus' to drill into specifics:\n"
"- activity: current node, transitions, latest LLM output\n"
"- memory: worker's accumulated knowledge and state\n"
"- tools: running and recent tool calls\n"
"- issues: retries, stalls, constraint violations\n"
"- progress: goal criteria, token consumption\n"
"- full: everything as JSON"
),
parameters={
"type": "object",
"properties": {
"focus": {
"type": "string",
"enum": ["activity", "memory", "tools", "issues", "progress", "full"],
"description": (
"Aspect to inspect. Omit for a brief summary."
),
},
"last_n": {
"type": "integer",
"description": "Number of recent events per category (default 20)",
"description": (
"Recent events per category (default 20). "
"Only for activity, tools, full."
),
},
},
"required": [],