From b826e70d8c0fb4f6105b67992b779f3ff5825a57 Mon Sep 17 00:00:00 2001 From: Richard Tang Date: Fri, 1 May 2026 14:07:34 -0700 Subject: [PATCH] feat: remove old lifecyle tools --- .../agents/queen/queen_tools_defaults.py | 5 +- core/framework/tools/queen_lifecycle_tools.py | 911 +----------------- 2 files changed, 15 insertions(+), 901 deletions(-) diff --git a/core/framework/agents/queen/queen_tools_defaults.py b/core/framework/agents/queen/queen_tools_defaults.py index 6af86ea9..6d2b032b 100644 --- a/core/framework/agents/queen/queen_tools_defaults.py +++ b/core/framework/agents/queen/queen_tools_defaults.py @@ -131,10 +131,8 @@ _TOOL_CATEGORIES: dict[str, list[str]] = { # browser_basic for richer site-by-site research; this category is the # lightweight always-available fallback. "research": [ - "search_papers", - "download_paper", - "search_wikipedia", "web_scrape", + "pdf_read" ], # Security — defensive scanning and reconnaissance. Engineering-only # surface; the rest of the queens shouldn't see port scanners. @@ -244,7 +242,6 @@ QUEEN_DEFAULT_CATEGORIES: dict[str, list[str]] = { "spreadsheet_advanced", "browser_basic", "browser_interaction", - "research", "time_context", ], } diff --git a/core/framework/tools/queen_lifecycle_tools.py b/core/framework/tools/queen_lifecycle_tools.py index fd237aa3..c1b4d061 100644 --- a/core/framework/tools/queen_lifecycle_tools.py +++ b/core/framework/tools/queen_lifecycle_tools.py @@ -46,11 +46,6 @@ from typing import TYPE_CHECKING, Any from framework.credentials.models import CredentialError from framework.host.event_bus import AgentEvent, EventType from framework.loader.preload_validation import credential_errors_to_json -from framework.tools.flowchart_utils import ( - FLOWCHART_TYPES, - classify_flowchart_node, - save_flowchart_file, -) if TYPE_CHECKING: from framework.host.agent_host import AgentHost @@ -501,9 +496,6 @@ def build_worker_profile(runtime: Any, agent_path: Path | str | None = None) -> return "\n".join(lines) -# FLOWCHART_TYPES is imported from framework.tools.flowchart_utils - - def _read_agent_triggers_json(agent_path: Path) -> list[dict]: """Read triggers.json from the agent's export directory.""" triggers_path = agent_path / "triggers.json" @@ -2565,12 +2557,10 @@ def register_queen_lifecycle_tools( "required": ["colony_name", "goal"], }, ) - registry.register( - "enqueue_task", - _enqueue_task_tool, - lambda inputs: enqueue_task_tool(**inputs), - ) - tools_registered += 1 + # NOTE: ``enqueue_task`` is intentionally NOT registered. The Tool object + # and helper above are kept for potential reuse by other code paths or + # a future per-persona registration gate; today no queen receives it. + _ = _enqueue_task_tool # --- switch_to_reviewing ---------------------------------------------------- @@ -2609,12 +2599,11 @@ def register_queen_lifecycle_tools( ), parameters={"type": "object", "properties": {}}, ) - registry.register( - "switch_to_reviewing", - _switch_editing_tool, - lambda inputs: switch_to_reviewing_tool(), - ) - tools_registered += 1 + # NOTE: ``switch_to_reviewing`` is intentionally NOT registered as a tool + # for the queen. The phase transition is still callable on QueenPhaseState + # (and used by routes_execution + queen_orchestrator); the LLM-facing + # tool wrapper is dormant. + _ = _switch_editing_tool # --- stop_worker_and_review -------------------------------------------------- @@ -2655,883 +2644,11 @@ def register_queen_lifecycle_tools( ), parameters={"type": "object", "properties": {}}, ) - registry.register("stop_worker_and_review", _stop_edit_tool, lambda inputs: stop_worker_and_review()) - tools_registered += 1 - - # --- stop_worker_and_plan (Running/Staging → Planning) --------------------- - - async def stop_worker_and_plan() -> str: - """Stop the loaded graph and switch to planning phase for diagnosis.""" - stop_result = await stop_worker() - result, can_transition = _stop_result_allows_phase_transition(stop_result) - - # Switch to planning phase - if phase_state is not None and can_transition: - await phase_state.switch_to_planning(source="tool") - _update_meta_json(session_manager, manager_session_id, {"phase": "planning"}) - - if can_transition: - result["phase"] = "planning" - result["message"] = ( - "Graph stopped. You are now in planning phase. " - "Diagnose the issue using read-only tools (checkpoints, logs, sessions), " - "discuss a fix plan with the user, then call " - "initialize_and_build_agent() to implement the fix." - ) - else: - result["message"] = ( - "Stop requested, but the worker is still shutting down. Phase will not change until shutdown completes." - ) - return json.dumps(result) - - _stop_plan_tool = Tool( - name="stop_worker_and_plan", - description=( - "Stop the graph and switch to planning phase for diagnosis. " - "Use this when you need to investigate an issue before fixing it. " - "After diagnosis, call initialize_and_build_agent() to switch to building." - ), - parameters={"type": "object", "properties": {}}, - ) - registry.register("stop_worker_and_plan", _stop_plan_tool, lambda inputs: stop_worker_and_plan()) - tools_registered += 1 - - # --- replan_agent (Building → Planning) ----------------------------------- - - async def replan_agent() -> str: - """Switch from building back to planning phase. - Only use when the user explicitly asks to re-plan.""" - if phase_state is not None: - if phase_state.phase != "building": - return json.dumps({"error": f"Cannot replan: currently in {phase_state.phase} phase."}) - - # Carry forward the current draft: restore original (pre-dissolution) - # draft so the queen can edit it in planning, rather than starting - # from scratch. - if phase_state.original_draft_graph is not None: - phase_state.draft_graph = phase_state.original_draft_graph - phase_state.original_draft_graph = None - phase_state.flowchart_map = None - phase_state.build_confirmed = False - - await phase_state.switch_to_planning(source="tool") - - # Re-emit draft so frontend shows the flowchart in planning mode - bus = phase_state.event_bus - if bus is not None and phase_state.draft_graph is not None: - try: - await bus.publish( - AgentEvent( - type=EventType.CUSTOM, - stream_id="queen", - data={"event": "draft_updated", **phase_state.draft_graph}, - ) - ) - except Exception: - logger.warning("Failed to re-emit draft during replan", exc_info=True) - - has_draft = phase_state is not None and phase_state.draft_graph is not None - return json.dumps( - { - "status": "replanning", - "phase": "planning", - "has_previous_draft": has_draft, - "message": ( - "Switched to PLANNING phase. Coding tools removed. " - + ( - "The previous draft flowchart has been restored (with " - "decision and sub-agent nodes intact). Call save_agent_draft() " - "to update the design, then confirm_and_build() when ready." - if has_draft - else "Discuss the new design with the user." - ) - ), - } - ) - - _replan_tool = Tool( - name="replan_agent", - description=( - "Switch from building back to planning phase. " - "Use when the user wants to change integrations, swap tools, " - "rethink the flow, or discuss design changes before building them." - ), - parameters={"type": "object", "properties": {}}, - ) - registry.register("replan_agent", _replan_tool, lambda inputs: replan_agent()) - tools_registered += 1 - - # --- save_agent_draft (Planning phase — declarative preview) ---------------- - # so the frontend can render the graph during planning (before any code). - - def _dissolve_planning_nodes( - draft: dict, - ) -> tuple[dict, dict[str, list[str]]]: - """Convert planning-only nodes into runtime-compatible structures. - - Two kinds of planning-only nodes are dissolved: - - **Decision nodes** (flowchart diamonds): - 1. Merging the decision clause into the predecessor node's success_criteria. - 2. Rewiring the decision's yes/no outgoing edges as on_success/on_failure - edges from the predecessor. - 3. Removing the decision node from the graph. - - **Sub-agent / browser nodes** (node_type == "gcu" or flowchart_type == "browser"): - 1. Adding the sub-agent node's ID to the predecessor's sub_agents list. - 2. Removing the sub-agent node and its connecting edge. - 3. Sub-agent nodes must not have outgoing edges (they are leaf delegates). - - Returns (converted_draft, flowchart_map) where flowchart_map maps - runtime node IDs → list of original draft node IDs they absorbed. - """ - import copy as _copy - - nodes: list[dict] = _copy.deepcopy(draft.get("nodes", [])) - edges: list[dict] = _copy.deepcopy(draft.get("edges", [])) - - # Index helpers - node_by_id: dict[str, dict] = {n["id"]: n for n in nodes} - - def _incoming(nid: str) -> list[dict]: - return [e for e in edges if e["target"] == nid] - - def _outgoing(nid: str) -> list[dict]: - return [e for e in edges if e["source"] == nid] - - # Identify decision nodes - decision_ids = [n["id"] for n in nodes if n.get("flowchart_type") == "decision"] - - # Track which draft nodes each runtime node absorbed - absorbed: dict[str, list[str]] = {} # runtime_id → [draft_ids...] - - # Process decisions in node-list order (topological for linear graphs) - for d_id in decision_ids: - d_node = node_by_id.get(d_id) - if d_node is None: - continue # already removed by a prior dissolution - - in_edges = _incoming(d_id) - out_edges = _outgoing(d_id) - - # Classify outgoing edges into yes/no branches - yes_edge: dict | None = None - no_edge: dict | None = None - - for oe in out_edges: - lbl = (oe.get("label") or "").lower().strip() - cond = (oe.get("condition") or "").lower().strip() - - if lbl in ("yes", "true", "pass") or cond == "on_success": - yes_edge = oe - elif lbl in ("no", "false", "fail") or cond == "on_failure": - no_edge = oe - - # Fallback: if exactly 2 outgoing and couldn't classify, assign by order - if len(out_edges) == 2 and (yes_edge is None or no_edge is None): - if yes_edge is None and no_edge is None: - yes_edge, no_edge = out_edges[0], out_edges[1] - elif yes_edge is None: - yes_edge = [e for e in out_edges if e is not no_edge][0] - else: - no_edge = [e for e in out_edges if e is not yes_edge][0] - - # Decision clause: prefer decision_clause, fall back to description/name - clause = (d_node.get("decision_clause") or d_node.get("description") or d_node.get("name") or d_id).strip() - - predecessors = [node_by_id[e["source"]] for e in in_edges if e["source"] in node_by_id] - - if not predecessors: - # Decision at start: convert to regular process node - d_node["flowchart_type"] = "process" - fc_meta = FLOWCHART_TYPES["process"] - d_node["flowchart_shape"] = fc_meta["shape"] - d_node["flowchart_color"] = fc_meta["color"] - if not d_node.get("success_criteria"): - d_node["success_criteria"] = clause - # Rewire outgoing edges to on_success/on_failure - if yes_edge: - yes_edge["condition"] = "on_success" - if no_edge: - no_edge["condition"] = "on_failure" - absorbed[d_id] = absorbed.get(d_id, [d_id]) - continue - - # Dissolve: merge into each predecessor - for pred in predecessors: - pid = pred["id"] - - # Merge decision clause into predecessor's success_criteria - existing = (pred.get("success_criteria") or "").strip() - if existing: - pred["success_criteria"] = f"{existing}; then evaluate: {clause}" - else: - pred["success_criteria"] = clause - - # Remove the edge from predecessor → decision - edges[:] = [e for e in edges if not (e["source"] == pid and e["target"] == d_id)] - - # Wire predecessor → yes/no targets - edge_counter = len(edges) - if yes_edge: - edges.append( - { - "id": f"edge-dissolved-{edge_counter}", - "source": pid, - "target": yes_edge["target"], - "condition": "on_success", - "description": yes_edge.get("description", ""), - "label": yes_edge.get("label", "Yes"), - } - ) - edge_counter += 1 - if no_edge: - edges.append( - { - "id": f"edge-dissolved-{edge_counter}", - "source": pid, - "target": no_edge["target"], - "condition": "on_failure", - "description": no_edge.get("description", ""), - "label": no_edge.get("label", "No"), - } - ) - - # Record absorption - prev_absorbed = absorbed.get(pid, [pid]) - if d_id not in prev_absorbed: - prev_absorbed.append(d_id) - absorbed[pid] = prev_absorbed - - # Remove decision node and all its edges - edges[:] = [e for e in edges if e["source"] != d_id and e["target"] != d_id] - nodes[:] = [n for n in nodes if n["id"] != d_id] - del node_by_id[d_id] - - # Build complete flowchart_map (identity for non-absorbed nodes) - flowchart_map: dict[str, list[str]] = {} - for n in nodes: - nid = n["id"] - flowchart_map[nid] = absorbed.get(nid, [nid]) - - # Rebuild terminal_nodes (decision targets may have changed). - sources = {e["source"] for e in edges} - all_ids = {n["id"] for n in nodes} - terminal_ids = all_ids - sources - if not terminal_ids and nodes: - terminal_ids = {nodes[-1]["id"]} - - converted = dict(draft) - converted["nodes"] = nodes - converted["edges"] = edges - converted["terminal_nodes"] = sorted(terminal_ids) - converted["entry_node"] = nodes[0]["id"] if nodes else "" - - return converted, flowchart_map - - async def save_agent_draft( - *, - agent_name: str, - goal: str, - nodes: list[dict], - edges: list[dict] | None = None, - description: str = "", - success_criteria: list[str] | None = None, - constraints: list[str] | None = None, - terminal_nodes: list[str] | None = None, - ) -> str: - """Save a declarative draft of the agent graph during planning. - - This creates a lightweight, visual-only graph for the user to review. - No executable code is generated. Nodes need only an id, name, and - description. Tools, input/output keys, and system prompts are optional - metadata hints — they will be fully specified during the building phase. - - Each node is classified into a classical flowchart component type - (start, terminal, process, decision, io, subprocess, browser, manual) - with a unique color. The queen can override auto-detection by setting - flowchart_type explicitly on a node. - """ - # ── Gate: require at least 2 rounds of user questions ───────── - if phase_state is not None and phase_state.phase == "planning" and phase_state.planning_ask_rounds < 2: - return json.dumps( - { - "error": ( - "You haven't asked enough questions yet. You have only " - f"asked {phase_state.planning_ask_rounds} round(s) of " - "questions — at least 2 are required before saving a " - "draft. Think deeper and ask more practical questions " - "to fully understand the user's requirements before " - "designing the agent graph." - ) - } - ) - - # ── Gate: require at least 5 nodes for a meaningful graph ───── - if len(nodes) < 5: - return json.dumps( - { - "error": ( - f"Draft only has {len(nodes)} node(s) — at least 5 are " - "required for a meaningful agent graph. Think deeper and " - "ask more practical questions to fully understand the " - "user's requirements, then design a more thorough graph." - ) - } - ) - - # Loose validation: each node needs at minimum an id - validated_nodes = [] - for i, n in enumerate(nodes): - if not isinstance(n, dict): - return json.dumps({"error": f"Node {i} must be a dict, got {type(n).__name__}"}) - node_id = n.get("id", "").strip() - if not node_id: - return json.dumps({"error": f"Node {i} is missing 'id'"}) - validated_nodes.append( - { - "id": node_id, - "name": n.get("name", node_id.replace("-", " ").replace("_", " ").title()), - "description": n.get("description", ""), - "node_type": n.get("node_type", "event_loop"), - # Optional business-logic hints (not validated yet) - "tools": n.get("tools", []), - "input_keys": n.get("input_keys", []), - "output_keys": n.get("output_keys", []), - "success_criteria": n.get("success_criteria", ""), - # Decision nodes: the yes/no question to evaluate - "decision_clause": n.get("decision_clause", ""), - # Explicit flowchart override (preserved for classification) - "flowchart_type": n.get("flowchart_type", ""), - } - ) - - # Check for duplicate node IDs - seen_ids: set[str] = set() - for n in validated_nodes: - if n["id"] in seen_ids: - return json.dumps({"error": f"Duplicate node id '{n['id']}'"}) - seen_ids.add(n["id"]) - - validated_edges = [] - if edges: - node_ids = {n["id"] for n in validated_nodes} - for i, e in enumerate(edges): - if not isinstance(e, dict): - return json.dumps({"error": f"Edge {i} must be a dict"}) - src = e.get("source", "") - tgt = e.get("target", "") - if src and src not in node_ids: - return json.dumps({"error": f"Edge {i} source '{src}' references unknown node"}) - if tgt and tgt not in node_ids: - return json.dumps({"error": f"Edge {i} target '{tgt}' references unknown node"}) - validated_edges.append( - { - "id": e.get("id", f"edge-{i}"), - "source": src, - "target": tgt, - "condition": e.get("condition", "on_success"), - "description": e.get("description", ""), - "label": e.get("label", ""), - } - ) - - topology_corrections: list[str] = [] - - # ── Validate graph connectivity ───────────────────────────── - # Every node must be reachable from the entry node. Disconnected - # subgraphs indicate a broken design — remove unreachable nodes - # and report them so the queen can fix the draft. - if validated_nodes: - entry_id = validated_nodes[0]["id"] - # Build undirected adjacency from edges - _adj: dict[str, set[str]] = {n["id"]: set() for n in validated_nodes} - for e in validated_edges: - s, t = e["source"], e["target"] - if s in _adj and t in _adj: - _adj[s].add(t) - _adj[t].add(s) - # BFS from entry - visited: set[str] = set() - queue = [entry_id] - while queue: - cur = queue.pop() - if cur in visited: - continue - visited.add(cur) - for nb in _adj.get(cur, ()): - if nb not in visited: - queue.append(nb) - unreachable = {n["id"] for n in validated_nodes} - visited - if unreachable: - for uid in sorted(unreachable): - logger.warning( - "Node '%s' is unreachable from entry node '%s' — removing it from the draft.", - uid, - entry_id, - ) - topology_corrections.append( - f"Node '{uid}' is disconnected from the graph " - f"(unreachable from entry node '{entry_id}') — " - f"removed. Connect it to the flow or assign it " - f"as a sub-agent of an existing node." - ) - validated_edges[:] = [ - e for e in validated_edges if e["source"] not in unreachable and e["target"] not in unreachable - ] - validated_nodes[:] = [n for n in validated_nodes if n["id"] not in unreachable] - - # Determine terminal nodes: explicit list, or nodes with no outgoing edges. - # Sub-agent nodes are leaf helpers, not endpoints — exclude them. - sa_ids: set[str] = set() - for n in validated_nodes: - for sa_id in n.get("sub_agents") or []: - sa_ids.add(sa_id) - terminal_ids: set[str] = set(terminal_nodes or []) - sa_ids - if not terminal_ids: - sources = {e["source"] for e in validated_edges} - all_ids = {n["id"] for n in validated_nodes} - terminal_ids = all_ids - sources - sa_ids - # If all nodes have outgoing edges (loop graph), mark the last as terminal - if not terminal_ids and validated_nodes: - terminal_ids = {validated_nodes[-1]["id"]} - - # Classify each node into a flowchart component type with color - total = len(validated_nodes) - for i, node in enumerate(validated_nodes): - fc_type = classify_flowchart_node( - node, - i, - total, - validated_edges, - terminal_ids, - ) - fc_meta = FLOWCHART_TYPES[fc_type] - node["flowchart_type"] = fc_type - node["flowchart_shape"] = fc_meta["shape"] - node["flowchart_color"] = fc_meta["color"] - - draft = { - "agent_name": agent_name.strip(), - "goal": goal.strip(), - "description": description.strip(), - "success_criteria": success_criteria or [], - "constraints": constraints or [], - "nodes": validated_nodes, - "edges": validated_edges, - "entry_node": validated_nodes[0]["id"] if validated_nodes else "", - "terminal_nodes": sorted(terminal_ids), - # Color legend for the frontend - "flowchart_legend": { - fc_type: {"shape": meta["shape"], "color": meta["color"]} for fc_type, meta in FLOWCHART_TYPES.items() - }, - } - - bus = getattr(session, "event_bus", None) - is_building = phase_state is not None and phase_state.phase == "building" - - if phase_state is not None: - if is_building: - # During building: re-draft updates the flowchart in place. - # Dissolve planning-only nodes immediately (no confirm gate). - import copy as _copy - - phase_state.original_draft_graph = _copy.deepcopy(draft) - converted, fmap = _dissolve_planning_nodes(draft) - phase_state.draft_graph = converted - phase_state.flowchart_map = fmap - # Do NOT reset build_confirmed — we're already building. - # Persist to agent folder - save_path = getattr(session, "worker_path", None) - if save_path is None: - # Worker not loaded yet — resolve from draft name - draft_name = draft.get("agent_name", "") - if draft_name: - from framework.config import COLONIES_DIR - - candidate = COLONIES_DIR / draft_name - if candidate.is_dir(): - save_path = candidate - save_flowchart_file( - save_path, - phase_state.original_draft_graph, - fmap, - ) - else: - # During planning: store raw draft, await user confirmation. - phase_state.draft_graph = draft - phase_state.build_confirmed = False - - # Emit events so the frontend can render - if bus is not None: - if is_building: - await bus.publish( - AgentEvent( - type=EventType.CUSTOM, - stream_id="queen", - data={ - "event": "draft_updated", - **(phase_state.draft_graph if phase_state else draft), - }, - ) - ) - await bus.publish( - AgentEvent( - type=EventType.CUSTOM, - stream_id="queen", - data={ - "event": "flowchart_updated", - "map": phase_state.flowchart_map if phase_state else None, - "original_draft": phase_state.original_draft_graph if phase_state else draft, - }, - ) - ) - else: - await bus.publish( - AgentEvent( - type=EventType.CUSTOM, - stream_id="queen", - data={"event": "draft_updated", **draft}, - ) - ) - - dissolution_info = {} - if is_building and phase_state is not None and phase_state.original_draft_graph: - orig_count = len(phase_state.original_draft_graph.get("nodes", [])) - conv_count = len(phase_state.draft_graph.get("nodes", [])) - dissolution_info = { - "planning_nodes_dissolved": orig_count - conv_count, - "flowchart_map": phase_state.flowchart_map, - } - - correction_warning = "" - if topology_corrections: - correction_warning = ( - " WARNING — your draft had topology errors that were " - "auto-corrected: " - + "; ".join(topology_corrections) - + " Review the corrected flowchart and do NOT repeat " - "this pattern. GCU nodes are ALWAYS leaf sub-agents." - ) - - if is_building: - msg = ( - "Draft flowchart updated during building. " - "Planning-only nodes dissolved automatically. " - "The user can see the updated flowchart. " - "Continue building — no re-confirmation needed." + correction_warning - ) - else: - msg = ( - "Draft graph saved and sent to the visualizer. " - "The user can now see the color-coded flowchart. " - "Present this design to the user and get their approval. " - "When the user confirms, call confirm_and_build() to proceed." + correction_warning - ) - - result: dict = { - "status": "draft_saved", - "agent_name": draft["agent_name"], - "node_count": len(validated_nodes), - "edge_count": len(validated_edges), - "node_types": {n["id"]: n["flowchart_type"] for n in validated_nodes}, - **dissolution_info, - "message": msg, - } - if topology_corrections: - result["topology_corrections"] = topology_corrections - return json.dumps(result) - - _draft_tool = Tool( - name="save_agent_draft", - description=( - "Save a declarative draft of the agent graph as a color-coded flowchart. " - "Usable in PLANNING (creates draft for user review) and BUILDING " - "(updates the flowchart in place — planning-only nodes are dissolved " - "automatically without re-confirmation). " - "Each node is auto-classified into a classical flowchart type " - "(start, terminal, process, decision, io, subprocess, browser, manual) " - "with unique colors. No code is generated. " - "Planning-only types (decision, browser/GCU) are dissolved at confirm/build time: " - "decision nodes merge into predecessor's success_criteria with yes/no edges; " - "browser/GCU nodes merge into predecessor's sub_agents list as leaf delegates." - ), - parameters={ - "type": "object", - "properties": { - "agent_name": { - "type": "string", - "description": "Snake_case name for the agent (e.g. 'research_agent')", - }, - "goal": { - "type": "string", - "description": "High-level goal description for the agent", - }, - "description": { - "type": "string", - "description": "Brief description of what the agent does", - }, - "nodes": { - "type": "array", - "items": { - "type": "object", - "properties": { - "id": {"type": "string", "description": "Kebab-case node identifier"}, - "name": {"type": "string", "description": "Human-readable name"}, - "description": { - "type": "string", - "description": "What this node does (business logic)", - }, - "node_type": { - "type": "string", - "enum": ["event_loop", "gcu"], - "description": "Node type (default: event_loop)", - }, - "flowchart_type": { - "type": "string", - "enum": [ - "start", - "terminal", - "process", - "decision", - "io", - "document", - "database", - "subprocess", - "browser", - ], - "description": ( - "Flowchart symbol type. Auto-detected if omitted. " - "start (sage green stadium), terminal (dusty red stadium), " - "process (blue-gray rect), decision (amber diamond), " - "io (purple parallelogram), document (steel blue wavy rect), " - "database (teal cylinder), subprocess (cyan subroutine), " - "browser (deep blue hexagon — for GCU/browser " - "sub-agents; must be a leaf node)" - ), - }, - "tools": { - "type": "array", - "items": {"type": "string"}, - "description": "Planned tools (hints, not validated yet)", - }, - "input_keys": { - "type": "array", - "items": {"type": "string"}, - "description": "Expected input buffer keys (hints)", - }, - "output_keys": { - "type": "array", - "items": {"type": "string"}, - "description": "Expected output buffer keys (hints)", - }, - "success_criteria": { - "type": "string", - "description": "What success looks like for this node", - }, - "sub_agents": { - "type": "array", - "items": {"type": "string"}, - "description": ( - "IDs of GCU/browser sub-agent nodes managed by this node. " - "At build time, sub-agent nodes are dissolved into this list. " - "Set this on the PARENT node — e.g. the orchestrator that " - "delegates to GCU leaves. Visual delegation edges are " - "synthesized automatically." - ), - }, - "decision_clause": { - "type": "string", - "description": ( - "For decision nodes only: the yes/no question to " - "evaluate (e.g. 'Is amount > $100?'). Used during " - "dissolution to set the predecessor's success_criteria." - ), - }, - }, - "required": ["id"], - }, - "description": "List of nodes with at minimum an id", - }, - "edges": { - "type": "array", - "items": { - "type": "object", - "properties": { - "source": {"type": "string"}, - "target": {"type": "string"}, - "condition": { - "type": "string", - "enum": [ - "always", - "on_success", - "on_failure", - "conditional", - "llm_decide", - ], - }, - "description": {"type": "string"}, - "label": { - "type": "string", - "description": ("Short edge label shown on the flowchart (e.g. 'Yes', 'No', 'Retry')"), - }, - }, - "required": ["source", "target"], - }, - "description": "Connections between nodes", - }, - "terminal_nodes": { - "type": "array", - "items": {"type": "string"}, - "description": ("Node IDs that are terminal (end) nodes. Auto-detected from edges if omitted."), - }, - "success_criteria": { - "type": "array", - "items": {"type": "string"}, - "description": "Agent-level success criteria", - }, - "constraints": { - "type": "array", - "items": {"type": "string"}, - "description": "Agent-level constraints", - }, - }, - "required": ["agent_name", "goal", "nodes"], - }, - ) - registry.register( - "save_agent_draft", - _draft_tool, - lambda inputs: save_agent_draft(**inputs), - ) - tools_registered += 1 - - # --- confirm_and_build (Planning → Building gate) ------------------------- - # Explicit user confirmation is required before transitioning from planning - # to building. This tool records that confirmation and proceeds. - - async def confirm_and_build(*, agent_name: str | None = None) -> str: - """Confirm the draft, create agent directory, and transition to building. - - This tool should ONLY be called after the user has explicitly approved - the draft graph design via ask_user. It creates the agent directory and - transitions to BUILDING phase. The queen then writes agent.json directly. - """ - if phase_state is None: - return json.dumps({"error": "Phase state not available."}) - - if phase_state.phase != "planning": - return json.dumps({"error": f"Cannot confirm_and_build: currently in {phase_state.phase} phase."}) - - if phase_state.draft_graph is None: - return json.dumps( - { - "error": ( - "No draft graph saved. Call save_agent_draft() first to create " - "a draft, present it to the user, and get their approval." - ) - } - ) - - phase_state.build_confirmed = True - - # Preserve original draft for flowchart display during runtime, - # then dissolve planning-only nodes (decision + browser/GCU) into - # runtime-compatible structures. - import copy as _copy - - original_nodes = phase_state.draft_graph.get("nodes", []) - # Compute dissolution first, then assign all three atomically so that - # a failure in _dissolve_planning_nodes doesn't leave partial state. - original_copy = _copy.deepcopy(phase_state.draft_graph) - converted, fmap = _dissolve_planning_nodes(phase_state.draft_graph) - phase_state.original_draft_graph = original_copy - phase_state.draft_graph = converted - phase_state.flowchart_map = fmap - - # Create agent folder early so flowchart and agent_path are available - # throughout the entire BUILDING phase. - _agent_name = agent_name or phase_state.draft_graph.get("agent_name", "").strip() - if _agent_name: - from framework.config import COLONIES_DIR - - _agent_folder = COLONIES_DIR / _agent_name - _agent_folder.mkdir(parents=True, exist_ok=True) - save_flowchart_file(_agent_folder, original_copy, fmap) - phase_state.agent_path = str(_agent_folder) - _update_meta_json( - session_manager, - manager_session_id, - { - "agent_path": str(_agent_folder), - "agent_name": _agent_name.replace("_", " ").title(), - }, - ) - - dissolved_count = len(original_nodes) - len(converted.get("nodes", [])) - decision_count = sum(1 for n in original_nodes if n.get("flowchart_type") == "decision") - subagent_count = sum( - 1 for n in original_nodes if n.get("flowchart_type") == "browser" or n.get("node_type") == "gcu" - ) - - dissolution_parts = [] - if decision_count: - dissolution_parts.append(f"{decision_count} decision node(s) dissolved into predecessor criteria") - if subagent_count: - dissolution_parts.append(f"{subagent_count} sub-agent node(s) dissolved into predecessor sub_agents") - - # Transition to BUILDING phase - await phase_state.switch_to_building(source="tool") - _update_meta_json(session_manager, manager_session_id, {"phase": "building"}) - phase_state.build_confirmed = False - - # No injection here -- the return message tells the queen what to do. - # Injecting would queue a BUILDING message that drains AFTER the queen - # may have already moved to STAGING via load_built_agent. - - return json.dumps( - { - "status": "confirmed", - "phase": "building", - "agent_name": _agent_name, - "agent_path": str(_agent_folder), - "planning_nodes_dissolved": dissolved_count, - "flowchart_map": fmap, - "message": ( - "Design confirmed and directory created. " - + ("; ".join(dissolution_parts) + ". " if dissolution_parts else "") - + f"Now write the complete agent config to {_agent_folder}/agent.json " - "using write_file(). Include all system prompts, tools, edges, and goal." - ), - } - ) - - _confirm_tool = Tool( - name="confirm_and_build", - description=( - "Confirm the draft graph design, create agent directory, and transition to building phase. " - "ONLY call this after the user has explicitly approved the design via ask_user. " - "After confirmation, write the complete agent.json using write_file()." - ), - parameters={ - "type": "object", - "properties": { - "agent_name": { - "type": "string", - "description": "Snake_case name for the agent (e.g. 'linkedin_outreach'). " - "If omitted, uses the name from save_agent_draft().", - }, - }, - }, - ) - registry.register( - "confirm_and_build", - _confirm_tool, - lambda inputs: confirm_and_build( - agent_name=inputs.get("agent_name"), - ), - ) - tools_registered += 1 + # NOTE: ``stop_worker_and_review`` is intentionally NOT registered. The + # phase transition method ``phase_state.switch_to_building`` it would + # invoke is no longer defined since the planning/building pipeline was + # removed; the wrapper is left here as a stub for future reintroduction. + _ = _stop_edit_tool # --- stop_worker (Running → Staging) --------------------------------------