remove subgraph, persistent tabs, node action plan

This commit is contained in:
bryan
2026-02-24 12:42:07 -08:00
parent 8381c95617
commit c44b12cc8b
14 changed files with 305 additions and 556 deletions
+90 -9
View File
@@ -207,6 +207,8 @@ class EventLoopNode(NodeProtocol):
self._input_ready = asyncio.Event()
self._awaiting_input = False
self._shutdown = False
# Track which nodes already have an action plan emitted (skip on revisit)
self._action_plan_emitted: set[str] = set()
def validate_input(self, ctx: NodeContext) -> list[str]:
"""Validate hard requirements only.
@@ -372,6 +374,16 @@ class EventLoopNode(NodeProtocol):
# 4. Publish loop started
await self._publish_loop_started(stream_id, node_id, execution_id)
# 4b. Fire-and-forget action plan generation (once per node per lifetime)
if (
start_iteration == 0
and ctx.llm
and self._event_bus
and node_id not in self._action_plan_emitted
):
self._action_plan_emitted.add(node_id)
asyncio.create_task(self._generate_action_plan(ctx, stream_id, node_id, execution_id))
# 5. Stall / doom loop detection state (restored from cursor if resuming)
recent_responses: list[str] = _restored_recent_responses
recent_tool_fingerprints: list[list[tuple[str, str]]] = _restored_tool_fingerprints
@@ -562,7 +574,9 @@ class EventLoopNode(NodeProtocol):
node_id,
iteration,
)
await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id)
await self._publish_loop_completed(
stream_id, node_id, iteration + 1, execution_id
)
latency_ms = int((time.time() - start_time) * 1000)
return NodeResult(
success=True,
@@ -713,7 +727,9 @@ class EventLoopNode(NodeProtocol):
if _cf_block:
if self._shutdown:
await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id)
await self._publish_loop_completed(
stream_id, node_id, iteration + 1, execution_id
)
latency_ms = int((time.time() - start_time) * 1000)
_continue_count += 1
if ctx.runtime_logger:
@@ -763,7 +779,9 @@ class EventLoopNode(NodeProtocol):
got_input = await self._await_user_input(ctx)
logger.info("[%s] iter=%d: unblocked, got_input=%s", node_id, iteration, got_input)
if not got_input:
await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id)
await self._publish_loop_completed(
stream_id, node_id, iteration + 1, execution_id
)
latency_ms = int((time.time() - start_time) * 1000)
_continue_count += 1
if ctx.runtime_logger:
@@ -1031,7 +1049,9 @@ class EventLoopNode(NodeProtocol):
continue
# 7. Max iterations exhausted
await self._publish_loop_completed(stream_id, node_id, self._config.max_iterations, execution_id)
await self._publish_loop_completed(
stream_id, node_id, self._config.max_iterations, execution_id
)
latency_ms = int((time.time() - start_time) * 1000)
if ctx.runtime_logger:
ctx.runtime_logger.log_node_complete(
@@ -1199,7 +1219,11 @@ class EventLoopNode(NodeProtocol):
if isinstance(event, TextDeltaEvent):
accumulated_text = event.snapshot
await self._publish_text_delta(
stream_id, node_id, event.content, event.snapshot, ctx,
stream_id,
node_id,
event.content,
event.snapshot,
ctx,
execution_id,
)
@@ -1284,7 +1308,11 @@ class EventLoopNode(NodeProtocol):
executed_in_batch += 1
await self._publish_tool_started(
stream_id, node_id, tc.tool_use_id, tc.tool_name, tc.tool_input,
stream_id,
node_id,
tc.tool_use_id,
tc.tool_name,
tc.tool_input,
execution_id,
)
logger.info(
@@ -2526,7 +2554,9 @@ class EventLoopNode(NodeProtocol):
# EventBus publishing helpers
# -------------------------------------------------------------------
async def _publish_loop_started(self, stream_id: str, node_id: str, execution_id: str = "") -> None:
async def _publish_loop_started(
self, stream_id: str, node_id: str, execution_id: str = ""
) -> None:
if self._event_bus:
await self._event_bus.emit_node_loop_started(
stream_id=stream_id,
@@ -2535,7 +2565,56 @@ class EventLoopNode(NodeProtocol):
execution_id=execution_id,
)
async def _publish_iteration(self, stream_id: str, node_id: str, iteration: int, execution_id: str = "") -> None:
async def _generate_action_plan(
self,
ctx: NodeContext,
stream_id: str,
node_id: str,
execution_id: str,
) -> None:
"""Generate a brief action plan via LLM and emit it as an SSE event.
Runs as a fire-and-forget task so it never blocks the main loop.
"""
try:
system_prompt = ctx.node_spec.system_prompt or ""
# Trim to keep the prompt small
prompt_summary = system_prompt[:500]
if len(system_prompt) > 500:
prompt_summary += "..."
tool_names = [t.name for t in ctx.available_tools]
output_keys = ctx.node_spec.output_keys or []
prompt = (
f'You are about to work on a task as node "{node_id}".\n\n'
f"System prompt:\n{prompt_summary}\n\n"
f"Tools available: {tool_names}\n"
f"Required outputs: {output_keys}\n\n"
f"Write a brief action plan (2-5 bullet points) describing "
f"what you will do to complete this task. Be specific and concise.\n"
f"Return ONLY the plan text, no preamble."
)
response = await ctx.llm.acomplete(
messages=[{"role": "user", "content": prompt}],
max_tokens=300,
)
plan = response.content.strip()
if plan and self._event_bus:
await self._event_bus.emit_node_action_plan(
stream_id=stream_id,
node_id=node_id,
plan=plan,
execution_id=execution_id,
)
except Exception as e:
logger.warning("Action plan generation failed for node '%s': %s", node_id, e)
async def _publish_iteration(
self, stream_id: str, node_id: str, iteration: int, execution_id: str = ""
) -> None:
if self._event_bus:
await self._event_bus.emit_node_loop_iteration(
stream_id=stream_id,
@@ -2544,7 +2623,9 @@ class EventLoopNode(NodeProtocol):
execution_id=execution_id,
)
async def _publish_loop_completed(self, stream_id: str, node_id: str, iterations: int, execution_id: str = "") -> None:
async def _publish_loop_completed(
self, stream_id: str, node_id: str, iterations: int, execution_id: str = ""
) -> None:
if self._event_bus:
await self._event_bus.emit_node_loop_completed(
stream_id=stream_id,
+10 -4
View File
@@ -771,7 +771,8 @@ class GraphExecutor:
# Emit node-started event (skip event_loop nodes — they emit their own)
if self._event_bus and node_spec.node_type != "event_loop":
await self._event_bus.emit_node_loop_started(
stream_id=self._stream_id, node_id=current_node_id,
stream_id=self._stream_id,
node_id=current_node_id,
execution_id=self._execution_id,
)
@@ -782,7 +783,9 @@ class GraphExecutor:
# Emit node-completed event (skip event_loop nodes)
if self._event_bus and node_spec.node_type != "event_loop":
await self._event_bus.emit_node_loop_completed(
stream_id=self._stream_id, node_id=current_node_id, iterations=1,
stream_id=self._stream_id,
node_id=current_node_id,
iterations=1,
execution_id=self._execution_id,
)
@@ -1945,7 +1948,8 @@ class GraphExecutor:
# Emit node-started event (skip event_loop nodes)
if self._event_bus and node_spec.node_type != "event_loop":
await self._event_bus.emit_node_loop_started(
stream_id=self._stream_id, node_id=branch.node_id,
stream_id=self._stream_id,
node_id=branch.node_id,
execution_id=self._execution_id,
)
@@ -1970,7 +1974,9 @@ class GraphExecutor:
# Emit node-completed event (skip event_loop nodes)
if self._event_bus and node_spec.node_type != "event_loop":
await self._event_bus.emit_node_loop_completed(
stream_id=self._stream_id, node_id=branch.node_id, iterations=1,
stream_id=self._stream_id,
node_id=branch.node_id,
iterations=1,
execution_id=self._execution_id,
)
-7
View File
@@ -252,13 +252,6 @@ class NodeSpec(BaseModel):
),
)
# Visualization: extracted workflow steps for the frontend DAG renderer
subgraph_steps: list[dict[str, Any]] | None = Field(
default=None,
description="Extracted workflow steps for visualization. "
"Generated at agent load time from system_prompt.",
)
model_config = {"extra": "allow", "arbitrary_types_allowed": True}
+17 -13
View File
@@ -1330,14 +1330,16 @@ class AgentRuntime:
active = stream.active_execution_ids
if not active:
continue
result.append({
"graph_id": graph_id,
"stream_id": stream.stream_id,
"entry_point_id": ep_id,
"active_execution_ids": active,
"is_awaiting_input": stream.is_awaiting_input,
"waiting_nodes": stream.get_waiting_nodes(),
})
result.append(
{
"graph_id": graph_id,
"stream_id": stream.stream_id,
"entry_point_id": ep_id,
"active_execution_ids": active,
"is_awaiting_input": stream.is_awaiting_input,
"waiting_nodes": stream.get_waiting_nodes(),
}
)
return result
def get_waiting_nodes(self) -> list[dict[str, Any]]:
@@ -1350,11 +1352,13 @@ class AgentRuntime:
for graph_id, reg in self._graphs.items():
for _ep_id, stream in reg.streams.items():
for waiting in stream.get_waiting_nodes():
result.append({
"graph_id": graph_id,
"stream_id": stream.stream_id,
**waiting,
})
result.append(
{
"graph_id": graph_id,
"stream_id": stream.stream_id,
**waiting,
}
)
return result
# === PROPERTIES ===
+19
View File
@@ -83,6 +83,7 @@ class EventType(StrEnum):
NODE_LOOP_STARTED = "node_loop_started"
NODE_LOOP_ITERATION = "node_loop_iteration"
NODE_LOOP_COMPLETED = "node_loop_completed"
NODE_ACTION_PLAN = "node_action_plan"
# LLM streaming observability
LLM_TEXT_DELTA = "llm_text_delta"
@@ -533,6 +534,24 @@ class EventBus:
)
)
async def emit_node_action_plan(
self,
stream_id: str,
node_id: str,
plan: str,
execution_id: str | None = None,
) -> None:
"""Emit node action plan event."""
await self.publish(
AgentEvent(
type=EventType.NODE_ACTION_PLAN,
stream_id=stream_id,
node_id=node_id,
execution_id=execution_id,
data={"plan": plan},
)
)
# === LLM STREAMING PUBLISHERS ===
async def emit_llm_text_delta(
-78
View File
@@ -14,84 +14,6 @@ from typing import Any
logger = logging.getLogger(__name__)
async def _extract_subgraph_steps(nodes: list, llm: Any) -> None:
"""Extract workflow steps from system prompts for frontend visualization.
Called once during agent load. Iterates event_loop nodes with system prompts,
asks the LLM to decompose each prompt into a DAG of steps, and stores the
result on node.subgraph_steps. Non-critical failures are logged and skipped.
"""
candidates = [
n for n in nodes if n.node_type == "event_loop" and n.system_prompt and not n.subgraph_steps
]
if not candidates:
return
for node in candidates:
try:
prompt = (
f"Analyze this system prompt for an AI agent node "
f"and extract the workflow steps.\n\n"
f"The node has these tools available: {json.dumps(node.tools)}\n"
f"The node reads these inputs: {json.dumps(node.input_keys)}\n"
f"The node must produce these outputs: {json.dumps(node.output_keys)}\n\n"
f"System prompt:\n---\n{node.system_prompt}\n---\n\n"
f"Extract a JSON array of workflow steps. For each step:\n"
f'- "id": short snake_case identifier\n'
f'- "label": human-readable description (5-10 words)\n'
f'- "tool": the tool name this step uses, or null for reasoning/decision steps\n'
f'- "depends_on": list of step ids that must complete before this one starts\n'
f'- "type": "action" (does work), "decision" '
f'(branches/loops), "loop" (repeats), or '
f'"output" (sets output)\n\n'
f"IMPORTANT:\n"
f"- Look for parallelism: if multiple tools can run "
f"independently after the same step, "
f"give them the SAME depends_on — this creates fan-out\n"
f"- Look for convergence: if a step needs results from multiple prior steps, "
f"list ALL of them in depends_on — this creates fan-in\n"
f"- Look for loops: if the prompt says 'repeat', 'go back to', 'if more then...', "
f"model it as a decision step\n"
f"- Do NOT make a simple linear chain unless the "
f"prompt truly describes a strictly sequential "
f"process\n\n"
f"Return ONLY a JSON array of step objects. No explanation."
)
response = await llm.acomplete(
messages=[{"role": "user", "content": prompt}],
max_tokens=4096,
json_mode=True,
)
# Parse the JSON array from the response
text = response.content.strip()
# Handle responses wrapped in {"steps": [...]} or just [...]
parsed = json.loads(text)
if isinstance(parsed, dict) and "steps" in parsed:
steps = parsed["steps"]
elif isinstance(parsed, list):
steps = parsed
else:
logger.warning(f"Subgraph extraction for '{node.id}': unexpected format")
continue
# Basic validation
if not isinstance(steps, list) or not all(
isinstance(s, dict) and s.get("id") and s.get("label") and "depends_on" in s
for s in steps
):
logger.warning(f"Subgraph extraction for '{node.id}': invalid step structure")
continue
node.subgraph_steps = steps
logger.info(f"Extracted {len(steps)} subgraph steps for node '{node.id}'")
except Exception as e:
logger.warning(f"Subgraph extraction failed for node '{node.id}': {e}")
continue
@dataclass
class AgentSlot:
"""A loaded agent with its runtime resources."""
+1
View File
@@ -25,6 +25,7 @@ DEFAULT_EVENT_TYPES = [
EventType.NODE_LOOP_STARTED,
EventType.NODE_LOOP_ITERATION,
EventType.NODE_LOOP_COMPLETED,
EventType.NODE_ACTION_PLAN,
EventType.EDGE_TRAVERSED,
EventType.GOAL_PROGRESS,
EventType.QUEEN_INTERVENTION_REQUESTED,
-1
View File
@@ -40,7 +40,6 @@ def _node_to_dict(node) -> dict:
"client_facing": node.client_facing,
"success_criteria": node.success_criteria,
"system_prompt": node.system_prompt or "",
"subgraph_steps": node.subgraph_steps or [],
}
@@ -228,11 +228,13 @@ def register_queen_lifecycle_tools(
target_node_id = injectable[0]["node_id"]
ok = await stream.inject_input(target_node_id, content)
if ok:
return json.dumps({
"status": "delivered",
"node_id": target_node_id,
"content_preview": content[:100],
})
return json.dumps(
{
"status": "delivered",
"node_id": target_node_id,
"content_preview": content[:100],
}
)
return json.dumps(
{
+1 -11
View File
@@ -146,7 +146,6 @@ export interface NodeSpec {
client_facing: boolean;
success_criteria: string | null;
system_prompt: string;
subgraph_steps?: SubgraphStep[];
// Runtime enrichment (when session_id provided)
visit_count?: number;
has_failures?: boolean;
@@ -190,16 +189,6 @@ export interface NodeCriteria {
};
}
// --- Subgraph visualization types ---
export interface SubgraphStep {
id: string;
label: string;
tool: string | null;
depends_on: string[];
type: "action" | "decision" | "loop" | "output";
}
// --- Tool info types ---
export interface ToolInfo {
@@ -250,6 +239,7 @@ export type EventTypeName =
| "node_loop_started"
| "node_loop_iteration"
| "node_loop_completed"
| "node_action_plan"
| "llm_text_delta"
| "llm_reasoning_delta"
| "tool_call_started"
@@ -1,291 +0,0 @@
/**
* ExecutionSubGraph — renders a DAG of workflow steps extracted from a node's
* system prompt. Replaces the hardcoded SubGraph SVG for real (loaded) agents.
*
* Layout algorithm: longest-path layer assignment (same approach as AgentGraph).
* Rendering: shared SVG primitives from sgPrimitives.ts, matching SubGraph's
* visual style for consistency.
*/
import { useMemo } from "react";
import type { SubgraphStep } from "../api/types";
import type { NodeStatus } from "./AgentGraph";
import { sgPort, computeEdgePath, computeArrowhead } from "./sgPrimitives";
import type { PortSide, Rect } from "./sgPrimitives";
// Layout constants — smaller than AgentGraph (this is an intra-node detail view)
const NODE_W = 130;
const NODE_H = 34;
const TOOL_W = 118;
const TOOL_H = 28;
const GAP_X = 16;
const GAP_Y = 60;
const TOP_Y = 10;
const SVG_W = 440;
const MARGIN_X = 16;
const statusColors: Record<NodeStatus, string> = {
running: "hsl(45,95%,58%)",
looping: "hsl(38,90%,55%)",
complete: "hsl(43,70%,45%)",
pending: "hsl(220,15%,45%)",
error: "hsl(0,65%,55%)",
};
type StepType = SubgraphStep["type"];
interface LayoutNode {
step: SubgraphStep;
layer: number;
col: number;
pos: Rect;
}
interface LayoutEdge {
fromId: string;
toId: string;
fromPort: PortSide;
toPort: PortSide;
}
function computeLayout(steps: SubgraphStep[]): {
nodes: LayoutNode[];
edges: LayoutEdge[];
svgH: number;
} {
if (steps.length === 0) return { nodes: [], edges: [], svgH: 0 };
const idxMap = new Map<string, number>();
steps.forEach((s, i) => idxMap.set(s.id, i));
// Build parent adjacency from depends_on
const parents = new Map<number, number[]>();
steps.forEach((_, i) => parents.set(i, []));
steps.forEach((s, i) => {
for (const dep of s.depends_on) {
const pi = idxMap.get(dep);
if (pi !== undefined) {
parents.get(i)!.push(pi);
}
}
});
// Longest-path layer assignment
const layers = new Array(steps.length).fill(0);
for (let i = 0; i < steps.length; i++) {
const pars = parents.get(i) || [];
if (pars.length > 0) {
layers[i] = Math.max(...pars.map((p) => layers[p])) + 1;
}
}
// Group by layer
const layerGroups = new Map<number, number[]>();
layers.forEach((l, i) => {
const group = layerGroups.get(l) || [];
group.push(i);
layerGroups.set(l, group);
});
// Column assignment (centered, ordered by average parent column)
let maxCols = 1;
layerGroups.forEach((group) => {
maxCols = Math.max(maxCols, group.length);
});
const cols = new Array(steps.length).fill(0);
layerGroups.forEach((group) => {
if (group.length === 1) {
cols[group[0]] = (maxCols - 1) / 2;
} else {
const sorted = [...group].sort((a, b) => {
const aP = parents.get(a) || [];
const bP = parents.get(b) || [];
const aAvg = aP.length > 0 ? aP.reduce((s, p) => s + cols[p], 0) / aP.length : 0;
const bAvg = bP.length > 0 ? bP.reduce((s, p) => s + cols[p], 0) / bP.length : 0;
return aAvg - bAvg;
});
const offset = (maxCols - group.length) / 2;
sorted.forEach((nodeIdx, j) => {
cols[nodeIdx] = offset + j;
});
}
});
// Compute positions
const usableW = SVG_W - MARGIN_X * 2;
const colSpacing = maxCols > 1 ? Math.min(NODE_W + GAP_X, usableW / maxCols) : 0;
const totalW = maxCols > 1 ? (maxCols - 1) * colSpacing + NODE_W : NODE_W;
const firstColX = MARGIN_X + (usableW - totalW) / 2;
const layoutNodes: LayoutNode[] = steps.map((step, i) => {
const isTool = step.type === "action" && step.tool !== null;
const w = isTool ? TOOL_W : NODE_W;
const h = isTool ? TOOL_H : NODE_H;
const x = firstColX + cols[i] * colSpacing + (NODE_W - w) / 2;
const y = TOP_Y + layers[i] * (NODE_H + GAP_Y) + (NODE_H - h) / 2;
return { step, layer: layers[i], col: cols[i], pos: { x, y, w, h } };
});
// Build edges from depends_on
const layoutEdges: LayoutEdge[] = [];
steps.forEach((step, i) => {
for (const dep of step.depends_on) {
const pi = idxMap.get(dep);
if (pi === undefined) continue;
const fromLayer = layers[pi];
const toLayer = layers[i];
const fromCol = cols[pi];
const toCol = cols[i];
let fromPort: PortSide = "bottom";
let toPort: PortSide = "top";
// Same layer = horizontal edge
if (fromLayer === toLayer) {
fromPort = fromCol < toCol ? "right" : "left";
toPort = fromCol < toCol ? "left" : "right";
}
// Back edge (loop)
if (toLayer <= fromLayer) {
fromPort = "left";
toPort = "left";
}
layoutEdges.push({ fromId: dep, toId: step.id, fromPort, toPort });
}
});
const maxLayer = Math.max(...layers);
const svgH = TOP_Y + (maxLayer + 1) * (NODE_H + GAP_Y) + 10;
return { nodes: layoutNodes, edges: layoutEdges, svgH };
}
interface ExecutionSubGraphProps {
steps: SubgraphStep[];
status: NodeStatus;
}
export default function ExecutionSubGraph({ steps, status }: ExecutionSubGraphProps) {
const color = statusColors[status];
const { nodes, edges, svgH } = useMemo(() => computeLayout(steps), [steps]);
const posMap = useMemo(() => {
const m: Record<string, Rect> = {};
nodes.forEach((n) => { m[n.step.id] = n.pos; });
return m;
}, [nodes]);
if (nodes.length === 0) {
return (
<div className="flex items-center justify-center py-8">
<p className="text-xs text-muted-foreground/60 italic">No workflow steps available</p>
</div>
);
}
const renderEdge = (edge: LayoutEdge, i: number) => {
const fp = posMap[edge.fromId];
const tp = posMap[edge.toId];
if (!fp || !tp) return null;
const [fx, fy] = sgPort(fp, edge.fromPort);
const [tx, ty] = sgPort(tp, edge.toPort);
const isBackEdge = edge.fromPort === "left" && edge.toPort === "left";
const stroke = isBackEdge ? "hsl(35,15%,25%)" : "hsl(35,10%,22%)";
const d = computeEdgePath(edge.fromPort, edge.toPort, fx, fy, tx, ty);
const arrowPoints = computeArrowhead(edge.toPort, tx, ty);
return (
<g key={`e-${i}`}>
<path
d={d}
fill="none"
stroke={stroke}
strokeWidth={1.5}
strokeDasharray={isBackEdge ? "4 3" : "none"}
/>
<polygon points={arrowPoints} fill="hsl(35,10%,26%)" />
</g>
);
};
const renderNode = (ln: LayoutNode) => {
const { step, pos } = ln;
const isDecision = step.type === "decision" || step.type === "loop";
const isOutput = step.type === "output";
const hasTool = step.tool !== null;
// Visual styling matching SubGraph conventions
const bgFill = hasTool
? "hsl(220,15%,10%)"
: isOutput
? "hsl(35,15%,12%)"
: "hsl(35,10%,11%)";
const borderStroke = hasTool
? "hsl(220,25%,28%)"
: isDecision
? "hsl(35,15%,26%)"
: isOutput
? `${color}30`
: "hsl(35,10%,19%)";
const textFill = hasTool
? "hsl(220,25%,60%)"
: "hsl(35,10%,42%)";
const rx = hasTool ? 5 : isDecision ? 8 : 7;
// Icon prefix
const icon = hasTool ? "\u2699" : isDecision ? "\u25C7" : isOutput ? "\u25B8" : "\u25CB";
const iconColor = hasTool ? "hsl(220,25%,55%)" : isDecision ? "hsl(35,30%,30%)" : isOutput ? `${color}80` : "hsl(35,10%,28%)";
return (
<g key={step.id}>
<rect
x={pos.x} y={pos.y} width={pos.w} height={pos.h}
rx={rx}
fill={bgFill}
stroke={borderStroke}
strokeWidth={hasTool ? 1 : 1.2}
strokeDasharray={isDecision ? "3 2" : "none"}
/>
<text
x={pos.x + 10} y={pos.y + pos.h / 2}
fontSize={hasTool ? 8 : 9}
dominantBaseline="middle"
fill={iconColor}
>
{icon}
</text>
<text
x={pos.x + (hasTool ? 20 : 23)} y={pos.y + pos.h / 2}
fill={textFill}
fontSize={hasTool ? 9.5 : 10.5}
fontWeight={400}
dominantBaseline="middle"
style={{ fontFamily: "'Inter', system-ui, sans-serif" }}
>
{step.label}
</text>
</g>
);
};
return (
<div className="w-full">
<svg
width="100%" height={svgH}
viewBox={`0 0 ${SVG_W} ${svgH}`}
preserveAspectRatio="xMidYMid meet"
className="select-none"
style={{ display: "block" }}
>
{edges.map((e, i) => renderEdge(e, i))}
{nodes.map((n) => renderNode(n))}
</svg>
</div>
);
}
@@ -4,7 +4,7 @@ import type { GraphNode, NodeStatus } from "./AgentGraph";
import type { NodeSpec, ToolInfo, NodeCriteria } from "../api/types";
import { graphsApi } from "../api/graphs";
import { logsApi } from "../api/logs";
import ExecutionSubGraph from "./ExecutionSubGraph";
import MarkdownContent from "./MarkdownContent";
interface Tool {
name: string;
@@ -27,6 +27,7 @@ interface NodeDetailPanelProps {
graphId?: string;
sessionId?: string | null;
nodeLogs?: string[];
actionPlan?: string;
onClose: () => void;
}
@@ -212,7 +213,7 @@ const tabs: { id: Tab; label: string; Icon: React.FC<{ className?: string }> }[]
{ id: "subagents", label: "Subagents", Icon: ({ className }) => <Bot className={className} /> },
];
export default function NodeDetailPanel({ node, nodeSpec, agentId, graphId, sessionId, nodeLogs, onClose }: NodeDetailPanelProps) {
export default function NodeDetailPanel({ node, nodeSpec, agentId, graphId, sessionId, nodeLogs, actionPlan, onClose }: NodeDetailPanelProps) {
const [activeTab, setActiveTab] = useState<Tab>("overview");
const [realTools, setRealTools] = useState<ToolInfo[] | null>(null);
const [realCriteria, setRealCriteria] = useState<NodeCriteria | null>(null);
@@ -314,12 +315,14 @@ export default function NodeDetailPanel({ node, nodeSpec, agentId, graphId, sess
<div className="flex-1 overflow-auto px-4 py-4 flex flex-col gap-3">
{activeTab === "overview" && (
<>
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider">Internal Steps</p>
{nodeSpec?.subgraph_steps?.length ? (
<ExecutionSubGraph steps={nodeSpec.subgraph_steps} status={node.status} />
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider">Action Plan</p>
{actionPlan ? (
<div className="rounded-lg border border-border/30 bg-background/60 px-3 py-2.5 text-[11px] leading-relaxed text-foreground/80">
<MarkdownContent content={actionPlan} />
</div>
) : (
<div className="flex items-center justify-center py-6">
<p className="text-[11px] text-muted-foreground/50 italic">No workflow steps extracted</p>
<p className="text-[11px] text-muted-foreground/50 italic">Action plan will appear when node starts running</p>
</div>
)}
{(() => {
@@ -1,84 +0,0 @@
/**
* Shared SVG geometry primitives for SubGraph and ExecutionSubGraph.
*
* Extracted from NodeDetailPanel's inline SubGraph renderer so both
* demo-mode (hardcoded sgDefs) and real-mode (ExecutionSubGraph) can
* share the same port, edge path, and arrowhead math.
*/
export type PortSide = "top" | "bottom" | "left" | "right";
export interface Rect {
x: number;
y: number;
w: number;
h: number;
}
/** Return the [x, y] connection point for a given side of a rect. */
export function sgPort(pos: Rect, port: PortSide): [number, number] {
if (port === "top") return [pos.x + pos.w / 2, pos.y];
if (port === "bottom") return [pos.x + pos.w / 2, pos.y + pos.h];
if (port === "left") return [pos.x, pos.y + pos.h / 2];
return [pos.x + pos.w, pos.y + pos.h / 2]; // right
}
/**
* Compute an SVG path `d` attribute for an edge between two ports.
* Uses cubic Bézier curves with smart midpoint handling for 6 port
* combinations (right→left, bottom→top, bottom→left, bottom→right,
* right→top, fallback).
*/
export function computeEdgePath(
fromPort: PortSide,
toPort: PortSide,
fx: number,
fy: number,
tx: number,
ty: number,
): string {
if (fromPort === "right" && toPort === "left") {
const midX = (fx + tx) / 2;
return `M ${fx} ${fy} C ${midX} ${fy}, ${midX} ${ty}, ${tx} ${ty}`;
}
if (fromPort === "bottom" && toPort === "top") {
if (Math.abs(tx - fx) < 10) {
return `M ${fx} ${fy} L ${tx} ${ty}`;
}
const cY = fy + (ty - fy) * 0.5;
return `M ${fx} ${fy} C ${fx} ${cY}, ${tx} ${cY}, ${tx} ${ty}`;
}
if (fromPort === "bottom" && toPort === "left") {
return `M ${fx} ${fy} C ${fx} ${fy + 20}, ${tx - 20} ${ty}, ${tx} ${ty}`;
}
if (fromPort === "bottom" && toPort === "right") {
return `M ${fx} ${fy} C ${fx} ${fy + 20}, ${tx + 20} ${ty}, ${tx} ${ty}`;
}
if (fromPort === "right" && toPort === "top") {
return `M ${fx} ${fy} C ${fx + 20} ${fy}, ${tx} ${ty - 20}, ${tx} ${ty}`;
}
// Fallback
const cX = (fx + tx) / 2;
return `M ${fx} ${fy} C ${cX} ${fy}, ${cX} ${ty}, ${tx} ${ty}`;
}
/**
* Compute an SVG polygon `points` attribute for an arrowhead at the
* target port. Arrow size is configurable (default 4.5).
*/
export function computeArrowhead(
toPort: PortSide,
tx: number,
ty: number,
arrowSize = 4.5,
): string {
const A = arrowSize;
if (toPort === "top")
return `${tx - A},${ty + A * 1.4} ${tx + A},${ty + A * 1.4} ${tx},${ty + 1}`;
if (toPort === "left")
return `${tx + A * 1.4},${ty - A} ${tx + A * 1.4},${ty + A} ${tx + 1},${ty}`;
if (toPort === "right")
return `${tx - A * 1.4},${ty - A} ${tx - A * 1.4},${ty + A} ${tx - 1},${ty}`;
// bottom
return `${tx - A},${ty - A * 1.4} ${tx + A},${ty - A * 1.4} ${tx},${ty - 1}`;
}
+151 -47
View File
@@ -38,6 +38,35 @@ function createSession(agentType: string, label: string, existingCredentials?: C
};
}
// --- Tab persistence ---
const TAB_STORAGE_KEY = "hive:workspace-tabs";
interface PersistedTabState {
tabs: Array<{ id: string; agentType: string; label: string }>;
activeSessionByAgent: Record<string, string>;
activeWorker: string;
}
function loadPersistedTabs(): PersistedTabState | null {
try {
const raw = localStorage.getItem(TAB_STORAGE_KEY);
if (!raw) return null;
const parsed = JSON.parse(raw);
if (!Array.isArray(parsed.tabs) || parsed.tabs.length === 0) return null;
return parsed as PersistedTabState;
} catch {
return null;
}
}
function savePersistedTabs(state: PersistedTabState): void {
try {
localStorage.setItem(TAB_STORAGE_KEY, JSON.stringify(state));
} catch {
// localStorage full or unavailable — silently ignore
}
}
// --- NewTabPopover ---
type PopoverStep = "root" | "new-agent-choice" | "clone-pick";
@@ -198,10 +227,27 @@ export default function Workspace() {
const initialAgent = rawAgent;
const initialPrompt = searchParams.get("prompt") || "";
// Sessions grouped by agent type — only create one for the initial agent
// Sessions grouped by agent type — restore from localStorage if available
const [sessionsByAgent, setSessionsByAgent] = useState<Record<string, Session[]>>(() => {
const persisted = loadPersistedTabs();
const initial: Record<string, Session[]> = {};
// Restore persisted tabs as skeleton sessions (messages/graph come from backend)
if (persisted) {
for (const tab of persisted.tabs) {
if (!initial[tab.agentType]) initial[tab.agentType] = [];
const session = createSession(tab.agentType, tab.label);
session.id = tab.id; // preserve ID so activeSessionByAgent refs stay valid
initial[tab.agentType].push(session);
}
}
// Check if the URL-requested agent already has a tab (deduplication)
if (initial[initialAgent]?.length) {
return initial;
}
// No existing tab for this agent — create one
if (initialAgent === "new-agent") {
const session = createSession("new-agent", "New Agent");
session.messages = [
@@ -224,17 +270,27 @@ export default function Workspace() {
},
);
}
initial["new-agent"] = [session];
initial["new-agent"] = [...(initial["new-agent"] || []), session];
} else {
// Real agent: start empty, backend will populate via intro_message + session history
initial[initialAgent] = [createSession(initialAgent, formatAgentDisplayName(initialAgent))];
initial[initialAgent] = [...(initial[initialAgent] || []),
createSession(initialAgent, formatAgentDisplayName(initialAgent))];
}
return initial;
});
// Active session ID per agent type
// Active session ID per agent type — restore from localStorage if available
const [activeSessionByAgent, setActiveSessionByAgent] = useState<Record<string, string>>(() => {
const persisted = loadPersistedTabs();
if (persisted) {
const restored = { ...persisted.activeSessionByAgent };
// Ensure the URL agent has a valid active session mapping
const urlSessions = sessionsByAgent[initialAgent];
if (urlSessions?.length && !restored[initialAgent]) {
restored[initialAgent] = urlSessions[0].id;
}
return restored;
}
const sessions = sessionsByAgent[initialAgent];
return sessions ? { [initialAgent]: sessions[0].id } : {};
});
@@ -273,6 +329,8 @@ export default function Workspace() {
const [currentExecutionId, setCurrentExecutionId] = useState<string | null>(null);
// Per-node live log lines accumulated from SSE events
const [nodeLogs, setNodeLogs] = useState<Record<string, string[]>>({});
// Per-node action plans generated at runtime via SSE
const [nodeActionPlans, setNodeActionPlans] = useState<Record<string, string>>({});
// Resolved display name for the loaded agent (e.g. "Competitive Intel Agent")
const [agentDisplayName, setAgentDisplayName] = useState<string | null>(null);
// Graph context for NodeDetailPanel
@@ -294,6 +352,19 @@ export default function Workspace() {
});
}, [activeWorker]);
// Persist tab metadata to localStorage on every relevant change
useEffect(() => {
const tabs: PersistedTabState["tabs"] = [];
for (const sessions of Object.values(sessionsByAgent)) {
for (const s of sessions) {
tabs.push({ id: s.id, agentType: s.agentType, label: s.label });
}
}
if (tabs.length > 0) {
savePersistedTabs({ tabs, activeSessionByAgent, activeWorker });
}
}, [sessionsByAgent, activeSessionByAgent, activeWorker]);
const handleRun = useCallback(async () => {
if (!backendAgentId || !backendReady) return;
try {
@@ -783,6 +854,15 @@ export default function Workspace() {
}
break;
case "node_action_plan":
if (!isQueen && event.node_id) {
const plan = (event.data?.plan as string) || "";
if (plan.trim()) {
setNodeActionPlans(prev => ({ ...prev, [event.node_id!]: plan }));
}
}
break;
default:
break;
}
@@ -805,6 +885,21 @@ export default function Workspace() {
? { nodes: activeSession.graphNodes, title: agentDisplayName || formatAgentDisplayName(activeWorker) }
: { nodes: [] as GraphNode[], title: "" };
// Build a flat list of all agent-type tabs for the tab bar
const agentTabs = Object.entries(sessionsByAgent)
.filter(([, sessions]) => sessions.length > 0)
.map(([agentType, sessions]) => {
const activeId = activeSessionByAgent[agentType] || sessions[0]?.id;
const session = sessions.find(s => s.id === activeId) || sessions[0];
return {
agentType,
sessionId: session.id,
label: session.label,
isActive: agentType === activeWorker,
hasRunning: session.graphNodes.some(n => n.status === "running" || n.status === "looping"),
};
});
// --- handleSend: real backend call or mock fallback (Phase 6) ---
const handleSend = useCallback((text: string, thread: string) => {
if (!activeSession) return;
@@ -893,15 +988,28 @@ export default function Workspace() {
}
}, [activeWorker, activeSession, backendAgentId, backendReady]);
const closeSession = useCallback((sessionId: string) => {
const sessions = sessionsByAgent[activeWorker] || [];
if (sessions.length <= 1) return; // Don't close last tab
const filtered = sessions.filter(s => s.id !== sessionId);
setSessionsByAgent(prev => ({ ...prev, [activeWorker]: filtered }));
if (activeSessionId === sessionId) {
setActiveSessionByAgent(prev => ({ ...prev, [activeWorker]: filtered[0].id }));
const closeAgentTab = useCallback((agentType: string) => {
const allTypes = Object.keys(sessionsByAgent).filter(k => (sessionsByAgent[k] || []).length > 0);
if (allTypes.length <= 1) return; // Don't close the last tab
setSessionsByAgent(prev => {
const next = { ...prev };
delete next[agentType];
return next;
});
setActiveSessionByAgent(prev => {
const next = { ...prev };
delete next[agentType];
return next;
});
if (activeWorker === agentType) {
const remaining = allTypes.filter(k => k !== agentType);
if (remaining.length > 0) {
setActiveWorker(remaining[0]);
}
}
}, [activeWorker, sessionsByAgent, activeSessionId]);
}, [sessionsByAgent, activeWorker]);
// Create a new session for any agent type (used by NewTabPopover)
const addAgentSession = useCallback((agentType: string, agentLabel?: string, cloned = false) => {
@@ -952,41 +1060,36 @@ export default function Workspace() {
</button>
<span className="text-border text-xs flex-shrink-0">|</span>
{/* Instance tabs */}
{/* Agent tabs — one per agent type */}
<div className="flex items-center gap-0.5 min-w-0 overflow-x-auto scrollbar-hide">
{currentSessions.map((session) => {
const sessionIsActive = session.graphNodes.some(n => n.status === "running" || n.status === "looping");
return (
<button
key={session.id}
onClick={() => {
setActiveSessionByAgent(prev => ({ ...prev, [activeWorker]: session.id }));
// Open the first active/running node detail, or clear it
const activeNode = session.graphNodes.find(n => n.status === "running" || n.status === "looping") || session.graphNodes[0] || null;
setSelectedNode(activeNode);
}}
className={`group flex items-center gap-1.5 px-3 py-1.5 rounded-md text-xs font-medium transition-colors whitespace-nowrap flex-shrink-0 ${
session.id === activeSessionId
? "bg-primary/15 text-primary"
: "text-muted-foreground hover:text-foreground hover:bg-muted/50"
}`}
>
{sessionIsActive && (
<span className="relative flex h-1.5 w-1.5 flex-shrink-0">
<span className="animate-ping absolute inline-flex h-full w-full rounded-full bg-primary opacity-60" />
<span className="relative inline-flex rounded-full h-1.5 w-1.5 bg-primary" />
</span>
)}
<span>{session.label}</span>
{currentSessions.length > 1 && (
<X
className="w-3 h-3 opacity-0 group-hover:opacity-60 hover:!opacity-100 transition-opacity"
onClick={(e) => { e.stopPropagation(); closeSession(session.id); }}
/>
)}
</button>
);
})}
{agentTabs.map((tab) => (
<button
key={tab.agentType}
onClick={() => {
setActiveWorker(tab.agentType);
setActiveSessionByAgent(prev => ({ ...prev, [tab.agentType]: tab.sessionId }));
}}
className={`group flex items-center gap-1.5 px-3 py-1.5 rounded-md text-xs font-medium transition-colors whitespace-nowrap flex-shrink-0 ${
tab.isActive
? "bg-primary/15 text-primary"
: "text-muted-foreground hover:text-foreground hover:bg-muted/50"
}`}
>
{tab.hasRunning && (
<span className="relative flex h-1.5 w-1.5 flex-shrink-0">
<span className="animate-ping absolute inline-flex h-full w-full rounded-full bg-primary opacity-60" />
<span className="relative inline-flex rounded-full h-1.5 w-1.5 bg-primary" />
</span>
)}
<span>{tab.label}</span>
{agentTabs.length > 1 && (
<X
className="w-3 h-3 opacity-0 group-hover:opacity-60 hover:!opacity-100 transition-opacity"
onClick={(e) => { e.stopPropagation(); closeAgentTab(tab.agentType); }}
/>
)}
</button>
))}
<button
ref={newTabBtnRef}
onClick={() => setNewTabOpen(o => !o)}
@@ -1073,6 +1176,7 @@ export default function Workspace() {
graphId={backendGraphId || undefined}
sessionId={null}
nodeLogs={nodeLogs[selectedNode.id] || []}
actionPlan={nodeActionPlans[selectedNode.id]}
onClose={() => setSelectedNode(null)}
/>
</div>