diff --git a/core/frontend/src/lib/chat-helpers.test.ts b/core/frontend/src/lib/chat-helpers.test.ts index b2c5a8f7..1d59abda 100644 --- a/core/frontend/src/lib/chat-helpers.test.ts +++ b/core/frontend/src/lib/chat-helpers.test.ts @@ -3,6 +3,7 @@ import { extractLastPhase, sseEventToChatMessage, formatAgentDisplayName, + replayEventsToMessages, } from "./chat-helpers"; import type { AgentEvent } from "@/api/types"; @@ -418,6 +419,104 @@ describe("sseEventToChatMessage", () => { }); }); +// --------------------------------------------------------------------------- +// replayEventsToMessages +// --------------------------------------------------------------------------- + +describe("replayEventsToMessages", () => { + it("merges queen inner turns from the same iteration into one restored bubble", () => { + const events = [ + makeEvent({ + type: "client_output_delta", + stream_id: "queen", + node_id: "queen", + execution_id: "session-1", + timestamp: "2026-04-20T12:45:25.234Z", + data: { + snapshot: "I will create the ERD.", + iteration: 0, + inner_turn: 0, + }, + }), + makeEvent({ + type: "tool_call_started", + stream_id: "queen", + node_id: "queen", + execution_id: "session-1", + timestamp: "2026-04-20T12:45:25.238Z", + data: { + tool_name: "write_file", + tool_use_id: "tool-1", + }, + }), + makeEvent({ + type: "tool_call_completed", + stream_id: "queen", + node_id: "queen", + execution_id: "session-1", + timestamp: "2026-04-20T12:45:25.250Z", + data: { + tool_name: "write_file", + tool_use_id: "tool-1", + result: "ok", + }, + }), + makeEvent({ + type: "client_output_delta", + stream_id: "queen", + node_id: "queen", + execution_id: "session-1", + timestamp: "2026-04-20T12:46:07.911Z", + data: { + snapshot: "Saved to `database_erd.md`.", + iteration: 0, + inner_turn: 2, + }, + }), + ]; + + const restored = replayEventsToMessages(events, "queen-dm", "Alexandra"); + const queenMessages = restored.filter( + (m) => m.role === "queen" && !m.type, + ); + + expect(queenMessages).toHaveLength(1); + expect(queenMessages[0].id).toBe("queen-stream-session-1-0"); + expect(queenMessages[0].content).toBe( + "I will create the ERD.\nSaved to `database_erd.md`.", + ); + expect(queenMessages[0].createdAt).toBe( + new Date("2026-04-20T12:45:25.234Z").getTime(), + ); + }); + + it("keeps worker inner turns as distinct restored bubbles", () => { + const events = [ + makeEvent({ + type: "llm_text_delta", + stream_id: "worker", + node_id: "research", + execution_id: "session-1", + data: { snapshot: "First pass", iteration: 0, inner_turn: 0 }, + }), + makeEvent({ + type: "llm_text_delta", + stream_id: "worker", + node_id: "research", + execution_id: "session-1", + data: { snapshot: "After tool", iteration: 0, inner_turn: 1 }, + }), + ]; + + const restored = replayEventsToMessages(events, "agent", "Research Agent"); + + expect(restored.map((m) => m.id)).toEqual([ + "stream-session-1-0-research", + "stream-session-1-0-t1-research", + ]); + }); +}); + // --------------------------------------------------------------------------- // formatAgentDisplayName // --------------------------------------------------------------------------- diff --git a/core/frontend/src/lib/chat-helpers.ts b/core/frontend/src/lib/chat-helpers.ts index fae0fbae..88d472df 100644 --- a/core/frontend/src/lib/chat-helpers.ts +++ b/core/frontend/src/lib/chat-helpers.ts @@ -262,10 +262,16 @@ export interface ReplayState { { name: string; done: boolean; streamId: string } >; toolUseToPill: Record; + queenIterText: Record>; } export function newReplayState(): ReplayState { - return { turnCounters: {}, activeToolCalls: {}, toolUseToPill: {} }; + return { + turnCounters: {}, + activeToolCalls: {}, + toolUseToPill: {}, + queenIterText: {}, + }; } /** @@ -305,14 +311,14 @@ export function replayEvent( const out: ChatMessage[] = []; - // Update state machine BEFORE the generic converter runs so the - // regular message emitted for this event sees the post-update - // counter (matches live handler ordering at colony-chat.tsx:525). + // Update state machine BEFORE the generic converter runs so regular + // messages and synthesized tool pills use the same turn counters in + // both live SSE handling and cold replay. switch (event.type) { case "execution_started": state.turnCounters[turnKey] = currentTurn + 1; - // New execution for a worker resets its active tools, mirroring - // the live handler's setAgentState at colony-chat.tsx:566. + // New execution for a worker resets its active tools so stale + // tool pills from a previous run cannot bleed into the next run. if (!isQueen) { const keepActive: typeof state.activeToolCalls = {}; for (const [k, v] of Object.entries(state.activeToolCalls)) { @@ -396,7 +402,27 @@ export function replayEvent( state.turnCounters[turnKey] ?? 0, ); if (msg) { - if (isQueen) msg.role = "queen"; + if (isQueen) { + msg.role = "queen"; + if ( + event.execution_id && + (event.type === "client_output_delta" || event.type === "llm_text_delta") + ) { + const iter = (event.data?.iteration as number | undefined) ?? 0; + const inner = (event.data?.inner_turn as number | undefined) ?? 0; + const iterKey = `${event.execution_id}:${iter}`; + if (!state.queenIterText[iterKey]) { + state.queenIterText[iterKey] = {}; + } + state.queenIterText[iterKey][inner] = msg.content; + const parts = state.queenIterText[iterKey]; + const sorted = Object.keys(parts) + .map(Number) + .sort((a, b) => a - b); + msg.content = sorted.map((k) => parts[k]).join("\n"); + msg.id = `queen-stream-${event.execution_id}-${iter}`; + } + } out.push(msg); } @@ -420,8 +446,8 @@ export function replayEventsToMessages( thread: string, agentDisplayName: string | undefined, queenDisplayName?: string, + state: ReplayState = newReplayState(), ): ChatMessage[] { - const state = newReplayState(); // Upsert by id — later emissions for the same pill replace earlier ones. const byId = new Map(); @@ -446,7 +472,11 @@ export function replayEventsToMessages( continue; } for (const m of replayEvent(state, evt, thread, agentDisplayName, queenDisplayName)) { - byId.set(m.id, m); + const previous = byId.get(m.id); + byId.set( + m.id, + previous ? { ...m, createdAt: previous.createdAt ?? m.createdAt } : m, + ); } } diff --git a/core/frontend/src/pages/colony-chat.tsx b/core/frontend/src/pages/colony-chat.tsx index 41a63611..7a55bbcf 100644 --- a/core/frontend/src/pages/colony-chat.tsx +++ b/core/frontend/src/pages/colony-chat.tsx @@ -12,9 +12,11 @@ import { sessionsApi } from "@/api/sessions"; import { useMultiSSE } from "@/hooks/use-sse"; import type { LiveSession, AgentEvent } from "@/api/types"; import { - sseEventToChatMessage, formatAgentDisplayName, + newReplayState, + replayEvent, replayEventsToMessages, + type ReplayState, } from "@/lib/chat-helpers"; import { resolveInitialColonyPhase, @@ -47,6 +49,7 @@ function truncate(s: string, max: number): string { type SessionRestoreResult = { messages: ChatMessage[]; + replayState: ReplayState; restoredPhase: "independent" | "incubating" | "working" | "reviewing" | null; truncated: boolean; droppedCount: number; @@ -82,7 +85,14 @@ async function restoreSessionMessages( } } - const messages = replayEventsToMessages(events, thread, agentDisplayName, queenDisplayName); + const replayState = newReplayState(); + const messages = replayEventsToMessages( + events, + thread, + agentDisplayName, + queenDisplayName, + replayState, + ); // Stamp the latest phase on every queen message so the UI's // phase-badge rendering matches what the live path would have // displayed at the time of the refresh. @@ -111,6 +121,7 @@ async function restoreSessionMessages( } return { messages, + replayState, restoredPhase: runningPhase ?? null, truncated, droppedCount, @@ -119,7 +130,13 @@ async function restoreSessionMessages( } catch { // Event log not available } - return { messages: [], restoredPhase: null, truncated: false, droppedCount: 0 }; + return { + messages: [], + replayState: newReplayState(), + restoredPhase: null, + truncated: false, + droppedCount: 0, + }; } // ── Agent backend state ────────────────────────────────────────────────────── @@ -156,7 +173,6 @@ interface AgentState { queenIsTyping: boolean; workerIsTyping: boolean; llmSnapshots: Record; - activeToolCalls: Record; pendingQuestion: string | null; pendingOptions: string[] | null; pendingQuestions: { id: string; prompt: string; options?: string[] }[] | null; @@ -190,7 +206,6 @@ function defaultAgentState(): AgentState { queenIsTyping: false, workerIsTyping: false, llmSnapshots: {}, - activeToolCalls: {}, pendingQuestion: null, pendingOptions: null, pendingQuestions: null, @@ -305,18 +320,11 @@ export default function ColonyChat() { const agentStateRef = useRef(agentState); agentStateRef.current = agentState; - const turnCounterRef = useRef>({}); + const replayStateRef = useRef(newReplayState()); // Timestamp of the latest restored message — SSE events older than this // are duplicates from the ring-buffer replay and should be skipped. const restoreCutoffRef = useRef(0); - // Maps tool_use_id → the pill message ID and tool name that was created for it. - // Survives turn counter resets so deferred completions (e.g. ask_user) can - // find and update the correct pill even after the counter changes. - const toolUseToPillRef = useRef< - Record - >({}); const queenPhaseRef = useRef("independent"); - const queenIterTextRef = useRef>>({}); const suppressIntroRef = useRef(false); const loadingRef = useRef(false); @@ -491,6 +499,7 @@ export default function ColonyChat() { const session = liveSession!; const displayName = formatAgentDisplayName(session.colony_name || agentPath); let restoredMessages: ChatMessage[] = []; + let restoredReplayState: ReplayState | null = null; const reusePrefetchedRestore = shouldUsePrefetchedColonyRestore( coldRestoreId, session.session_id, @@ -507,10 +516,12 @@ export default function ColonyChat() { if (restored.messages.length > 0) { restoredMessages = restored.messages; } + restoredReplayState = restored.replayState; restoredPhase = restored.restoredPhase; } else if (prefetchedRestore) { if (reusePrefetchedRestore) { restoredMessages = prefetchedRestore.messages; + restoredReplayState = prefetchedRestore.replayState; restoredPhase = prefetchedRestore.restoredPhase; } else { // The backend corrected the resume target to the colony's forked @@ -523,10 +534,15 @@ export default function ColonyChat() { queenInfo.name, ); restoredMessages = restored.messages; + restoredReplayState = restored.replayState; restoredPhase = restored.restoredPhase; } } + if (restoredReplayState) { + replayStateRef.current = restoredReplayState; + } + if (restoredMessages.length > 0) { restoredMessages.sort((a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0)); setMessages(restoredMessages); @@ -580,10 +596,8 @@ export default function ColonyChat() { setMessages([]); setGraphNodes([]); setAgentState(defaultAgentState()); - turnCounterRef.current = {}; - toolUseToPillRef.current = {}; + replayStateRef.current = newReplayState(); queenPhaseRef.current = "independent"; - queenIterTextRef.current = {}; suppressIntroRef.current = false; restoreCutoffRef.current = 0; loadingRef.current = false; @@ -600,11 +614,7 @@ export default function ColonyChat() { const suppressQueenMessages = isQueen && suppressIntroRef.current; const state = agentStateRef.current; const agentDisplayName = state.displayName; - const displayName = isQueen ? queenInfo.name : agentDisplayName || undefined; - const role = isQueen ? ("queen" as const) : ("worker" as const); const ts = fmtLogTs(event.timestamp); - const turnKey = streamId; - const currentTurn = turnCounterRef.current[turnKey] ?? 0; const eventCreatedAt = event.timestamp ? new Date(event.timestamp).getTime() : Date.now(); @@ -619,11 +629,17 @@ export default function ColonyChat() { } const shouldMarkQueenReady = isQueen && !state.queenReady; + const emittedMessages = replayEvent( + replayStateRef.current, + event, + agentPath, + agentDisplayName || undefined, + queenInfo.name, + ); switch (event.type) { case "execution_started": if (isQueen) { - turnCounterRef.current[turnKey] = currentTurn + 1; updateState({ isTyping: true, queenIsTyping: true, @@ -645,7 +661,6 @@ export default function ColonyChat() { createdAt: eventCreatedAt, }); } - turnCounterRef.current[turnKey] = currentTurn + 1; updateState({ isTyping: true, isStreaming: false, @@ -655,7 +670,6 @@ export default function ColonyChat() { nodeLogs: {}, subagentReports: [], llmSnapshots: {}, - activeToolCalls: {}, pendingQuestion: null, pendingOptions: null, pendingQuestions: null, @@ -692,37 +706,15 @@ export default function ColonyChat() { case "client_input_received": case "client_input_requested": case "llm_text_delta": { - const chatMsg = sseEventToChatMessage(event, agentPath, displayName, currentTurn); - if (chatMsg && !suppressQueenMessages) { - // Merge queen inner_turns within same iteration - if ( - isQueen && - (event.type === "client_output_delta" || event.type === "llm_text_delta") && - event.execution_id - ) { - const iter = event.data?.iteration ?? 0; - const inner = (event.data?.inner_turn as number) ?? 0; - const iterKey = `${event.execution_id}:${iter}`; - if (!queenIterTextRef.current[iterKey]) { - queenIterTextRef.current[iterKey] = {}; + if (!suppressQueenMessages) { + for (const msg of emittedMessages) { + if (isQueen) { + msg.phase = queenPhaseRef.current as ChatMessage["phase"]; } - const snapshot = - (event.data?.snapshot as string) || (event.data?.content as string) || ""; - queenIterTextRef.current[iterKey][inner] = snapshot; - const parts = queenIterTextRef.current[iterKey]; - const sorted = Object.keys(parts) - .map(Number) - .sort((a, b) => a - b); - chatMsg.content = sorted.map((k) => parts[k]).join("\n"); - chatMsg.id = `queen-stream-${event.execution_id}-${iter}`; + upsertMessage(msg, { + reconcileOptimisticUser: event.type === "client_input_received", + }); } - if (isQueen) { - chatMsg.role = role; - chatMsg.phase = queenPhaseRef.current as ChatMessage["phase"]; - } - upsertMessage(chatMsg, { - reconcileOptimisticUser: event.type === "client_input_received", - }); } if (event.type === "llm_text_delta" || event.type === "client_output_delta") { @@ -806,8 +798,7 @@ export default function ColonyChat() { } case "node_loop_started": - turnCounterRef.current[turnKey] = currentTurn + 1; - updateState({ isTyping: true, activeToolCalls: {} }); + updateState({ isTyping: true }); if (!isQueen && event.node_id) { const existing = graphNodes.find((n) => n.id === event.node_id); const isRevisit = existing?.status === "complete"; @@ -819,11 +810,9 @@ export default function ColonyChat() { break; case "node_loop_iteration": - turnCounterRef.current[turnKey] = currentTurn + 1; if (isQueen) { updateState({ isStreaming: false, - activeToolCalls: {}, awaitingInput: false, pendingQuestion: null, pendingOptions: null, @@ -834,7 +823,6 @@ export default function ColonyChat() { updateState({ isStreaming: false, workerIsTyping: true, - activeToolCalls: {}, awaitingInput: false, pendingQuestion: null, pendingOptions: null, @@ -903,41 +891,13 @@ export default function ColonyChat() { ); } - const toolName = (event.data?.tool_name as string) || "unknown"; - const toolUseId = (event.data?.tool_use_id as string) || ""; - - const sid = event.stream_id; - // Track which pill message this tool belongs to so deferred - // completions (ask_user) can find it after the turn counter changes. - toolUseToPillRef.current[toolUseId] = { - msgId: `tool-pill-${sid}-${event.execution_id || "exec"}-${currentTurn}`, - name: toolName, - }; - setAgentState((prev) => { - const newActive = { - ...prev.activeToolCalls, - [toolUseId]: { name: toolName, done: false, streamId: sid }, - }; - const tools = Object.values(newActive) - .filter((t) => t.streamId === sid) - .map((t) => ({ name: t.name, done: t.done })); - const allDone = tools.length > 0 && tools.every((t) => t.done); - upsertMessage({ - id: `tool-pill-${sid}-${event.execution_id || "exec"}-${currentTurn}`, - agent: agentDisplayName || event.node_id || "Agent", - agentColor: "", - content: JSON.stringify({ tools, allDone }), - timestamp: "", - type: "tool_status", - role, - thread: agentPath, - createdAt: eventCreatedAt, - nodeId: event.node_id || undefined, - executionId: event.execution_id || undefined, - streamId: sid || undefined, - }); - return { ...prev, isStreaming: false, activeToolCalls: newActive }; - }); + for (const msg of emittedMessages) { + if (msg.role === "queen") { + msg.phase = queenPhaseRef.current as ChatMessage["phase"]; + } + upsertMessage(msg); + } + updateState({ isStreaming: false }); } break; } @@ -945,7 +905,6 @@ export default function ColonyChat() { case "tool_call_completed": { if (event.node_id) { const toolName = (event.data?.tool_name as string) || "unknown"; - const toolUseId = (event.data?.tool_use_id as string) || ""; const isError = event.data?.is_error as boolean | undefined; const result = event.data?.result as string | undefined; if (isError) { @@ -958,74 +917,12 @@ export default function ColonyChat() { appendNodeLog(event.node_id, `${ts} INFO ${toolName} done${resultStr}`); } - // Look up the original pill message this tool belongs to. - // For deferred completions (ask_user), the turn counter and - // activeToolCalls have already been reset, so we rely on the - // ref recorded during tool_call_started. - const tracked = toolUseToPillRef.current[toolUseId]; - delete toolUseToPillRef.current[toolUseId]; - - const sid = event.stream_id; - - // Mark done in activeToolCalls if still present (normal case) - setAgentState((prev) => { - if (!prev.activeToolCalls[toolUseId]) return prev; - return { - ...prev, - activeToolCalls: { - ...prev.activeToolCalls, - [toolUseId]: { - ...prev.activeToolCalls[toolUseId], - done: true, - }, - }, - }; - }); - - // Determine the correct pill message ID - const pillMsgId = - tracked?.msgId ?? - `tool-pill-${sid}-${event.execution_id || "exec"}-${currentTurn}`; - const trackedName = tracked?.name; - - // Update the pill message content directly - setMessages((prevMsgs) => { - const idx = prevMsgs.findIndex((m) => m.id === pillMsgId); - if (idx < 0) return prevMsgs; - - try { - const parsed = JSON.parse(prevMsgs[idx].content); - const tools: { name: string; done: boolean }[] = - parsed.tools || []; - - if (trackedName) { - let marked = false; - for (let i = 0; i < tools.length; i++) { - if ( - tools[i].name === trackedName && - !tools[i].done && - !marked - ) { - tools[i] = { ...tools[i], done: true }; - marked = true; - } - } - } - - const allDone = - tools.length > 0 && tools.every((t) => t.done); - return prevMsgs.map((m, i) => - i === idx - ? { - ...m, - content: JSON.stringify({ tools, allDone }), - } - : m, - ); - } catch { - return prevMsgs; + for (const msg of emittedMessages) { + if (msg.role === "queen") { + msg.phase = queenPhaseRef.current as ChatMessage["phase"]; } - }); + upsertMessage(msg); + } } break; } diff --git a/core/frontend/src/pages/queen-dm.tsx b/core/frontend/src/pages/queen-dm.tsx index 9bfa149d..f4fe5499 100644 --- a/core/frontend/src/pages/queen-dm.tsx +++ b/core/frontend/src/pages/queen-dm.tsx @@ -12,7 +12,8 @@ import { queensApi } from "@/api/queens"; import { useMultiSSE } from "@/hooks/use-sse"; import type { AgentEvent, HistorySession } from "@/api/types"; import { - sseEventToChatMessage, + newReplayState, + replayEvent, replayEventsToMessages, } from "@/lib/chat-helpers"; import { useColony } from "@/context/ColonyContext"; @@ -46,9 +47,6 @@ export default function QueenDM() { >(null); const [awaitingInput, setAwaitingInput] = useState(false); const [tokenUsage, setTokenUsage] = useState({ input: 0, output: 0 }); - const [, setActiveToolCalls] = useState< - Record - >({}); const [historySessions, setHistorySessions] = useState([]); const [historyLoading, setHistoryLoading] = useState(false); const [switchingSessionId, setSwitchingSessionId] = useState( @@ -70,14 +68,7 @@ export default function QueenDM() { ); const [compactingAndForking, setCompactingAndForking] = useState(false); - const turnCounterRef = useRef(0); - // Maps tool_use_id → the pill message ID and tool name that was created for it. - // Survives turn counter resets so deferred completions (e.g. ask_user) can - // find and update the correct pill even after llm_turn_complete bumps the counter. - const toolUseToPillRef = useRef< - Record - >({}); - const queenIterTextRef = useRef>>({}); + const replayStateRef = useRef(newReplayState()); const [queenPhase, setQueenPhase] = useState< "independent" | "incubating" | "working" | "reviewing" >("independent"); @@ -92,18 +83,55 @@ export default function QueenDM() { setPendingOptions(null); setPendingQuestions(null); setAwaitingInput(false); - setActiveToolCalls({}); setQueenPhase("independent"); setTokenUsage({ input: 0, output: 0 }); setInitialDraft(null); setColonySpawned(false); setSpawnedColonyName(null); setCompactingAndForking(false); - turnCounterRef.current = 0; - toolUseToPillRef.current = {}; - queenIterTextRef.current = {}; + replayStateRef.current = newReplayState(); }, []); + const upsertMessage = useCallback( + (chatMsg: ChatMessage, options?: { reconcileOptimisticUser?: boolean }) => { + setMessages((prev) => { + const idx = prev.findIndex((m) => m.id === chatMsg.id); + if (idx >= 0) { + return prev.map((m, i) => + i === idx ? { ...chatMsg, createdAt: m.createdAt ?? chatMsg.createdAt } : m, + ); + } + if (options?.reconcileOptimisticUser && chatMsg.type === "user") { + const incomingTs = chatMsg.createdAt ?? Date.now(); + const matchIdx = prev.findIndex( + (m) => + m.type === "user" && + m.content === chatMsg.content && + Math.abs(incomingTs - (m.createdAt ?? incomingTs)) <= 15000, + ); + if (matchIdx !== -1) { + return prev.map((m, i) => + i === matchIdx ? { ...m, id: chatMsg.id, queued: undefined } : m, + ); + } + } + + const ts = chatMsg.createdAt ?? Date.now(); + let insertIdx = prev.length - 1; + while (insertIdx >= 0 && (prev[insertIdx].createdAt ?? 0) > ts) { + insertIdx--; + } + if (insertIdx === -1 || insertIdx === prev.length - 1) { + return [...prev, chatMsg]; + } + const next = [...prev]; + next.splice(insertIdx + 1, 0, chatMsg); + return next; + }); + }, + [], + ); + const restoreMessages = useCallback( async (sid: string, cancelled: () => boolean) => { try { @@ -114,7 +142,15 @@ export default function QueenDM() { // Use the stateful replay so tool_status pills are synthesized // the same way the live SSE handler does — without this the // refreshed queen DM shows zero tool activity. - const restored = replayEventsToMessages(events, "queen-dm", queenName); + const replayState = newReplayState(); + const restored = replayEventsToMessages( + events, + "queen-dm", + queenName, + undefined, + replayState, + ); + replayStateRef.current = replayState; // Show a banner if the server truncated older events. const droppedCount = Math.max(0, total - returned); @@ -492,14 +528,17 @@ export default function QueenDM() { (_agentType: string, event: AgentEvent) => { const isQueen = event.stream_id === "queen"; if (!isQueen) return; + const emittedMessages = replayEvent( + replayStateRef.current, + event, + "queen-dm", + queenName, + ); switch (event.type) { case "execution_started": - turnCounterRef.current++; setIsTyping(true); setQueenReady(true); - setActiveToolCalls({}); - toolUseToPillRef.current = {}; // Clear queued flag on all user messages now that the queen is processing setMessages((prev) => { if (!prev.some((m) => m.queued)) return prev; @@ -513,8 +552,6 @@ export default function QueenDM() { break; case "llm_turn_complete": - turnCounterRef.current++; - setActiveToolCalls({}); if (event.data) { const inp = (event.data.input_tokens as number) || 0; const out = (event.data.output_tokens as number) || 0; @@ -524,48 +561,7 @@ export default function QueenDM() { case "client_output_delta": case "llm_text_delta": { - const chatMsg = sseEventToChatMessage( - event, - "queen-dm", - queenName, - turnCounterRef.current, - ); - if (chatMsg) { - if (event.execution_id) { - const iter = event.data?.iteration ?? 0; - const inner = (event.data?.inner_turn as number) ?? 0; - const iterKey = `${event.execution_id}:${iter}`; - if (!queenIterTextRef.current[iterKey]) { - queenIterTextRef.current[iterKey] = {}; - } - const snapshot = - (event.data?.snapshot as string) || - (event.data?.content as string) || - ""; - queenIterTextRef.current[iterKey][inner] = snapshot; - const parts = queenIterTextRef.current[iterKey]; - const sorted = Object.keys(parts) - .map(Number) - .sort((a, b) => a - b); - chatMsg.content = sorted.map((k) => parts[k]).join("\n"); - chatMsg.id = `queen-stream-${event.execution_id}-${iter}`; - } - chatMsg.role = "queen"; - - setMessages((prev) => { - const idx = prev.findIndex((m) => m.id === chatMsg.id); - if (idx >= 0) { - // Preserve the original createdAt so the displayed timestamp - // doesn't tick forward as new deltas stream in. - return prev.map((m, i) => - i === idx - ? { ...chatMsg, createdAt: m.createdAt ?? chatMsg.createdAt } - : m, - ); - } - return [...prev, chatMsg]; - }); - } + for (const msg of emittedMessages) upsertMessage(msg); setIsStreaming(true); break; } @@ -594,34 +590,8 @@ export default function QueenDM() { } case "client_input_received": { - const chatMsg = sseEventToChatMessage( - event, - "queen-dm", - queenName, - turnCounterRef.current, - ); - if (chatMsg) { - setMessages((prev) => { - // Reconcile optimistic user message. A matching echo from - // the backend means the queen has now received this - // message, so drop the "queued" indicator (it was set when - // the user sent while the queen was still busy). - if (chatMsg.type === "user" && prev.length > 0) { - const idx = prev.findIndex( - (m) => - m.type === "user" && - m.content === chatMsg.content && - Math.abs((chatMsg.createdAt ?? 0) - (m.createdAt ?? 0)) <= - 15000, - ); - if (idx !== -1) { - return prev.map((m, i) => - i === idx ? { ...m, id: chatMsg.id, queued: undefined } : m, - ); - } - } - return [...prev, chatMsg]; - }); + for (const msg of emittedMessages) { + upsertMessage(msg, { reconcileOptimisticUser: true }); } break; } @@ -676,133 +646,12 @@ export default function QueenDM() { } case "tool_call_started": { - const toolName = (event.data?.tool_name as string) || "unknown"; - const toolUseId = (event.data?.tool_use_id as string) || ""; - const sid = event.stream_id; - const execId = event.execution_id || "exec"; - const eventCreatedAt = event.timestamp - ? new Date(event.timestamp).getTime() - : Date.now(); - - // Track which pill message this tool belongs to so deferred - // completions (ask_user) can find it after the turn counter changes. - const msgId = `tool-pill-${sid}-${execId}-${turnCounterRef.current}`; - toolUseToPillRef.current[toolUseId] = { msgId, name: toolName }; - - setActiveToolCalls((prev) => { - const newActive = { - ...prev, - [toolUseId]: { name: toolName, done: false }, - }; - const tools = Object.entries(newActive).map(([, t]) => ({ - name: t.name, - done: t.done, - })); - const allDone = tools.length > 0 && tools.every((t) => t.done); - const toolMsg: ChatMessage = { - id: msgId, - agent: queenName, - agentColor: "", - content: JSON.stringify({ tools, allDone }), - timestamp: "", - type: "tool_status", - role: "queen", - thread: "queen-dm", - createdAt: eventCreatedAt, - nodeId: event.node_id || undefined, - executionId: event.execution_id || undefined, - streamId: sid || undefined, - }; - setMessages((prevMsgs) => { - const idx = prevMsgs.findIndex((m) => m.id === msgId); - if (idx >= 0) { - return prevMsgs.map((m, i) => - i === idx ? { ...toolMsg, createdAt: m.createdAt ?? toolMsg.createdAt } : m, - ); - } - // Insert in sorted position by createdAt - const ts = toolMsg.createdAt ?? Date.now(); - let insertIdx = prevMsgs.length - 1; - while (insertIdx >= 0 && (prevMsgs[insertIdx].createdAt ?? 0) > ts) { - insertIdx--; - } - if (insertIdx === -1 || insertIdx === prevMsgs.length - 1) { - return [...prevMsgs, toolMsg]; - } - const next = [...prevMsgs]; - next.splice(insertIdx + 1, 0, toolMsg); - return next; - }); - return newActive; - }); + for (const msg of emittedMessages) upsertMessage(msg); break; } case "tool_call_completed": { - const toolUseId = (event.data?.tool_use_id as string) || ""; - - // Look up the original pill message this tool belongs to. - // For deferred completions (ask_user), the turn counter and - // activeToolCalls have already been reset by llm_turn_complete, - // so we rely on the ref recorded during tool_call_started. - const tracked = toolUseToPillRef.current[toolUseId]; - delete toolUseToPillRef.current[toolUseId]; - - // Mark done in activeToolCalls if still present (normal case) - setActiveToolCalls((prev) => { - if (!prev[toolUseId]) return prev; - return { - ...prev, - [toolUseId]: { ...prev[toolUseId], done: true }, - }; - }); - - // Determine the correct pill message ID - const sid = event.stream_id; - const execId = event.execution_id || "exec"; - const pillMsgId = - tracked?.msgId ?? - `tool-pill-${sid}-${execId}-${turnCounterRef.current}`; - const toolName = tracked?.name; - - // Update the pill message content directly - setMessages((prevMsgs) => { - const idx = prevMsgs.findIndex((m) => m.id === pillMsgId); - if (idx < 0) return prevMsgs; - - try { - const parsed = JSON.parse(prevMsgs[idx].content); - const tools: { name: string; done: boolean }[] = - parsed.tools || []; - - if (toolName) { - let marked = false; - for (let i = 0; i < tools.length; i++) { - if ( - tools[i].name === toolName && - !tools[i].done && - !marked - ) { - tools[i] = { ...tools[i], done: true }; - marked = true; - } - } - } - - const allDone = - tools.length > 0 && tools.every((t) => t.done); - return prevMsgs.map((m, i) => - i === idx - ? { - ...m, - content: JSON.stringify({ tools, allDone }), - } - : m, - ); - } catch { - return prevMsgs; - } - }); + for (const msg of emittedMessages) upsertMessage(msg); break; } @@ -810,7 +659,7 @@ export default function QueenDM() { break; } }, - [queenName, refresh], + [queenName, refresh, upsertMessage], ); const sseSessions = useMemo((): Record => { @@ -885,8 +734,7 @@ export default function QueenDM() { await executionApi.cancelQueen(sessionId); setIsTyping(false); setIsStreaming(false); - setActiveToolCalls({}); - toolUseToPillRef.current = {}; + replayStateRef.current = newReplayState(); // Clear queued flags since the queen is now idle setMessages((prev) => { if (!prev.some((m) => m.queued)) return prev;