Merge branch 'feature/new-colony' into feature/colony-creation

This commit is contained in:
Timothy
2026-04-14 10:19:24 -07:00
4 changed files with 132 additions and 18 deletions
+49 -8
View File
@@ -3052,6 +3052,33 @@ class AgentLoop(AgentProtocol):
str, tuple[ToolResult | BaseException, str, float] | BaseException
] = {}
async def _cancel_turn_with_stubs(
_pending: list[ToolCallEvent] = pending_real, # noqa: B006,B008
) -> None:
"""Populate [Tool call cancelled by user] stubs for
every pending tool so the conversation doesn't end
up with dangling tool_use blocks, then raise
TurnCancelled so the queen event loop continues
cleanly. Shared between the parallel and serial
phases because either can observe CancelledError.
"""
for _tc in _pending:
await conversation.add_tool_result(
tool_use_id=_tc.tool_use_id,
content="[Tool call cancelled by user]",
is_error=True,
)
await self._publish_tool_completed(
stream_id,
node_id,
_tc.tool_use_id,
_tc.tool_name,
"[Tool call cancelled by user]",
is_error=True,
execution_id=execution_id,
)
raise TurnCancelled() from None
# Phase 2b: resolve the concurrency-safe batch. Prefer
# any early task already started during streaming (Gap
# 1) so we don't accidentally execute the same tool
@@ -3073,11 +3100,22 @@ class AgentLoop(AgentProtocol):
finally:
self._tool_task = None
# gather(return_exceptions=True) captures CancelledError
# as a return value instead of propagating it. Re-raise
# so stop_worker actually stops the execution.
# as a return value instead of propagating it.
# Distinguish cancel_current_turn() (cancels only
# _tool_task) from stop_worker (cancels the parent
# execution task). When the parent itself is
# cancelled, cancelling() > 0 — propagate so the
# executor can save state. Otherwise convert to
# TurnCancelled so the queen event loop continues,
# writing cancellation stubs for every pending tool
# first so the conversation has no dangling
# tool_use blocks.
for entry in parallel_timed:
if isinstance(entry, asyncio.CancelledError):
raise entry
task = asyncio.current_task()
if task and task.cancelling() > 0:
raise entry
await _cancel_turn_with_stubs()
for tc, entry in zip(parallel_batch, parallel_timed, strict=True):
timed_results_by_id[tc.tool_use_id] = entry
@@ -3087,6 +3125,8 @@ class AgentLoop(AgentProtocol):
# drop. A ToolResult with is_error=True is a normal return
# (e.g. "file not found") and does NOT trip the cascade -
# the model should see subsequent errors too.
# CancelledError is handled separately via the shared
# user-cancel helper above.
_serial_cascade_broken = False
for tc in serial_batch:
if _serial_cascade_broken:
@@ -3113,12 +3153,13 @@ class AgentLoop(AgentProtocol):
timed_results_by_id[tc.tool_use_id] = entry
raw_check = entry[0] if isinstance(entry, tuple) else entry
if isinstance(raw_check, BaseException) and not isinstance(
raw_check, asyncio.CancelledError
):
if isinstance(raw_check, asyncio.CancelledError):
task = asyncio.current_task()
if task and task.cancelling() > 0:
raise raw_check
await _cancel_turn_with_stubs()
elif isinstance(raw_check, BaseException):
_serial_cascade_broken = True
elif isinstance(raw_check, asyncio.CancelledError):
raise raw_check
# Phase 2d: reassemble results in original call order so
# the rest of the loop sees no difference from the
+10 -1
View File
@@ -56,6 +56,8 @@ export interface ChatMessage {
nodeId?: string;
/** Backend execution_id for this message */
executionId?: string;
/** True when the message was sent while the queen was still processing */
queued?: boolean;
}
interface ChatPanelProps {
@@ -289,7 +291,9 @@ const MessageBubble = memo(
if (isUser) {
return (
<div className="flex justify-end">
<div className="max-w-[75%] bg-primary text-primary-foreground text-sm leading-relaxed rounded-2xl rounded-br-md px-4 py-3">
<div
className={`max-w-[75%] bg-primary text-primary-foreground text-sm leading-relaxed rounded-2xl rounded-br-md px-4 py-3${msg.queued ? " animate-pulse opacity-80" : ""}`}
>
{msg.images && msg.images.length > 0 && (
<div className="flex flex-wrap gap-2 mb-2">
{msg.images.map((img, i) => (
@@ -305,6 +309,11 @@ const MessageBubble = memo(
{msg.content && (
<p className="whitespace-pre-wrap break-words">{msg.content}</p>
)}
{msg.queued && (
<span className="block text-[10px] opacity-60 mt-1 text-right">
queued
</span>
)}
</div>
</div>
);
+13 -1
View File
@@ -271,7 +271,19 @@ export default function ColonyChat() {
return prev.map((m, i) => (i === lastIdx ? { ...m, id: chatMsg.id } : m));
}
}
return [...prev, chatMsg];
// Insert in sorted position by createdAt so tool pills and queen
// messages interleave correctly when multiple arrive out of order.
const ts = chatMsg.createdAt ?? Date.now();
let insertIdx = prev.length - 1;
while (insertIdx >= 0 && (prev[insertIdx].createdAt ?? 0) > ts) {
insertIdx--;
}
if (insertIdx === -1 || insertIdx === prev.length - 1) {
return [...prev, chatMsg];
}
const next = [...prev];
next.splice(insertIdx + 1, 0, chatMsg);
return next;
});
},
[],
+60 -8
View File
@@ -95,7 +95,14 @@ export default function QueenDM() {
if (restored.length > 0 && !cancelled()) {
restored.sort((a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0));
setMessages(restored);
setIsTyping(false);
// Only clear typing if the history contains a completed execution;
// during bootstrap the queen is still processing.
const hasCompleted = events.some(
(e: AgentEvent) => e.type === "execution_completed",
);
if (hasCompleted) {
setIsTyping(false);
}
}
} catch {
// No history
@@ -383,6 +390,11 @@ export default function QueenDM() {
setIsTyping(true);
setQueenReady(true);
setActiveToolCalls({});
// Clear queued flag on all user messages now that the queen is processing
setMessages((prev) => {
if (!prev.some((m) => m.queued)) return prev;
return prev.map((m) => (m.queued ? { ...m, queued: undefined } : m));
});
break;
case "execution_completed":
@@ -544,6 +556,9 @@ export default function QueenDM() {
const toolUseId = (event.data?.tool_use_id as string) || "";
const sid = event.stream_id;
const execId = event.execution_id || "exec";
const eventCreatedAt = event.timestamp
? new Date(event.timestamp).getTime()
: Date.now();
setActiveToolCalls((prev) => {
const newActive = {
@@ -565,16 +580,29 @@ export default function QueenDM() {
type: "tool_status",
role: "queen",
thread: "queen-dm",
createdAt: Date.now(),
createdAt: eventCreatedAt,
nodeId: event.node_id || undefined,
executionId: event.execution_id || undefined,
};
setMessages((prevMsgs) => {
const idx = prevMsgs.findIndex((m) => m.id === msgId);
if (idx >= 0) {
return prevMsgs.map((m, i) => (i === idx ? toolMsg : m));
return prevMsgs.map((m, i) =>
i === idx ? { ...toolMsg, createdAt: m.createdAt ?? toolMsg.createdAt } : m,
);
}
return [...prevMsgs, toolMsg];
// Insert in sorted position by createdAt
const ts = toolMsg.createdAt ?? Date.now();
let insertIdx = prevMsgs.length - 1;
while (insertIdx >= 0 && (prevMsgs[insertIdx].createdAt ?? 0) > ts) {
insertIdx--;
}
if (insertIdx === -1 || insertIdx === prevMsgs.length - 1) {
return [...prevMsgs, toolMsg];
}
const next = [...prevMsgs];
next.splice(insertIdx + 1, 0, toolMsg);
return next;
});
return newActive;
});
@@ -585,6 +613,9 @@ export default function QueenDM() {
const toolUseId = (event.data?.tool_use_id as string) || "";
const sid = event.stream_id;
const execId = event.execution_id || "exec";
const eventCreatedAt = event.timestamp
? new Date(event.timestamp).getTime()
: Date.now();
setActiveToolCalls((prev) => {
const updated = { ...prev };
@@ -606,16 +637,29 @@ export default function QueenDM() {
type: "tool_status",
role: "queen",
thread: "queen-dm",
createdAt: Date.now(),
createdAt: eventCreatedAt,
nodeId: event.node_id || undefined,
executionId: event.execution_id || undefined,
};
setMessages((prevMsgs) => {
const idx = prevMsgs.findIndex((m) => m.id === msgId);
if (idx >= 0) {
return prevMsgs.map((m, i) => (i === idx ? toolMsg : m));
return prevMsgs.map((m, i) =>
i === idx ? { ...toolMsg, createdAt: m.createdAt ?? toolMsg.createdAt } : m,
);
}
return [...prevMsgs, toolMsg];
// Insert in sorted position by createdAt
const ts = toolMsg.createdAt ?? Date.now();
let insertIdx = prevMsgs.length - 1;
while (insertIdx >= 0 && (prevMsgs[insertIdx].createdAt ?? 0) > ts) {
insertIdx--;
}
if (insertIdx === -1 || insertIdx === prevMsgs.length - 1) {
return [...prevMsgs, toolMsg];
}
const next = [...prevMsgs];
next.splice(insertIdx + 1, 0, toolMsg);
return next;
});
return updated;
});
@@ -645,6 +689,7 @@ export default function QueenDM() {
setPendingOptions(null);
}
const isQueenBusy = isTyping;
const userMsg: ChatMessage = {
id: makeId(),
agent: "You",
@@ -655,6 +700,7 @@ export default function QueenDM() {
thread: "queen-dm",
createdAt: Date.now(),
images,
queued: isQueenBusy || undefined,
};
setMessages((prev) => [...prev, userMsg]);
setIsTyping(true);
@@ -666,7 +712,7 @@ export default function QueenDM() {
});
}
},
[sessionId, awaitingInput],
[sessionId, awaitingInput, isTyping],
);
const handleQuestionAnswer = useCallback(
@@ -699,6 +745,12 @@ export default function QueenDM() {
await executionApi.cancelQueen(sessionId);
setIsTyping(false);
setIsStreaming(false);
setActiveToolCalls({});
// Clear queued flags since the queen is now idle
setMessages((prev) => {
if (!prev.some((m) => m.queued)) return prev;
return prev.map((m) => (m.queued ? { ...m, queued: undefined } : m));
});
} catch {
// ignore
}