Merge remote-tracking branch 'origin/feat/queue-message' into feat/colony-merge-candidate

# Conflicts:
#	core/frontend/src/components/ChatPanel.tsx
#	core/frontend/src/pages/colony-chat.tsx
#	core/frontend/src/pages/queen-dm.tsx
This commit is contained in:
Richard Tang
2026-04-20 19:11:58 -07:00
16 changed files with 1086 additions and 97 deletions
+4
View File
@@ -1959,6 +1959,10 @@ class LiteLLMProvider(LLMProvider):
if self._codex_backend:
kwargs.pop("max_tokens", None)
kwargs.pop("stream_options", None)
# Pass store directly to OpenAI in case litellm drops it as unknown
if "extra_body" not in kwargs:
kwargs["extra_body"] = {}
kwargs["extra_body"]["store"] = False
request_summary = _summarize_request_for_log(kwargs)
logger.debug(
+81 -16
View File
@@ -95,6 +95,12 @@ interface ChatPanelProps {
supportsImages?: boolean;
/** Called when user clicks the stop button to cancel the queen's current turn */
onCancel?: () => void;
/** Called when the user steers a queued message into the current turn —
* the message is sent to the backend immediately so it influences the
* agent after the next tool call completes. */
onSteer?: (messageId: string) => void;
/** Called when the user cancels a still-queued (not-yet-sent) message. */
onCancelQueued?: (messageId: string) => void;
/** Pending questions from ask_user. A single-entry list renders
* QuestionWidget; 2+ entries render MultiQuestionWidget; a single
* entry with no options falls through to the normal text input so
@@ -587,6 +593,8 @@ const MessageBubble = memo(
queenProfileId,
queenAvatarUrl,
onColonyLinkClick,
onSteer,
onCancelQueued,
}: {
msg: ChatMessage;
queenPhase?: QueenPhase;
@@ -594,6 +602,8 @@ const MessageBubble = memo(
queenProfileId?: string | null;
queenAvatarUrl?: string | null;
onColonyLinkClick?: (colonyName: string) => void;
onSteer?: (messageId: string) => void;
onCancelQueued?: (messageId: string) => void;
}) {
const isUser = msg.type === "user";
const isQueen = msg.role === "queen";
@@ -752,9 +762,9 @@ const MessageBubble = memo(
if (isUser) {
return (
<div className="flex justify-end">
<div className="flex flex-col items-end gap-1">
<div
className={`max-w-[75%] bg-primary text-primary-foreground text-sm leading-relaxed rounded-2xl rounded-br-md px-4 py-3${msg.queued ? " animate-pulse opacity-80" : ""}`}
className={`max-w-[75%] bg-primary text-primary-foreground text-sm leading-relaxed rounded-2xl rounded-br-md px-4 py-3${msg.queued ? " ring-1 ring-amber-500/50" : ""}`}
>
{msg.images && msg.images.length > 0 && (
<div className="flex flex-wrap gap-2 mb-2">
@@ -773,11 +783,42 @@ const MessageBubble = memo(
)}
{(msg.queued || msg.createdAt) && (
<div className="flex justify-end items-center gap-1.5 mt-1 text-[10px] opacity-60">
{msg.queued && <span>queued</span>}
{msg.queued && (
<span className="inline-flex items-center gap-1">
<span className="w-1 h-1 rounded-full bg-amber-400 animate-pulse" />
queued
</span>
)}
{msg.createdAt && <span>{formatMessageTime(msg.createdAt)}</span>}
</div>
)}
</div>
{msg.queued && (onSteer || onCancelQueued) && (
<div className="flex items-center gap-1.5">
{onSteer && (
<button
type="button"
onClick={() => onSteer(msg.id)}
className="inline-flex items-center gap-1 text-[11px] font-medium px-2 py-0.5 rounded-full bg-amber-500/15 text-amber-600 hover:bg-amber-500/25 border border-amber-500/30 transition-colors"
title="Send now — influence the current turn after the next tool call"
>
<Zap className="w-3 h-3" />
Steer
</button>
)}
{onCancelQueued && (
<button
type="button"
onClick={() => onCancelQueued(msg.id)}
className="inline-flex items-center gap-1 text-[11px] font-medium px-2 py-0.5 rounded-full bg-muted/60 text-muted-foreground hover:bg-muted border border-border transition-colors"
title="Remove this queued message"
>
<X className="w-3 h-3" />
Cancel
</button>
)}
</div>
)}
</div>
);
}
@@ -857,8 +898,11 @@ const MessageBubble = memo(
prev.msg.id === next.msg.id &&
prev.msg.content === next.msg.content &&
prev.msg.phase === next.msg.phase &&
prev.msg.queued === next.msg.queued &&
prev.queenPhase === next.queenPhase &&
prev.showQueenPhaseBadge === next.showQueenPhaseBadge,
prev.showQueenPhaseBadge === next.showQueenPhaseBadge &&
prev.onSteer === next.onSteer &&
prev.onCancelQueued === next.onCancelQueued,
);
export default function ChatPanel({
@@ -870,6 +914,8 @@ export default function ChatPanel({
activeThread,
disabled,
onCancel,
onSteer,
onCancelQueued,
pendingQuestions,
onQuestionSubmit,
onQuestionDismiss,
@@ -1345,6 +1391,8 @@ export default function ChatPanel({
queenProfileId={queenProfileId}
queenAvatarUrl={queenAvatarUrl}
onColonyLinkClick={onColonyLinkClick}
onSteer={onSteer}
onCancelQueued={onCancelQueued}
/>
</div>
);
@@ -1586,30 +1634,47 @@ export default function ChatPanel({
}
}}
placeholder={
disabled ? "Connecting to agent..." : "Message Queen Bee..."
disabled
? "Connecting to agent..."
: isBusy
? "Queue a message — or click Steer to inject now..."
: "Message Queen Bee..."
}
disabled={disabled}
className="flex-1 bg-transparent text-sm text-foreground outline-none placeholder:text-muted-foreground disabled:opacity-50 disabled:cursor-not-allowed resize-none overflow-y-auto"
/>
{isBusy && onCancel ? (
{isBusy && onCancel && (
<button
type="button"
onClick={onCancel}
title="Stop the queen's current turn"
className="p-2 rounded-lg bg-amber-500/15 text-amber-400 border border-amber-500/40 hover:bg-amber-500/25 transition-colors"
>
<Square className="w-4 h-4" />
</button>
) : (
<button
type="submit"
disabled={
(!input.trim() && pendingImages.length === 0) || disabled
}
className="p-2 rounded-lg bg-primary text-primary-foreground disabled:opacity-30 hover:opacity-90 transition-opacity"
>
<Send className="w-4 h-4" />
</button>
)}
<button
type="submit"
disabled={
(!input.trim() && pendingImages.length === 0) || disabled
}
title={
isBusy
? "Queue message — sent after the current turn, or click Steer on the bubble to send now"
: "Send"
}
className={`p-2 rounded-lg disabled:opacity-30 hover:opacity-90 transition-opacity ${
isBusy
? "bg-amber-500/20 text-amber-600 border border-amber-500/40"
: "bg-primary text-primary-foreground"
}`}
>
{isBusy ? (
<Zap className="w-4 h-4" />
) : (
<Send className="w-4 h-4" />
)}
</button>
</div>
</form>
)}
@@ -0,0 +1,99 @@
import { useCallback, useRef } from "react";
import type { Dispatch, SetStateAction } from "react";
import type { ChatMessage, ImageContent } from "@/components/ChatPanel";
interface QueuedPayload {
text: string;
images?: ImageContent[];
}
interface UsePendingQueueArgs {
/** Sends a message to the backend. Must handle its own errors. */
sendToBackend: (text: string, images?: ImageContent[]) => void;
/** Setter for the chat message list — used to flip/strip the `queued` flag. */
setMessages: Dispatch<SetStateAction<ChatMessage[]>>;
/** Fires once per flush, before any message is sent. Typically sets
* isTyping/queenIsTyping so the UI reflects that the queen is busy again. */
onFlushStart?: () => void;
}
/**
* Client-side queue for user messages typed while the queen is mid-turn.
*
* - `enqueue` stores a message locally keyed by its optimistic UI id.
* - `steer` pulls one message out and sends it now — backend injects at the
* next iteration boundary.
* - `cancelQueued` drops a queued message entirely (no backend call).
* - `flushNext` pops and sends one; wire this to `llm_turn_complete` (the
* real per-turn boundary — execution_completed only fires at session
* shutdown because the queen's loop parks in _await_user_input between
* turns). Do NOT call on pause / cancel / fail.
*
* `flushRef` exposes the latest `flush` for capture-once SSE handlers.
*/
export function usePendingQueue({
sendToBackend,
setMessages,
onFlushStart,
}: UsePendingQueueArgs) {
const queueRef = useRef<Map<string, QueuedPayload>>(new Map());
const enqueue = useCallback(
(messageId: string, payload: QueuedPayload) => {
queueRef.current.set(messageId, payload);
},
[],
);
const steer = useCallback(
(messageId: string) => {
const pending = queueRef.current.get(messageId);
if (!pending) return;
queueRef.current.delete(messageId);
setMessages((prev) =>
prev.map((m) => (m.id === messageId ? { ...m, queued: false } : m)),
);
sendToBackend(pending.text, pending.images);
},
[sendToBackend, setMessages],
);
const cancelQueued = useCallback(
(messageId: string) => {
if (!queueRef.current.has(messageId)) return;
queueRef.current.delete(messageId);
setMessages((prev) => prev.filter((m) => m.id !== messageId));
},
[setMessages],
);
// Drop every queued payload without sending. Call on route-level resets
// (queen switch, colony switch) — the hook outlives those transitions,
// so without this, stale queue entries flush into the new session.
const clear = useCallback(() => {
queueRef.current.clear();
}, []);
// Pop and send the oldest queued message (Map iteration is insertion
// order in JS). One-at-a-time semantics: used for both the Stop-button
// path (cancel current turn, send next) and the natural-turn-end path
// (on `execution_completed`, pick up the next queued message).
const flushNext = useCallback(() => {
const first = queueRef.current.entries().next();
if (first.done) return;
const [firstId, payload] = first.value;
queueRef.current.delete(firstId);
setMessages((prev) =>
prev.map((m) => (m.id === firstId ? { ...m, queued: false } : m)),
);
onFlushStart?.();
sendToBackend(payload.text, payload.images);
}, [sendToBackend, setMessages, onFlushStart]);
// Ref to the latest flushNext so SSE handlers captured with narrow deps
// can still invoke the up-to-date closure.
const flushNextRef = useRef(flushNext);
flushNextRef.current = flushNext;
return { enqueue, steer, cancelQueued, flushNext, flushNextRef, clear };
}
+3
View File
@@ -156,6 +156,9 @@ export function sseEventToChatMessage(
type: "user",
thread,
createdAt,
// Carrying execution_id here lets the optimistic-message reconciler
// distinguish server-echoed user bubbles from still-unflushed ones.
executionId: event.execution_id || undefined,
streamId: event.stream_id || undefined,
};
}
+120 -41
View File
@@ -10,6 +10,7 @@ import CredentialsModal, {
import { executionApi } from "@/api/execution";
import { sessionsApi } from "@/api/sessions";
import { useMultiSSE } from "@/hooks/use-sse";
import { usePendingQueue } from "@/hooks/use-pending-queue";
import type { LiveSession, AgentEvent } from "@/api/types";
import {
formatAgentDisplayName,
@@ -321,6 +322,10 @@ export default function ColonyChat() {
// are duplicates from the ring-buffer replay and should be skipped.
const restoreCutoffRef = useRef<number>(0);
const queenPhaseRef = useRef<string>("independent");
// Flipped true by the auto-flush path; consumed by the next empty-prompt
// client_input_requested so we don't flicker the typing bubble off while
// the queen is about to resume on the flushed input.
const queenAboutToResumeRef = useRef(false);
const suppressIntroRef = useRef(false);
const loadingRef = useRef(false);
@@ -340,21 +345,17 @@ export default function ColonyChat() {
);
}
if (options?.reconcileOptimisticUser && chatMsg.type === "user" && prev.length > 0) {
// Match by content + timestamp across the whole list (not just
// the last slot) so a queued user message still reconciles
// even when the queen's previous reply slotted in between.
// Also drops the "queued" indicator since the backend has
// now confirmed receipt.
const incomingTs = chatMsg.createdAt ?? Date.now();
const matchIdx = prev.findIndex(
(m) =>
m.type === "user" &&
m.content === chatMsg.content &&
Math.abs(incomingTs - (m.createdAt ?? incomingTs)) <= 15000,
// Optimistic user bubbles have no executionId; server echoes do.
// Match the oldest unreconciled optimistic with the same content —
// that's the FIFO-correct pick for both auto-flush and Steer.
const idx = prev.findIndex(
(m) => m.type === "user" && !m.executionId && m.content === chatMsg.content,
);
if (matchIdx !== -1) {
if (idx !== -1) {
return prev.map((m, i) =>
i === matchIdx ? { ...m, id: chatMsg.id, queued: undefined } : m,
i === idx
? { ...m, id: chatMsg.id, executionId: chatMsg.executionId }
: m,
);
}
}
@@ -692,6 +693,17 @@ export default function ColonyChat() {
}
break;
case "llm_turn_complete":
// Flush one queued message per queen LLM-turn boundary. Workers'
// LLM turns don't drain the queen queue. execution_completed
// fires only at session shutdown (the queen's loop parks in
// _await_user_input between turns), so this is the real "turn
// ended" signal. Mid-tool-call boundaries count too.
if (isQueen) {
flushNextPendingRef.current();
}
break;
case "execution_paused":
case "execution_failed":
case "client_output_delta":
@@ -732,14 +744,22 @@ export default function ColonyChat() {
? (rawQuestions as { id: string; prompt: string; options?: string[] }[])
: null;
if (isQueen) {
updateState({
awaitingInput: true,
isTyping: false,
isStreaming: false,
queenIsTyping: false,
pendingQuestions: questions,
pendingQuestionSource: "queen",
});
// An empty-prompt client_input_requested means the queen parked
// in auto-wait. If we just auto-flushed a queued message, our
// inject will unblock her in a moment — skip flipping isTyping
// off so the thinking bubble doesn't flicker.
if (queenAboutToResumeRef.current && !questions) {
queenAboutToResumeRef.current = false;
} else {
updateState({
awaitingInput: true,
isTyping: false,
isStreaming: false,
queenIsTyping: false,
pendingQuestions: questions,
pendingQuestionSource: "queen",
});
}
}
}
@@ -1122,27 +1142,86 @@ export default function ColonyChat() {
// ── Action handlers ────────────────────────────────────────────────────
// Core backend send — bypasses queue logic. Used both for the normal path
// (agent idle) and for Steer / auto-flush paths.
const sendToBackend = useCallback(
(text: string, images?: ImageContent[]) => {
if (!agentState.sessionId || !agentState.ready) return;
executionApi.chat(agentState.sessionId, text, images).catch((err: unknown) => {
const errMsg = err instanceof Error ? err.message : String(err);
upsertMessage({
id: makeId(),
agent: "System",
agentColor: "",
content: `Failed to send message: ${errMsg}`,
timestamp: "",
type: "system",
thread: agentPath,
createdAt: Date.now(),
});
updateState({ isTyping: false, isStreaming: false, queenIsTyping: false });
});
},
[agentPath, agentState.sessionId, agentState.ready, updateState, upsertMessage],
);
const {
enqueue: enqueuePending,
steer: handleSteer,
cancelQueued: handleCancelQueued,
flushNext: flushNextPending,
flushNextRef: flushNextPendingRef,
clear: clearPendingQueue,
} = usePendingQueue({
sendToBackend,
setMessages,
onFlushStart: useCallback(() => {
updateState({ isTyping: true, queenIsTyping: true });
queenAboutToResumeRef.current = true;
}, [updateState]),
});
// Reset the queue whenever we navigate to a different colony (or to
// new-chat). The hook outlives the route change, so without this, a
// message queued in colony A would auto-flush into colony B's next
// execution_completed.
useEffect(() => {
clearPendingQueue();
}, [agentPath, isNewChat, clearPendingQueue]);
const handleCancelQueen = useCallback(async () => {
if (!agentState.sessionId) return;
try {
await executionApi.cancelQueen(agentState.sessionId);
updateState({ isTyping: false, isStreaming: false, queenIsTyping: false });
// After cancelling the current turn, immediately send the oldest
// queued message (if any). The remaining queued messages stay put
// so the user can review them or Steer/Cancel individually.
flushNextPending();
} catch {
// fire-and-forget
}
}, [agentState.sessionId, updateState]);
}, [agentState.sessionId, updateState, flushNextPending]);
const handleSend = useCallback(
(text: string, _thread: string, images?: ImageContent[]) => {
if (agentState.pendingQuestionSource === "queen") {
const answeringQuestion = agentState.pendingQuestionSource === "queen";
if (answeringQuestion) {
updateState({
pendingQuestions: null,
pendingQuestionSource: null,
});
}
// Queue when the queen is mid-turn — unless the user is answering an
// ask_user prompt, in which case we send immediately so the loop can
// resume. Queued messages are held locally (not sent to the backend)
// until the user clicks Steer or the queen goes idle.
const shouldQueue = !answeringQuestion && (agentState.queenIsTyping ?? false);
const msgId = makeId();
const userMsg: ChatMessage = {
id: makeId(),
id: msgId,
agent: "You",
agentColor: "",
content: text,
@@ -1151,29 +1230,27 @@ export default function ColonyChat() {
thread: agentPath,
createdAt: Date.now(),
images,
queued: shouldQueue,
};
setMessages((prev) => [...prev, userMsg]);
suppressIntroRef.current = false;
updateState({ isTyping: true, queenIsTyping: true });
if (agentState.sessionId && agentState.ready) {
executionApi.chat(agentState.sessionId, text, images).catch((err: unknown) => {
const errMsg = err instanceof Error ? err.message : String(err);
upsertMessage({
id: makeId(),
agent: "System",
agentColor: "",
content: `Failed to send message: ${errMsg}`,
timestamp: "",
type: "system",
thread: agentPath,
createdAt: Date.now(),
});
updateState({ isTyping: false, isStreaming: false, queenIsTyping: false });
});
if (shouldQueue) {
enqueuePending(msgId, { text, images });
return;
}
updateState({ isTyping: true, queenIsTyping: true });
sendToBackend(text, images);
},
[agentPath, agentState.sessionId, agentState.ready, agentState.pendingQuestionSource, updateState, upsertMessage],
[
agentPath,
agentState.queenIsTyping,
agentState.pendingQuestionSource,
updateState,
sendToBackend,
enqueuePending,
],
);
const handleQueenQuestionAnswer = useCallback(
@@ -1318,6 +1395,8 @@ export default function ColonyChat() {
messages={messages}
onSend={handleSend}
onCancel={handleCancelQueen}
onSteer={handleSteer}
onCancelQueued={handleCancelQueued}
activeThread={agentPath}
isWaiting={(agentState.queenIsTyping && !agentState.isStreaming) ?? false}
isWorkerWaiting={(agentState.workerIsTyping && !agentState.isStreaming) ?? false}
+87 -23
View File
@@ -10,6 +10,7 @@ import { executionApi } from "@/api/execution";
import { sessionsApi } from "@/api/sessions";
import { queensApi } from "@/api/queens";
import { useMultiSSE } from "@/hooks/use-sse";
import { usePendingQueue } from "@/hooks/use-pending-queue";
import type { AgentEvent, HistorySession } from "@/api/types";
import {
newReplayState,
@@ -74,6 +75,10 @@ export default function QueenDM() {
const [compactingAndForking, setCompactingAndForking] = useState(false);
const replayStateRef = useRef(newReplayState());
// Flipped true by the auto-flush path; consumed by the next empty-prompt
// client_input_requested so we don't flicker the typing bubble off while
// the queen is about to resume on the flushed input.
const queenAboutToResumeRef = useRef(false);
const [queenPhase, setQueenPhase] = useState<
"independent" | "incubating" | "working" | "reviewing"
>("independent");
@@ -499,11 +504,11 @@ export default function QueenDM() {
case "execution_started":
setIsTyping(true);
setQueenReady(true);
// Clear queued flag on all user messages now that the queen is processing
setMessages((prev) => {
if (!prev.some((m) => m.queued)) return prev;
return prev.map((m) => (m.queued ? { ...m, queued: undefined } : m));
});
// Do NOT clear `queued` on user messages here. The pending queue
// hook owns that flag — it's cleared on steer / cancel / flush.
// If the user has queued messages that haven't been flushed yet,
// the queen starting a new turn (e.g. from a steer or from the
// flush itself) shouldn't hide the still-queued ones.
break;
case "execution_completed":
@@ -517,6 +522,13 @@ export default function QueenDM() {
const out = (event.data.output_tokens as number) || 0;
setTokenUsage((prev) => ({ input: prev.input + inp, output: prev.output + out }));
}
// Flush one queued message per LLM turn boundary. This is the
// real "turn ended" signal in a queen DM — execution_completed
// only fires at session shutdown because the event loop parks in
// _await_user_input between turns. Mid-tool-call boundaries
// count too: sending now lets the queen pick up the message on
// her next drain, same as clicking Steer.
flushNextPendingRef.current();
break;
case "client_output_delta":
@@ -535,6 +547,14 @@ export default function QueenDM() {
options?: string[];
}[])
: null;
// An empty-prompt client_input_requested means the queen parked
// in auto-wait. If we just auto-flushed a queued message, our
// inject will unblock her in a moment — skip flipping isTyping
// off so the thinking bubble doesn't flicker.
if (queenAboutToResumeRef.current && !questions) {
queenAboutToResumeRef.current = false;
break;
}
setAwaitingInput(true);
setIsTyping(false);
setIsStreaming(false);
@@ -622,17 +642,60 @@ export default function QueenDM() {
useMultiSSE({ sessions: sseSessions, onEvent: handleSSEEvent });
// Send handler
// Core backend send — used both for immediate sends and for Steer /
// auto-flush paths out of the pending queue.
const sendToBackend = useCallback(
(text: string, images?: ImageContent[]) => {
if (!sessionId) return;
executionApi.chat(sessionId, text, images).catch(() => {
setIsTyping(false);
setIsStreaming(false);
});
},
[sessionId],
);
const {
enqueue: enqueuePending,
steer: handleSteer,
cancelQueued: handleCancelQueued,
flushNext: flushNextPending,
flushNextRef: flushNextPendingRef,
clear: clearPendingQueue,
} = usePendingQueue({
sendToBackend,
setMessages,
onFlushStart: useCallback(() => {
setIsTyping(true);
queenAboutToResumeRef.current = true;
}, []),
});
// Reset the queue whenever we navigate to a different queen. The hook
// outlives the route change (same component instance), so without this,
// a message queued for Queen A would auto-flush into Queen B's session
// on B's next execution_completed.
useEffect(() => {
clearPendingQueue();
}, [queenId, clearPendingQueue]);
// Send handler. Queues when the queen is mid-turn (unless the user is
// answering an ask_user prompt, which must send immediately to unblock
// the loop). Queued messages are held locally until Steer, Cancel, or
// the next `execution_completed` auto-flush.
const handleSend = useCallback(
(text: string, _thread: string, images?: ImageContent[]) => {
if (awaitingInput) {
const answeringQuestion = awaitingInput;
if (answeringQuestion) {
setAwaitingInput(false);
setPendingQuestions(null);
}
const isQueenBusy = isTyping;
const shouldQueue = !answeringQuestion && isTyping;
const msgId = makeId();
const userMsg: ChatMessage = {
id: makeId(),
id: msgId,
agent: "You",
agentColor: "",
content: text,
@@ -641,19 +704,19 @@ export default function QueenDM() {
thread: "queen-dm",
createdAt: Date.now(),
images,
queued: isQueenBusy || undefined,
queued: shouldQueue,
};
setMessages((prev) => [...prev, userMsg]);
setIsTyping(true);
if (sessionId) {
executionApi.chat(sessionId, text, images).catch(() => {
setIsTyping(false);
setIsStreaming(false);
});
if (shouldQueue) {
enqueuePending(msgId, { text, images });
return;
}
setIsTyping(true);
sendToBackend(text, images);
},
[sessionId, awaitingInput, isTyping],
[awaitingInput, isTyping, sendToBackend, enqueuePending],
);
const handleColonySpawn = useCallback(() => {
@@ -735,15 +798,14 @@ export default function QueenDM() {
setIsTyping(false);
setIsStreaming(false);
replayStateRef.current = newReplayState();
// Clear queued flags since the queen is now idle
setMessages((prev) => {
if (!prev.some((m) => m.queued)) return prev;
return prev.map((m) => (m.queued ? { ...m, queued: undefined } : m));
});
// After cancelling the current turn, immediately send the oldest
// queued message (if any). The remaining queued messages stay put
// so the user can review them or Steer/Cancel individually.
flushNextPending();
} catch {
// ignore
}
}, [sessionId]);
}, [sessionId, flushNextPending]);
return (
<div className="flex flex-col h-full">
@@ -766,6 +828,8 @@ export default function QueenDM() {
messages={messages}
onSend={handleSend}
onCancel={handleCancelQueen}
onSteer={handleSteer}
onCancelQueued={handleCancelQueued}
activeThread="queen-dm"
isWaiting={isTyping && !isStreaming}
isBusy={isTyping}
+13 -17
View File
@@ -67,14 +67,11 @@ def test_cerebras_catalog_tracks_public_models_endpoint():
assert cerebras_default == "gpt-oss-120b"
assert [model["id"] for model in cerebras_models] == [
"gpt-oss-120b",
"llama3.1-8b",
"zai-glm-4.7",
"qwen-3-235b-a22b-instruct-2507",
]
assert cerebras_models[0]["max_tokens"] == 40960
assert cerebras_models[0]["max_context_tokens"] == 131072
assert cerebras_models[1]["max_tokens"] == 8192
assert cerebras_models[1]["max_context_tokens"] == 32768
assert all(model["max_tokens"] == 40960 for model in cerebras_models)
assert all(model["max_context_tokens"] == 131072 for model in cerebras_models)
def test_minimax_catalog_tracks_current_non_legacy_text_models():
@@ -86,8 +83,8 @@ def test_minimax_catalog_tracks_current_non_legacy_text_models():
"MiniMax-M2.7",
"MiniMax-M2.5",
]
assert all(model["max_context_tokens"] == 204800 for model in minimax_models)
assert all(model["max_tokens"] == 32768 for model in minimax_models)
assert all(model["max_context_tokens"] == 180000 for model in minimax_models)
assert all(model["max_tokens"] == 40960 for model in minimax_models)
def test_mistral_catalog_tracks_current_curated_models():
@@ -149,26 +146,25 @@ def test_openrouter_catalog_tracks_current_frontier_set():
"anthropic/claude-sonnet-4.6",
"anthropic/claude-opus-4.6",
"google/gemini-3.1-pro-preview-customtools",
"deepseek/deepseek-v3.2",
"qwen/qwen3.6-plus",
"z-ai/glm-5v-turbo",
"x-ai/grok-4.20",
"z-ai/glm-5.1",
"minimax/minimax-m2.7",
"xiaomi/mimo-v2-pro",
"stepfun/step-3.5-flash",
]
assert openrouter_models[0]["max_tokens"] == 128000
assert openrouter_models[0]["max_context_tokens"] == 922000
assert openrouter_models[1]["max_context_tokens"] == 936000
assert openrouter_models[0]["max_context_tokens"] == 872000
assert openrouter_models[1]["max_context_tokens"] == 872000
assert openrouter_models[2]["max_context_tokens"] == 872000
assert openrouter_models[3]["max_context_tokens"] == 1048576
assert openrouter_models[4]["max_context_tokens"] == 163840
assert openrouter_models[3]["max_context_tokens"] == 872000
assert openrouter_models[4]["max_context_tokens"] == 240000
def test_find_model_any_provider_returns_provider_and_model():
provider_id, model = model_catalog.find_model_any_provider("google/gemini-3.1-pro-preview-customtools")
assert provider_id == "openrouter"
assert model["max_context_tokens"] == 1048576
assert model["max_context_tokens"] == 872000
def test_get_preset_returns_subscription_specific_limits():
@@ -187,8 +183,8 @@ def test_minimax_preset_uses_current_default_model():
assert preset is not None
assert preset["model"] == "MiniMax-M2.7"
assert preset["max_tokens"] == 32768
assert preset["max_context_tokens"] == 204800
assert preset["max_tokens"] == 40960
assert preset["max_context_tokens"] == 180800
def test_load_model_catalog_rejects_duplicate_model_ids(tmp_path, monkeypatch):
@@ -110,6 +110,7 @@ from .pipedrive import PIPEDRIVE_CREDENTIALS
from .plaid import PLAID_CREDENTIALS
from .postgres import POSTGRES_CREDENTIALS
from .powerbi import POWERBI_CREDENTIALS
from .prometheus import PROMETHEUS_CREDENTIALS
from .pushover import PUSHOVER_CREDENTIALS
from .quickbooks import QUICKBOOKS_CREDENTIALS
from .razorpay import RAZORPAY_CREDENTIALS
@@ -197,6 +198,7 @@ CREDENTIAL_SPECS = {
**PLAID_CREDENTIALS,
**POSTGRES_CREDENTIALS,
**POWERBI_CREDENTIALS,
**PROMETHEUS_CREDENTIALS,
**PUSHOVER_CREDENTIALS,
**QUICKBOOKS_CREDENTIALS,
**RAZORPAY_CREDENTIALS,
@@ -293,6 +295,7 @@ __all__ = [
"PLAID_CREDENTIALS",
"POSTGRES_CREDENTIALS",
"POWERBI_CREDENTIALS",
"PROMETHEUS_CREDENTIALS",
"PUSHOVER_CREDENTIALS",
"QUICKBOOKS_CREDENTIALS",
"RAZORPAY_CREDENTIALS",
@@ -1045,6 +1045,47 @@ class LushaHealthChecker:
)
class PrometheusHealthChecker:
"""Health checker for Prometheus (no authentication)."""
TIMEOUT = 5.0
def check(self, base_url: str) -> HealthCheckResult:
"""
Validate Prometheus by hitting /-/ready endpoint.
"""
url = base_url.rstrip("/") + "/-/ready"
try:
with httpx.Client(timeout=self.TIMEOUT) as client:
response = client.get(url)
if response.status_code == 200:
return HealthCheckResult(
valid=True,
message="Prometheus is healthy",
)
else:
return HealthCheckResult(
valid=False,
message=f"Prometheus returned status {response.status_code}",
details={"status_code": response.status_code},
)
except httpx.TimeoutException:
return HealthCheckResult(
valid=False,
message="Prometheus health check timed out",
details={"error": "timeout"},
)
except httpx.RequestError as e:
return HealthCheckResult(
valid=False,
message=f"Failed to connect to Prometheus: {e}",
details={"error": str(e)},
)
# --- New checkers using BaseHttpHealthChecker ---
@@ -1342,6 +1383,7 @@ HEALTH_CHECKERS: dict[str, CredentialHealthChecker] = {
"notion_token": NotionHealthChecker(),
"pinecone": PineconeHealthChecker(),
"pipedrive": PipedriveHealthChecker(),
"prometheus": PrometheusHealthChecker(),
"resend": ResendHealthChecker(),
"serpapi": SerpApiHealthChecker(),
"slack": SlackHealthChecker(),
@@ -0,0 +1,41 @@
from __future__ import annotations
from .base import CredentialSpec
PROMETHEUS_CREDENTIALS = {
"prometheus": CredentialSpec(
env_var="PROMETHEUS_BASE_URL",
tools=[
"prometheus_query",
"prometheus_query_range",
],
required=True,
startup_required=False,
help_url="https://prometheus.io/docs/prometheus/latest/querying/api/",
description="Base URL of Prometheus server",
aden_supported=False,
direct_api_key_supported=False,
api_key_instructions="""To configure Prometheus access:
1. Set your Prometheus base URL:
export PROMETHEUS_BASE_URL=http://localhost:9090
Optional authentication:
2. For Bearer Token:
export PROMETHEUS_TOKEN=your-token
3. For Basic Auth:
export PROMETHEUS_USERNAME=admin
export PROMETHEUS_PASSWORD=secret
Notes:
- PROMETHEUS_BASE_URL is required
- Authentication is optional (most local setups dont need it)
""",
health_check_endpoint="/-/ready",
health_check_method="GET",
credential_id="prometheus",
credential_key="base_url",
),
}
+2
View File
@@ -105,6 +105,7 @@ from .plaid_tool import register_tools as register_plaid
from .port_scanner import register_tools as register_port_scanner
from .postgres_tool import register_tools as register_postgres
from .powerbi_tool import register_tools as register_powerbi
from .prometheus_tool import register_tools as register_prometheus
from .pushover_tool import register_tools as register_pushover
from .quickbooks_tool import register_tools as register_quickbooks
from .razorpay_tool import register_tools as register_razorpay
@@ -310,6 +311,7 @@ def _register_unverified(
register_pipedrive(mcp, credentials=credentials)
register_plaid(mcp, credentials=credentials)
register_powerbi(mcp, credentials=credentials)
register_prometheus(mcp, credentials=credentials)
register_pushover(mcp, credentials=credentials)
register_quickbooks(mcp, credentials=credentials)
register_reddit(mcp, credentials=credentials)
@@ -0,0 +1,158 @@
# Prometheus Metrics Query Tool
Provides PromQL-based querying tools for agents to fetch real-time and historical metrics from a Prometheus server.
## Authentication
Authentication is **optional** as most prometheus servers are deployed within private infrastructure. If no credentials are set, requests are sent without auth headers (suitable for open/internal Prometheus instances).
When credentials are present, **Bearer token takes priority** over Basic Auth:
| Priority | Mode | Environment Variables |
| -------- | ------------ | --------------------------------------------- |
| 1 | Bearer Token | `PROMETHEUS_TOKEN` |
| 2 | Basic Auth | `PROMETHEUS_USERNAME` + `PROMETHEUS_PASSWORD` |
| 3 | None | _(no variables set)_ |
### Setup
```bash
# Required
export PROMETHEUS_BASE_URL="http://your-prometheus-host:9090"
# Optional — Bearer token (takes priority if set)
export PROMETHEUS_TOKEN="your_token_here"
# Optional — Basic auth
export PROMETHEUS_USERNAME="admin"
export PROMETHEUS_PASSWORD="secret"
```
> \_Note: Base URL can also be configured via the Aden Credential Store under the `prometheus` key.
---
## Tools
| Tool | Description |
| ------------------------ | --------------------------------------------------------- |
| `prometheus_query` | Run an instant PromQL query — returns current value(s) |
| `prometheus_query_range` | Run a PromQL query over a time range with step resolution |
---
## Tool Reference
### `prometheus_query`
Executes a PromQL expression against `/api/v1/query`. Returns the latest value for matching time series.
**Parameters:**
| Name | Type | Required | Default | Description |
| --------- | ---- | -------- | ------- | ----------------------------------------------------------------- |
| `query` | str | ✅ | — | PromQL expression (max 1000 chars) |
| `timeout` | int | ❌ | `5` | Request timeout in seconds (130; out-of-range values reset to 5) |
**Returns:**
```json
{
"success": true,
"query": "up",
"result": [
{
"metric": {
"__name__": "up",
"job": "prometheus",
"instance": "localhost:9090"
},
"value": [1700000000.0, "1"]
}
],
"raw": {
"status": "success",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"__name__": "up",
"job": "prometheus",
"instance": "localhost:9090"
},
"value": [1700000000.0, "1"]
}
]
}
}
}
```
---
### `prometheus_query_range`
Executes a PromQL expression against `/api/v1/query_range`. Returns a matrix of values over time — useful for graphing, trends, and historical analysis.
**Parameters:**
| Name | Type | Required | Default | Description |
| --------- | ---- | -------- | ------- | ---------------------------------------- |
| `query` | str | ✅ | — | PromQL expression (max 1000 chars) |
| `start` | str | ✅ | — | Start time — Unix timestamp or RFC3339 |
| `end` | str | ✅ | — | End time — Unix timestamp or RFC3339 |
| `step` | str | ❌ | `"60s"` | Resolution step (e.g. `15s`, `5m`, `1h`) |
| `timeout` | int | ❌ | `5` | Request timeout in seconds (130) |
**Returns:**
```json
{
"success": true,
"query": "rate(http_requests_total[5m])",
"start": "2024-01-01T00:00:00Z",
"end": "2024-01-01T01:00:00Z",
"step": "60s",
"result": [
{
"metric": { "job": "api-server" },
"values": [
[1704067200, "3.14"],
[1704067260, "3.22"]
]
}
]
}
```
---
## Error Handling
All tools return structured error dicts on failure.
```json
{ "error": "Request to Prometheus timed out" }
{ "error": "Failed to connect to Prometheus", "help": "Check if Prometheus is running and base_url is correct" }
{ "error": "Prometheus returned status 400", "details": "..." }
{ "error": "Query must be between 1 and 1000 characters" }
{ "error": "start and end time are required" }
{
"error": "Missing required credential: <description>",
"help": "<setup instructions>",
"success": false
}
```
---
## Limits & Safeguards
| Guard | Value |
| ----------------- | ------------------------------------------------------------------- |
| Base URL priority | Credential store (`prometheus`) → fallback to `PROMETHEUS_BASE_URL` |
| Timeout handling | Out-of-range values reset to `5s` |
| Query limit | Must be 11000 characters |
| URL normalization | Trailing `/` removed using `.rstrip('/')` |
| Timeout range | 130 seconds (values outside reset defaults to 5s) |
@@ -0,0 +1,5 @@
from __future__ import annotations
from .prometheus_tool import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,260 @@
"""
Prometheus Tool - Query metrics from a Prometheus server using PromQL.
Required:
- PROMETHEUS_BASE_URL
Optional Authentication:
- PROMETHEUS_TOKEN (Bearer token)
- PROMETHEUS_USERNAME and PROMETHEUS_PASSWORD (Basic Auth)
API Reference: https://prometheus.io/docs/prometheus/latest/querying/api/
"""
from __future__ import annotations
import os
import httpx
from fastmcp import FastMCP
from aden_tools.credentials import CREDENTIAL_SPECS
from aden_tools.credentials.store_adapter import CredentialStoreAdapter
DEFAULT_TIMEOUT = 5
def _get_prometheus_base_url(
credentials: CredentialStoreAdapter | None,
) -> str | None:
"""
Return Prometheus base URL.
Priority:
1. Credential store
2. Environment variable fallback
Parameters:
credentials: Credential store to query
Returns:
Base URL string or None
"""
base_url: str | None = None
if credentials:
base_url = credentials.get("prometheus")
if not base_url:
base_url = os.getenv("PROMETHEUS_BASE_URL")
return base_url
def _missing_prometheus_credential_response() -> dict:
"""
Return a standardized response for missing Prometheus configuration.
"""
spec = CREDENTIAL_SPECS["prometheus"]
return {
"error": f"Missing required credential: {spec.description}",
"help": spec.api_key_instructions,
"success": False,
}
def _get_auth() -> tuple[dict[str, str], httpx.BasicAuth | None]:
headers: dict[str, str] = {}
auth = None
# Bearer token
token = os.getenv("PROMETHEUS_TOKEN")
if token:
headers["Authorization"] = f"Bearer {token}"
return headers, None
# Basic auth
username = os.getenv("PROMETHEUS_USERNAME")
password = os.getenv("PROMETHEUS_PASSWORD")
if username and password:
auth = httpx.BasicAuth(username, password)
return headers, auth
def register_tools(
mcp: FastMCP,
credentials: CredentialStoreAdapter | None = None,
) -> None:
"""Register Prometheus tools with MCP."""
@mcp.tool()
def prometheus_query(
query: str,
timeout: int = DEFAULT_TIMEOUT,
) -> dict:
"""
Query Prometheus using PromQL.
Use this tool to fetch real-time metrics from a Prometheus server.
Args:
query: PromQL query string (e.g., 'up', 'sum(rate(http_requests_total[1m]))')
timeout: Request timeout in seconds (1-30)
Returns:
Dict containing query results or error
"""
# limit query length
if not query or len(query) > 1000:
return {"error": "Query must be between 1 and 1000 characters"}
if timeout < 1 or timeout > 30:
timeout = DEFAULT_TIMEOUT
base_url = _get_prometheus_base_url(credentials)
if not base_url:
return _missing_prometheus_credential_response()
url = f"{base_url.rstrip('/')}/api/v1/query"
headers, auth = _get_auth()
try:
response = httpx.get(
url,
params={"query": query},
headers=headers,
auth=auth,
timeout=timeout,
)
if response.status_code != 200:
return {
"error": f"Prometheus returned status {response.status_code}",
"details": response.text,
}
data = response.json()
if data.get("status") != "success":
return {
"error": "Prometheus query failed",
"details": data,
}
return {
"success": True,
"query": query,
"result": data.get("data", {}).get("result", []),
"raw": data,
}
except httpx.TimeoutException:
return {"error": "Request to Prometheus timed out"}
except httpx.ConnectError:
return {
"error": "Failed to connect to Prometheus",
"help": "Check if Prometheus is running and base_url is correct",
}
except Exception as e:
return {"error": f"Unexpected error: {str(e)}"}
@mcp.tool()
def prometheus_query_range(
query: str,
start: str,
end: str,
step: str = "60s",
timeout: int = DEFAULT_TIMEOUT,
) -> dict:
"""
Query Prometheus over a time range using PromQL.
Use this tool to fetch historical metrics and time series data
from a Prometheus server. Suitable for trend analysis, graphing,
and monitoring over a defined time window.
Args:
query: PromQL query string (e.g., 'rate(http_requests_total[5m])')
start: Start time (Unix timestamp or RFC3339 format, e.g., "2024-01-01T00:00:00Z")
end: End time (Unix timestamp or RFC3339 format, e.g., "2024-01-01T00:00:00Z")
step: Query resolution step (e.g., '15s', '5m', '1h')
timeout: Request timeout in seconds (1-30)
Returns:
Dict containing time-series results or error
"""
if not query or len(query) > 1000:
return {"error": "Query must be between 1 and 1000 characters"}
if not start or not end:
return {"error": "start and end time are required"}
if timeout < 1 or timeout > 30:
timeout = DEFAULT_TIMEOUT
base_url = _get_prometheus_base_url(credentials)
if not base_url:
return _missing_prometheus_credential_response()
url = f"{base_url.rstrip('/')}/api/v1/query_range"
headers, auth = _get_auth()
try:
response = httpx.get(
url,
params={
"query": query,
"start": start,
"end": end,
"step": step,
},
headers=headers,
auth=auth,
timeout=timeout,
)
if response.status_code != 200:
return {
"error": f"Prometheus returned status {response.status_code}",
"details": response.text,
}
data = response.json()
if data.get("status") != "success":
return {
"error": "Prometheus range query failed",
"details": data,
}
return {
"success": True,
"query": query,
"start": start,
"end": end,
"step": step,
"result": data.get("data", {}).get("result", []),
"raw": data,
}
except httpx.TimeoutException:
return {"error": "Request to Prometheus timed out"}
except httpx.ConnectError:
return {
"error": "Failed to connect to Prometheus",
"help": "Check if Prometheus is running and base_url is correct",
}
except Exception as e:
return {"error": f"Unexpected error: {str(e)}"}
+1
View File
@@ -88,6 +88,7 @@ class TestHealthCheckerRegistry:
"notion_token",
"pinecone",
"pipedrive",
"prometheus",
"resend",
"serpapi",
"slack",
+167
View File
@@ -0,0 +1,167 @@
from __future__ import annotations
import httpx
import pytest
from fastmcp import FastMCP
from aden_tools.tools.prometheus_tool import register_tools
@pytest.fixture
def mcp() -> FastMCP:
server = FastMCP("test")
register_tools(server)
return server
def test_prometheus_query_validation(mcp: FastMCP) -> None:
tool_fn = mcp._tool_manager._tools["prometheus_query"].fn
result = tool_fn(query="")
assert "error" in result
def test_prometheus_query_success(mcp: FastMCP, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("PROMETHEUS_BASE_URL", "http://fake-prometheus:9090")
tool_fn = mcp._tool_manager._tools["prometheus_query"].fn
class MockResponse:
status_code = 200
def json(self):
return {
"status": "success",
"data": {"result": [{"metric": {}, "value": [123, "1"]}]},
}
def mock_get(*args, **kwargs):
return MockResponse()
monkeypatch.setattr("aden_tools.tools.prometheus_tool.prometheus_tool.httpx.get", mock_get)
result = tool_fn(query="up")
assert result["success"] is True
assert "result" in result
def test_prometheus_query_range_success(mcp: FastMCP, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("PROMETHEUS_BASE_URL", "http://fake-prometheus:9090")
tool_fn = mcp._tool_manager._tools["prometheus_query_range"].fn
class MockResponse:
status_code = 200
def json(self):
return {
"status": "success",
"data": {"result": [{"values": [[123, "1"]]}]},
}
monkeypatch.setattr("aden_tools.tools.prometheus_tool.prometheus_tool.httpx.get", lambda *a, **k: MockResponse())
result = tool_fn(
query="up",
start="2026-01-01T00:00:00Z",
end="2026-01-01T01:00:00Z",
)
assert result["success"] is True
def test_prometheus_non_200(mcp: FastMCP, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("PROMETHEUS_BASE_URL", "http://fake-prometheus:9090")
tool_fn = mcp._tool_manager._tools["prometheus_query"].fn
class MockResponse:
status_code = 500
text = "Internal error"
def json(self):
return {}
monkeypatch.setattr("aden_tools.tools.prometheus_tool.prometheus_tool.httpx.get", lambda *a, **k: MockResponse())
result = tool_fn(query="up")
assert "error" in result
def test_timeout(mcp: FastMCP, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("PROMETHEUS_BASE_URL", "http://fake-prometheus:9090")
tool_fn = mcp._tool_manager._tools["prometheus_query"].fn
def mock_query(*args, **kwargs):
raise httpx.TimeoutException("Request timed out")
monkeypatch.setattr(
"aden_tools.tools.prometheus_tool.prometheus_tool.httpx.get",
mock_query,
)
result = tool_fn(query="up")
assert "error" in result
assert "timed out" in result["error"].lower()
def test_prometheus_connection_error(mcp: FastMCP, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("PROMETHEUS_BASE_URL", "http://fake-prometheus:9090")
tool_fn = mcp._tool_manager._tools["prometheus_query"].fn
def mock_get(*args, **kwargs):
raise Exception("Connection failed")
monkeypatch.setattr("aden_tools.tools.prometheus_tool.prometheus_tool.httpx.get", mock_get)
result = tool_fn(query="up")
assert "error" in result
def test_missing_base_url(mcp: FastMCP, monkeypatch: pytest.MonkeyPatch) -> None:
tool_fn = mcp._tool_manager._tools["prometheus_query"].fn
monkeypatch.delenv("PROMETHEUS_BASE_URL", raising=False)
result = tool_fn(query="up")
assert result["success"] is False
assert "Missing required credential" in result["error"]
def test_base_url_credentials_priority_over_env(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("PROMETHEUS_BASE_URL", "http://fake-prometheus:9090")
class FakeCredentialStore:
def get(self, key: str):
return "http://cred-prometheus:9090"
mcp = FastMCP("test-cred-override")
register_tools(mcp, credentials=FakeCredentialStore())
called_urls = []
def fake_get(url, *args, **kwargs):
called_urls.append(url)
class Resp:
status_code = 200
def json(self):
return {"status": "success", "data": {"result": []}}
return Resp()
monkeypatch.setattr("aden_tools.tools.prometheus_tool.prometheus_tool.httpx.get", fake_get)
tool_fn = mcp._tool_manager._tools["prometheus_query"].fn
result = tool_fn(query="up")
assert result["success"] is True
assert result["query"] == "up"
assert "cred-prometheus:9090" in called_urls[0]