diff --git a/core/framework/host/worker.py b/core/framework/host/worker.py index 5c9d72d0..5b977053 100644 --- a/core/framework/host/worker.py +++ b/core/framework/host/worker.py @@ -145,6 +145,24 @@ class Worker: self.status = WorkerStatus.RUNNING self._started_at = time.monotonic() + # Scope browser profile (and any other CONTEXT_PARAMS) to this + # worker. asyncio.create_task() copies the parent's contextvars, + # so without this override every spawned worker inherits the + # queen's `profile=` and its browser_* tool + # calls end up driving the queen's Chrome tab group. Setting + # it here (inside the new Task's context) shadows the parent + # value without affecting the queen's ongoing calls. + try: + from framework.loader.tool_registry import ToolRegistry + + ToolRegistry.set_execution_context(profile=self.id) + except Exception: + logger.debug( + "Worker %s: failed to scope browser profile", + self.id, + exc_info=True, + ) + try: result = await self._agent_loop.execute(self._context) duration = time.monotonic() - self._started_at diff --git a/core/framework/loader/tool_registry.py b/core/framework/loader/tool_registry.py index a2264366..e80dffa3 100644 --- a/core/framework/loader/tool_registry.py +++ b/core/framework/loader/tool_registry.py @@ -581,8 +581,18 @@ class ToolRegistry: tool_cap: int | None = None, log_collisions: bool = False, ) -> tuple[bool, int, str | None]: - """Register a single MCP server with one retry for transient failures.""" + """Register a single MCP server with one retry for transient failures. + + When ``preserve_existing_tools=True`` and the server's tools are + already present from a prior registration, ``register_mcp_server`` + returns ``count=0`` because every tool was shadowed. That's a + no-op success, not a failure — don't retry / warn in that case. + Otherwise a duplicate-init path (e.g. a worker spawn re-loading + the MCP servers the queen already registered) spams shadow + warnings, sleeps 2s, and retries for no reason. + """ name = server_config.get("name", "unknown") + already_loaded = bool(self._mcp_server_tools.get(name)) last_error: str | None = None for attempt in range(2): @@ -595,6 +605,10 @@ class ToolRegistry: ) if count > 0: return True, count, None + if already_loaded and preserve_existing_tools: + # All tools shadowed by the prior registration of + # the same server — nothing to do, server is usable. + return True, 0, None last_error = "registered 0 tools" except Exception as exc: last_error = str(exc) @@ -762,12 +776,18 @@ class ToolRegistry: if preserve_existing_tools and mcp_tool.name in self._tools: if log_collisions: origin_server = self._find_mcp_origin_server_for_tool(mcp_tool.name) or "" - logger.warning( - "MCP tool '%s' from '%s' shadowed by '%s' (loaded first)", - mcp_tool.name, - server_name, - origin_server, - ) + # Don't warn when a server is being re-registered + # by itself — that's a redundant-init case (e.g. + # the same tool_registry seeing the same server + # twice via pooled reconnect), not a real + # cross-server shadow worth flagging. + if origin_server != server_name: + logger.warning( + "MCP tool '%s' from '%s' shadowed by '%s' (loaded first)", + mcp_tool.name, + server_name, + origin_server, + ) # Skip registration; do not update MCP tool bookkeeping for this server. continue diff --git a/core/framework/server/routes_events.py b/core/framework/server/routes_events.py index 02eb51db..30e68dc9 100644 --- a/core/framework/server/routes_events.py +++ b/core/framework/server/routes_events.py @@ -119,6 +119,22 @@ async def handle_events(request: web.Request) -> web.StreamResponse: event_bus = session.event_bus event_types = _parse_event_types(request.query.get("types")) + # Worker-noise filter is phase-aware. In DM mode (queen phase + # "independent") the queen's chat should stay clean — workers + # are invisible. In colony mode (phase "working"/"reviewing") + # the user IS supervising the workers and wants to see the + # tool-call/text-delta chatter as it happens. Sample the phase + # once at SSE connect; if the queen later transitions the + # frontend reconnects. + def _should_filter_worker_noise() -> bool: + phase_state = getattr(session, "phase_state", None) + if phase_state is None: + return True # unknown phase → be conservative, filter noise + phase = getattr(phase_state, "phase", "independent") + return phase == "independent" + + filter_worker_noise = _should_filter_worker_noise() + # Per-client buffer queue queue: asyncio.Queue = asyncio.Queue(maxsize=1000) @@ -145,7 +161,7 @@ async def handle_events(request: web.Request) -> web.StreamResponse: return evt_dict = event.to_dict() - if _is_worker_noise(evt_dict): + if filter_worker_noise and _is_worker_noise(evt_dict): return if evt_dict.get("type") in _CRITICAL_EVENTS: try: @@ -202,7 +218,7 @@ async def handle_events(request: web.Request) -> web.StreamResponse: for past_event in event_bus._event_history: if past_event.type.value in replay_types: past_dict = past_event.to_dict() - if _is_worker_noise(past_dict): + if filter_worker_noise and _is_worker_noise(past_dict): continue try: queue.put_nowait(past_dict) diff --git a/core/framework/server/routes_workers.py b/core/framework/server/routes_workers.py index c1ae39e8..43717927 100644 --- a/core/framework/server/routes_workers.py +++ b/core/framework/server/routes_workers.py @@ -236,6 +236,157 @@ async def handle_node_tools(request: web.Request) -> web.Response: return web.json_response({"tools": tools_out}) +# --------------------------------------------------------------------------- +# Live worker control — list / stop a specific worker / stop all +# --------------------------------------------------------------------------- + + +def _active_colony(session): + """Return the session's unified ColonyRuntime (``session.colony``) if present. + + All spawned workers (queen-overseer + run_parallel_workers fan-outs) + are hosted here. ``session.colony_runtime`` is a different concept + (loaded agent graph) and doesn't hold the live worker registry we + need to enumerate / stop. + """ + return getattr(session, "colony", None) + + +async def handle_list_live_workers(request: web.Request) -> web.Response: + """GET /api/sessions/{session_id}/workers — list live workers. + + Returns an array of ``{worker_id, task, status, started_at, duration_seconds, + is_active}`` objects. Active workers come first. The queen overseer + (persistent worker) is included because the frontend should know it + exists, but the stop action on it is a session-level kill — the UI + should treat it differently (not offered here). + """ + session, err = resolve_session(request) + if err: + return err + + colony = _active_colony(session) + if colony is None: + return web.json_response({"workers": []}) + + now = time.monotonic() + payload = [] + try: + workers = list(colony._workers.values()) # type: ignore[attr-defined] + except Exception: + workers = [] + + for w in workers: + started_at = getattr(w, "_started_at", 0.0) or 0.0 + duration = (now - started_at) if started_at else 0.0 + result = getattr(w, "_result", None) + payload.append( + { + "worker_id": w.id, + "task": (w.task or "")[:400], + "status": str(getattr(w, "status", "unknown")), + "is_active": bool(getattr(w, "is_active", False)), + "duration_seconds": round(duration, 1), + "explicit_report": getattr(w, "_explicit_report", None), + "result_status": (result.status if result else None), + "result_summary": (result.summary if result else None), + } + ) + + # Active workers first, then terminated, newest-started first within group. + payload.sort(key=lambda r: (not r["is_active"], -(r["duration_seconds"] or 0))) + return web.json_response({"workers": payload}) + + +async def handle_stop_live_worker(request: web.Request) -> web.Response: + """POST /api/sessions/{session_id}/workers/{worker_id}/stop — force-stop one worker. + + Calls ``colony.stop_worker(worker_id)`` which cancels the worker's + background task. The worker's terminal SUBAGENT_REPORT still fires + (preserving any _explicit_report) so the queen sees a `[WORKER_REPORT]` + with ``status="stopped"``. + """ + session, err = resolve_session(request) + if err: + return err + + worker_id = request.match_info.get("worker_id", "") + if not worker_id: + return web.json_response({"error": "worker_id required"}, status=400) + + colony = _active_colony(session) + if colony is None: + return web.json_response({"error": "No active colony on this session"}, status=503) + + worker = colony._workers.get(worker_id) # type: ignore[attr-defined] + if worker is None: + return web.json_response({"error": f"Worker '{worker_id}' not found"}, status=404) + if not worker.is_active: + return web.json_response( + { + "stopped": False, + "reason": "Worker already terminated", + "worker_id": worker_id, + "status": str(worker.status), + } + ) + + try: + await colony.stop_worker(worker_id) + except Exception as exc: + logger.exception("stop_worker failed for %s", worker_id) + return web.json_response( + {"stopped": False, "error": str(exc), "worker_id": worker_id}, + status=500, + ) + + return web.json_response({"stopped": True, "worker_id": worker_id}) + + +async def handle_stop_all_live_workers(request: web.Request) -> web.Response: + """POST /api/sessions/{session_id}/workers/stop-all — force-stop every active worker. + + The persistent overseer (if any) is skipped — it is the queen itself + and stopping it would end the session. Only ephemeral fan-out workers + are targeted. + """ + session, err = resolve_session(request) + if err: + return err + + colony = _active_colony(session) + if colony is None: + return web.json_response({"stopped": [], "error": "No active colony on this session"}) + + stopped: list[str] = [] + errors: list[dict] = [] + try: + workers = list(colony._workers.values()) # type: ignore[attr-defined] + except Exception: + workers = [] + + for w in workers: + if not w.is_active: + continue + if getattr(w, "_persistent", False): + # The overseer — don't kill the queen. + continue + try: + await colony.stop_worker(w.id) + stopped.append(w.id) + except Exception as exc: + logger.warning("stop-all: failed to stop %s: %s", w.id, exc) + errors.append({"worker_id": w.id, "error": str(exc)}) + + return web.json_response( + { + "stopped": stopped, + "stopped_count": len(stopped), + "errors": errors if errors else None, + } + ) + + def register_routes(app: web.Application) -> None: """Register worker inspection routes.""" app.router.add_get("/api/sessions/{session_id}/colonies/{colony_id}/nodes", handle_list_nodes) @@ -248,3 +399,13 @@ def register_routes(app: web.Application) -> None: "/api/sessions/{session_id}/colonies/{colony_id}/nodes/{node_id}/tools", handle_node_tools, ) + # Live worker control + app.router.add_get("/api/sessions/{session_id}/workers", handle_list_live_workers) + app.router.add_post( + "/api/sessions/{session_id}/workers/stop-all", + handle_stop_all_live_workers, + ) + app.router.add_post( + "/api/sessions/{session_id}/workers/{worker_id}/stop", + handle_stop_live_worker, + ) diff --git a/core/frontend/src/api/workers.ts b/core/frontend/src/api/workers.ts index 7e520c19..7d02bbb0 100644 --- a/core/frontend/src/api/workers.ts +++ b/core/frontend/src/api/workers.ts @@ -1,6 +1,31 @@ import { api } from "./client"; import type { GraphTopology, NodeDetail, NodeCriteria, ToolInfo } from "./types"; +export interface LiveWorker { + worker_id: string; + task: string; + status: string; + is_active: boolean; + duration_seconds: number; + explicit_report: Record | null; + result_status: string | null; + result_summary: string | null; +} + +export interface StopWorkerResult { + stopped: boolean; + worker_id?: string; + reason?: string; + status?: string; + error?: string; +} + +export interface StopAllWorkersResult { + stopped: string[]; + stopped_count: number; + errors?: { worker_id: string; error: string }[] | null; +} + export const workersApi = { nodes: (sessionId: string, colonyId: string, workerSessionId?: string) => api.get( @@ -26,4 +51,17 @@ export const workersApi = { api.get<{ tools: ToolInfo[] }>( `/sessions/${sessionId}/colonies/${colonyId}/nodes/${nodeId}/tools`, ), + + // Live fan-out control + listLive: (sessionId: string) => + api.get<{ workers: LiveWorker[] }>(`/sessions/${sessionId}/workers`), + + stopLive: (sessionId: string, workerId: string) => + api.post( + `/sessions/${sessionId}/workers/${workerId}/stop`, + {}, + ), + + stopAllLive: (sessionId: string) => + api.post(`/sessions/${sessionId}/workers/stop-all`, {}), }; diff --git a/core/frontend/src/components/WorkersPanel.tsx b/core/frontend/src/components/WorkersPanel.tsx new file mode 100644 index 00000000..a8a9e232 --- /dev/null +++ b/core/frontend/src/components/WorkersPanel.tsx @@ -0,0 +1,175 @@ +import { useEffect, useState, useCallback } from "react"; +import { Loader2, Square, XCircle, CheckCircle2, OctagonX } from "lucide-react"; +import { workersApi, type LiveWorker } from "@/api/workers"; + +interface WorkersPanelProps { + sessionId: string | null; + // Refresh the panel every this many ms. 0 disables polling. + pollMs?: number; +} + +function statusClassName(w: LiveWorker): string { + if (w.is_active) return "text-blue-600"; + const s = (w.result_status || w.status || "").toLowerCase(); + if (s.includes("success")) return "text-emerald-600"; + if (s.includes("fail") || s.includes("error")) return "text-destructive"; + if (s.includes("stop") || s.includes("timeout")) return "text-amber-600"; + return "text-muted-foreground"; +} + +function StatusIcon({ worker }: { worker: LiveWorker }) { + const cls = `w-3.5 h-3.5 ${statusClassName(worker)}`; + if (worker.is_active) return ; + const s = (worker.result_status || worker.status || "").toLowerCase(); + if (s.includes("success")) return ; + if (s.includes("fail") || s.includes("error")) return ; + if (s.includes("stop") || s.includes("timeout")) return ; + return ; +} + +function formatDuration(seconds: number): string { + if (seconds < 60) return `${Math.round(seconds)}s`; + const m = Math.floor(seconds / 60); + const s = Math.round(seconds % 60); + return `${m}m ${String(s).padStart(2, "0")}s`; +} + +export default function WorkersPanel({ sessionId, pollMs = 2000 }: WorkersPanelProps) { + const [workers, setWorkers] = useState([]); + const [loading, setLoading] = useState(false); + const [stoppingId, setStoppingId] = useState(null); + const [stoppingAll, setStoppingAll] = useState(false); + + const fetchWorkers = useCallback(async () => { + if (!sessionId) return; + try { + const res = await workersApi.listLive(sessionId); + setWorkers(res.workers || []); + } catch { + // Backend down or 404 — clear rather than crash. + setWorkers([]); + } + }, [sessionId]); + + useEffect(() => { + if (!sessionId) return; + setLoading(true); + fetchWorkers().finally(() => setLoading(false)); + if (pollMs <= 0) return; + const id = setInterval(fetchWorkers, pollMs); + return () => clearInterval(id); + }, [sessionId, pollMs, fetchWorkers]); + + const stopOne = useCallback( + async (workerId: string) => { + if (!sessionId) return; + setStoppingId(workerId); + try { + await workersApi.stopLive(sessionId, workerId); + } catch { + // Non-fatal — the next poll will reflect the true state. + } finally { + setStoppingId(null); + fetchWorkers(); + } + }, + [sessionId, fetchWorkers], + ); + + const stopAll = useCallback(async () => { + if (!sessionId) return; + setStoppingAll(true); + try { + await workersApi.stopAllLive(sessionId); + } catch { + // ignore + } finally { + setStoppingAll(false); + fetchWorkers(); + } + }, [sessionId, fetchWorkers]); + + const activeCount = workers.filter((w) => w.is_active).length; + + return ( +
+
+
+ + Workers + + {activeCount > 0 && ( + + {activeCount} active + + )} +
+ {activeCount > 0 && ( + + )} +
+
+ {loading && workers.length === 0 && ( +
Loading…
+ )} + {!loading && workers.length === 0 && ( +
+ No workers have been spawned in this session yet. +
+ )} + {workers.map((w) => ( +
+
+ +
+
+ + {w.worker_id.slice(0, 24)} + + + {w.is_active ? w.status : (w.result_status || w.status)} + +
+
+ {w.task || "(no task)"} +
+
+ {formatDuration(w.duration_seconds)} + {w.is_active && ( + + )} +
+ {w.result_summary && !w.is_active && ( +
+ {w.result_summary} +
+ )} +
+
+
+ ))} +
+
+ ); +} diff --git a/core/frontend/src/pages/colony-chat.tsx b/core/frontend/src/pages/colony-chat.tsx index 5c869bcc..47dcda5a 100644 --- a/core/frontend/src/pages/colony-chat.tsx +++ b/core/frontend/src/pages/colony-chat.tsx @@ -4,6 +4,7 @@ import { Loader2, WifiOff, KeyRound, FolderOpen, X, Users } from "lucide-react"; import type { GraphNode, NodeStatus } from "@/components/graph-types"; import TriggersPanel from "@/components/TriggersPanel"; import TriggerDetailPanel from "@/components/TriggerDetailPanel"; +import WorkersPanel from "@/components/WorkersPanel"; import ChatPanel, { type ChatMessage, type ImageContent } from "@/components/ChatPanel"; import CredentialsModal, { type Credential, @@ -1340,6 +1341,16 @@ export default function ColonyChat() { /> + {/* Workers sidebar — live list of active + recently-finished workers + with per-worker stop controls. Shown whenever the queen is in + working or reviewing phase (i.e., there's a meaningful worker + population to manage). */} + {(agentState.queenPhase === "working" || agentState.queenPhase === "reviewing") && ( +
+ +
+ )} + {/* Triggers sidebar — only rendered when the colony actually has triggers */} {triggers.length > 0 && (