feat: queue messages during queen turns in colony/queen chats

This commit is contained in:
bryan
2026-04-20 12:45:38 -07:00
parent dcbfd4ab01
commit 16ea9b52d3
2 changed files with 186 additions and 64 deletions
+100 -33
View File
@@ -10,6 +10,7 @@ import CredentialsModal, {
import { executionApi } from "@/api/execution";
import { sessionsApi } from "@/api/sessions";
import { useMultiSSE } from "@/hooks/use-sse";
import { usePendingQueue } from "@/hooks/use-pending-queue";
import type { LiveSession, AgentEvent } from "@/api/types";
import {
sseEventToChatMessage,
@@ -325,21 +326,17 @@ export default function ColonyChat() {
);
}
if (options?.reconcileOptimisticUser && chatMsg.type === "user" && prev.length > 0) {
// Match by content + timestamp across the whole list (not just
// the last slot) so a queued user message still reconciles
// even when the queen's previous reply slotted in between.
// Also drops the "queued" indicator since the backend has
// now confirmed receipt.
const incomingTs = chatMsg.createdAt ?? Date.now();
const matchIdx = prev.findIndex(
(m) =>
m.type === "user" &&
m.content === chatMsg.content &&
Math.abs(incomingTs - (m.createdAt ?? incomingTs)) <= 15000,
// Optimistic user bubbles have no executionId; server echoes do.
// Match the oldest unreconciled optimistic with the same content —
// that's the FIFO-correct pick for both auto-flush and Steer.
const idx = prev.findIndex(
(m) => m.type === "user" && !m.executionId && m.content === chatMsg.content,
);
if (matchIdx !== -1) {
if (idx !== -1) {
return prev.map((m, i) =>
i === matchIdx ? { ...m, id: chatMsg.id, queued: undefined } : m,
i === idx
? { ...m, id: chatMsg.id, executionId: chatMsg.executionId }
: m,
);
}
}
@@ -658,6 +655,17 @@ export default function ColonyChat() {
}
break;
case "llm_turn_complete":
// Flush one queued message per queen LLM-turn boundary. Workers'
// LLM turns don't drain the queen queue. execution_completed
// fires only at session shutdown (the queen's loop parks in
// _await_user_input between turns), so this is the real "turn
// ended" signal. Mid-tool-call boundaries count too.
if (isQueen) {
flushNextPendingRef.current();
}
break;
case "execution_paused":
case "execution_failed":
case "client_output_delta":
@@ -1173,19 +1181,71 @@ export default function ColonyChat() {
// ── Action handlers ────────────────────────────────────────────────────
// Core backend send — bypasses queue logic. Used both for the normal path
// (agent idle) and for Steer / auto-flush paths.
const sendToBackend = useCallback(
(text: string, images?: ImageContent[]) => {
if (!agentState.sessionId || !agentState.ready) return;
executionApi.chat(agentState.sessionId, text, images).catch((err: unknown) => {
const errMsg = err instanceof Error ? err.message : String(err);
upsertMessage({
id: makeId(),
agent: "System",
agentColor: "",
content: `Failed to send message: ${errMsg}`,
timestamp: "",
type: "system",
thread: agentPath,
createdAt: Date.now(),
});
updateState({ isTyping: false, isStreaming: false, queenIsTyping: false });
});
},
[agentPath, agentState.sessionId, agentState.ready, updateState, upsertMessage],
);
const {
enqueue: enqueuePending,
steer: handleSteer,
cancelQueued: handleCancelQueued,
flushNext: flushNextPending,
flushNextRef: flushNextPendingRef,
clear: clearPendingQueue,
} = usePendingQueue({
sendToBackend,
setMessages,
onFlushStart: useCallback(
() => updateState({ isTyping: true, queenIsTyping: true }),
[updateState],
),
});
// Reset the queue whenever we navigate to a different colony (or to
// new-chat). The hook outlives the route change, so without this, a
// message queued in colony A would auto-flush into colony B's next
// execution_completed.
useEffect(() => {
clearPendingQueue();
}, [agentPath, isNewChat, clearPendingQueue]);
const handleCancelQueen = useCallback(async () => {
if (!agentState.sessionId) return;
try {
await executionApi.cancelQueen(agentState.sessionId);
updateState({ isTyping: false, isStreaming: false, queenIsTyping: false });
// After cancelling the current turn, immediately send the oldest
// queued message (if any). The remaining queued messages stay put
// so the user can review them or Steer/Cancel individually.
flushNextPending();
} catch {
// fire-and-forget
}
}, [agentState.sessionId, updateState]);
}, [agentState.sessionId, updateState, flushNextPending]);
const handleSend = useCallback(
(text: string, _thread: string, images?: ImageContent[]) => {
if (agentState.pendingQuestionSource === "queen") {
const answeringQuestion = agentState.pendingQuestionSource === "queen";
if (answeringQuestion) {
updateState({
pendingQuestion: null,
pendingOptions: null,
@@ -1194,8 +1254,15 @@ export default function ColonyChat() {
});
}
// Queue when the queen is mid-turn — unless the user is answering an
// ask_user prompt, in which case we send immediately so the loop can
// resume. Queued messages are held locally (not sent to the backend)
// until the user clicks Steer or the queen goes idle.
const shouldQueue = !answeringQuestion && (agentState.queenIsTyping ?? false);
const msgId = makeId();
const userMsg: ChatMessage = {
id: makeId(),
id: msgId,
agent: "You",
agentColor: "",
content: text,
@@ -1204,29 +1271,27 @@ export default function ColonyChat() {
thread: agentPath,
createdAt: Date.now(),
images,
queued: shouldQueue,
};
setMessages((prev) => [...prev, userMsg]);
suppressIntroRef.current = false;
updateState({ isTyping: true, queenIsTyping: true });
if (agentState.sessionId && agentState.ready) {
executionApi.chat(agentState.sessionId, text, images).catch((err: unknown) => {
const errMsg = err instanceof Error ? err.message : String(err);
upsertMessage({
id: makeId(),
agent: "System",
agentColor: "",
content: `Failed to send message: ${errMsg}`,
timestamp: "",
type: "system",
thread: agentPath,
createdAt: Date.now(),
});
updateState({ isTyping: false, isStreaming: false, queenIsTyping: false });
});
if (shouldQueue) {
enqueuePending(msgId, { text, images });
return;
}
updateState({ isTyping: true, queenIsTyping: true });
sendToBackend(text, images);
},
[agentPath, agentState.sessionId, agentState.ready, agentState.pendingQuestionSource, updateState, upsertMessage],
[
agentPath,
agentState.queenIsTyping,
agentState.pendingQuestionSource,
updateState,
sendToBackend,
enqueuePending,
],
);
const handleQueenQuestionAnswer = useCallback(
@@ -1384,6 +1449,8 @@ export default function ColonyChat() {
messages={messages}
onSend={handleSend}
onCancel={handleCancelQueen}
onSteer={handleSteer}
onCancelQueued={handleCancelQueued}
activeThread={agentPath}
isWaiting={(agentState.queenIsTyping && !agentState.isStreaming) ?? false}
isWorkerWaiting={(agentState.workerIsTyping && !agentState.isStreaming) ?? false}
+86 -31
View File
@@ -10,6 +10,7 @@ import { executionApi } from "@/api/execution";
import { sessionsApi } from "@/api/sessions";
import { queensApi } from "@/api/queens";
import { useMultiSSE } from "@/hooks/use-sse";
import { usePendingQueue } from "@/hooks/use-pending-queue";
import type { AgentEvent, HistorySession } from "@/api/types";
import {
sseEventToChatMessage,
@@ -424,11 +425,11 @@ export default function QueenDM() {
setQueenReady(true);
setActiveToolCalls({});
toolUseToPillRef.current = {};
// Clear queued flag on all user messages now that the queen is processing
setMessages((prev) => {
if (!prev.some((m) => m.queued)) return prev;
return prev.map((m) => (m.queued ? { ...m, queued: undefined } : m));
});
// Do NOT clear `queued` on user messages here. The pending queue
// hook owns that flag — it's cleared on steer / cancel / flush.
// If the user has queued messages that haven't been flushed yet,
// the queen starting a new turn (e.g. from a steer or from the
// flush itself) shouldn't hide the still-queued ones.
break;
case "execution_completed":
@@ -439,6 +440,13 @@ export default function QueenDM() {
case "llm_turn_complete":
turnCounterRef.current++;
setActiveToolCalls({});
// Flush one queued message per LLM turn boundary. This is the
// real "turn ended" signal in a queen DM — execution_completed
// only fires at session shutdown because the event loop parks in
// _await_user_input between turns. Mid-tool-call boundaries
// count too: sending now lets the queen pick up the message on
// her next drain, same as clicking Steer.
flushNextPendingRef.current();
break;
case "client_output_delta":
@@ -521,21 +529,27 @@ export default function QueenDM() {
);
if (chatMsg) {
setMessages((prev) => {
// Reconcile optimistic user message. A matching echo from
// the backend means the queen has now received this
// message, so drop the "queued" indicator (it was set when
// the user sent while the queen was still busy).
// Reconcile an optimistic user bubble with the server echo.
// Optimistics have no executionId; server echoes do. Match the
// oldest unreconciled optimistic with the same content — that's
// the FIFO-correct pick for both auto-flush and Steer.
if (chatMsg.type === "user" && prev.length > 0) {
const idx = prev.findIndex(
(m) =>
m.type === "user" &&
m.content === chatMsg.content &&
Math.abs((chatMsg.createdAt ?? 0) - (m.createdAt ?? 0)) <=
15000,
!m.executionId &&
m.content === chatMsg.content,
);
if (idx !== -1) {
return prev.map((m, i) =>
i === idx ? { ...m, id: chatMsg.id, queued: undefined } : m,
i === idx
? {
...m,
id: chatMsg.id,
executionId: chatMsg.executionId,
queued: undefined,
}
: m,
);
}
}
@@ -738,18 +752,58 @@ export default function QueenDM() {
useMultiSSE({ sessions: sseSessions, onEvent: handleSSEEvent });
// Send handler
// Core backend send — used both for immediate sends and for Steer /
// auto-flush paths out of the pending queue.
const sendToBackend = useCallback(
(text: string, images?: ImageContent[]) => {
if (!sessionId) return;
executionApi.chat(sessionId, text, images).catch(() => {
setIsTyping(false);
setIsStreaming(false);
});
},
[sessionId],
);
const {
enqueue: enqueuePending,
steer: handleSteer,
cancelQueued: handleCancelQueued,
flushNext: flushNextPending,
flushNextRef: flushNextPendingRef,
clear: clearPendingQueue,
} = usePendingQueue({
sendToBackend,
setMessages,
onFlushStart: useCallback(() => setIsTyping(true), []),
});
// Reset the queue whenever we navigate to a different queen. The hook
// outlives the route change (same component instance), so without this,
// a message queued for Queen A would auto-flush into Queen B's session
// on B's next execution_completed.
useEffect(() => {
clearPendingQueue();
}, [queenId, clearPendingQueue]);
// Send handler. Queues when the queen is mid-turn (unless the user is
// answering an ask_user prompt, which must send immediately to unblock
// the loop). Queued messages are held locally until Steer, Cancel, or
// the next `execution_completed` auto-flush.
const handleSend = useCallback(
(text: string, _thread: string, images?: ImageContent[]) => {
if (awaitingInput) {
const answeringQuestion = awaitingInput;
if (answeringQuestion) {
setAwaitingInput(false);
setPendingQuestion(null);
setPendingOptions(null);
}
const isQueenBusy = isTyping;
const shouldQueue = !answeringQuestion && isTyping;
const msgId = makeId();
const userMsg: ChatMessage = {
id: makeId(),
id: msgId,
agent: "You",
agentColor: "",
content: text,
@@ -758,19 +812,19 @@ export default function QueenDM() {
thread: "queen-dm",
createdAt: Date.now(),
images,
queued: isQueenBusy || undefined,
queued: shouldQueue,
};
setMessages((prev) => [...prev, userMsg]);
setIsTyping(true);
if (sessionId) {
executionApi.chat(sessionId, text, images).catch(() => {
setIsTyping(false);
setIsStreaming(false);
});
if (shouldQueue) {
enqueuePending(msgId, { text, images });
return;
}
setIsTyping(true);
sendToBackend(text, images);
},
[sessionId, awaitingInput, isTyping],
[awaitingInput, isTyping, sendToBackend, enqueuePending],
);
const handleQuestionAnswer = useCallback(
@@ -805,15 +859,14 @@ export default function QueenDM() {
setIsStreaming(false);
setActiveToolCalls({});
toolUseToPillRef.current = {};
// Clear queued flags since the queen is now idle
setMessages((prev) => {
if (!prev.some((m) => m.queued)) return prev;
return prev.map((m) => (m.queued ? { ...m, queued: undefined } : m));
});
// After cancelling the current turn, immediately send the oldest
// queued message (if any). The remaining queued messages stay put
// so the user can review them or Steer/Cancel individually.
flushNextPending();
} catch {
// ignore
}
}, [sessionId]);
}, [sessionId, flushNextPending]);
return (
<div className="flex flex-col h-full">
@@ -836,6 +889,8 @@ export default function QueenDM() {
messages={messages}
onSend={handleSend}
onCancel={handleCancelQueen}
onSteer={handleSteer}
onCancelQueued={handleCancelQueued}
activeThread="queen-dm"
isWaiting={isTyping && !isStreaming}
isBusy={isTyping}