diff --git a/core/framework/graph/event_loop_node.py b/core/framework/graph/event_loop_node.py index 97ceda79..72b8a53e 100644 --- a/core/framework/graph/event_loop_node.py +++ b/core/framework/graph/event_loop_node.py @@ -533,12 +533,28 @@ class EventLoopNode(NodeProtocol): _restored_recent_responses = restored.recent_responses _restored_tool_fingerprints = restored.recent_tool_fingerprints - # Refresh the system prompt with full 3-layer composition. - # The stored prompt may be stale after code changes or when - # runtime-injected context (e.g. worker identity) has changed. - # On resume, we rebuild identity + narrative + focus so the LLM - # understands the session history, not just the node directive. - from framework.graph.prompt_composer import compose_system_prompt + # Refresh the system prompt with full composition including + # execution preamble and node-type preamble. The stored + # prompt may be stale after code changes or when runtime- + # injected context (e.g. worker identity) has changed. + from framework.graph.prompt_composer import ( + EXECUTION_SCOPE_PREAMBLE, + compose_system_prompt, + ) + + _exec_preamble = None + if ( + not ctx.is_subagent_mode + and ctx.node_spec.node_type in ("event_loop", "gcu") + and ctx.node_spec.output_keys + ): + _exec_preamble = EXECUTION_SCOPE_PREAMBLE + + _node_type_preamble = None + if ctx.node_spec.node_type == "gcu": + from framework.graph.gcu import GCU_BROWSER_SYSTEM_PROMPT + + _node_type_preamble = GCU_BROWSER_SYSTEM_PROMPT _current_prompt = compose_system_prompt( identity_prompt=ctx.identity_prompt or None, @@ -547,6 +563,8 @@ class EventLoopNode(NodeProtocol): accounts_prompt=ctx.accounts_prompt or None, skills_catalog_prompt=ctx.skills_catalog_prompt or None, protocols_prompt=ctx.protocols_prompt or None, + execution_preamble=_exec_preamble, + node_type_preamble=_node_type_preamble, ) if conversation.system_prompt != _current_prompt: conversation.update_system_prompt(_current_prompt) @@ -2486,6 +2504,27 @@ class EventLoopNode(NodeProtocol): results_by_id[tc.tool_use_id] = result elif tc.tool_name == "delegate_to_sub_agent": + # Guard: in continuous mode the LLM may see delegate + # calls from a previous node's conversation history and + # attempt to re-use the tool on a node that doesn't own + # it. Only accept if the tool was actually offered. + if not any(t.name == "delegate_to_sub_agent" for t in tools): + logger.warning( + "[%s] LLM called delegate_to_sub_agent but tool " + "was not offered to this node — rejecting", + node_id, + ) + result = ToolResult( + tool_use_id=tc.tool_use_id, + content=( + "ERROR: delegate_to_sub_agent is not available " + "on this node. This tool belongs to a different " + "node in the workflow." + ), + is_error=True, + ) + results_by_id[tc.tool_use_id] = result + continue # --- Framework-level subagent delegation --- # Queue for parallel execution in Phase 2 logger.info( @@ -5155,7 +5194,20 @@ class EventLoopNode(NodeProtocol): write_keys=[], # Read-only! ) - # 2b. Set up report callback (one-way channel to parent / event bus) + # 2b. Compute instance counter early so node_id is available for the + # report callback and the NodeContext. Each delegation to the same + # agent_id gets a unique suffix (instance 1 has no suffix for backward + # compat; instance 2+ appends ":N"). + self._subagent_instance_counter.setdefault(agent_id, 0) + self._subagent_instance_counter[agent_id] += 1 + _sa_instance = self._subagent_instance_counter[agent_id] + if _sa_instance > 1: + sa_node_id = f"{ctx.node_id}:subagent:{agent_id}:{_sa_instance}" + else: + sa_node_id = f"{ctx.node_id}:subagent:{agent_id}" + subagent_instance = str(_sa_instance) + + # 2c. Set up report callback (one-way channel to parent / event bus) subagent_reports: list[dict] = [] async def _report_callback( @@ -5168,7 +5220,7 @@ class EventLoopNode(NodeProtocol): if self._event_bus: await self._event_bus.emit_subagent_report( stream_id=ctx.node_id, - node_id=f"{ctx.node_id}:subagent:{agent_id}", + node_id=sa_node_id, subagent_id=agent_id, message=message, data=data, @@ -5258,7 +5310,7 @@ class EventLoopNode(NodeProtocol): max_iter = min(self._config.max_iterations, 10) subagent_ctx = NodeContext( runtime=ctx.runtime, - node_id=f"{ctx.node_id}:subagent:{agent_id}", + node_id=sa_node_id, node_spec=subagent_spec, memory=scoped_memory, input_data={"task": task, **parent_data}, @@ -5286,10 +5338,7 @@ class EventLoopNode(NodeProtocol): # Derive a conversation store for the subagent from the parent's store. # Each invocation gets a unique path so that repeated delegate calls # (e.g. one per profile) don't restore a stale completed conversation. - self._subagent_instance_counter.setdefault(agent_id, 0) - self._subagent_instance_counter[agent_id] += 1 - subagent_instance = str(self._subagent_instance_counter[agent_id]) - + # (Instance counter was computed earlier in step 2b.) subagent_conv_store = None if self._conversation_store is not None: from framework.storage.conversation_store import FileConversationStore diff --git a/core/framework/graph/prompt_composer.py b/core/framework/graph/prompt_composer.py index f9ba1287..29e26914 100644 --- a/core/framework/graph/prompt_composer.py +++ b/core/framework/graph/prompt_composer.py @@ -152,6 +152,8 @@ def compose_system_prompt( accounts_prompt: str | None = None, skills_catalog_prompt: str | None = None, protocols_prompt: str | None = None, + execution_preamble: str | None = None, + node_type_preamble: str | None = None, ) -> str: """Compose the multi-layer system prompt. @@ -162,6 +164,10 @@ def compose_system_prompt( accounts_prompt: Connected accounts block (sits between identity and narrative). skills_catalog_prompt: Available skills catalog XML (Agent Skills standard). protocols_prompt: Default skill operational protocols section. + execution_preamble: EXECUTION_SCOPE_PREAMBLE for worker nodes + (prepended before focus so the LLM knows its pipeline scope). + node_type_preamble: Node-type-specific preamble, e.g. GCU browser + best-practices prompt (prepended before focus). Returns: Composed system prompt with all layers present, plus current datetime. @@ -188,6 +194,15 @@ def compose_system_prompt( if narrative: parts.append(f"\n--- Context (what has happened so far) ---\n{narrative}") + # Execution scope preamble (worker nodes — tells the LLM it is one + # step in a multi-node pipeline and should not overreach) + if execution_preamble: + parts.append(f"\n{execution_preamble}") + + # Node-type preamble (e.g. GCU browser best-practices) + if node_type_preamble: + parts.append(f"\n{node_type_preamble}") + # Layer 3: Focus (current phase directive) if focus_prompt: parts.append(f"\n--- Current Focus ---\n{focus_prompt}") diff --git a/core/framework/server/session_manager.py b/core/framework/server/session_manager.py index 9b32f9f0..b6d7760f 100644 --- a/core/framework/server/session_manager.py +++ b/core/framework/server/session_manager.py @@ -1032,10 +1032,17 @@ class SessionManager: _consolidation_session_dir = queen_dir async def _on_compaction(_event) -> None: + # Only consolidate on queen compactions — worker and subagent + # compactions are frequent and don't warrant a memory update. + if getattr(_event, "stream_id", None) != "queen": + return from framework.agents.queen.queen_memory import consolidate_queen_memory - await consolidate_queen_memory( - session.id, _consolidation_session_dir, _consolidation_llm + asyncio.create_task( + consolidate_queen_memory( + session.id, _consolidation_session_dir, _consolidation_llm + ), + name=f"queen-memory-consolidation-{session.id}", ) from framework.runtime.event_bus import EventType as _ET diff --git a/core/frontend/src/components/ChatPanel.tsx b/core/frontend/src/components/ChatPanel.tsx index 0e02082d..59aead74 100644 --- a/core/frontend/src/components/ChatPanel.tsx +++ b/core/frontend/src/components/ChatPanel.tsx @@ -1,4 +1,4 @@ -import { memo, useState, useRef, useEffect } from "react"; +import { memo, useState, useRef, useEffect, useMemo } from "react"; import { Send, Square, Crown, Cpu, Check, Loader2 } from "lucide-react"; export interface ContextUsageEntry { @@ -10,6 +10,7 @@ export interface ContextUsageEntry { import MarkdownContent from "@/components/MarkdownContent"; import QuestionWidget from "@/components/QuestionWidget"; import MultiQuestionWidget from "@/components/MultiQuestionWidget"; +import ParallelSubagentBubble, { type SubagentGroup } from "@/components/ParallelSubagentBubble"; export interface ChatMessage { id: string; @@ -25,6 +26,10 @@ export interface ChatMessage { createdAt?: number; /** Queen phase active when this message was created */ phase?: "planning" | "building" | "staging" | "running"; + /** Backend node_id that produced this message — used for subagent grouping */ + nodeId?: string; + /** Backend execution_id for this message */ + executionId?: string; } interface ChatPanelProps { @@ -269,6 +274,84 @@ export default function ChatPanel({ messages, onSend, isWaiting, isWorkerWaiting return true; }); + // Group subagent messages into parallel bubbles. + // A subagent message has nodeId containing ":subagent:". + // The run only ends on hard boundaries (user messages, run_dividers) + // so interleaved queen/tool/system messages don't fragment the bubble. + type RenderItem = + | { kind: "message"; msg: ChatMessage } + | { kind: "parallel"; groupId: string; groups: SubagentGroup[] }; + + const renderItems = useMemo(() => { + const items: RenderItem[] = []; + let i = 0; + while (i < threadMessages.length) { + const msg = threadMessages[i]; + const isSubagent = msg.nodeId?.includes(":subagent:"); + if (!isSubagent) { + items.push({ kind: "message", msg }); + i++; + continue; + } + + // Start a subagent run. Collect all subagent messages, allowing + // non-subagent messages in between (they render as normal items + // before the bubble). Only break on hard boundaries. + const subagentMsgs: ChatMessage[] = []; + const interleaved: { idx: number; msg: ChatMessage }[] = []; + const firstId = msg.id; + + while (i < threadMessages.length) { + const m = threadMessages[i]; + const isSa = m.nodeId?.includes(":subagent:"); + + if (isSa) { + subagentMsgs.push(m); + i++; + continue; + } + + // Hard boundary — stop the run + if (m.type === "user" || m.type === "run_divider") break; + + // Worker message from a non-subagent node means the graph has + // moved on to the next stage. Close the bubble even if some + // subagents are still streaming in the background. + if (m.role === "worker" && m.nodeId && !m.nodeId.includes(":subagent:")) break; + + // Soft interruption (queen output, system, tool_status without + // nodeId) — render it normally but keep the subagent run going + interleaved.push({ idx: items.length + interleaved.length, msg: m }); + i++; + } + + // Emit interleaved messages first (before the bubble) + for (const { msg: im } of interleaved) { + items.push({ kind: "message", msg: im }); + } + + // Build the single parallel bubble from all collected subagent msgs + if (subagentMsgs.length > 0) { + const byNode = new Map(); + for (const m of subagentMsgs) { + const nid = m.nodeId!; + if (!byNode.has(nid)) byNode.set(nid, []); + byNode.get(nid)!.push(m); + } + const groups: SubagentGroup[] = []; + for (const [nodeId, msgs] of byNode) { + groups.push({ + nodeId, + messages: msgs, + contextUsage: contextUsage?.[nodeId], + }); + } + items.push({ kind: "parallel", groupId: `par-${firstId}`, groups }); + } + } + return items; + }, [threadMessages, contextUsage]); + // Mark current thread as read useEffect(() => { const count = messages.filter((m) => m.thread === activeThread).length; @@ -314,11 +397,17 @@ export default function ChatPanel({ messages, onSend, isWaiting, isWorkerWaiting {/* Messages */}
- {threadMessages.map((msg) => ( -
- -
- ))} + {renderItems.map((item) => + item.kind === "parallel" ? ( +
+ +
+ ) : ( +
+ +
+ ) + )} {/* Show typing indicator while waiting for first queen response (disabled + empty chat) */} {(isWaiting || (disabled && threadMessages.length === 0)) && ( diff --git a/core/frontend/src/components/ParallelSubagentBubble.tsx b/core/frontend/src/components/ParallelSubagentBubble.tsx new file mode 100644 index 00000000..96907d84 --- /dev/null +++ b/core/frontend/src/components/ParallelSubagentBubble.tsx @@ -0,0 +1,413 @@ +import { memo, useState, useRef, useEffect } from "react"; +import { ChevronDown, ChevronUp, Cpu } from "lucide-react"; +import type { ChatMessage, ContextUsageEntry } from "@/components/ChatPanel"; +import MarkdownContent from "@/components/MarkdownContent"; + +// --------------------------------------------------------------------------- +// Shared helpers +// --------------------------------------------------------------------------- + +const workerColor = "hsl(220,60%,55%)"; + +const SUBAGENT_COLORS = [ + "hsl(220,60%,55%)", + "hsl(260,50%,55%)", + "hsl(180,50%,45%)", + "hsl(30,70%,50%)", + "hsl(340,55%,50%)", + "hsl(150,45%,45%)", + "hsl(45,80%,50%)", + "hsl(290,45%,55%)", +]; + +function colorForIndex(i: number): string { + return SUBAGENT_COLORS[i % SUBAGENT_COLORS.length]; +} + +function subagentLabel(nodeId: string): string { + const parts = nodeId.split(":subagent:"); + const raw = parts.length >= 2 ? parts[1] : nodeId; + return raw + .replace(/:\d+$/, "") // strip instance suffix like ":3" + .replace(/[_-]/g, " ") + .replace(/\b\w/g, (c) => c.toUpperCase()) + .trim(); +} + +function last(arr: T[]): T | undefined { + return arr[arr.length - 1]; +} + +export interface SubagentGroup { + nodeId: string; + messages: ChatMessage[]; + contextUsage?: ContextUsageEntry; +} + +interface ParallelSubagentBubbleProps { + groups: SubagentGroup[]; + groupId: string; +} + +// --------------------------------------------------------------------------- +// Thermometer — vertical context gauge on right edge of each pane +// --------------------------------------------------------------------------- + +// --------------------------------------------------------------------------- +// Tool overlay — shown when a tool_status message is active (not all done) +// --------------------------------------------------------------------------- + +function ToolOverlay({ + toolName, + color, + visible, +}: { + toolName: string; + color: string; + visible: boolean; +}) { + return ( +
+
+
+ {toolName} +
+
+ {visible ? "..." : "\u2713"} +
+
+
+ ); +} + +// --------------------------------------------------------------------------- +// Single tmux pane +// --------------------------------------------------------------------------- + +function MuxPane({ + group, + index, + label, + isFocused, + isZoomed, + onClickTitle, +}: { + group: SubagentGroup; + index: number; + label: string; + isFocused: boolean; + isZoomed: boolean; + onClickTitle: () => void; +}) { + const bodyRef = useRef(null); + const stickRef = useRef(true); + const color = colorForIndex(index); + const pct = group.contextUsage?.usagePct ?? 0; + + const streamMsgs = group.messages.filter((m) => m.type !== "tool_status"); + const latestContent = last(streamMsgs)?.content ?? ""; + const msgCount = streamMsgs.length; + + // Detect active tool and finished state from latest tool_status + const latestTool = last( + group.messages.filter((m) => m.type === "tool_status") + ); + let activeToolName = ""; + let toolRunning = false; + let isFinished = false; + if (latestTool) { + try { + const parsed = JSON.parse(latestTool.content); + const tools: { name: string; done: boolean }[] = parsed.tools || []; + const allDone = parsed.allDone as boolean | undefined; + const running = tools.find((t) => !t.done); + if (running) { + activeToolName = running.name; + toolRunning = true; + } + // Finished when all tools are done and one of them is set_output + // or report_to_parent (terminal tool calls) + if (allDone && tools.length > 0) { + const hasTerminal = tools.some( + (t) => + t.done && + (t.name === "set_output" || t.name === "report_to_parent") + ); + if (hasTerminal) isFinished = true; + } + } catch { + /* ignore */ + } + } + + // Auto-scroll + useEffect(() => { + if (stickRef.current && bodyRef.current) { + bodyRef.current.scrollTop = bodyRef.current.scrollHeight; + } + }, [latestContent]); + + const handleScroll = () => { + const el = bodyRef.current; + if (!el) return; + stickRef.current = el.scrollHeight - el.scrollTop - el.clientHeight < 30; + }; + + return ( +
+ {/* Title bar */} +
+ {isFinished ? ( + + ) : ( +
+ )} + + {label} + + + + {msgCount} + +
+
= 80 ? "hsl(0,65%,55%)" : pct >= 50 ? "hsl(35,90%,55%)" : color, + }} + /> +
+ + {pct}% + +
+ + {/* Body */} +
+ {latestContent ? ( +
+ +
+ ) : ( + waiting... + )} + {/* Blinking cursor — hidden when finished */} + {!isFinished && ( + + )} +
+ + {/* Tool overlay */} + +
+ ); +} + +// --------------------------------------------------------------------------- +// Main component +// --------------------------------------------------------------------------- + +const ParallelSubagentBubble = memo( + function ParallelSubagentBubble({ groups }: ParallelSubagentBubbleProps) { + const [expanded, setExpanded] = useState(false); + const [zoomedIdx, setZoomedIdx] = useState(null); + + // Labels with instance numbers for duplicates + const labels: string[] = (() => { + const countByBase = new Map(); + const bases = groups.map((g) => subagentLabel(g.nodeId)); + for (const b of bases) + countByBase.set(b, (countByBase.get(b) ?? 0) + 1); + const idxByBase = new Map(); + return bases.map((b) => { + if ((countByBase.get(b) ?? 1) <= 1) return b; + const idx = (idxByBase.get(b) ?? 0) + 1; + idxByBase.set(b, idx); + return `${b} #${idx}`; + }); + })(); + + // Latest-active pane + const latestIdx = groups.reduce((best, g, i) => { + const filtered = g.messages.filter((m) => m.type !== "tool_status"); + const lm = last(filtered); + if (!lm) return best; + if (best < 0) return i; + const bm = last( + groups[best].messages.filter((m) => m.type !== "tool_status") + ); + if (!bm) return i; + return (lm.createdAt ?? 0) >= (bm.createdAt ?? 0) ? i : best; + }, -1); + + // Per-group finished detection (same logic as MuxPane) + const finishedFlags = groups.map((g) => { + const lt = last(g.messages.filter((m) => m.type === "tool_status")); + if (!lt) return false; + try { + const p = JSON.parse(lt.content); + const tools: { name: string; done: boolean }[] = p.tools || []; + if (!p.allDone || tools.length === 0) return false; + return tools.some( + (t) => t.done && (t.name === "set_output" || t.name === "report_to_parent") + ); + } catch { return false; } + }); + const activeCount = finishedFlags.filter((f) => !f).length; + + if (groups.length === 0) return null; + + // Grid sizing: 2 columns, auto rows capped at a fixed height + const rows = Math.ceil(groups.length / 2); + const gridHeight = expanded + ? Math.min(rows * 200, 480) + : Math.min(rows * 100, 240); + + return ( +
+ {/* Left icon */} +
+ +
+ +
+ {/* Header */} +
+ + {groups.length === 1 ? "Sub-agent" : "Parallel Agents"} + + + {activeCount > 0 ? `${activeCount} running` : `${groups.length} done`} + + +
+ + {/* Mux frame */} +
+ {/* Grid */} +
+ {groups.map((group, i) => ( + + setZoomedIdx(zoomedIdx === i ? null : i) + } + /> + ))} +
+
+
+
+ ); + }, + (prev, next) => + prev.groupId === next.groupId && + prev.groups.length === next.groups.length && + prev.groups.every( + (g, i) => + g.nodeId === next.groups[i].nodeId && + g.messages.length === next.groups[i].messages.length && + last(g.messages)?.content === last(next.groups[i].messages)?.content && + g.contextUsage?.usagePct === next.groups[i].contextUsage?.usagePct + ) +); + +export default ParallelSubagentBubble; + +// Injected as a global style (keyframes can't be inline) +if (typeof document !== "undefined") { + const id = "parallel-subagent-keyframes"; + if (!document.getElementById(id)) { + const style = document.createElement("style"); + style.id = id; + style.textContent = ` + @keyframes cursorBlink { 0%, 100% { opacity: 1; } 50% { opacity: 0; } } + @keyframes thermoPulse { 0%, 100% { opacity: 1; } 50% { opacity: 0.4; } } + `; + document.head.appendChild(style); + } +} diff --git a/core/frontend/src/lib/chat-helpers.ts b/core/frontend/src/lib/chat-helpers.ts index dbccbe7a..9a332153 100644 --- a/core/frontend/src/lib/chat-helpers.ts +++ b/core/frontend/src/lib/chat-helpers.ts @@ -72,6 +72,8 @@ export function sseEventToChatMessage( role: "worker", thread, createdAt, + nodeId: event.node_id || undefined, + executionId: event.execution_id || undefined, }; } @@ -110,6 +112,8 @@ export function sseEventToChatMessage( role: "worker", thread, createdAt, + nodeId: event.node_id || undefined, + executionId: event.execution_id || undefined, }; } diff --git a/core/frontend/src/pages/workspace.tsx b/core/frontend/src/pages/workspace.tsx index 19092558..2554f90a 100644 --- a/core/frontend/src/pages/workspace.tsx +++ b/core/frontend/src/pages/workspace.tsx @@ -2011,6 +2011,8 @@ export default function Workspace() { role, thread: agentType, createdAt: eventCreatedAt, + nodeId: event.node_id || undefined, + executionId: event.execution_id || undefined, }); return { ...prev, @@ -2082,6 +2084,8 @@ export default function Workspace() { role, thread: agentType, createdAt: eventCreatedAt, + nodeId: event.node_id || undefined, + executionId: event.execution_id || undefined, }); return { ...prev, diff --git a/tools/src/aden_tools/tools/google_sheets_tool/google_sheets_tool.py b/tools/src/aden_tools/tools/google_sheets_tool/google_sheets_tool.py index 4435b5db..aa09aea2 100644 --- a/tools/src/aden_tools/tools/google_sheets_tool/google_sheets_tool.py +++ b/tools/src/aden_tools/tools/google_sheets_tool/google_sheets_tool.py @@ -391,7 +391,7 @@ def register_tools( def google_sheets_update_values( spreadsheet_id: str, range_name: str, - values: list[list[Any]], + values: list[list[Any]] | str, value_input_option: str = "USER_ENTERED", # Tracking parameters (injected by framework, ignored by tool) workspace_id: str | None = None, @@ -405,16 +405,29 @@ def register_tools( Args: spreadsheet_id: The spreadsheet ID (from the URL) range_name: The A1 notation range (e.g., "Sheet1!A1:B10") - values: 2D array of values to write + values: 2D array of values to write. Accepts a list or a JSON string. value_input_option: How to interpret input (USER_ENTERED parses, RAW stores as-is) Returns: Dict with update result or error """ + # Credentials check first so missing-creds errors aren't masked client = _get_client() if isinstance(client, dict): return client + # Accept stringified JSON and deserialize + import json + + if isinstance(values, str): + try: + values = json.loads(values) + except (json.JSONDecodeError, ValueError): + return {"error": "values is not valid JSON"} + if not isinstance(values, list): + return { + "error": f"values must be a 2D list or JSON string, got {type(values).__name__}" + } try: return client.update_values(spreadsheet_id, range_name, values, value_input_option) except httpx.TimeoutException: @@ -426,7 +439,7 @@ def register_tools( def google_sheets_append_values( spreadsheet_id: str, range_name: str, - values: list[list[Any]], + values: list[list[Any]] | str, value_input_option: str = "USER_ENTERED", # Tracking parameters (injected by framework, ignored by tool) workspace_id: str | None = None, @@ -440,16 +453,29 @@ def register_tools( Args: spreadsheet_id: The spreadsheet ID (from the URL) range_name: The A1 notation range (e.g., "Sheet1!A1") - values: 2D array of values to append + values: 2D array of values to append. Accepts a list or a JSON string. value_input_option: How to interpret input (USER_ENTERED parses, RAW stores as-is) Returns: Dict with append result or error """ + # Credentials check first so missing-creds errors aren't masked client = _get_client() if isinstance(client, dict): return client + # Accept stringified JSON and deserialize + import json + + if isinstance(values, str): + try: + values = json.loads(values) + except (json.JSONDecodeError, ValueError): + return {"error": "values is not valid JSON"} + if not isinstance(values, list): + return { + "error": f"values must be a 2D list or JSON string, got {type(values).__name__}" + } try: return client.append_values(spreadsheet_id, range_name, values, value_input_option) except httpx.TimeoutException: