feat: fix reply message bubble and improve code reuse

This commit is contained in:
Richard Tang
2026-04-20 13:07:26 -07:00
parent 4ffddc53e6
commit c1d7b0ee69
4 changed files with 261 additions and 387 deletions
@@ -3,6 +3,7 @@ import {
extractLastPhase,
sseEventToChatMessage,
formatAgentDisplayName,
replayEventsToMessages,
} from "./chat-helpers";
import type { AgentEvent } from "@/api/types";
@@ -418,6 +419,104 @@ describe("sseEventToChatMessage", () => {
});
});
// ---------------------------------------------------------------------------
// replayEventsToMessages
// ---------------------------------------------------------------------------
describe("replayEventsToMessages", () => {
it("merges queen inner turns from the same iteration into one restored bubble", () => {
const events = [
makeEvent({
type: "client_output_delta",
stream_id: "queen",
node_id: "queen",
execution_id: "session-1",
timestamp: "2026-04-20T12:45:25.234Z",
data: {
snapshot: "I will create the ERD.",
iteration: 0,
inner_turn: 0,
},
}),
makeEvent({
type: "tool_call_started",
stream_id: "queen",
node_id: "queen",
execution_id: "session-1",
timestamp: "2026-04-20T12:45:25.238Z",
data: {
tool_name: "write_file",
tool_use_id: "tool-1",
},
}),
makeEvent({
type: "tool_call_completed",
stream_id: "queen",
node_id: "queen",
execution_id: "session-1",
timestamp: "2026-04-20T12:45:25.250Z",
data: {
tool_name: "write_file",
tool_use_id: "tool-1",
result: "ok",
},
}),
makeEvent({
type: "client_output_delta",
stream_id: "queen",
node_id: "queen",
execution_id: "session-1",
timestamp: "2026-04-20T12:46:07.911Z",
data: {
snapshot: "Saved to `database_erd.md`.",
iteration: 0,
inner_turn: 2,
},
}),
];
const restored = replayEventsToMessages(events, "queen-dm", "Alexandra");
const queenMessages = restored.filter(
(m) => m.role === "queen" && !m.type,
);
expect(queenMessages).toHaveLength(1);
expect(queenMessages[0].id).toBe("queen-stream-session-1-0");
expect(queenMessages[0].content).toBe(
"I will create the ERD.\nSaved to `database_erd.md`.",
);
expect(queenMessages[0].createdAt).toBe(
new Date("2026-04-20T12:45:25.234Z").getTime(),
);
});
it("keeps worker inner turns as distinct restored bubbles", () => {
const events = [
makeEvent({
type: "llm_text_delta",
stream_id: "worker",
node_id: "research",
execution_id: "session-1",
data: { snapshot: "First pass", iteration: 0, inner_turn: 0 },
}),
makeEvent({
type: "llm_text_delta",
stream_id: "worker",
node_id: "research",
execution_id: "session-1",
data: { snapshot: "After tool", iteration: 0, inner_turn: 1 },
}),
];
const restored = replayEventsToMessages(events, "agent", "Research Agent");
expect(restored.map((m) => m.id)).toEqual([
"stream-session-1-0-research",
"stream-session-1-0-t1-research",
]);
});
});
// ---------------------------------------------------------------------------
// formatAgentDisplayName
// ---------------------------------------------------------------------------
+39 -9
View File
@@ -262,10 +262,16 @@ export interface ReplayState {
{ name: string; done: boolean; streamId: string }
>;
toolUseToPill: Record<string, { msgId: string; name: string }>;
queenIterText: Record<string, Record<number, string>>;
}
export function newReplayState(): ReplayState {
return { turnCounters: {}, activeToolCalls: {}, toolUseToPill: {} };
return {
turnCounters: {},
activeToolCalls: {},
toolUseToPill: {},
queenIterText: {},
};
}
/**
@@ -305,14 +311,14 @@ export function replayEvent(
const out: ChatMessage[] = [];
// Update state machine BEFORE the generic converter runs so the
// regular message emitted for this event sees the post-update
// counter (matches live handler ordering at colony-chat.tsx:525).
// Update state machine BEFORE the generic converter runs so regular
// messages and synthesized tool pills use the same turn counters in
// both live SSE handling and cold replay.
switch (event.type) {
case "execution_started":
state.turnCounters[turnKey] = currentTurn + 1;
// New execution for a worker resets its active tools, mirroring
// the live handler's setAgentState at colony-chat.tsx:566.
// New execution for a worker resets its active tools so stale
// tool pills from a previous run cannot bleed into the next run.
if (!isQueen) {
const keepActive: typeof state.activeToolCalls = {};
for (const [k, v] of Object.entries(state.activeToolCalls)) {
@@ -396,7 +402,27 @@ export function replayEvent(
state.turnCounters[turnKey] ?? 0,
);
if (msg) {
if (isQueen) msg.role = "queen";
if (isQueen) {
msg.role = "queen";
if (
event.execution_id &&
(event.type === "client_output_delta" || event.type === "llm_text_delta")
) {
const iter = (event.data?.iteration as number | undefined) ?? 0;
const inner = (event.data?.inner_turn as number | undefined) ?? 0;
const iterKey = `${event.execution_id}:${iter}`;
if (!state.queenIterText[iterKey]) {
state.queenIterText[iterKey] = {};
}
state.queenIterText[iterKey][inner] = msg.content;
const parts = state.queenIterText[iterKey];
const sorted = Object.keys(parts)
.map(Number)
.sort((a, b) => a - b);
msg.content = sorted.map((k) => parts[k]).join("\n");
msg.id = `queen-stream-${event.execution_id}-${iter}`;
}
}
out.push(msg);
}
@@ -420,8 +446,8 @@ export function replayEventsToMessages(
thread: string,
agentDisplayName: string | undefined,
queenDisplayName?: string,
state: ReplayState = newReplayState(),
): ChatMessage[] {
const state = newReplayState();
// Upsert by id — later emissions for the same pill replace earlier ones.
const byId = new Map<string, ChatMessage>();
@@ -446,7 +472,11 @@ export function replayEventsToMessages(
continue;
}
for (const m of replayEvent(state, evt, thread, agentDisplayName, queenDisplayName)) {
byId.set(m.id, m);
const previous = byId.get(m.id);
byId.set(
m.id,
previous ? { ...m, createdAt: previous.createdAt ?? m.createdAt } : m,
);
}
}
+57 -160
View File
@@ -12,9 +12,11 @@ import { sessionsApi } from "@/api/sessions";
import { useMultiSSE } from "@/hooks/use-sse";
import type { LiveSession, AgentEvent } from "@/api/types";
import {
sseEventToChatMessage,
formatAgentDisplayName,
newReplayState,
replayEvent,
replayEventsToMessages,
type ReplayState,
} from "@/lib/chat-helpers";
import {
resolveInitialColonyPhase,
@@ -47,6 +49,7 @@ function truncate(s: string, max: number): string {
type SessionRestoreResult = {
messages: ChatMessage[];
replayState: ReplayState;
restoredPhase: "independent" | "incubating" | "working" | "reviewing" | null;
truncated: boolean;
droppedCount: number;
@@ -82,7 +85,14 @@ async function restoreSessionMessages(
}
}
const messages = replayEventsToMessages(events, thread, agentDisplayName, queenDisplayName);
const replayState = newReplayState();
const messages = replayEventsToMessages(
events,
thread,
agentDisplayName,
queenDisplayName,
replayState,
);
// Stamp the latest phase on every queen message so the UI's
// phase-badge rendering matches what the live path would have
// displayed at the time of the refresh.
@@ -111,6 +121,7 @@ async function restoreSessionMessages(
}
return {
messages,
replayState,
restoredPhase: runningPhase ?? null,
truncated,
droppedCount,
@@ -119,7 +130,13 @@ async function restoreSessionMessages(
} catch {
// Event log not available
}
return { messages: [], restoredPhase: null, truncated: false, droppedCount: 0 };
return {
messages: [],
replayState: newReplayState(),
restoredPhase: null,
truncated: false,
droppedCount: 0,
};
}
// ── Agent backend state ──────────────────────────────────────────────────────
@@ -156,7 +173,6 @@ interface AgentState {
queenIsTyping: boolean;
workerIsTyping: boolean;
llmSnapshots: Record<string, string>;
activeToolCalls: Record<string, { name: string; done: boolean; streamId: string }>;
pendingQuestion: string | null;
pendingOptions: string[] | null;
pendingQuestions: { id: string; prompt: string; options?: string[] }[] | null;
@@ -190,7 +206,6 @@ function defaultAgentState(): AgentState {
queenIsTyping: false,
workerIsTyping: false,
llmSnapshots: {},
activeToolCalls: {},
pendingQuestion: null,
pendingOptions: null,
pendingQuestions: null,
@@ -305,18 +320,11 @@ export default function ColonyChat() {
const agentStateRef = useRef(agentState);
agentStateRef.current = agentState;
const turnCounterRef = useRef<Record<string, number>>({});
const replayStateRef = useRef(newReplayState());
// Timestamp of the latest restored message — SSE events older than this
// are duplicates from the ring-buffer replay and should be skipped.
const restoreCutoffRef = useRef<number>(0);
// Maps tool_use_id → the pill message ID and tool name that was created for it.
// Survives turn counter resets so deferred completions (e.g. ask_user) can
// find and update the correct pill even after the counter changes.
const toolUseToPillRef = useRef<
Record<string, { msgId: string; name: string }>
>({});
const queenPhaseRef = useRef<string>("independent");
const queenIterTextRef = useRef<Record<string, Record<number, string>>>({});
const suppressIntroRef = useRef(false);
const loadingRef = useRef(false);
@@ -491,6 +499,7 @@ export default function ColonyChat() {
const session = liveSession!;
const displayName = formatAgentDisplayName(session.colony_name || agentPath);
let restoredMessages: ChatMessage[] = [];
let restoredReplayState: ReplayState | null = null;
const reusePrefetchedRestore = shouldUsePrefetchedColonyRestore(
coldRestoreId,
session.session_id,
@@ -507,10 +516,12 @@ export default function ColonyChat() {
if (restored.messages.length > 0) {
restoredMessages = restored.messages;
}
restoredReplayState = restored.replayState;
restoredPhase = restored.restoredPhase;
} else if (prefetchedRestore) {
if (reusePrefetchedRestore) {
restoredMessages = prefetchedRestore.messages;
restoredReplayState = prefetchedRestore.replayState;
restoredPhase = prefetchedRestore.restoredPhase;
} else {
// The backend corrected the resume target to the colony's forked
@@ -523,10 +534,15 @@ export default function ColonyChat() {
queenInfo.name,
);
restoredMessages = restored.messages;
restoredReplayState = restored.replayState;
restoredPhase = restored.restoredPhase;
}
}
if (restoredReplayState) {
replayStateRef.current = restoredReplayState;
}
if (restoredMessages.length > 0) {
restoredMessages.sort((a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0));
setMessages(restoredMessages);
@@ -580,10 +596,8 @@ export default function ColonyChat() {
setMessages([]);
setGraphNodes([]);
setAgentState(defaultAgentState());
turnCounterRef.current = {};
toolUseToPillRef.current = {};
replayStateRef.current = newReplayState();
queenPhaseRef.current = "independent";
queenIterTextRef.current = {};
suppressIntroRef.current = false;
restoreCutoffRef.current = 0;
loadingRef.current = false;
@@ -600,11 +614,7 @@ export default function ColonyChat() {
const suppressQueenMessages = isQueen && suppressIntroRef.current;
const state = agentStateRef.current;
const agentDisplayName = state.displayName;
const displayName = isQueen ? queenInfo.name : agentDisplayName || undefined;
const role = isQueen ? ("queen" as const) : ("worker" as const);
const ts = fmtLogTs(event.timestamp);
const turnKey = streamId;
const currentTurn = turnCounterRef.current[turnKey] ?? 0;
const eventCreatedAt = event.timestamp
? new Date(event.timestamp).getTime()
: Date.now();
@@ -619,11 +629,17 @@ export default function ColonyChat() {
}
const shouldMarkQueenReady = isQueen && !state.queenReady;
const emittedMessages = replayEvent(
replayStateRef.current,
event,
agentPath,
agentDisplayName || undefined,
queenInfo.name,
);
switch (event.type) {
case "execution_started":
if (isQueen) {
turnCounterRef.current[turnKey] = currentTurn + 1;
updateState({
isTyping: true,
queenIsTyping: true,
@@ -645,7 +661,6 @@ export default function ColonyChat() {
createdAt: eventCreatedAt,
});
}
turnCounterRef.current[turnKey] = currentTurn + 1;
updateState({
isTyping: true,
isStreaming: false,
@@ -655,7 +670,6 @@ export default function ColonyChat() {
nodeLogs: {},
subagentReports: [],
llmSnapshots: {},
activeToolCalls: {},
pendingQuestion: null,
pendingOptions: null,
pendingQuestions: null,
@@ -692,37 +706,15 @@ export default function ColonyChat() {
case "client_input_received":
case "client_input_requested":
case "llm_text_delta": {
const chatMsg = sseEventToChatMessage(event, agentPath, displayName, currentTurn);
if (chatMsg && !suppressQueenMessages) {
// Merge queen inner_turns within same iteration
if (
isQueen &&
(event.type === "client_output_delta" || event.type === "llm_text_delta") &&
event.execution_id
) {
const iter = event.data?.iteration ?? 0;
const inner = (event.data?.inner_turn as number) ?? 0;
const iterKey = `${event.execution_id}:${iter}`;
if (!queenIterTextRef.current[iterKey]) {
queenIterTextRef.current[iterKey] = {};
if (!suppressQueenMessages) {
for (const msg of emittedMessages) {
if (isQueen) {
msg.phase = queenPhaseRef.current as ChatMessage["phase"];
}
const snapshot =
(event.data?.snapshot as string) || (event.data?.content as string) || "";
queenIterTextRef.current[iterKey][inner] = snapshot;
const parts = queenIterTextRef.current[iterKey];
const sorted = Object.keys(parts)
.map(Number)
.sort((a, b) => a - b);
chatMsg.content = sorted.map((k) => parts[k]).join("\n");
chatMsg.id = `queen-stream-${event.execution_id}-${iter}`;
upsertMessage(msg, {
reconcileOptimisticUser: event.type === "client_input_received",
});
}
if (isQueen) {
chatMsg.role = role;
chatMsg.phase = queenPhaseRef.current as ChatMessage["phase"];
}
upsertMessage(chatMsg, {
reconcileOptimisticUser: event.type === "client_input_received",
});
}
if (event.type === "llm_text_delta" || event.type === "client_output_delta") {
@@ -806,8 +798,7 @@ export default function ColonyChat() {
}
case "node_loop_started":
turnCounterRef.current[turnKey] = currentTurn + 1;
updateState({ isTyping: true, activeToolCalls: {} });
updateState({ isTyping: true });
if (!isQueen && event.node_id) {
const existing = graphNodes.find((n) => n.id === event.node_id);
const isRevisit = existing?.status === "complete";
@@ -819,11 +810,9 @@ export default function ColonyChat() {
break;
case "node_loop_iteration":
turnCounterRef.current[turnKey] = currentTurn + 1;
if (isQueen) {
updateState({
isStreaming: false,
activeToolCalls: {},
awaitingInput: false,
pendingQuestion: null,
pendingOptions: null,
@@ -834,7 +823,6 @@ export default function ColonyChat() {
updateState({
isStreaming: false,
workerIsTyping: true,
activeToolCalls: {},
awaitingInput: false,
pendingQuestion: null,
pendingOptions: null,
@@ -903,41 +891,13 @@ export default function ColonyChat() {
);
}
const toolName = (event.data?.tool_name as string) || "unknown";
const toolUseId = (event.data?.tool_use_id as string) || "";
const sid = event.stream_id;
// Track which pill message this tool belongs to so deferred
// completions (ask_user) can find it after the turn counter changes.
toolUseToPillRef.current[toolUseId] = {
msgId: `tool-pill-${sid}-${event.execution_id || "exec"}-${currentTurn}`,
name: toolName,
};
setAgentState((prev) => {
const newActive = {
...prev.activeToolCalls,
[toolUseId]: { name: toolName, done: false, streamId: sid },
};
const tools = Object.values(newActive)
.filter((t) => t.streamId === sid)
.map((t) => ({ name: t.name, done: t.done }));
const allDone = tools.length > 0 && tools.every((t) => t.done);
upsertMessage({
id: `tool-pill-${sid}-${event.execution_id || "exec"}-${currentTurn}`,
agent: agentDisplayName || event.node_id || "Agent",
agentColor: "",
content: JSON.stringify({ tools, allDone }),
timestamp: "",
type: "tool_status",
role,
thread: agentPath,
createdAt: eventCreatedAt,
nodeId: event.node_id || undefined,
executionId: event.execution_id || undefined,
streamId: sid || undefined,
});
return { ...prev, isStreaming: false, activeToolCalls: newActive };
});
for (const msg of emittedMessages) {
if (msg.role === "queen") {
msg.phase = queenPhaseRef.current as ChatMessage["phase"];
}
upsertMessage(msg);
}
updateState({ isStreaming: false });
}
break;
}
@@ -945,7 +905,6 @@ export default function ColonyChat() {
case "tool_call_completed": {
if (event.node_id) {
const toolName = (event.data?.tool_name as string) || "unknown";
const toolUseId = (event.data?.tool_use_id as string) || "";
const isError = event.data?.is_error as boolean | undefined;
const result = event.data?.result as string | undefined;
if (isError) {
@@ -958,74 +917,12 @@ export default function ColonyChat() {
appendNodeLog(event.node_id, `${ts} INFO ${toolName} done${resultStr}`);
}
// Look up the original pill message this tool belongs to.
// For deferred completions (ask_user), the turn counter and
// activeToolCalls have already been reset, so we rely on the
// ref recorded during tool_call_started.
const tracked = toolUseToPillRef.current[toolUseId];
delete toolUseToPillRef.current[toolUseId];
const sid = event.stream_id;
// Mark done in activeToolCalls if still present (normal case)
setAgentState((prev) => {
if (!prev.activeToolCalls[toolUseId]) return prev;
return {
...prev,
activeToolCalls: {
...prev.activeToolCalls,
[toolUseId]: {
...prev.activeToolCalls[toolUseId],
done: true,
},
},
};
});
// Determine the correct pill message ID
const pillMsgId =
tracked?.msgId ??
`tool-pill-${sid}-${event.execution_id || "exec"}-${currentTurn}`;
const trackedName = tracked?.name;
// Update the pill message content directly
setMessages((prevMsgs) => {
const idx = prevMsgs.findIndex((m) => m.id === pillMsgId);
if (idx < 0) return prevMsgs;
try {
const parsed = JSON.parse(prevMsgs[idx].content);
const tools: { name: string; done: boolean }[] =
parsed.tools || [];
if (trackedName) {
let marked = false;
for (let i = 0; i < tools.length; i++) {
if (
tools[i].name === trackedName &&
!tools[i].done &&
!marked
) {
tools[i] = { ...tools[i], done: true };
marked = true;
}
}
}
const allDone =
tools.length > 0 && tools.every((t) => t.done);
return prevMsgs.map((m, i) =>
i === idx
? {
...m,
content: JSON.stringify({ tools, allDone }),
}
: m,
);
} catch {
return prevMsgs;
for (const msg of emittedMessages) {
if (msg.role === "queen") {
msg.phase = queenPhaseRef.current as ChatMessage["phase"];
}
});
upsertMessage(msg);
}
}
break;
}
+66 -218
View File
@@ -12,7 +12,8 @@ import { queensApi } from "@/api/queens";
import { useMultiSSE } from "@/hooks/use-sse";
import type { AgentEvent, HistorySession } from "@/api/types";
import {
sseEventToChatMessage,
newReplayState,
replayEvent,
replayEventsToMessages,
} from "@/lib/chat-helpers";
import { useColony } from "@/context/ColonyContext";
@@ -46,9 +47,6 @@ export default function QueenDM() {
>(null);
const [awaitingInput, setAwaitingInput] = useState(false);
const [tokenUsage, setTokenUsage] = useState({ input: 0, output: 0 });
const [, setActiveToolCalls] = useState<
Record<string, { name: string; done: boolean }>
>({});
const [historySessions, setHistorySessions] = useState<HistorySession[]>([]);
const [historyLoading, setHistoryLoading] = useState(false);
const [switchingSessionId, setSwitchingSessionId] = useState<string | null>(
@@ -70,14 +68,7 @@ export default function QueenDM() {
);
const [compactingAndForking, setCompactingAndForking] = useState(false);
const turnCounterRef = useRef(0);
// Maps tool_use_id → the pill message ID and tool name that was created for it.
// Survives turn counter resets so deferred completions (e.g. ask_user) can
// find and update the correct pill even after llm_turn_complete bumps the counter.
const toolUseToPillRef = useRef<
Record<string, { msgId: string; name: string }>
>({});
const queenIterTextRef = useRef<Record<string, Record<number, string>>>({});
const replayStateRef = useRef(newReplayState());
const [queenPhase, setQueenPhase] = useState<
"independent" | "incubating" | "working" | "reviewing"
>("independent");
@@ -92,18 +83,55 @@ export default function QueenDM() {
setPendingOptions(null);
setPendingQuestions(null);
setAwaitingInput(false);
setActiveToolCalls({});
setQueenPhase("independent");
setTokenUsage({ input: 0, output: 0 });
setInitialDraft(null);
setColonySpawned(false);
setSpawnedColonyName(null);
setCompactingAndForking(false);
turnCounterRef.current = 0;
toolUseToPillRef.current = {};
queenIterTextRef.current = {};
replayStateRef.current = newReplayState();
}, []);
const upsertMessage = useCallback(
(chatMsg: ChatMessage, options?: { reconcileOptimisticUser?: boolean }) => {
setMessages((prev) => {
const idx = prev.findIndex((m) => m.id === chatMsg.id);
if (idx >= 0) {
return prev.map((m, i) =>
i === idx ? { ...chatMsg, createdAt: m.createdAt ?? chatMsg.createdAt } : m,
);
}
if (options?.reconcileOptimisticUser && chatMsg.type === "user") {
const incomingTs = chatMsg.createdAt ?? Date.now();
const matchIdx = prev.findIndex(
(m) =>
m.type === "user" &&
m.content === chatMsg.content &&
Math.abs(incomingTs - (m.createdAt ?? incomingTs)) <= 15000,
);
if (matchIdx !== -1) {
return prev.map((m, i) =>
i === matchIdx ? { ...m, id: chatMsg.id, queued: undefined } : m,
);
}
}
const ts = chatMsg.createdAt ?? Date.now();
let insertIdx = prev.length - 1;
while (insertIdx >= 0 && (prev[insertIdx].createdAt ?? 0) > ts) {
insertIdx--;
}
if (insertIdx === -1 || insertIdx === prev.length - 1) {
return [...prev, chatMsg];
}
const next = [...prev];
next.splice(insertIdx + 1, 0, chatMsg);
return next;
});
},
[],
);
const restoreMessages = useCallback(
async (sid: string, cancelled: () => boolean) => {
try {
@@ -114,7 +142,15 @@ export default function QueenDM() {
// Use the stateful replay so tool_status pills are synthesized
// the same way the live SSE handler does — without this the
// refreshed queen DM shows zero tool activity.
const restored = replayEventsToMessages(events, "queen-dm", queenName);
const replayState = newReplayState();
const restored = replayEventsToMessages(
events,
"queen-dm",
queenName,
undefined,
replayState,
);
replayStateRef.current = replayState;
// Show a banner if the server truncated older events.
const droppedCount = Math.max(0, total - returned);
@@ -492,14 +528,17 @@ export default function QueenDM() {
(_agentType: string, event: AgentEvent) => {
const isQueen = event.stream_id === "queen";
if (!isQueen) return;
const emittedMessages = replayEvent(
replayStateRef.current,
event,
"queen-dm",
queenName,
);
switch (event.type) {
case "execution_started":
turnCounterRef.current++;
setIsTyping(true);
setQueenReady(true);
setActiveToolCalls({});
toolUseToPillRef.current = {};
// Clear queued flag on all user messages now that the queen is processing
setMessages((prev) => {
if (!prev.some((m) => m.queued)) return prev;
@@ -513,8 +552,6 @@ export default function QueenDM() {
break;
case "llm_turn_complete":
turnCounterRef.current++;
setActiveToolCalls({});
if (event.data) {
const inp = (event.data.input_tokens as number) || 0;
const out = (event.data.output_tokens as number) || 0;
@@ -524,48 +561,7 @@ export default function QueenDM() {
case "client_output_delta":
case "llm_text_delta": {
const chatMsg = sseEventToChatMessage(
event,
"queen-dm",
queenName,
turnCounterRef.current,
);
if (chatMsg) {
if (event.execution_id) {
const iter = event.data?.iteration ?? 0;
const inner = (event.data?.inner_turn as number) ?? 0;
const iterKey = `${event.execution_id}:${iter}`;
if (!queenIterTextRef.current[iterKey]) {
queenIterTextRef.current[iterKey] = {};
}
const snapshot =
(event.data?.snapshot as string) ||
(event.data?.content as string) ||
"";
queenIterTextRef.current[iterKey][inner] = snapshot;
const parts = queenIterTextRef.current[iterKey];
const sorted = Object.keys(parts)
.map(Number)
.sort((a, b) => a - b);
chatMsg.content = sorted.map((k) => parts[k]).join("\n");
chatMsg.id = `queen-stream-${event.execution_id}-${iter}`;
}
chatMsg.role = "queen";
setMessages((prev) => {
const idx = prev.findIndex((m) => m.id === chatMsg.id);
if (idx >= 0) {
// Preserve the original createdAt so the displayed timestamp
// doesn't tick forward as new deltas stream in.
return prev.map((m, i) =>
i === idx
? { ...chatMsg, createdAt: m.createdAt ?? chatMsg.createdAt }
: m,
);
}
return [...prev, chatMsg];
});
}
for (const msg of emittedMessages) upsertMessage(msg);
setIsStreaming(true);
break;
}
@@ -594,34 +590,8 @@ export default function QueenDM() {
}
case "client_input_received": {
const chatMsg = sseEventToChatMessage(
event,
"queen-dm",
queenName,
turnCounterRef.current,
);
if (chatMsg) {
setMessages((prev) => {
// Reconcile optimistic user message. A matching echo from
// the backend means the queen has now received this
// message, so drop the "queued" indicator (it was set when
// the user sent while the queen was still busy).
if (chatMsg.type === "user" && prev.length > 0) {
const idx = prev.findIndex(
(m) =>
m.type === "user" &&
m.content === chatMsg.content &&
Math.abs((chatMsg.createdAt ?? 0) - (m.createdAt ?? 0)) <=
15000,
);
if (idx !== -1) {
return prev.map((m, i) =>
i === idx ? { ...m, id: chatMsg.id, queued: undefined } : m,
);
}
}
return [...prev, chatMsg];
});
for (const msg of emittedMessages) {
upsertMessage(msg, { reconcileOptimisticUser: true });
}
break;
}
@@ -676,133 +646,12 @@ export default function QueenDM() {
}
case "tool_call_started": {
const toolName = (event.data?.tool_name as string) || "unknown";
const toolUseId = (event.data?.tool_use_id as string) || "";
const sid = event.stream_id;
const execId = event.execution_id || "exec";
const eventCreatedAt = event.timestamp
? new Date(event.timestamp).getTime()
: Date.now();
// Track which pill message this tool belongs to so deferred
// completions (ask_user) can find it after the turn counter changes.
const msgId = `tool-pill-${sid}-${execId}-${turnCounterRef.current}`;
toolUseToPillRef.current[toolUseId] = { msgId, name: toolName };
setActiveToolCalls((prev) => {
const newActive = {
...prev,
[toolUseId]: { name: toolName, done: false },
};
const tools = Object.entries(newActive).map(([, t]) => ({
name: t.name,
done: t.done,
}));
const allDone = tools.length > 0 && tools.every((t) => t.done);
const toolMsg: ChatMessage = {
id: msgId,
agent: queenName,
agentColor: "",
content: JSON.stringify({ tools, allDone }),
timestamp: "",
type: "tool_status",
role: "queen",
thread: "queen-dm",
createdAt: eventCreatedAt,
nodeId: event.node_id || undefined,
executionId: event.execution_id || undefined,
streamId: sid || undefined,
};
setMessages((prevMsgs) => {
const idx = prevMsgs.findIndex((m) => m.id === msgId);
if (idx >= 0) {
return prevMsgs.map((m, i) =>
i === idx ? { ...toolMsg, createdAt: m.createdAt ?? toolMsg.createdAt } : m,
);
}
// Insert in sorted position by createdAt
const ts = toolMsg.createdAt ?? Date.now();
let insertIdx = prevMsgs.length - 1;
while (insertIdx >= 0 && (prevMsgs[insertIdx].createdAt ?? 0) > ts) {
insertIdx--;
}
if (insertIdx === -1 || insertIdx === prevMsgs.length - 1) {
return [...prevMsgs, toolMsg];
}
const next = [...prevMsgs];
next.splice(insertIdx + 1, 0, toolMsg);
return next;
});
return newActive;
});
for (const msg of emittedMessages) upsertMessage(msg);
break;
}
case "tool_call_completed": {
const toolUseId = (event.data?.tool_use_id as string) || "";
// Look up the original pill message this tool belongs to.
// For deferred completions (ask_user), the turn counter and
// activeToolCalls have already been reset by llm_turn_complete,
// so we rely on the ref recorded during tool_call_started.
const tracked = toolUseToPillRef.current[toolUseId];
delete toolUseToPillRef.current[toolUseId];
// Mark done in activeToolCalls if still present (normal case)
setActiveToolCalls((prev) => {
if (!prev[toolUseId]) return prev;
return {
...prev,
[toolUseId]: { ...prev[toolUseId], done: true },
};
});
// Determine the correct pill message ID
const sid = event.stream_id;
const execId = event.execution_id || "exec";
const pillMsgId =
tracked?.msgId ??
`tool-pill-${sid}-${execId}-${turnCounterRef.current}`;
const toolName = tracked?.name;
// Update the pill message content directly
setMessages((prevMsgs) => {
const idx = prevMsgs.findIndex((m) => m.id === pillMsgId);
if (idx < 0) return prevMsgs;
try {
const parsed = JSON.parse(prevMsgs[idx].content);
const tools: { name: string; done: boolean }[] =
parsed.tools || [];
if (toolName) {
let marked = false;
for (let i = 0; i < tools.length; i++) {
if (
tools[i].name === toolName &&
!tools[i].done &&
!marked
) {
tools[i] = { ...tools[i], done: true };
marked = true;
}
}
}
const allDone =
tools.length > 0 && tools.every((t) => t.done);
return prevMsgs.map((m, i) =>
i === idx
? {
...m,
content: JSON.stringify({ tools, allDone }),
}
: m,
);
} catch {
return prevMsgs;
}
});
for (const msg of emittedMessages) upsertMessage(msg);
break;
}
@@ -810,7 +659,7 @@ export default function QueenDM() {
break;
}
},
[queenName, refresh],
[queenName, refresh, upsertMessage],
);
const sseSessions = useMemo((): Record<string, string> => {
@@ -885,8 +734,7 @@ export default function QueenDM() {
await executionApi.cancelQueen(sessionId);
setIsTyping(false);
setIsStreaming(false);
setActiveToolCalls({});
toolUseToPillRef.current = {};
replayStateRef.current = newReplayState();
// Clear queued flags since the queen is now idle
setMessages((prev) => {
if (!prev.some((m) => m.queued)) return prev;