feat: remove old lifecyle tools

This commit is contained in:
Richard Tang
2026-05-01 14:07:34 -07:00
parent 6f2f037c9c
commit b826e70d8c
2 changed files with 15 additions and 901 deletions
@@ -131,10 +131,8 @@ _TOOL_CATEGORIES: dict[str, list[str]] = {
# browser_basic for richer site-by-site research; this category is the # browser_basic for richer site-by-site research; this category is the
# lightweight always-available fallback. # lightweight always-available fallback.
"research": [ "research": [
"search_papers",
"download_paper",
"search_wikipedia",
"web_scrape", "web_scrape",
"pdf_read"
], ],
# Security — defensive scanning and reconnaissance. Engineering-only # Security — defensive scanning and reconnaissance. Engineering-only
# surface; the rest of the queens shouldn't see port scanners. # surface; the rest of the queens shouldn't see port scanners.
@@ -244,7 +242,6 @@ QUEEN_DEFAULT_CATEGORIES: dict[str, list[str]] = {
"spreadsheet_advanced", "spreadsheet_advanced",
"browser_basic", "browser_basic",
"browser_interaction", "browser_interaction",
"research",
"time_context", "time_context",
], ],
} }
+14 -897
View File
@@ -46,11 +46,6 @@ from typing import TYPE_CHECKING, Any
from framework.credentials.models import CredentialError from framework.credentials.models import CredentialError
from framework.host.event_bus import AgentEvent, EventType from framework.host.event_bus import AgentEvent, EventType
from framework.loader.preload_validation import credential_errors_to_json from framework.loader.preload_validation import credential_errors_to_json
from framework.tools.flowchart_utils import (
FLOWCHART_TYPES,
classify_flowchart_node,
save_flowchart_file,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from framework.host.agent_host import AgentHost from framework.host.agent_host import AgentHost
@@ -501,9 +496,6 @@ def build_worker_profile(runtime: Any, agent_path: Path | str | None = None) ->
return "\n".join(lines) return "\n".join(lines)
# FLOWCHART_TYPES is imported from framework.tools.flowchart_utils
def _read_agent_triggers_json(agent_path: Path) -> list[dict]: def _read_agent_triggers_json(agent_path: Path) -> list[dict]:
"""Read triggers.json from the agent's export directory.""" """Read triggers.json from the agent's export directory."""
triggers_path = agent_path / "triggers.json" triggers_path = agent_path / "triggers.json"
@@ -2565,12 +2557,10 @@ def register_queen_lifecycle_tools(
"required": ["colony_name", "goal"], "required": ["colony_name", "goal"],
}, },
) )
registry.register( # NOTE: ``enqueue_task`` is intentionally NOT registered. The Tool object
"enqueue_task", # and helper above are kept for potential reuse by other code paths or
_enqueue_task_tool, # a future per-persona registration gate; today no queen receives it.
lambda inputs: enqueue_task_tool(**inputs), _ = _enqueue_task_tool
)
tools_registered += 1
# --- switch_to_reviewing ---------------------------------------------------- # --- switch_to_reviewing ----------------------------------------------------
@@ -2609,12 +2599,11 @@ def register_queen_lifecycle_tools(
), ),
parameters={"type": "object", "properties": {}}, parameters={"type": "object", "properties": {}},
) )
registry.register( # NOTE: ``switch_to_reviewing`` is intentionally NOT registered as a tool
"switch_to_reviewing", # for the queen. The phase transition is still callable on QueenPhaseState
_switch_editing_tool, # (and used by routes_execution + queen_orchestrator); the LLM-facing
lambda inputs: switch_to_reviewing_tool(), # tool wrapper is dormant.
) _ = _switch_editing_tool
tools_registered += 1
# --- stop_worker_and_review -------------------------------------------------- # --- stop_worker_and_review --------------------------------------------------
@@ -2655,883 +2644,11 @@ def register_queen_lifecycle_tools(
), ),
parameters={"type": "object", "properties": {}}, parameters={"type": "object", "properties": {}},
) )
registry.register("stop_worker_and_review", _stop_edit_tool, lambda inputs: stop_worker_and_review()) # NOTE: ``stop_worker_and_review`` is intentionally NOT registered. The
tools_registered += 1 # phase transition method ``phase_state.switch_to_building`` it would
# invoke is no longer defined since the planning/building pipeline was
# --- stop_worker_and_plan (Running/Staging → Planning) --------------------- # removed; the wrapper is left here as a stub for future reintroduction.
_ = _stop_edit_tool
async def stop_worker_and_plan() -> str:
"""Stop the loaded graph and switch to planning phase for diagnosis."""
stop_result = await stop_worker()
result, can_transition = _stop_result_allows_phase_transition(stop_result)
# Switch to planning phase
if phase_state is not None and can_transition:
await phase_state.switch_to_planning(source="tool")
_update_meta_json(session_manager, manager_session_id, {"phase": "planning"})
if can_transition:
result["phase"] = "planning"
result["message"] = (
"Graph stopped. You are now in planning phase. "
"Diagnose the issue using read-only tools (checkpoints, logs, sessions), "
"discuss a fix plan with the user, then call "
"initialize_and_build_agent() to implement the fix."
)
else:
result["message"] = (
"Stop requested, but the worker is still shutting down. Phase will not change until shutdown completes."
)
return json.dumps(result)
_stop_plan_tool = Tool(
name="stop_worker_and_plan",
description=(
"Stop the graph and switch to planning phase for diagnosis. "
"Use this when you need to investigate an issue before fixing it. "
"After diagnosis, call initialize_and_build_agent() to switch to building."
),
parameters={"type": "object", "properties": {}},
)
registry.register("stop_worker_and_plan", _stop_plan_tool, lambda inputs: stop_worker_and_plan())
tools_registered += 1
# --- replan_agent (Building → Planning) -----------------------------------
async def replan_agent() -> str:
"""Switch from building back to planning phase.
Only use when the user explicitly asks to re-plan."""
if phase_state is not None:
if phase_state.phase != "building":
return json.dumps({"error": f"Cannot replan: currently in {phase_state.phase} phase."})
# Carry forward the current draft: restore original (pre-dissolution)
# draft so the queen can edit it in planning, rather than starting
# from scratch.
if phase_state.original_draft_graph is not None:
phase_state.draft_graph = phase_state.original_draft_graph
phase_state.original_draft_graph = None
phase_state.flowchart_map = None
phase_state.build_confirmed = False
await phase_state.switch_to_planning(source="tool")
# Re-emit draft so frontend shows the flowchart in planning mode
bus = phase_state.event_bus
if bus is not None and phase_state.draft_graph is not None:
try:
await bus.publish(
AgentEvent(
type=EventType.CUSTOM,
stream_id="queen",
data={"event": "draft_updated", **phase_state.draft_graph},
)
)
except Exception:
logger.warning("Failed to re-emit draft during replan", exc_info=True)
has_draft = phase_state is not None and phase_state.draft_graph is not None
return json.dumps(
{
"status": "replanning",
"phase": "planning",
"has_previous_draft": has_draft,
"message": (
"Switched to PLANNING phase. Coding tools removed. "
+ (
"The previous draft flowchart has been restored (with "
"decision and sub-agent nodes intact). Call save_agent_draft() "
"to update the design, then confirm_and_build() when ready."
if has_draft
else "Discuss the new design with the user."
)
),
}
)
_replan_tool = Tool(
name="replan_agent",
description=(
"Switch from building back to planning phase. "
"Use when the user wants to change integrations, swap tools, "
"rethink the flow, or discuss design changes before building them."
),
parameters={"type": "object", "properties": {}},
)
registry.register("replan_agent", _replan_tool, lambda inputs: replan_agent())
tools_registered += 1
# --- save_agent_draft (Planning phase — declarative preview) ----------------
# so the frontend can render the graph during planning (before any code).
def _dissolve_planning_nodes(
draft: dict,
) -> tuple[dict, dict[str, list[str]]]:
"""Convert planning-only nodes into runtime-compatible structures.
Two kinds of planning-only nodes are dissolved:
**Decision nodes** (flowchart diamonds):
1. Merging the decision clause into the predecessor node's success_criteria.
2. Rewiring the decision's yes/no outgoing edges as on_success/on_failure
edges from the predecessor.
3. Removing the decision node from the graph.
**Sub-agent / browser nodes** (node_type == "gcu" or flowchart_type == "browser"):
1. Adding the sub-agent node's ID to the predecessor's sub_agents list.
2. Removing the sub-agent node and its connecting edge.
3. Sub-agent nodes must not have outgoing edges (they are leaf delegates).
Returns (converted_draft, flowchart_map) where flowchart_map maps
runtime node IDs list of original draft node IDs they absorbed.
"""
import copy as _copy
nodes: list[dict] = _copy.deepcopy(draft.get("nodes", []))
edges: list[dict] = _copy.deepcopy(draft.get("edges", []))
# Index helpers
node_by_id: dict[str, dict] = {n["id"]: n for n in nodes}
def _incoming(nid: str) -> list[dict]:
return [e for e in edges if e["target"] == nid]
def _outgoing(nid: str) -> list[dict]:
return [e for e in edges if e["source"] == nid]
# Identify decision nodes
decision_ids = [n["id"] for n in nodes if n.get("flowchart_type") == "decision"]
# Track which draft nodes each runtime node absorbed
absorbed: dict[str, list[str]] = {} # runtime_id → [draft_ids...]
# Process decisions in node-list order (topological for linear graphs)
for d_id in decision_ids:
d_node = node_by_id.get(d_id)
if d_node is None:
continue # already removed by a prior dissolution
in_edges = _incoming(d_id)
out_edges = _outgoing(d_id)
# Classify outgoing edges into yes/no branches
yes_edge: dict | None = None
no_edge: dict | None = None
for oe in out_edges:
lbl = (oe.get("label") or "").lower().strip()
cond = (oe.get("condition") or "").lower().strip()
if lbl in ("yes", "true", "pass") or cond == "on_success":
yes_edge = oe
elif lbl in ("no", "false", "fail") or cond == "on_failure":
no_edge = oe
# Fallback: if exactly 2 outgoing and couldn't classify, assign by order
if len(out_edges) == 2 and (yes_edge is None or no_edge is None):
if yes_edge is None and no_edge is None:
yes_edge, no_edge = out_edges[0], out_edges[1]
elif yes_edge is None:
yes_edge = [e for e in out_edges if e is not no_edge][0]
else:
no_edge = [e for e in out_edges if e is not yes_edge][0]
# Decision clause: prefer decision_clause, fall back to description/name
clause = (d_node.get("decision_clause") or d_node.get("description") or d_node.get("name") or d_id).strip()
predecessors = [node_by_id[e["source"]] for e in in_edges if e["source"] in node_by_id]
if not predecessors:
# Decision at start: convert to regular process node
d_node["flowchart_type"] = "process"
fc_meta = FLOWCHART_TYPES["process"]
d_node["flowchart_shape"] = fc_meta["shape"]
d_node["flowchart_color"] = fc_meta["color"]
if not d_node.get("success_criteria"):
d_node["success_criteria"] = clause
# Rewire outgoing edges to on_success/on_failure
if yes_edge:
yes_edge["condition"] = "on_success"
if no_edge:
no_edge["condition"] = "on_failure"
absorbed[d_id] = absorbed.get(d_id, [d_id])
continue
# Dissolve: merge into each predecessor
for pred in predecessors:
pid = pred["id"]
# Merge decision clause into predecessor's success_criteria
existing = (pred.get("success_criteria") or "").strip()
if existing:
pred["success_criteria"] = f"{existing}; then evaluate: {clause}"
else:
pred["success_criteria"] = clause
# Remove the edge from predecessor → decision
edges[:] = [e for e in edges if not (e["source"] == pid and e["target"] == d_id)]
# Wire predecessor → yes/no targets
edge_counter = len(edges)
if yes_edge:
edges.append(
{
"id": f"edge-dissolved-{edge_counter}",
"source": pid,
"target": yes_edge["target"],
"condition": "on_success",
"description": yes_edge.get("description", ""),
"label": yes_edge.get("label", "Yes"),
}
)
edge_counter += 1
if no_edge:
edges.append(
{
"id": f"edge-dissolved-{edge_counter}",
"source": pid,
"target": no_edge["target"],
"condition": "on_failure",
"description": no_edge.get("description", ""),
"label": no_edge.get("label", "No"),
}
)
# Record absorption
prev_absorbed = absorbed.get(pid, [pid])
if d_id not in prev_absorbed:
prev_absorbed.append(d_id)
absorbed[pid] = prev_absorbed
# Remove decision node and all its edges
edges[:] = [e for e in edges if e["source"] != d_id and e["target"] != d_id]
nodes[:] = [n for n in nodes if n["id"] != d_id]
del node_by_id[d_id]
# Build complete flowchart_map (identity for non-absorbed nodes)
flowchart_map: dict[str, list[str]] = {}
for n in nodes:
nid = n["id"]
flowchart_map[nid] = absorbed.get(nid, [nid])
# Rebuild terminal_nodes (decision targets may have changed).
sources = {e["source"] for e in edges}
all_ids = {n["id"] for n in nodes}
terminal_ids = all_ids - sources
if not terminal_ids and nodes:
terminal_ids = {nodes[-1]["id"]}
converted = dict(draft)
converted["nodes"] = nodes
converted["edges"] = edges
converted["terminal_nodes"] = sorted(terminal_ids)
converted["entry_node"] = nodes[0]["id"] if nodes else ""
return converted, flowchart_map
async def save_agent_draft(
*,
agent_name: str,
goal: str,
nodes: list[dict],
edges: list[dict] | None = None,
description: str = "",
success_criteria: list[str] | None = None,
constraints: list[str] | None = None,
terminal_nodes: list[str] | None = None,
) -> str:
"""Save a declarative draft of the agent graph during planning.
This creates a lightweight, visual-only graph for the user to review.
No executable code is generated. Nodes need only an id, name, and
description. Tools, input/output keys, and system prompts are optional
metadata hints they will be fully specified during the building phase.
Each node is classified into a classical flowchart component type
(start, terminal, process, decision, io, subprocess, browser, manual)
with a unique color. The queen can override auto-detection by setting
flowchart_type explicitly on a node.
"""
# ── Gate: require at least 2 rounds of user questions ─────────
if phase_state is not None and phase_state.phase == "planning" and phase_state.planning_ask_rounds < 2:
return json.dumps(
{
"error": (
"You haven't asked enough questions yet. You have only "
f"asked {phase_state.planning_ask_rounds} round(s) of "
"questions — at least 2 are required before saving a "
"draft. Think deeper and ask more practical questions "
"to fully understand the user's requirements before "
"designing the agent graph."
)
}
)
# ── Gate: require at least 5 nodes for a meaningful graph ─────
if len(nodes) < 5:
return json.dumps(
{
"error": (
f"Draft only has {len(nodes)} node(s) — at least 5 are "
"required for a meaningful agent graph. Think deeper and "
"ask more practical questions to fully understand the "
"user's requirements, then design a more thorough graph."
)
}
)
# Loose validation: each node needs at minimum an id
validated_nodes = []
for i, n in enumerate(nodes):
if not isinstance(n, dict):
return json.dumps({"error": f"Node {i} must be a dict, got {type(n).__name__}"})
node_id = n.get("id", "").strip()
if not node_id:
return json.dumps({"error": f"Node {i} is missing 'id'"})
validated_nodes.append(
{
"id": node_id,
"name": n.get("name", node_id.replace("-", " ").replace("_", " ").title()),
"description": n.get("description", ""),
"node_type": n.get("node_type", "event_loop"),
# Optional business-logic hints (not validated yet)
"tools": n.get("tools", []),
"input_keys": n.get("input_keys", []),
"output_keys": n.get("output_keys", []),
"success_criteria": n.get("success_criteria", ""),
# Decision nodes: the yes/no question to evaluate
"decision_clause": n.get("decision_clause", ""),
# Explicit flowchart override (preserved for classification)
"flowchart_type": n.get("flowchart_type", ""),
}
)
# Check for duplicate node IDs
seen_ids: set[str] = set()
for n in validated_nodes:
if n["id"] in seen_ids:
return json.dumps({"error": f"Duplicate node id '{n['id']}'"})
seen_ids.add(n["id"])
validated_edges = []
if edges:
node_ids = {n["id"] for n in validated_nodes}
for i, e in enumerate(edges):
if not isinstance(e, dict):
return json.dumps({"error": f"Edge {i} must be a dict"})
src = e.get("source", "")
tgt = e.get("target", "")
if src and src not in node_ids:
return json.dumps({"error": f"Edge {i} source '{src}' references unknown node"})
if tgt and tgt not in node_ids:
return json.dumps({"error": f"Edge {i} target '{tgt}' references unknown node"})
validated_edges.append(
{
"id": e.get("id", f"edge-{i}"),
"source": src,
"target": tgt,
"condition": e.get("condition", "on_success"),
"description": e.get("description", ""),
"label": e.get("label", ""),
}
)
topology_corrections: list[str] = []
# ── Validate graph connectivity ─────────────────────────────
# Every node must be reachable from the entry node. Disconnected
# subgraphs indicate a broken design — remove unreachable nodes
# and report them so the queen can fix the draft.
if validated_nodes:
entry_id = validated_nodes[0]["id"]
# Build undirected adjacency from edges
_adj: dict[str, set[str]] = {n["id"]: set() for n in validated_nodes}
for e in validated_edges:
s, t = e["source"], e["target"]
if s in _adj and t in _adj:
_adj[s].add(t)
_adj[t].add(s)
# BFS from entry
visited: set[str] = set()
queue = [entry_id]
while queue:
cur = queue.pop()
if cur in visited:
continue
visited.add(cur)
for nb in _adj.get(cur, ()):
if nb not in visited:
queue.append(nb)
unreachable = {n["id"] for n in validated_nodes} - visited
if unreachable:
for uid in sorted(unreachable):
logger.warning(
"Node '%s' is unreachable from entry node '%s' — removing it from the draft.",
uid,
entry_id,
)
topology_corrections.append(
f"Node '{uid}' is disconnected from the graph "
f"(unreachable from entry node '{entry_id}') — "
f"removed. Connect it to the flow or assign it "
f"as a sub-agent of an existing node."
)
validated_edges[:] = [
e for e in validated_edges if e["source"] not in unreachable and e["target"] not in unreachable
]
validated_nodes[:] = [n for n in validated_nodes if n["id"] not in unreachable]
# Determine terminal nodes: explicit list, or nodes with no outgoing edges.
# Sub-agent nodes are leaf helpers, not endpoints — exclude them.
sa_ids: set[str] = set()
for n in validated_nodes:
for sa_id in n.get("sub_agents") or []:
sa_ids.add(sa_id)
terminal_ids: set[str] = set(terminal_nodes or []) - sa_ids
if not terminal_ids:
sources = {e["source"] for e in validated_edges}
all_ids = {n["id"] for n in validated_nodes}
terminal_ids = all_ids - sources - sa_ids
# If all nodes have outgoing edges (loop graph), mark the last as terminal
if not terminal_ids and validated_nodes:
terminal_ids = {validated_nodes[-1]["id"]}
# Classify each node into a flowchart component type with color
total = len(validated_nodes)
for i, node in enumerate(validated_nodes):
fc_type = classify_flowchart_node(
node,
i,
total,
validated_edges,
terminal_ids,
)
fc_meta = FLOWCHART_TYPES[fc_type]
node["flowchart_type"] = fc_type
node["flowchart_shape"] = fc_meta["shape"]
node["flowchart_color"] = fc_meta["color"]
draft = {
"agent_name": agent_name.strip(),
"goal": goal.strip(),
"description": description.strip(),
"success_criteria": success_criteria or [],
"constraints": constraints or [],
"nodes": validated_nodes,
"edges": validated_edges,
"entry_node": validated_nodes[0]["id"] if validated_nodes else "",
"terminal_nodes": sorted(terminal_ids),
# Color legend for the frontend
"flowchart_legend": {
fc_type: {"shape": meta["shape"], "color": meta["color"]} for fc_type, meta in FLOWCHART_TYPES.items()
},
}
bus = getattr(session, "event_bus", None)
is_building = phase_state is not None and phase_state.phase == "building"
if phase_state is not None:
if is_building:
# During building: re-draft updates the flowchart in place.
# Dissolve planning-only nodes immediately (no confirm gate).
import copy as _copy
phase_state.original_draft_graph = _copy.deepcopy(draft)
converted, fmap = _dissolve_planning_nodes(draft)
phase_state.draft_graph = converted
phase_state.flowchart_map = fmap
# Do NOT reset build_confirmed — we're already building.
# Persist to agent folder
save_path = getattr(session, "worker_path", None)
if save_path is None:
# Worker not loaded yet — resolve from draft name
draft_name = draft.get("agent_name", "")
if draft_name:
from framework.config import COLONIES_DIR
candidate = COLONIES_DIR / draft_name
if candidate.is_dir():
save_path = candidate
save_flowchart_file(
save_path,
phase_state.original_draft_graph,
fmap,
)
else:
# During planning: store raw draft, await user confirmation.
phase_state.draft_graph = draft
phase_state.build_confirmed = False
# Emit events so the frontend can render
if bus is not None:
if is_building:
await bus.publish(
AgentEvent(
type=EventType.CUSTOM,
stream_id="queen",
data={
"event": "draft_updated",
**(phase_state.draft_graph if phase_state else draft),
},
)
)
await bus.publish(
AgentEvent(
type=EventType.CUSTOM,
stream_id="queen",
data={
"event": "flowchart_updated",
"map": phase_state.flowchart_map if phase_state else None,
"original_draft": phase_state.original_draft_graph if phase_state else draft,
},
)
)
else:
await bus.publish(
AgentEvent(
type=EventType.CUSTOM,
stream_id="queen",
data={"event": "draft_updated", **draft},
)
)
dissolution_info = {}
if is_building and phase_state is not None and phase_state.original_draft_graph:
orig_count = len(phase_state.original_draft_graph.get("nodes", []))
conv_count = len(phase_state.draft_graph.get("nodes", []))
dissolution_info = {
"planning_nodes_dissolved": orig_count - conv_count,
"flowchart_map": phase_state.flowchart_map,
}
correction_warning = ""
if topology_corrections:
correction_warning = (
" WARNING — your draft had topology errors that were "
"auto-corrected: "
+ "; ".join(topology_corrections)
+ " Review the corrected flowchart and do NOT repeat "
"this pattern. GCU nodes are ALWAYS leaf sub-agents."
)
if is_building:
msg = (
"Draft flowchart updated during building. "
"Planning-only nodes dissolved automatically. "
"The user can see the updated flowchart. "
"Continue building — no re-confirmation needed." + correction_warning
)
else:
msg = (
"Draft graph saved and sent to the visualizer. "
"The user can now see the color-coded flowchart. "
"Present this design to the user and get their approval. "
"When the user confirms, call confirm_and_build() to proceed." + correction_warning
)
result: dict = {
"status": "draft_saved",
"agent_name": draft["agent_name"],
"node_count": len(validated_nodes),
"edge_count": len(validated_edges),
"node_types": {n["id"]: n["flowchart_type"] for n in validated_nodes},
**dissolution_info,
"message": msg,
}
if topology_corrections:
result["topology_corrections"] = topology_corrections
return json.dumps(result)
_draft_tool = Tool(
name="save_agent_draft",
description=(
"Save a declarative draft of the agent graph as a color-coded flowchart. "
"Usable in PLANNING (creates draft for user review) and BUILDING "
"(updates the flowchart in place — planning-only nodes are dissolved "
"automatically without re-confirmation). "
"Each node is auto-classified into a classical flowchart type "
"(start, terminal, process, decision, io, subprocess, browser, manual) "
"with unique colors. No code is generated. "
"Planning-only types (decision, browser/GCU) are dissolved at confirm/build time: "
"decision nodes merge into predecessor's success_criteria with yes/no edges; "
"browser/GCU nodes merge into predecessor's sub_agents list as leaf delegates."
),
parameters={
"type": "object",
"properties": {
"agent_name": {
"type": "string",
"description": "Snake_case name for the agent (e.g. 'research_agent')",
},
"goal": {
"type": "string",
"description": "High-level goal description for the agent",
},
"description": {
"type": "string",
"description": "Brief description of what the agent does",
},
"nodes": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string", "description": "Kebab-case node identifier"},
"name": {"type": "string", "description": "Human-readable name"},
"description": {
"type": "string",
"description": "What this node does (business logic)",
},
"node_type": {
"type": "string",
"enum": ["event_loop", "gcu"],
"description": "Node type (default: event_loop)",
},
"flowchart_type": {
"type": "string",
"enum": [
"start",
"terminal",
"process",
"decision",
"io",
"document",
"database",
"subprocess",
"browser",
],
"description": (
"Flowchart symbol type. Auto-detected if omitted. "
"start (sage green stadium), terminal (dusty red stadium), "
"process (blue-gray rect), decision (amber diamond), "
"io (purple parallelogram), document (steel blue wavy rect), "
"database (teal cylinder), subprocess (cyan subroutine), "
"browser (deep blue hexagon — for GCU/browser "
"sub-agents; must be a leaf node)"
),
},
"tools": {
"type": "array",
"items": {"type": "string"},
"description": "Planned tools (hints, not validated yet)",
},
"input_keys": {
"type": "array",
"items": {"type": "string"},
"description": "Expected input buffer keys (hints)",
},
"output_keys": {
"type": "array",
"items": {"type": "string"},
"description": "Expected output buffer keys (hints)",
},
"success_criteria": {
"type": "string",
"description": "What success looks like for this node",
},
"sub_agents": {
"type": "array",
"items": {"type": "string"},
"description": (
"IDs of GCU/browser sub-agent nodes managed by this node. "
"At build time, sub-agent nodes are dissolved into this list. "
"Set this on the PARENT node — e.g. the orchestrator that "
"delegates to GCU leaves. Visual delegation edges are "
"synthesized automatically."
),
},
"decision_clause": {
"type": "string",
"description": (
"For decision nodes only: the yes/no question to "
"evaluate (e.g. 'Is amount > $100?'). Used during "
"dissolution to set the predecessor's success_criteria."
),
},
},
"required": ["id"],
},
"description": "List of nodes with at minimum an id",
},
"edges": {
"type": "array",
"items": {
"type": "object",
"properties": {
"source": {"type": "string"},
"target": {"type": "string"},
"condition": {
"type": "string",
"enum": [
"always",
"on_success",
"on_failure",
"conditional",
"llm_decide",
],
},
"description": {"type": "string"},
"label": {
"type": "string",
"description": ("Short edge label shown on the flowchart (e.g. 'Yes', 'No', 'Retry')"),
},
},
"required": ["source", "target"],
},
"description": "Connections between nodes",
},
"terminal_nodes": {
"type": "array",
"items": {"type": "string"},
"description": ("Node IDs that are terminal (end) nodes. Auto-detected from edges if omitted."),
},
"success_criteria": {
"type": "array",
"items": {"type": "string"},
"description": "Agent-level success criteria",
},
"constraints": {
"type": "array",
"items": {"type": "string"},
"description": "Agent-level constraints",
},
},
"required": ["agent_name", "goal", "nodes"],
},
)
registry.register(
"save_agent_draft",
_draft_tool,
lambda inputs: save_agent_draft(**inputs),
)
tools_registered += 1
# --- confirm_and_build (Planning → Building gate) -------------------------
# Explicit user confirmation is required before transitioning from planning
# to building. This tool records that confirmation and proceeds.
async def confirm_and_build(*, agent_name: str | None = None) -> str:
"""Confirm the draft, create agent directory, and transition to building.
This tool should ONLY be called after the user has explicitly approved
the draft graph design via ask_user. It creates the agent directory and
transitions to BUILDING phase. The queen then writes agent.json directly.
"""
if phase_state is None:
return json.dumps({"error": "Phase state not available."})
if phase_state.phase != "planning":
return json.dumps({"error": f"Cannot confirm_and_build: currently in {phase_state.phase} phase."})
if phase_state.draft_graph is None:
return json.dumps(
{
"error": (
"No draft graph saved. Call save_agent_draft() first to create "
"a draft, present it to the user, and get their approval."
)
}
)
phase_state.build_confirmed = True
# Preserve original draft for flowchart display during runtime,
# then dissolve planning-only nodes (decision + browser/GCU) into
# runtime-compatible structures.
import copy as _copy
original_nodes = phase_state.draft_graph.get("nodes", [])
# Compute dissolution first, then assign all three atomically so that
# a failure in _dissolve_planning_nodes doesn't leave partial state.
original_copy = _copy.deepcopy(phase_state.draft_graph)
converted, fmap = _dissolve_planning_nodes(phase_state.draft_graph)
phase_state.original_draft_graph = original_copy
phase_state.draft_graph = converted
phase_state.flowchart_map = fmap
# Create agent folder early so flowchart and agent_path are available
# throughout the entire BUILDING phase.
_agent_name = agent_name or phase_state.draft_graph.get("agent_name", "").strip()
if _agent_name:
from framework.config import COLONIES_DIR
_agent_folder = COLONIES_DIR / _agent_name
_agent_folder.mkdir(parents=True, exist_ok=True)
save_flowchart_file(_agent_folder, original_copy, fmap)
phase_state.agent_path = str(_agent_folder)
_update_meta_json(
session_manager,
manager_session_id,
{
"agent_path": str(_agent_folder),
"agent_name": _agent_name.replace("_", " ").title(),
},
)
dissolved_count = len(original_nodes) - len(converted.get("nodes", []))
decision_count = sum(1 for n in original_nodes if n.get("flowchart_type") == "decision")
subagent_count = sum(
1 for n in original_nodes if n.get("flowchart_type") == "browser" or n.get("node_type") == "gcu"
)
dissolution_parts = []
if decision_count:
dissolution_parts.append(f"{decision_count} decision node(s) dissolved into predecessor criteria")
if subagent_count:
dissolution_parts.append(f"{subagent_count} sub-agent node(s) dissolved into predecessor sub_agents")
# Transition to BUILDING phase
await phase_state.switch_to_building(source="tool")
_update_meta_json(session_manager, manager_session_id, {"phase": "building"})
phase_state.build_confirmed = False
# No injection here -- the return message tells the queen what to do.
# Injecting would queue a BUILDING message that drains AFTER the queen
# may have already moved to STAGING via load_built_agent.
return json.dumps(
{
"status": "confirmed",
"phase": "building",
"agent_name": _agent_name,
"agent_path": str(_agent_folder),
"planning_nodes_dissolved": dissolved_count,
"flowchart_map": fmap,
"message": (
"Design confirmed and directory created. "
+ ("; ".join(dissolution_parts) + ". " if dissolution_parts else "")
+ f"Now write the complete agent config to {_agent_folder}/agent.json "
"using write_file(). Include all system prompts, tools, edges, and goal."
),
}
)
_confirm_tool = Tool(
name="confirm_and_build",
description=(
"Confirm the draft graph design, create agent directory, and transition to building phase. "
"ONLY call this after the user has explicitly approved the design via ask_user. "
"After confirmation, write the complete agent.json using write_file()."
),
parameters={
"type": "object",
"properties": {
"agent_name": {
"type": "string",
"description": "Snake_case name for the agent (e.g. 'linkedin_outreach'). "
"If omitted, uses the name from save_agent_draft().",
},
},
},
)
registry.register(
"confirm_and_build",
_confirm_tool,
lambda inputs: confirm_and_build(
agent_name=inputs.get("agent_name"),
),
)
tools_registered += 1
# --- stop_worker (Running → Staging) -------------------------------------- # --- stop_worker (Running → Staging) --------------------------------------