From 4aa5fd7a90cb4b1035666c302e9af22f9c00dcc0 Mon Sep 17 00:00:00 2001 From: Timothy Date: Thu, 16 Apr 2026 11:26:32 -0700 Subject: [PATCH] refactor: align worker display --- core/frontend/src/lib/chat-helpers.ts | 180 ++++++++++++++++++++++++ core/frontend/src/pages/colony-chat.tsx | 41 ++++-- core/frontend/src/pages/queen-dm.tsx | 19 +-- 3 files changed, 217 insertions(+), 23 deletions(-) diff --git a/core/frontend/src/lib/chat-helpers.ts b/core/frontend/src/lib/chat-helpers.ts index 4dcc1a3e..5a81a9e9 100644 --- a/core/frontend/src/lib/chat-helpers.ts +++ b/core/frontend/src/lib/chat-helpers.ts @@ -149,6 +149,186 @@ export function sseEventToChatMessage( } } +// --------------------------------------------------------------------------- +// Stateful event replay — produces tool_status pills + regular messages +// --------------------------------------------------------------------------- + +/** + * State maintained while replaying an event stream. Tracks per-stream turn + * counters, the set of active tool calls (so tool_status pill content + * reflects "tool A done, tool B running" correctly), and a tool_use_id → + * pill_msg_id map so deferred `tool_call_completed` events can find the + * pill they belong to after the turn counter moves on. + */ +export interface ReplayState { + turnCounters: Record; + activeToolCalls: Record< + string, + { name: string; done: boolean; streamId: string } + >; + toolUseToPill: Record; +} + +export function newReplayState(): ReplayState { + return { turnCounters: {}, activeToolCalls: {}, toolUseToPill: {} }; +} + +/** + * Process a single event and emit zero or more ChatMessage upserts. + * + * Why this exists: `sseEventToChatMessage` is stateless — one event in, at + * most one message out. But the chat's tool_status pill is a SYNTHESIZED + * message: each tool_call_started adds to an accumulating pill, and each + * tool_call_completed flips one of its tools from running to done. Live + * SSE handlers in colony-chat and queen-dm already do this synthesis + * against React refs. Cold-restore from events.jsonl used to skip + * tool_call_* events entirely, so refreshed sessions looked completely + * different from live ones — no tool activity visible, just prose. + * + * This function centralizes the synthesis so cold-restore and live paths + * can use the exact same state machine. The caller treats the returned + * messages as upserts (by id) — a later event in the same replay may + * emit the same pill id with updated content, which should REPLACE the + * earlier row in the caller's message list. + */ +export function replayEvent( + state: ReplayState, + event: AgentEvent, + thread: string, + agentDisplayName: string | undefined, +): ChatMessage[] { + const streamId = event.stream_id; + const isQueen = streamId === "queen"; + const role: "queen" | "worker" = isQueen ? "queen" : "worker"; + const turnKey = streamId; + const currentTurn = state.turnCounters[turnKey] ?? 0; + const eventCreatedAt = event.timestamp + ? new Date(event.timestamp).getTime() + : Date.now(); + + const out: ChatMessage[] = []; + + // Update state machine BEFORE the generic converter runs so the + // regular message emitted for this event sees the post-update + // counter (matches live handler ordering at colony-chat.tsx:525). + switch (event.type) { + case "execution_started": + state.turnCounters[turnKey] = currentTurn + 1; + // New execution for a worker resets its active tools, mirroring + // the live handler's setAgentState at colony-chat.tsx:566. + if (!isQueen) { + const keepActive: typeof state.activeToolCalls = {}; + for (const [k, v] of Object.entries(state.activeToolCalls)) { + if (v.streamId !== streamId) keepActive[k] = v; + } + state.activeToolCalls = keepActive; + } + break; + case "llm_turn_complete": + state.turnCounters[turnKey] = currentTurn + 1; + break; + case "tool_call_started": { + if (!event.node_id) break; + const toolName = (event.data?.tool_name as string) || "unknown"; + const toolUseId = (event.data?.tool_use_id as string) || ""; + state.activeToolCalls[toolUseId] = { + name: toolName, + done: false, + streamId, + }; + const pillId = `tool-pill-${streamId}-${event.execution_id || "exec"}-${currentTurn}`; + if (toolUseId) { + state.toolUseToPill[toolUseId] = { msgId: pillId, name: toolName }; + } + const tools = Object.values(state.activeToolCalls) + .filter((t) => t.streamId === streamId) + .map((t) => ({ name: t.name, done: t.done })); + const allDone = tools.length > 0 && tools.every((t) => t.done); + out.push({ + id: pillId, + agent: agentDisplayName || event.node_id || "Agent", + agentColor: "", + content: JSON.stringify({ tools, allDone }), + timestamp: "", + type: "tool_status", + role, + thread, + createdAt: eventCreatedAt, + nodeId: event.node_id || undefined, + executionId: event.execution_id || undefined, + }); + break; + } + case "tool_call_completed": { + if (!event.node_id) break; + const toolUseId = (event.data?.tool_use_id as string) || ""; + const tracked = state.toolUseToPill[toolUseId]; + if (toolUseId) delete state.toolUseToPill[toolUseId]; + if (toolUseId && state.activeToolCalls[toolUseId]) { + state.activeToolCalls[toolUseId].done = true; + } + if (!tracked) break; + const tools = Object.values(state.activeToolCalls) + .filter((t) => t.streamId === streamId) + .map((t) => ({ name: t.name, done: t.done })); + const allDone = tools.length > 0 && tools.every((t) => t.done); + // Re-emit the SAME pill id with updated content. Caller upserts + // by id, so this replaces the row from tool_call_started. + out.push({ + id: tracked.msgId, + agent: agentDisplayName || event.node_id || "Agent", + agentColor: "", + content: JSON.stringify({ tools, allDone }), + timestamp: "", + type: "tool_status", + role, + thread, + createdAt: eventCreatedAt, + nodeId: event.node_id || undefined, + executionId: event.execution_id || undefined, + }); + break; + } + } + + // Regular stateless conversion (prose, user input, system notes). + const msg = sseEventToChatMessage( + event, + thread, + agentDisplayName, + state.turnCounters[turnKey] ?? 0, + ); + if (msg) { + if (isQueen) msg.role = "queen"; + out.push(msg); + } + + return out; +} + +/** + * Replay an entire event array and return a deduplicated, chronologically + * sorted ChatMessage list. Used by cold-restore paths so refreshed + * sessions match the live stream exactly. + */ +export function replayEventsToMessages( + events: AgentEvent[], + thread: string, + agentDisplayName: string | undefined, +): ChatMessage[] { + const state = newReplayState(); + // Upsert by id — later emissions for the same pill replace earlier ones. + const byId = new Map(); + for (const evt of events) { + for (const m of replayEvent(state, evt, thread, agentDisplayName)) { + byId.set(m.id, m); + } + } + return Array.from(byId.values()).sort( + (a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0), + ); +} + type QueenPhase = "planning" | "building" | "staging" | "running" | "independent"; const VALID_PHASES = new Set(["planning", "building", "staging", "running", "independent"]); diff --git a/core/frontend/src/pages/colony-chat.tsx b/core/frontend/src/pages/colony-chat.tsx index 25207b97..17bb0b24 100644 --- a/core/frontend/src/pages/colony-chat.tsx +++ b/core/frontend/src/pages/colony-chat.tsx @@ -13,7 +13,11 @@ import { executionApi } from "@/api/execution"; import { sessionsApi } from "@/api/sessions"; import { useMultiSSE } from "@/hooks/use-sse"; import type { LiveSession, AgentEvent } from "@/api/types"; -import { sseEventToChatMessage, formatAgentDisplayName } from "@/lib/chat-helpers"; +import { + sseEventToChatMessage, + formatAgentDisplayName, + replayEventsToMessages, +} from "@/lib/chat-helpers"; import { cronToLabel } from "@/lib/graphUtils"; import { ApiError } from "@/api/client"; import { useColony } from "@/context/ColonyContext"; @@ -54,29 +58,38 @@ async function restoreSessionMessages( const { events, truncated, total, returned } = await sessionsApi.eventsHistory(sessionId); if (events.length > 0) { - const messages: ChatMessage[] = []; + // Walk events twice: + // 1. Extract the trailing queen phase (unchanged logic). + // 2. Run the full state-machine replay so tool_status pills + // are synthesized just like the live SSE handler does. + // Without (2), refreshed sessions showed zero tool activity + // because tool_call_started/completed events are ignored by + // the stateless converter. let runningPhase: ChatMessage["phase"] = undefined; for (const evt of events) { const p = evt.type === "queen_phase_changed" ? (evt.data?.phase as string) : evt.type === "node_loop_iteration" - ? (evt.data?.phase as string | undefined) - : undefined; + ? (evt.data?.phase as string | undefined) + : undefined; if (p && ["planning", "building", "staging", "running"].includes(p)) { runningPhase = p as ChatMessage["phase"]; } - const msg = sseEventToChatMessage(evt, thread, agentDisplayName); - if (!msg) continue; - if (evt.stream_id === "queen") { - msg.role = "queen"; - msg.phase = runningPhase; - } - messages.push(msg); } - // Prepend a run_divider banner so the user knows how many older - // events were dropped. Keeps the chronology readable without - // injecting a load-more button (we can add pagination later). + + const messages = replayEventsToMessages(events, thread, agentDisplayName); + // Stamp the latest phase on every queen message so the UI's + // phase-badge rendering matches what the live path would have + // displayed at the time of the refresh. + if (runningPhase) { + for (const m of messages) { + if (m.role === "queen") m.phase = runningPhase; + } + } + + // Prepend a run_divider banner when the server truncated older + // events so the user knows how many are hidden. const droppedCount = Math.max(0, total - returned); if (truncated && droppedCount > 0) { const firstTs = events[0]?.timestamp; diff --git a/core/frontend/src/pages/queen-dm.tsx b/core/frontend/src/pages/queen-dm.tsx index 3c27eac8..25fc5c98 100644 --- a/core/frontend/src/pages/queen-dm.tsx +++ b/core/frontend/src/pages/queen-dm.tsx @@ -11,7 +11,10 @@ import { sessionsApi } from "@/api/sessions"; import { queensApi } from "@/api/queens"; import { useMultiSSE } from "@/hooks/use-sse"; import type { AgentEvent, HistorySession } from "@/api/types"; -import { sseEventToChatMessage } from "@/lib/chat-helpers"; +import { + sseEventToChatMessage, + replayEventsToMessages, +} from "@/lib/chat-helpers"; import { useColony } from "@/context/ColonyContext"; import { useHeaderActions } from "@/context/HeaderActionsContext"; import { getQueenForAgent, slugToColonyId } from "@/lib/colony-registry"; @@ -93,13 +96,12 @@ export default function QueenDM() { const { events, truncated, total, returned } = await sessionsApi.eventsHistory(sid); if (cancelled()) return; - const restored: ChatMessage[] = []; - for (const evt of events) { - const msg = sseEventToChatMessage(evt, "queen-dm", queenName); - if (!msg) continue; - if (evt.stream_id === "queen") msg.role = "queen"; - restored.push(msg); - } + + // Use the stateful replay so tool_status pills are synthesized + // the same way the live SSE handler does — without this the + // refreshed queen DM shows zero tool activity. + const restored = replayEventsToMessages(events, "queen-dm", queenName); + // Show a banner if the server truncated older events. const droppedCount = Math.max(0, total - returned); if (truncated && droppedCount > 0) { @@ -119,7 +121,6 @@ export default function QueenDM() { }); } if (restored.length > 0 && !cancelled()) { - restored.sort((a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0)); setMessages(restored); // Only clear typing if the history contains a completed execution; // during bootstrap the queen is still processing.