Merge branch 'fix/worker-tab-groups' into feature/clean-context

This commit is contained in:
Timothy
2026-04-17 12:35:09 -07:00
7 changed files with 448 additions and 9 deletions
+18
View File
@@ -145,6 +145,24 @@ class Worker:
self.status = WorkerStatus.RUNNING
self._started_at = time.monotonic()
# Scope browser profile (and any other CONTEXT_PARAMS) to this
# worker. asyncio.create_task() copies the parent's contextvars,
# so without this override every spawned worker inherits the
# queen's `profile=<queen_session_id>` and its browser_* tool
# calls end up driving the queen's Chrome tab group. Setting
# it here (inside the new Task's context) shadows the parent
# value without affecting the queen's ongoing calls.
try:
from framework.loader.tool_registry import ToolRegistry
ToolRegistry.set_execution_context(profile=self.id)
except Exception:
logger.debug(
"Worker %s: failed to scope browser profile",
self.id,
exc_info=True,
)
try:
result = await self._agent_loop.execute(self._context)
duration = time.monotonic() - self._started_at
+27 -7
View File
@@ -581,8 +581,18 @@ class ToolRegistry:
tool_cap: int | None = None,
log_collisions: bool = False,
) -> tuple[bool, int, str | None]:
"""Register a single MCP server with one retry for transient failures."""
"""Register a single MCP server with one retry for transient failures.
When ``preserve_existing_tools=True`` and the server's tools are
already present from a prior registration, ``register_mcp_server``
returns ``count=0`` because every tool was shadowed. That's a
no-op success, not a failure don't retry / warn in that case.
Otherwise a duplicate-init path (e.g. a worker spawn re-loading
the MCP servers the queen already registered) spams shadow
warnings, sleeps 2s, and retries for no reason.
"""
name = server_config.get("name", "unknown")
already_loaded = bool(self._mcp_server_tools.get(name))
last_error: str | None = None
for attempt in range(2):
@@ -595,6 +605,10 @@ class ToolRegistry:
)
if count > 0:
return True, count, None
if already_loaded and preserve_existing_tools:
# All tools shadowed by the prior registration of
# the same server — nothing to do, server is usable.
return True, 0, None
last_error = "registered 0 tools"
except Exception as exc:
last_error = str(exc)
@@ -762,12 +776,18 @@ class ToolRegistry:
if preserve_existing_tools and mcp_tool.name in self._tools:
if log_collisions:
origin_server = self._find_mcp_origin_server_for_tool(mcp_tool.name) or "<existing>"
logger.warning(
"MCP tool '%s' from '%s' shadowed by '%s' (loaded first)",
mcp_tool.name,
server_name,
origin_server,
)
# Don't warn when a server is being re-registered
# by itself — that's a redundant-init case (e.g.
# the same tool_registry seeing the same server
# twice via pooled reconnect), not a real
# cross-server shadow worth flagging.
if origin_server != server_name:
logger.warning(
"MCP tool '%s' from '%s' shadowed by '%s' (loaded first)",
mcp_tool.name,
server_name,
origin_server,
)
# Skip registration; do not update MCP tool bookkeeping for this server.
continue
+18 -2
View File
@@ -119,6 +119,22 @@ async def handle_events(request: web.Request) -> web.StreamResponse:
event_bus = session.event_bus
event_types = _parse_event_types(request.query.get("types"))
# Worker-noise filter is phase-aware. In DM mode (queen phase
# "independent") the queen's chat should stay clean — workers
# are invisible. In colony mode (phase "working"/"reviewing")
# the user IS supervising the workers and wants to see the
# tool-call/text-delta chatter as it happens. Sample the phase
# once at SSE connect; if the queen later transitions the
# frontend reconnects.
def _should_filter_worker_noise() -> bool:
phase_state = getattr(session, "phase_state", None)
if phase_state is None:
return True # unknown phase → be conservative, filter noise
phase = getattr(phase_state, "phase", "independent")
return phase == "independent"
filter_worker_noise = _should_filter_worker_noise()
# Per-client buffer queue
queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
@@ -145,7 +161,7 @@ async def handle_events(request: web.Request) -> web.StreamResponse:
return
evt_dict = event.to_dict()
if _is_worker_noise(evt_dict):
if filter_worker_noise and _is_worker_noise(evt_dict):
return
if evt_dict.get("type") in _CRITICAL_EVENTS:
try:
@@ -202,7 +218,7 @@ async def handle_events(request: web.Request) -> web.StreamResponse:
for past_event in event_bus._event_history:
if past_event.type.value in replay_types:
past_dict = past_event.to_dict()
if _is_worker_noise(past_dict):
if filter_worker_noise and _is_worker_noise(past_dict):
continue
try:
queue.put_nowait(past_dict)
+161
View File
@@ -236,6 +236,157 @@ async def handle_node_tools(request: web.Request) -> web.Response:
return web.json_response({"tools": tools_out})
# ---------------------------------------------------------------------------
# Live worker control — list / stop a specific worker / stop all
# ---------------------------------------------------------------------------
def _active_colony(session):
"""Return the session's unified ColonyRuntime (``session.colony``) if present.
All spawned workers (queen-overseer + run_parallel_workers fan-outs)
are hosted here. ``session.colony_runtime`` is a different concept
(loaded agent graph) and doesn't hold the live worker registry we
need to enumerate / stop.
"""
return getattr(session, "colony", None)
async def handle_list_live_workers(request: web.Request) -> web.Response:
"""GET /api/sessions/{session_id}/workers — list live workers.
Returns an array of ``{worker_id, task, status, started_at, duration_seconds,
is_active}`` objects. Active workers come first. The queen overseer
(persistent worker) is included because the frontend should know it
exists, but the stop action on it is a session-level kill the UI
should treat it differently (not offered here).
"""
session, err = resolve_session(request)
if err:
return err
colony = _active_colony(session)
if colony is None:
return web.json_response({"workers": []})
now = time.monotonic()
payload = []
try:
workers = list(colony._workers.values()) # type: ignore[attr-defined]
except Exception:
workers = []
for w in workers:
started_at = getattr(w, "_started_at", 0.0) or 0.0
duration = (now - started_at) if started_at else 0.0
result = getattr(w, "_result", None)
payload.append(
{
"worker_id": w.id,
"task": (w.task or "")[:400],
"status": str(getattr(w, "status", "unknown")),
"is_active": bool(getattr(w, "is_active", False)),
"duration_seconds": round(duration, 1),
"explicit_report": getattr(w, "_explicit_report", None),
"result_status": (result.status if result else None),
"result_summary": (result.summary if result else None),
}
)
# Active workers first, then terminated, newest-started first within group.
payload.sort(key=lambda r: (not r["is_active"], -(r["duration_seconds"] or 0)))
return web.json_response({"workers": payload})
async def handle_stop_live_worker(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/workers/{worker_id}/stop — force-stop one worker.
Calls ``colony.stop_worker(worker_id)`` which cancels the worker's
background task. The worker's terminal SUBAGENT_REPORT still fires
(preserving any _explicit_report) so the queen sees a `[WORKER_REPORT]`
with ``status="stopped"``.
"""
session, err = resolve_session(request)
if err:
return err
worker_id = request.match_info.get("worker_id", "")
if not worker_id:
return web.json_response({"error": "worker_id required"}, status=400)
colony = _active_colony(session)
if colony is None:
return web.json_response({"error": "No active colony on this session"}, status=503)
worker = colony._workers.get(worker_id) # type: ignore[attr-defined]
if worker is None:
return web.json_response({"error": f"Worker '{worker_id}' not found"}, status=404)
if not worker.is_active:
return web.json_response(
{
"stopped": False,
"reason": "Worker already terminated",
"worker_id": worker_id,
"status": str(worker.status),
}
)
try:
await colony.stop_worker(worker_id)
except Exception as exc:
logger.exception("stop_worker failed for %s", worker_id)
return web.json_response(
{"stopped": False, "error": str(exc), "worker_id": worker_id},
status=500,
)
return web.json_response({"stopped": True, "worker_id": worker_id})
async def handle_stop_all_live_workers(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/workers/stop-all — force-stop every active worker.
The persistent overseer (if any) is skipped it is the queen itself
and stopping it would end the session. Only ephemeral fan-out workers
are targeted.
"""
session, err = resolve_session(request)
if err:
return err
colony = _active_colony(session)
if colony is None:
return web.json_response({"stopped": [], "error": "No active colony on this session"})
stopped: list[str] = []
errors: list[dict] = []
try:
workers = list(colony._workers.values()) # type: ignore[attr-defined]
except Exception:
workers = []
for w in workers:
if not w.is_active:
continue
if getattr(w, "_persistent", False):
# The overseer — don't kill the queen.
continue
try:
await colony.stop_worker(w.id)
stopped.append(w.id)
except Exception as exc:
logger.warning("stop-all: failed to stop %s: %s", w.id, exc)
errors.append({"worker_id": w.id, "error": str(exc)})
return web.json_response(
{
"stopped": stopped,
"stopped_count": len(stopped),
"errors": errors if errors else None,
}
)
def register_routes(app: web.Application) -> None:
"""Register worker inspection routes."""
app.router.add_get("/api/sessions/{session_id}/colonies/{colony_id}/nodes", handle_list_nodes)
@@ -248,3 +399,13 @@ def register_routes(app: web.Application) -> None:
"/api/sessions/{session_id}/colonies/{colony_id}/nodes/{node_id}/tools",
handle_node_tools,
)
# Live worker control
app.router.add_get("/api/sessions/{session_id}/workers", handle_list_live_workers)
app.router.add_post(
"/api/sessions/{session_id}/workers/stop-all",
handle_stop_all_live_workers,
)
app.router.add_post(
"/api/sessions/{session_id}/workers/{worker_id}/stop",
handle_stop_live_worker,
)
+38
View File
@@ -1,6 +1,31 @@
import { api } from "./client";
import type { GraphTopology, NodeDetail, NodeCriteria, ToolInfo } from "./types";
export interface LiveWorker {
worker_id: string;
task: string;
status: string;
is_active: boolean;
duration_seconds: number;
explicit_report: Record<string, unknown> | null;
result_status: string | null;
result_summary: string | null;
}
export interface StopWorkerResult {
stopped: boolean;
worker_id?: string;
reason?: string;
status?: string;
error?: string;
}
export interface StopAllWorkersResult {
stopped: string[];
stopped_count: number;
errors?: { worker_id: string; error: string }[] | null;
}
export const workersApi = {
nodes: (sessionId: string, colonyId: string, workerSessionId?: string) =>
api.get<GraphTopology>(
@@ -26,4 +51,17 @@ export const workersApi = {
api.get<{ tools: ToolInfo[] }>(
`/sessions/${sessionId}/colonies/${colonyId}/nodes/${nodeId}/tools`,
),
// Live fan-out control
listLive: (sessionId: string) =>
api.get<{ workers: LiveWorker[] }>(`/sessions/${sessionId}/workers`),
stopLive: (sessionId: string, workerId: string) =>
api.post<StopWorkerResult>(
`/sessions/${sessionId}/workers/${workerId}/stop`,
{},
),
stopAllLive: (sessionId: string) =>
api.post<StopAllWorkersResult>(`/sessions/${sessionId}/workers/stop-all`, {}),
};
@@ -0,0 +1,175 @@
import { useEffect, useState, useCallback } from "react";
import { Loader2, Square, XCircle, CheckCircle2, OctagonX } from "lucide-react";
import { workersApi, type LiveWorker } from "@/api/workers";
interface WorkersPanelProps {
sessionId: string | null;
// Refresh the panel every this many ms. 0 disables polling.
pollMs?: number;
}
function statusClassName(w: LiveWorker): string {
if (w.is_active) return "text-blue-600";
const s = (w.result_status || w.status || "").toLowerCase();
if (s.includes("success")) return "text-emerald-600";
if (s.includes("fail") || s.includes("error")) return "text-destructive";
if (s.includes("stop") || s.includes("timeout")) return "text-amber-600";
return "text-muted-foreground";
}
function StatusIcon({ worker }: { worker: LiveWorker }) {
const cls = `w-3.5 h-3.5 ${statusClassName(worker)}`;
if (worker.is_active) return <Loader2 className={`${cls} animate-spin`} />;
const s = (worker.result_status || worker.status || "").toLowerCase();
if (s.includes("success")) return <CheckCircle2 className={cls} />;
if (s.includes("fail") || s.includes("error")) return <XCircle className={cls} />;
if (s.includes("stop") || s.includes("timeout")) return <OctagonX className={cls} />;
return <CheckCircle2 className={cls} />;
}
function formatDuration(seconds: number): string {
if (seconds < 60) return `${Math.round(seconds)}s`;
const m = Math.floor(seconds / 60);
const s = Math.round(seconds % 60);
return `${m}m ${String(s).padStart(2, "0")}s`;
}
export default function WorkersPanel({ sessionId, pollMs = 2000 }: WorkersPanelProps) {
const [workers, setWorkers] = useState<LiveWorker[]>([]);
const [loading, setLoading] = useState(false);
const [stoppingId, setStoppingId] = useState<string | null>(null);
const [stoppingAll, setStoppingAll] = useState(false);
const fetchWorkers = useCallback(async () => {
if (!sessionId) return;
try {
const res = await workersApi.listLive(sessionId);
setWorkers(res.workers || []);
} catch {
// Backend down or 404 — clear rather than crash.
setWorkers([]);
}
}, [sessionId]);
useEffect(() => {
if (!sessionId) return;
setLoading(true);
fetchWorkers().finally(() => setLoading(false));
if (pollMs <= 0) return;
const id = setInterval(fetchWorkers, pollMs);
return () => clearInterval(id);
}, [sessionId, pollMs, fetchWorkers]);
const stopOne = useCallback(
async (workerId: string) => {
if (!sessionId) return;
setStoppingId(workerId);
try {
await workersApi.stopLive(sessionId, workerId);
} catch {
// Non-fatal — the next poll will reflect the true state.
} finally {
setStoppingId(null);
fetchWorkers();
}
},
[sessionId, fetchWorkers],
);
const stopAll = useCallback(async () => {
if (!sessionId) return;
setStoppingAll(true);
try {
await workersApi.stopAllLive(sessionId);
} catch {
// ignore
} finally {
setStoppingAll(false);
fetchWorkers();
}
}, [sessionId, fetchWorkers]);
const activeCount = workers.filter((w) => w.is_active).length;
return (
<div className="h-full flex flex-col border-l border-border bg-card/30">
<div className="px-3 py-2 border-b border-border flex items-center justify-between">
<div className="flex items-center gap-2">
<span className="text-xs font-semibold uppercase tracking-wide text-muted-foreground">
Workers
</span>
{activeCount > 0 && (
<span className="text-[10px] px-1.5 py-0.5 rounded bg-blue-500/15 text-blue-600 font-medium">
{activeCount} active
</span>
)}
</div>
{activeCount > 0 && (
<button
onClick={stopAll}
disabled={stoppingAll}
className="text-[10px] px-2 py-0.5 rounded border border-destructive/40 text-destructive hover:bg-destructive/10 disabled:opacity-50 transition-colors"
title="Stop all active workers"
>
{stoppingAll ? "Stopping…" : "Stop all"}
</button>
)}
</div>
<div className="flex-1 overflow-y-auto p-2 space-y-1.5">
{loading && workers.length === 0 && (
<div className="text-xs text-muted-foreground p-2">Loading</div>
)}
{!loading && workers.length === 0 && (
<div className="text-xs text-muted-foreground p-2">
No workers have been spawned in this session yet.
</div>
)}
{workers.map((w) => (
<div
key={w.worker_id}
className="rounded border border-border/60 bg-background/70 p-2 text-xs"
>
<div className="flex items-start gap-2">
<StatusIcon worker={w} />
<div className="flex-1 min-w-0">
<div className="flex items-center justify-between gap-2">
<span
className="font-mono text-[10px] text-muted-foreground truncate"
title={w.worker_id}
>
{w.worker_id.slice(0, 24)}
</span>
<span className={`text-[10px] ${statusClassName(w)}`}>
{w.is_active ? w.status : (w.result_status || w.status)}
</span>
</div>
<div className="mt-1 text-[11px] text-foreground/90 line-clamp-2">
{w.task || "(no task)"}
</div>
<div className="mt-1 flex items-center justify-between gap-2 text-[10px] text-muted-foreground">
<span>{formatDuration(w.duration_seconds)}</span>
{w.is_active && (
<button
onClick={() => stopOne(w.worker_id)}
disabled={stoppingId === w.worker_id}
className="inline-flex items-center gap-1 px-1.5 py-0.5 rounded border border-destructive/40 text-destructive hover:bg-destructive/10 disabled:opacity-50 transition-colors"
title="Stop this worker"
>
<Square className="w-2.5 h-2.5" />
{stoppingId === w.worker_id ? "…" : "Stop"}
</button>
)}
</div>
{w.result_summary && !w.is_active && (
<div className="mt-1 text-[10px] text-muted-foreground line-clamp-2 italic">
{w.result_summary}
</div>
)}
</div>
</div>
</div>
))}
</div>
</div>
);
}
+11
View File
@@ -4,6 +4,7 @@ import { Loader2, WifiOff, KeyRound, FolderOpen, X, Users } from "lucide-react";
import type { GraphNode, NodeStatus } from "@/components/graph-types";
import TriggersPanel from "@/components/TriggersPanel";
import TriggerDetailPanel from "@/components/TriggerDetailPanel";
import WorkersPanel from "@/components/WorkersPanel";
import ChatPanel, { type ChatMessage, type ImageContent } from "@/components/ChatPanel";
import CredentialsModal, {
type Credential,
@@ -1340,6 +1341,16 @@ export default function ColonyChat() {
/>
</div>
{/* Workers sidebar — live list of active + recently-finished workers
with per-worker stop controls. Shown whenever the queen is in
working or reviewing phase (i.e., there's a meaningful worker
population to manage). */}
{(agentState.queenPhase === "working" || agentState.queenPhase === "reviewing") && (
<div className="w-[260px] flex-shrink-0">
<WorkersPanel sessionId={agentState.sessionId} />
</div>
)}
{/* Triggers sidebar — only rendered when the colony actually has triggers */}
{triggers.length > 0 && (
<div className="w-[260px] flex-shrink-0">