diff --git a/core/framework/agents/queen/incubating_evaluator.py b/core/framework/agents/queen/incubating_evaluator.py new file mode 100644 index 00000000..071ee079 --- /dev/null +++ b/core/framework/agents/queen/incubating_evaluator.py @@ -0,0 +1,247 @@ +"""One-shot LLM gate that decides if a queen DM is ready to fork a colony. + +The queen's ``start_incubating_colony`` tool calls :func:`evaluate` with +the queen's recent conversation, a proposed ``colony_name``, and a +one-paragraph ``intended_purpose``. The evaluator returns a structured +verdict: + + { + "ready": bool, + "reasons": [str], + "missing_prerequisites": [str], + } + +On ``ready=False`` the queen receives the verdict as her tool result and +self-corrects (asks the user, refines scope, drops the idea). On +``ready=True`` the tool flips the queen's phase to ``incubating``. + +Failure mode is **fail-closed**: any LLM error or unparseable response +returns ``ready=False`` with reason ``"evaluation_failed"`` so the queen +cannot accidentally proceed past a broken gate. +""" + +from __future__ import annotations + +import json +import logging +import re +from typing import Any + +from framework.agent_loop.conversation import Message + +logger = logging.getLogger(__name__) + + +_INCUBATING_EVALUATOR_SYSTEM_PROMPT = """\ +You gate whether a queen agent should commit to forking a persistent +"colony" (a headless worker spec written to disk). Forking is +expensive: it ends the user's chat with this queen and the worker runs +unattended afterward, so the spec must be settled before you approve. + +Read the conversation excerpt and the queen's proposed colony_name + +intended_purpose, then decide. + +APPROVE (ready=true) only when ALL of the following hold: + 1. The user has explicitly asked for work that needs to outlive this + chat — recurring (cron / interval), monitoring + alert, scheduled + batch, or "fire-and-forget background job". A one-shot question + that the queen can answer in chat does NOT qualify. + 2. The scope of the work is concrete enough to write down — what + inputs, what outputs, what success looks like. Vague ("help me + with my workflow") does NOT qualify. + 3. The technical approach is at least sketched — what data sources, + APIs, or tools the worker will use. The queen does not have to + have written the SKILL.md yet, but she must have the operational + ingredients available. + 4. There are no open clarifying questions on the table that the user + hasn't answered. If the queen recently asked the user something + and is still waiting, do NOT approve. + +REJECT (ready=false) on any of: + - Conversation is too short / too generic to support a settled spec. + - User is still describing what they want. + - User has expressed doubts, change-of-direction, or "let me think". + - Work is one-shot and could be done in chat instead. + - Open question awaiting user reply. + +Reply with a JSON object exactly matching this shape: + + { + "ready": true | false, + "reasons": ["short phrase", ...], // at least one entry + "missing_prerequisites": ["short phrase", ...] // empty when ready + } + +``reasons`` explains the verdict in 1-3 short phrases. +``missing_prerequisites`` lists what's missing in queen-actionable +form ("user hasn't confirmed schedule", "no API auth flow discussed"). +Empty list when ``ready=true``. + +Output JSON only. Do not wrap in markdown. Do not add prose. +""" + + +# Bound the formatted excerpt so the eval call stays cheap and fits well +# under the LLM's context window even for long DM sessions. +_MAX_MESSAGES = 30 +_MAX_TOOL_CONTENT_CHARS = 400 +_MAX_USER_CONTENT_CHARS = 2_000 +_MAX_ASSISTANT_CONTENT_CHARS = 2_000 + + +def format_conversation_excerpt(messages: list[Message]) -> str: + """Format the tail of a queen conversation for the evaluator prompt. + + Keeps the most recent ``_MAX_MESSAGES`` messages. Tool results are + truncated hard since they're rarely load-bearing for the readiness + decision; user/assistant text is truncated more generously to + preserve the actual conversation signal. + """ + if not messages: + return "(no messages)" + + tail = messages[-_MAX_MESSAGES:] + parts: list[str] = [] + for msg in tail: + role = msg.role.upper() + content = (msg.content or "").strip() + if msg.role == "tool": + if len(content) > _MAX_TOOL_CONTENT_CHARS: + content = content[:_MAX_TOOL_CONTENT_CHARS] + "..." + elif msg.role == "assistant": + # Surface tool-call intent for empty assistant turns so the + # evaluator sees what the queen has been doing. + if not content and msg.tool_calls: + names = [ + tc.get("function", {}).get("name", "?") + for tc in msg.tool_calls + ] + content = f"(called: {', '.join(names)})" + if len(content) > _MAX_ASSISTANT_CONTENT_CHARS: + content = content[:_MAX_ASSISTANT_CONTENT_CHARS] + "..." + else: # user + if len(content) > _MAX_USER_CONTENT_CHARS: + content = content[:_MAX_USER_CONTENT_CHARS] + "..." + if content: + parts.append(f"[{role}]: {content}") + + return "\n\n".join(parts) if parts else "(no messages)" + + +def _build_user_message( + conversation_excerpt: str, + colony_name: str, + intended_purpose: str, +) -> str: + return ( + f"## Proposed colony name\n{colony_name}\n\n" + f"## Queen's intended_purpose\n{intended_purpose.strip()}\n\n" + f"## Recent conversation (oldest → newest)\n{conversation_excerpt}\n\n" + "Decide: should this queen be approved to enter INCUBATING phase?" + ) + + +def _parse_verdict(raw: str) -> dict[str, Any] | None: + """Parse the evaluator's JSON. Returns None if parsing fails.""" + if not raw: + return None + raw = raw.strip() + try: + return json.loads(raw) + except json.JSONDecodeError: + # Some models wrap JSON in markdown fences or add preamble. + # Pull the first { ... } block out as a best-effort fallback — + # mirrors the same recovery pattern used in recall_selector.py. + match = re.search(r"\{.*\}", raw, re.DOTALL) + if match: + try: + return json.loads(match.group()) + except json.JSONDecodeError: + return None + return None + + +def _normalize_verdict(parsed: dict[str, Any]) -> dict[str, Any]: + """Coerce a parsed verdict into the shape the tool returns to the queen.""" + ready = bool(parsed.get("ready")) + reasons = parsed.get("reasons") or [] + if isinstance(reasons, str): + reasons = [reasons] + reasons = [str(r).strip() for r in reasons if str(r).strip()] + missing = parsed.get("missing_prerequisites") or [] + if isinstance(missing, str): + missing = [missing] + missing = [str(m).strip() for m in missing if str(m).strip()] + + if ready: + # When approved we don't surface missing prerequisites — the + # incubating role prompt opens that floor itself. + missing = [] + elif not reasons: + # Always give the queen at least one reason to reflect on. + reasons = ["evaluator returned no reasons"] + + return { + "ready": ready, + "reasons": reasons, + "missing_prerequisites": missing, + } + + +async def evaluate( + llm: Any, + messages: list[Message], + colony_name: str, + intended_purpose: str, +) -> dict[str, Any]: + """Run the incubating evaluator against the queen's conversation. + + Args: + llm: An LLM provider exposing ``acomplete(messages, system, ...)``. + Pass the queen's own ``ctx.llm`` so the eval uses the same + model the user is talking to. + messages: The queen's conversation messages, oldest first. The + evaluator slices its own tail; pass the full list. + colony_name: Validated colony slug. + intended_purpose: Queen's one-paragraph brief. + + Returns: + ``{"ready": bool, "reasons": [str], "missing_prerequisites": [str]}``. + Fail-closed on any error. + """ + excerpt = format_conversation_excerpt(messages) + user_msg = _build_user_message(excerpt, colony_name, intended_purpose) + + try: + response = await llm.acomplete( + messages=[{"role": "user", "content": user_msg}], + system=_INCUBATING_EVALUATOR_SYSTEM_PROMPT, + max_tokens=1024, + response_format={"type": "json_object"}, + ) + except Exception as exc: # noqa: BLE001 - fail-closed on any LLM failure + logger.warning("incubating_evaluator: LLM call failed (%s)", exc) + return { + "ready": False, + "reasons": ["evaluation_failed"], + "missing_prerequisites": [ + "evaluator LLM call failed; retry once the queen can reach the model again" + ], + } + + raw = (getattr(response, "content", "") or "").strip() + parsed = _parse_verdict(raw) + if parsed is None: + logger.warning( + "incubating_evaluator: could not parse JSON verdict (raw=%.200s)", + raw, + ) + return { + "ready": False, + "reasons": ["evaluation_failed"], + "missing_prerequisites": [ + "evaluator returned malformed JSON; retry" + ], + } + + return _normalize_verdict(parsed) diff --git a/core/framework/agents/queen/nodes/__init__.py b/core/framework/agents/queen/nodes/__init__.py index 2fa1fc46..0f4ed31b 100644 --- a/core/framework/agents/queen/nodes/__init__.py +++ b/core/framework/agents/queen/nodes/__init__.py @@ -47,9 +47,30 @@ _QUEEN_INDEPENDENT_TOOLS = [ "undo_changes", # NOTE (2026-04-16): ``run_parallel_workers`` is not in the DM phase. # Pure DM is for conversation with the user; fan out parallel work via - # ``create_colony`` (forks into a persistent colony with its own page - # and phase machine). + # ``start_incubating_colony`` (which gates the colony fork behind a + # readiness eval before exposing create_colony in INCUBATING phase). + "start_incubating_colony", +] + +# Incubating phase: queen has been approved by the incubating_evaluator to +# fork into a colony. Tool surface is intentionally small — the queen's job +# in this phase is to nail the operational spec (concurrency, schedule, +# result tracking, credentials) and write a tight task + SKILL.md, not to +# keep doing work. Read-only file tools are kept so she can confirm details +# (e.g. inspect an existing skill) before committing. +_QUEEN_INCUBATING_TOOLS = [ + "read_file", + "list_directory", + "search_files", + "run_command", + # Trigger management: phase-branched in queen_lifecycle_tools.py to + # buffer drafts onto phase_state.pending_triggers, then drained into + # the colony's triggers.json by create_colony post-fork. + "set_trigger", + "remove_trigger", + "list_triggers", "create_colony", + "cancel_incubation", ] # Working phase: colony workers are running. Queen monitors, replies @@ -129,15 +150,70 @@ If the user opens with a greeting or chat, reply in plain prose in \ character first — check recall memory for name and past topics and weave \ them in. If you need a structured choice or approval gate, always use \ ask_user or ask_user_multiple; otherwise ask in plain prose. \ +\ +When the user clearly wants persistent / recurring / headless work that \ +needs to outlive THIS chat (e.g. "every morning", "monitor X and alert \ +me", "set up a job that…"), call ``start_incubating_colony`` with a \ +proposed colony_name and a one-paragraph intended_purpose. A side \ +evaluator reads the conversation and decides if the spec is settled. If \ +it returns ``not_ready`` you keep talking with the user — sort out \ +whatever the evaluator said is missing, then retry. If it returns \ +``incubating`` your phase flips and a new prompt takes over. Do not \ +try to write SKILL.md, fork directories, or otherwise build the colony \ +yourself in this phase.\ +""" + +_queen_role_incubating = """\ +You are in INCUBATING mode. The incubating evaluator has approved you to \ +fork colony ``{colony_name}`` and you are now drafting the spec. Your \ +ONLY job in this phase: produce a self-contained ``task`` description \ +and ``SKILL.md`` body that lets a fresh worker, who has zero memory of \ +this chat, do the work unattended. Do not start doing the work yourself \ +— the coding toolkit is gone on purpose so you can focus. + +Before you call ``create_colony``, sort out the operational details that \ +conversation tends to skip. The "Approved → operational checklist" block \ +in your tools doc lists the kinds of things to think about (concurrency, \ +schedule, result-tracking, failure handling, credentials). Treat that \ +list as prompts for YOUR judgement — only ask the user about the items \ +that actually matter for THIS colony and that the conversation hasn't \ +already settled. Use ``ask_user`` / ``ask_user_multiple`` for the gaps; \ +plain prose for everything else. + +If you realise mid-incubation that the spec isn't ready (user changed \ +their mind, you're missing more than a couple of details, the work \ +turned out to be one-shot after all), call ``cancel_incubation`` — \ +no harm, you go back to INDEPENDENT and can retry later. """ _queen_role_working = """\ -You are in WORKING mode. Your colony has workers executing right now. \ -Your job: monitor progress, answer worker escalations through \ -reply_to_worker, and fan out more tasks with run_parallel_workers if \ -the user asks. Keep the user informed when they ask; do NOT poll the \ -workers just to have something to say. If the user greets you \ -mid-run, reply in prose and wait for their next message. +You are in WORKING mode. The colony's spec was settled during \ +INCUBATING; workers are executing that spec now. Your role here is \ +operational presence, not direction — think on-call engineer for a \ +running deployment, not architect of a new one. + +What you DO in this phase: +- Be available for worker escalations (reply_to_worker on items in \ + list_worker_questions). +- Surface progress when the user asks for it (get_worker_status), or \ + when something concrete is worth flagging (a notable failure, a \ + worker stuck on a question that needs them). +- Intervene when a worker is clearly off course (inject_message) or \ + needs to stop (stop_worker). +- Make SPEC-COMPATIBLE adjustments when the user asks — fan out MORE \ + of the same work (run_parallel_workers), or tweak the schedule \ + (set_trigger / remove_trigger / list_triggers). These are tweaks to \ + the spec the user already approved, not redesigns. + +What you DO NOT do in this phase: +- Redesign the colony. If the user asks for something fundamentally \ + new (different scope, different skill, different problem), say so \ + plainly: "this colony is for X — for that we'd need a fresh chat \ + with me, where I can incubate a new colony." A new colony is born \ + in INDEPENDENT via start_incubating_colony, and you cannot reach \ + that from inside a colony. +- Drive the conversation. Do not poll workers just to have something \ + to say. If the user greets you mid-run, reply in prose and wait. """ _queen_role_reviewing = """\ @@ -167,42 +243,140 @@ search_files, run_command, undo_changes browser_tabs, browser_close, browser_evaluate, etc.). - MUST Follow the browser-automation skill protocol before using browser tools. -## Persistent colony +## Hand off to a colony +- start_incubating_colony(colony_name, intended_purpose) — Use this when \ + the user wants persistent / recurring / headless work that needs to \ + outlive THIS chat. It does NOT fork on its own; it spawns a one-shot \ + evaluator that reads this conversation and decides whether the spec \ + is settled enough to proceed. On approval your phase flips to \ + INCUBATING and a new tool surface (including create_colony itself) \ + unlocks. On rejection you stay here and keep the conversation going \ + to fill the gaps the evaluator named. +- ``intended_purpose`` is a one-paragraph brief: what the colony will \ + do, on what cadence, why it must outlive this chat. Don't write a \ + SKILL.md here — that comes in INCUBATING. +""" + +_queen_tools_incubating = """ +# Tools (INCUBATING mode) + +You've been approved to fork. The full coding toolkit is gone on \ +purpose — your job in this phase is to nail the spec, not keep doing \ +work. Available: + +## Read-only inspection (coder-tools MCP) +- read_file, list_directory, search_files, run_command — for confirming \ +details before you commit (e.g. peek at an existing skill in \ +~/.hive/skills/, sanity-check an API URL). + +## Schedule the colony (drafted now, written on commit) +- set_trigger(trigger_id, trigger_type, trigger_config, task) — Stage a \ + schedule for the colony. Validated immediately (cron syntax / interval \ + shape) but NOT activated yet — buffered onto the incubation context \ + and written to ``triggers.json`` by create_colony so it auto-starts on \ + first colony load. Repeated calls with the same trigger_id replace \ + the prior draft. +- list_triggers() — Inspect the buffered drafts. +- remove_trigger(trigger_id) — Drop a draft from the buffer. +- ``trigger_type`` is "timer" (with ``trigger_config={"cron": "..."}`` \ + or ``{"interval_minutes": N}``) or "webhook" (with \ + ``trigger_config={"path": "/hooks/..."}``). +- ``task`` describes what the worker should do when the trigger fires; \ + this is REQUIRED — a trigger with no task can't fire usefully. It's \ + separate from create_colony's ``task`` argument because the trigger \ + task is one-shot per fire, while create_colony's ``task`` is the \ + worker's overall purpose. + +## Approved → operational checklist (use your judgement, ask only what's missing) +The conversation that got you here probably did NOT cover all of: +- Concurrency: how many tasks should run in parallel? Single-fire? +- Schedule: cron expression, interval (every N minutes), webhook, \ + manual-only? +- Result tracking: what should the worker write into ``progress.db`` so \ + the user can review later? Per-task status, summary, raw payload? +- Failure handling: retry, alert, mark-failed-and-continue? +- Credentials and MCP servers: what does the worker need that you \ + haven't discussed (API keys, OAuth, browser profile)? +- Skills the worker needs beyond the one you'll write inline. + +These are PROMPTS for your judgement, not a required checklist. Cover \ +the items that actually matter for THIS colony, and only the ones the \ +user hasn't already implied. Use ``ask_user`` / ``ask_user_multiple`` \ +for gaps that need a real answer; skip the rest. + +## Commit - create_colony(colony_name, task, skill_name, skill_description, \ - skill_body, skill_files?, tasks?) — Fork this session into a \ - persistent colony for headless / recurring / background work. The colony \ - has its own chat surface and runs `run_parallel_workers` from there. -- **Atomic call — pass the skill INLINE.** Do NOT write SKILL.md with \ - `write_file` beforehand. Provide `skill_name`, `skill_description`, \ - and `skill_body` as arguments and the tool will materialize \ - `~/.hive/skills/{skill_name}/` for you, then fork. Use optional \ - `skill_files` (array of `{path, content}`) for supporting scripts \ - or references. Reusing an existing `skill_name` simply replaces that \ - skill with your latest content. -- The `task` must be FULL and self-contained because the future worker \ - run cannot rely on this live chat turn for missing context. -- The `skill_body` must be FULL and self-contained too — capture the \ - operational protocol (endpoints, auth, gotchas, pre-baked queries) so \ - the worker doesn't have to rediscover what you already know. -- Nothing runs immediately after the call. The user launches the \ - worker later from the new colony page. + skill_body, skill_files?, tasks?, concurrency_hint?) — Fork this \ + session into the colony. **Atomic call — pass the skill INLINE.** Do \ + NOT write SKILL.md with write_file beforehand; this tool materialises \ + the folder for you and then forks. Reusing an existing skill_name \ + within the colony replaces that skill with your latest content. +- The ``task`` must be FULL and self-contained — the worker has zero \ + memory of THIS chat at run time. +- The ``skill_body`` must be FULL and self-contained — capture the \ + operational protocol (endpoints, auth, gotchas, pre-baked queries) \ + so the worker doesn't have to rediscover what you already know. +- ``concurrency_hint`` (optional integer ≥ 1) — advisory cap on how \ + many worker processes typically run in parallel for this colony \ + (e.g. 1 for "send digest", 5 for a fan-out). Baked into worker.json \ + for the future colony queen to consult; not enforced. +- Any triggers you staged with ``set_trigger`` during this incubation \ + are written to ``triggers.json`` as part of the commit and \ + auto-start on first colony load. +- After this returns, the chat is over: the session locks immediately \ + and the user gets a "compact and start a new session with you" \ + button. So make your call to create_colony the last thing you do — \ + one closing message to the user is fine, but expect the next user \ + input to land in a fresh forked session, not this one. + +## Bail +- cancel_incubation() — Call when the spec isn't ready after all (user \ + changed their mind, you discovered the work is actually one-shot, \ + more than a couple of details still need to be worked out). Returns \ + you to INDEPENDENT with the full toolkit; no fork happens. """ _queen_tools_working = """ # Tools (WORKING mode) -Workers are running in your colony. You have: -- Read-only: read_file, list_directory, search_files, run_command -- get_worker_status(focus?) — Poll latest progress / issues -- inject_message(content) — Send guidance to a running worker -- list_worker_questions() / reply_to_worker(request_id, reply) — Answer escalations -- stop_worker() — Stop a worker early -- run_parallel_workers(tasks, timeout?) — Fan out MORE parallel tasks on \ -top of what's already running (each task string must be fully self-contained) -- set_trigger / remove_trigger / list_triggers — Timer management +The colony's spec was committed during INCUBATING. Your tools here are \ +operational, not editorial. -When every worker has reported (success or failure), the phase auto-moves \ -to REVIEWING. You do not need to call a transition tool yourself. +## Stay informed (only when asked, or when something matters) +- get_worker_status(focus?) — Pull progress / issues for the user. +- list_worker_questions() — Check the escalation inbox. + +## Respond +- reply_to_worker(request_id, reply) — Answer a worker escalation. +- inject_message(content) — Course-correct a running worker (e.g. it's \ + heading the wrong way and the user wants it redirected). + +## Intervene +- stop_worker() — Kill switch for a runaway or no-longer-needed worker. + +## Spec-compatible adjustments +- run_parallel_workers(tasks, timeout?) — Fan out MORE of the same \ + work. Use when the user wants additional units of an already-defined \ + job, NOT for new scope. Each task string must be fully self-contained. +- set_trigger / remove_trigger / list_triggers — Tweak the schedule \ + the user already approved during incubation. Adding a follow-up \ + trigger for the same colony is fine; redesigning the colony's \ + purpose is not. + +## Read-only inspection +- read_file, list_directory, search_files, run_command + +When every worker has reported (success or failure), the phase \ +auto-moves to REVIEWING. You do not need to call a transition tool \ +yourself. + +## What does NOT belong here +A request like "actually let's also do X" with X being a new scope, \ +new skill, or different problem is a NEW COLONY, not an extension of \ +this one. Tell the user plainly: "this colony is for the work we \ +already started — for that we'd need a fresh chat with me, where I \ +can incubate a new colony." You cannot create a new colony from \ +inside a colony. """ _queen_tools_reviewing = """ @@ -291,9 +465,9 @@ queen_node = NodeSpec( id="queen", name="Queen", description=( - "User's primary interactive interface. Operates in DM (independent) " - "or colony mode (working / reviewing) depending on whether workers " - "have been spawned." + "User's primary interactive interface. Operates in DM (independent), " + "colony-spec drafting (incubating), or colony mode (working / " + "reviewing) depending on whether workers have been spawned." ), node_type="event_loop", max_node_visits=0, @@ -301,7 +475,14 @@ queen_node = NodeSpec( output_keys=[], # Queen should never have this nullable_output_keys=[], # Queen should never have this skip_judge=True, # Queen is a conversational agent; suppress tool-use pressure feedback - tools=sorted(set(_QUEEN_INDEPENDENT_TOOLS + _QUEEN_WORKING_TOOLS + _QUEEN_REVIEWING_TOOLS)), + tools=sorted( + set( + _QUEEN_INDEPENDENT_TOOLS + + _QUEEN_INCUBATING_TOOLS + + _QUEEN_WORKING_TOOLS + + _QUEEN_REVIEWING_TOOLS + ) + ), system_prompt=( _queen_character_core + _queen_role_independent @@ -312,20 +493,30 @@ queen_node = NodeSpec( ), ) -ALL_QUEEN_TOOLS = sorted(set(_QUEEN_INDEPENDENT_TOOLS + _QUEEN_WORKING_TOOLS + _QUEEN_REVIEWING_TOOLS)) +ALL_QUEEN_TOOLS = sorted( + set( + _QUEEN_INDEPENDENT_TOOLS + + _QUEEN_INCUBATING_TOOLS + + _QUEEN_WORKING_TOOLS + + _QUEEN_REVIEWING_TOOLS + ) +) __all__ = [ "queen_node", "ALL_QUEEN_TOOLS", "_QUEEN_INDEPENDENT_TOOLS", + "_QUEEN_INCUBATING_TOOLS", "_QUEEN_WORKING_TOOLS", "_QUEEN_REVIEWING_TOOLS", # Character + phase-specific prompt segments (used by queen_orchestrator for dynamic prompts) "_queen_character_core", "_queen_role_independent", + "_queen_role_incubating", "_queen_role_working", "_queen_role_reviewing", "_queen_tools_independent", + "_queen_tools_incubating", "_queen_tools_working", "_queen_tools_reviewing", "_queen_behavior_always", diff --git a/core/framework/server/queen_orchestrator.py b/core/framework/server/queen_orchestrator.py index 68606e46..5936251f 100644 --- a/core/framework/server/queen_orchestrator.py +++ b/core/framework/server/queen_orchestrator.py @@ -266,16 +266,19 @@ async def create_queen( queen_loop_config as _base_loop_config, ) from framework.agents.queen.nodes import ( + _QUEEN_INCUBATING_TOOLS, _QUEEN_INDEPENDENT_TOOLS, _QUEEN_REVIEWING_TOOLS, _QUEEN_WORKING_TOOLS, _queen_behavior_always, _queen_behavior_independent, _queen_character_core, + _queen_role_incubating, _queen_role_independent, _queen_role_reviewing, _queen_role_working, _queen_style, + _queen_tools_incubating, _queen_tools_independent, _queen_tools_reviewing, _queen_tools_working, @@ -378,6 +381,7 @@ async def create_queen( # ---- Partition tools by phase ------------------------------------ independent_names = set(_QUEEN_INDEPENDENT_TOOLS) + incubating_names = set(_QUEEN_INCUBATING_TOOLS) working_names = set(_QUEEN_WORKING_TOOLS) reviewing_names = set(_QUEEN_REVIEWING_TOOLS) @@ -386,16 +390,24 @@ async def create_queen( phase_state.working_tools = [t for t in queen_tools if t.name in working_names] phase_state.reviewing_tools = [t for t in queen_tools if t.name in reviewing_names] + # Incubating tool surface is intentionally minimal (read-only inspection + # + create_colony + cancel_incubation) — no MCP tools spliced in, so the + # queen stays focused on drafting the spec. + phase_state.incubating_tools = [t for t in queen_tools if t.name in incubating_names] # Independent phase gets core tools + all MCP tools not claimed by any # other phase (coder-tools file I/O, gcu-tools browser, etc.). - all_phase_names = independent_names | working_names | reviewing_names + all_phase_names = independent_names | incubating_names | working_names | reviewing_names mcp_tools = [t for t in queen_tools if t.name not in all_phase_names] phase_state.independent_tools = [t for t in queen_tools if t.name in independent_names] + mcp_tools logger.info( "Queen: independent tools: %s", sorted(t.name for t in phase_state.independent_tools), ) + logger.info( + "Queen: incubating tools: %s", + sorted(t.name for t in phase_state.incubating_tools), + ) # ---- Global + queen-scoped memory ---------------------------------- global_dir, queen_mem_dir = initialize_memory_scopes(session, phase_state) @@ -428,6 +440,16 @@ async def create_queen( ), _has_vision, ) + phase_state.prompt_incubating = finalize_queen_prompt( + ( + _queen_character_core + + _queen_role_incubating + + _queen_style + + _queen_tools_incubating + + _queen_behavior_always + ), + _has_vision, + ) phase_state.prompt_working = finalize_queen_prompt( (_queen_character_core + _queen_role_working + _queen_style + _queen_tools_working + _queen_behavior_always), _has_vision, diff --git a/core/framework/server/routes_execution.py b/core/framework/server/routes_execution.py index 9fc492b7..f14ae7a4 100644 --- a/core/framework/server/routes_execution.py +++ b/core/framework/server/routes_execution.py @@ -682,35 +682,32 @@ async def handle_cancel_queen(request: web.Request) -> web.Response: return web.json_response({"cancelled": True}) -async def handle_mark_colony_spawned(request: web.Request) -> web.Response: - """POST /api/sessions/{session_id}/mark-colony-spawned -- lock the queen DM. +def persist_colony_spawn_lock(session: Any, colony_name: str) -> None: + """Persist the colony-spawned lock on a queen session. - Called by the frontend the first time the user clicks the - COLONY_CREATED system message. Persists ``colony_spawned: true`` and - ``spawned_colony_name`` into the queen session's ``meta.json`` so the - lock survives server restart, and caches the same on the live Session - object so subsequent /chat calls in this process can be rejected - immediately without disk I/O. + Writes ``colony_spawned: true`` + ``spawned_colony_name`` + a timestamp + into the queen session's ``meta.json`` and mirrors the same fields onto + the live ``Session`` object so subsequent ``/chat`` calls in this + process are rejected immediately without disk I/O. - Body: ``{"colony_name": "..."}`` + Shared by the HTTP route ``handle_mark_colony_spawned`` (frontend + click on the colony-link card) and the in-process ``create_colony`` + tool path (when the queen forks while in ``incubating`` phase). + + Raises ``OSError`` if the meta.json write fails. Callers should catch + and respond/log appropriately. """ from datetime import datetime as _dt - session, err = resolve_session(request) - if err: - return err - - body = await request.json() if request.can_read_body else {} - colony_name = (body.get("colony_name") or "").strip() - if not colony_name: - return web.json_response({"error": "colony_name is required"}, status=400) - queen_dir = getattr(session, "queen_dir", None) if queen_dir is None: - return web.json_response( - {"error": "queen session directory is not set on this session"}, - status=503, - ) + # Tool-side callers may invoke before the queen dir is available. + # Still mirror onto the session so the in-process /chat guard + # works; the meta.json write is just deferred until the next + # session start writes the file (rare path). + session.colony_spawned = True + session.spawned_colony_name = colony_name + return meta_path = queen_dir / "meta.json" meta: dict = {} @@ -724,16 +721,39 @@ async def handle_mark_colony_spawned(request: web.Request) -> web.Response: meta["spawned_colony_name"] = colony_name meta["spawned_colony_at"] = _dt.now(UTC).isoformat() - try: - meta_path.parent.mkdir(parents=True, exist_ok=True) - meta_path.write_text(json.dumps(meta), encoding="utf-8") - except OSError as exc: - logger.exception("mark_colony_spawned: failed to persist meta.json") - return web.json_response({"error": f"failed to persist: {exc}"}, status=500) + meta_path.parent.mkdir(parents=True, exist_ok=True) + meta_path.write_text(json.dumps(meta), encoding="utf-8") session.colony_spawned = True session.spawned_colony_name = colony_name + +async def handle_mark_colony_spawned(request: web.Request) -> web.Response: + """POST /api/sessions/{session_id}/mark-colony-spawned -- lock the queen DM. + + Called by the frontend the first time the user clicks the + COLONY_CREATED system message. Thin wrapper around + :func:`persist_colony_spawn_lock` — the heavy lifting (meta.json + merge + Session cache) lives in the helper so the in-process + ``create_colony`` path can reuse it without re-issuing an HTTP call. + + Body: ``{"colony_name": "..."}`` + """ + session, err = resolve_session(request) + if err: + return err + + body = await request.json() if request.can_read_body else {} + colony_name = (body.get("colony_name") or "").strip() + if not colony_name: + return web.json_response({"error": "colony_name is required"}, status=400) + + try: + persist_colony_spawn_lock(session, colony_name) + except OSError as exc: + logger.exception("mark_colony_spawned: failed to persist meta.json") + return web.json_response({"error": f"failed to persist: {exc}"}, status=500) + return web.json_response( { "session_id": session.id, @@ -991,6 +1011,7 @@ async def fork_session_into_colony( colony_name: str, task: str, tasks: list[dict] | None = None, + concurrency_hint: int | None = None, ) -> dict: """Fork a queen session into a colony directory. @@ -1228,6 +1249,13 @@ async def fork_session_into_colony( "spawned_from": session.id, "spawned_at": datetime.now(UTC).isoformat(), } + # Concurrency advisory baked in at incubation time. Not enforced — the + # progress.db queue is atomic regardless — but the colony queen reads + # this when planning fan-outs (run_parallel_workers, trigger-fired + # batches) so behavior matches what the user agreed to during + # incubation. + if isinstance(concurrency_hint, int) and concurrency_hint > 0: + worker_meta["concurrency_hint"] = concurrency_hint worker_config_path.write_text(json.dumps(worker_meta, indent=2, ensure_ascii=False), encoding="utf-8") # ── 3. Duplicate queen session into colony ─────────────────── diff --git a/core/framework/tools/queen_lifecycle_tools.py b/core/framework/tools/queen_lifecycle_tools.py index ebf399e4..885efc97 100644 --- a/core/framework/tools/queen_lifecycle_tools.py +++ b/core/framework/tools/queen_lifecycle_tools.py @@ -61,6 +61,33 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# Open-the-floor message returned by ``start_incubating_colony`` on +# approval. The same kinds of prompts (concurrency, schedule, result +# tracking, failure handling, credentials) live ongoingly inside +# ``_queen_tools_incubating`` so the queen sees them every turn — this +# constant is the single-shot version that lands as the tool result. +# Phrasing intentionally invites the queen's judgement; do NOT turn this +# into a hard checklist. +_INCUBATING_APPROVAL_GUIDANCE = ( + "Approved to incubate colony '{colony_name}' for: {intended_purpose}\n\n" + "Your phase has flipped to INCUBATING. Before you call create_colony, " + "the worker will need operational details that are easy to lose in a " + "planning conversation. Take a moment to figure out what's still " + "ambiguous for THIS colony — for example: how many tasks should run " + "in parallel, what schedule fits (cron, interval, manual-only), what " + "should the worker write into progress.db so the user can review " + "results later, how to handle partial failures, what credentials or " + "MCP servers the worker needs that you haven't discussed. You don't " + "need to cover every example — only the items that actually matter " + "for this colony, and only the ones the user hasn't already implied. " + "Use ask_user / ask_user_multiple to fill the real gaps. " + "If, while sorting these out, you decide the spec isn't ready, call " + "cancel_incubation and we go back to INDEPENDENT." +) + + + + def _render_credentials_block(provider: Any) -> str: """Call a credentials_prompt_provider safely and return its output. @@ -94,8 +121,10 @@ class WorkerSessionAdapter: class QueenPhaseState: """Mutable state container for queen operating phase. - Three phases: independent, working, reviewing. + Four phases: independent, incubating, working, reviewing. INDEPENDENT: queen acts as a standalone agent with MCP tools, no colony workers. + INCUBATING: queen has been approved by the incubating_evaluator to fork + a colony — focused tool surface for drafting the spec. WORKING: colony workers are running autonomously. REVIEWING: workers have completed, queen reviews results. @@ -103,8 +132,9 @@ class QueenPhaseState: that trigger phase transitions. """ - phase: str = "independent" # "independent", "working", or "reviewing" + phase: str = "independent" # "independent", "incubating", "working", or "reviewing" independent_tools: list = field(default_factory=list) # list[Tool] + incubating_tools: list = field(default_factory=list) # list[Tool] working_tools: list = field(default_factory=list) # list[Tool] reviewing_tools: list = field(default_factory=list) # list[Tool] inject_notification: Any = None # async (str) -> None @@ -115,9 +145,27 @@ class QueenPhaseState: # Phase-specific prompts (set by queen_orchestrator after construction) prompt_independent: str = "" + prompt_incubating: str = "" prompt_working: str = "" prompt_reviewing: str = "" + # Last-set incubation context (colony_name + intended_purpose), populated + # by start_incubating_colony when the evaluator approves. Read by + # get_current_prompt() to interpolate the colony name into the + # incubating role prompt so the queen sees the same name across turns + # without having to remember it from the tool result. + incubating_colony_name: str | None = None + incubating_intended_purpose: str | None = None + # Trigger configs the queen drafted during incubation. set_trigger / + # remove_trigger / list_triggers buffer here when phase=="incubating" + # because the colony directory + worker runtime don't exist yet — + # there's nothing live to attach a timer/webhook to. ``create_colony`` + # drains this list into ``{colony_dir}/triggers.json`` after the fork + # so the triggers auto-start the next time the colony is loaded. + # Cleared on switch_to_independent so failed/cancelled incubations + # don't leak into a future incubation attempt. + pending_triggers: list = field(default_factory=list) # list[dict] + # Default skill operational protocols — appended to every phase prompt protocols_prompt: str = "" # Community skills catalog (XML) — appended after protocols @@ -172,6 +220,8 @@ class QueenPhaseState: return list(self.working_tools) if self.phase == "reviewing": return list(self.reviewing_tools) + if self.phase == "incubating": + return list(self.incubating_tools) # Default / "independent" — DM mode with full MCP tools. return list(self.independent_tools) @@ -181,6 +231,15 @@ class QueenPhaseState: base = self.prompt_working elif self.phase == "reviewing": base = self.prompt_reviewing + elif self.phase == "incubating": + # Interpolate the active incubation context so the queen sees the + # same colony_name on every turn, not just the first tool result. + base = self.prompt_incubating + if self.incubating_colony_name: + base = base.replace( + "{colony_name}", + self.incubating_colony_name, + ) else: base = self.prompt_independent @@ -250,6 +309,14 @@ class QueenPhaseState: if self.phase == "independent": return self.phase = "independent" + # Clear stale incubation context so a future incubation starts fresh. + # This includes the trigger buffer — a cancelled or successful + # incubation should not leak its drafted triggers into the next + # attempt (successful create_colony already drained them; cancel + # never committed them). + self.incubating_colony_name = None + self.incubating_intended_purpose = None + self.pending_triggers = [] tool_names = [t.name for t in self.independent_tools] logger.info("Queen phase → independent (source=%s, tools: %s)", source, tool_names) await self._emit_phase_event() @@ -260,6 +327,48 @@ class QueenPhaseState: "Available tools: " + ", ".join(tool_names) + "." ) + async def switch_to_incubating( + self, + *, + colony_name: str, + intended_purpose: str, + source: str = "tool", + ) -> None: + """Switch to incubating phase — queen drafts the colony spec. + + Caller must already have validated colony_name. Stores the active + incubation context on self so get_current_prompt() can interpolate + it on every turn (the queen otherwise loses the colony_name after + the first tool result rolls past in the conversation history). + + Args: + colony_name: Validated colony slug (lowercase alphanumeric + _). + intended_purpose: One-paragraph brief from the queen. + source: "tool", "frontend", or "auto". + """ + if self.phase == "incubating": + # Allow re-statement of context even when already incubating — + # the queen may have refined her intended_purpose mid-flight. + self.incubating_colony_name = colony_name + self.incubating_intended_purpose = intended_purpose + return + self.phase = "incubating" + self.incubating_colony_name = colony_name + self.incubating_intended_purpose = intended_purpose + tool_names = [t.name for t in self.incubating_tools] + logger.info( + "Queen phase → incubating (source=%s, colony=%s, tools: %s)", + source, + colony_name, + tool_names, + ) + await self._emit_phase_event() + if self.inject_notification and source != "tool": + await self.inject_notification( + "[PHASE CHANGE] Switched to INCUBATING phase for colony " + f"'{colony_name}'. Available tools: " + ", ".join(tool_names) + "." + ) + def build_worker_profile(runtime: Any, agent_path: Path | str | None = None) -> str: """Build a worker capability profile from the runtime's spec and goal.""" @@ -1334,6 +1443,7 @@ def register_queen_lifecycle_tools( skill_body: str, skill_files: list[dict] | None = None, tasks: list[dict] | None = None, + concurrency_hint: int | None = None, ) -> str: """Create a colony and materialize its skill folder in one atomic call. @@ -1427,6 +1537,7 @@ def register_queen_lifecycle_tools( colony_name=cn, task=(task or "").strip(), tasks=tasks if isinstance(tasks, list) else None, + concurrency_hint=concurrency_hint if isinstance(concurrency_hint, int) and concurrency_hint > 0 else None, ) except Exception as e: logger.exception("create_colony: fork failed after installing skill") @@ -1471,6 +1582,61 @@ def register_queen_lifecycle_tools( exc_info=True, ) + # When the queen forked from INCUBATING phase, the chat is over by + # design: the colony spec is committed and there's nothing left to + # discuss in this DM. Auto-lock the session immediately (same + # mechanism the user-click path uses) and switch the queen back to + # INDEPENDENT so her closing message renders normally. The + # colony_spawned check on /chat will reject the user's NEXT message + # with the "compact and start a new session" UX. + phase_state = getattr(session, "phase_state", None) + if phase_state is not None and phase_state.phase == "incubating": + # Drain buffered triggers into the colony's triggers.json so they + # auto-start when the colony is first loaded (session_manager + # reads triggers.json on colony load and starts each timer / + # webhook). Do this BEFORE switch_to_independent clears the + # buffer. + if phase_state.pending_triggers: + triggers_path = colony_dir / "triggers.json" + try: + triggers_path.write_text( + json.dumps(phase_state.pending_triggers, indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + logger.info( + "create_colony: wrote %d buffered trigger(s) to %s", + len(phase_state.pending_triggers), + triggers_path, + ) + except OSError: + logger.warning( + "create_colony: failed to write triggers.json from buffer", + exc_info=True, + ) + try: + from framework.server.routes_execution import ( + persist_colony_spawn_lock, + ) + + persist_colony_spawn_lock(session, fork_result.get("colony_name", cn)) + except OSError: + logger.warning( + "create_colony: failed to persist colony-spawned lock", + exc_info=True, + ) + except Exception: + logger.warning( + "create_colony: persist_colony_spawn_lock raised", + exc_info=True, + ) + try: + await phase_state.switch_to_independent(source="tool") + except Exception: + logger.warning( + "create_colony: failed to switch phase back to independent", + exc_info=True, + ) + return json.dumps( { "status": "created", @@ -1674,6 +1840,20 @@ def register_queen_lifecycle_tools( "required": ["goal"], }, }, + "concurrency_hint": { + "type": "integer", + "description": ( + "Optional advisory cap: how many worker processes " + "should typically run in parallel for this colony " + "(e.g. 1 for a single-fire 'send digest' job, 5 " + "for a fan-out that processes records). Baked " + "into worker.json as ``concurrency_hint`` for the " + "future colony queen to consult when planning " + "fan-outs. Not enforced — the queue itself is " + "atomic, this is just guidance. Omit if unsure." + ), + "minimum": 1, + }, }, "required": [ "colony_name", @@ -1691,6 +1871,258 @@ def register_queen_lifecycle_tools( ) tools_registered += 1 + # --- start_incubating_colony ------------------------------------------------- + + async def start_incubating_colony( + *, + colony_name: str, + intended_purpose: str, + ) -> str: + """Gate the queen behind a one-shot readiness evaluator. + + Reads the queen's recent conversation off disk and asks + :func:`incubating_evaluator.evaluate` whether the spec is + settled enough to fork. On approval, flips the queen's phase + to ``incubating`` so a focused tool surface (``create_colony``, + ``cancel_incubation``, read-only file tools) takes over. On + rejection, returns the verdict for the queen to self-correct + on her next turn — the rejection is queen-only by design (no + SSE event, no user-facing message). + """ + if session is None: + return json.dumps({"error": "No session bound to this tool registry."}) + + cn = (colony_name or "").strip() + if not _COLONY_NAME_RE.match(cn): + return json.dumps( + { + "error": ( + "colony_name must be lowercase alphanumeric with " + "underscores (e.g. 'morning_hn_digest')." + ) + } + ) + + purpose = (intended_purpose or "").strip() + if not purpose: + return json.dumps( + { + "error": ( + "intended_purpose is required — describe in one " + "paragraph what the colony will do, on what " + "cadence, and why it must outlive this chat." + ) + } + ) + + phase_state = getattr(session, "phase_state", None) + if phase_state is None: + return json.dumps({"error": "phase_state is not initialised on this session."}) + + # Block re-entry from working/reviewing — those phases mean a colony + # is already running, the queen should NOT be drafting another spec + # on top. Independent → incubating is the only legal entry path. + if phase_state.phase not in ("independent", "incubating"): + return json.dumps( + { + "error": ( + f"start_incubating_colony is not available in phase " + f"'{phase_state.phase}' — finish or stop the current " + "colony's workers first." + ) + } + ) + + # Read the queen's conversation parts straight from disk. Same + # pattern as handle_compact_and_fork — avoids needing access to + # the live NodeConversation, which is local to the agent loop. + from framework.agent_loop.conversation import Message + from framework.agents.queen import incubating_evaluator + from framework.storage.conversation_store import FileConversationStore + + queen_dir = getattr(session, "queen_dir", None) + messages: list = [] + if queen_dir is not None and (queen_dir / "conversations").exists(): + try: + store = FileConversationStore(queen_dir / "conversations") + raw_parts = await store.read_parts() + for part in raw_parts: + try: + messages.append(Message.from_storage_dict(part)) + except (KeyError, TypeError): + # Skip malformed parts; the evaluator can still work + # off whatever messages it gets. + continue + except Exception: + logger.warning( + "start_incubating_colony: failed to read queen conversation", + exc_info=True, + ) + + llm = getattr(session, "llm", None) + if llm is None: + return json.dumps( + { + "error": ( + "session has no LLM — cannot run readiness " + "evaluator. Retry once the session has fully " + "initialised." + ) + } + ) + + verdict = await incubating_evaluator.evaluate( + llm=llm, + messages=messages, + colony_name=cn, + intended_purpose=purpose, + ) + + if not verdict.get("ready"): + # Queen-only silent rejection — no SSE, no user message. + # The queen reads the reasons in her tool result and decides + # what to do next (ask the user, refine scope, drop the idea). + return json.dumps( + { + "status": "not_ready", + "colony_name": cn, + "reasons": verdict.get("reasons", []), + "missing_prerequisites": verdict.get( + "missing_prerequisites", [] + ), + } + ) + + # Approved — flip phase. switch_to_incubating publishes + # QUEEN_PHASE_CHANGED so the frontend badge updates and stores + # the colony_name + purpose for the role prompt to interpolate. + await phase_state.switch_to_incubating( + colony_name=cn, + intended_purpose=purpose, + source="tool", + ) + + return json.dumps( + { + "status": "incubating", + "colony_name": cn, + "intended_purpose": purpose, + "guidance": _INCUBATING_APPROVAL_GUIDANCE.format( + colony_name=cn, + intended_purpose=purpose, + ), + } + ) + + _start_incubating_colony_tool = Tool( + name="start_incubating_colony", + description=( + "Ask to fork this session into a persistent colony for " + "HEADLESS / RECURRING / BACKGROUND work that needs to " + "outlive this chat. This tool does NOT fork on its own — " + "it spawns a one-shot evaluator that reads the recent " + "conversation and decides whether the spec is settled " + "enough to proceed.\n\n" + "On APPROVAL, your phase flips to INCUBATING and a focused " + "tool surface unlocks (create_colony, cancel_incubation, " + "read-only file tools). The full coding toolkit goes away " + "on purpose so you can concentrate on writing a tight task " + "+ SKILL.md.\n\n" + "On REJECTION, you stay in INDEPENDENT and the verdict's " + "``missing_prerequisites`` lists what's still ambiguous in " + "queen-actionable form. Resolve those with the user (ask " + "in plain prose or via ask_user) and call this tool again " + "when the spec is settled. The rejection is queen-only — " + "the user does NOT see it, so frame your follow-up " + "naturally without referencing 'the evaluator'.\n\n" + "DO NOT call this for one-shot work that the user wants " + "results for right now in this chat — do that work yourself " + "with your independent toolkit instead." + ), + parameters={ + "type": "object", + "properties": { + "colony_name": { + "type": "string", + "description": ( + "Lowercase alphanumeric+underscore name for the " + "proposed colony (e.g. 'morning_hn_digest', " + "'inbox_monitor')." + ), + }, + "intended_purpose": { + "type": "string", + "description": ( + "One-paragraph brief: what the colony will do, " + "on what cadence, why it must outlive this " + "chat. Do NOT write the SKILL.md here — that " + "happens in INCUBATING phase after approval." + ), + }, + }, + "required": ["colony_name", "intended_purpose"], + }, + ) + registry.register( + "start_incubating_colony", + _start_incubating_colony_tool, + lambda inputs: start_incubating_colony(**inputs), + ) + tools_registered += 1 + + # --- cancel_incubation ------------------------------------------------------- + + async def cancel_incubation() -> str: + """Bail out of incubating mode and return to independent. + + Use when the spec turns out to not be ready after all (user + changed their mind, the work is one-shot, more than a couple of + details still need to be worked out). Harmless no-op if not + currently in incubating. + """ + if session is None: + return json.dumps({"error": "No session bound to this tool registry."}) + + phase_state = getattr(session, "phase_state", None) + if phase_state is None: + return json.dumps({"error": "phase_state is not initialised on this session."}) + + if phase_state.phase != "incubating": + return json.dumps( + { + "status": "noop", + "reason": f"phase is '{phase_state.phase}', not 'incubating'", + } + ) + + previous_colony = phase_state.incubating_colony_name + await phase_state.switch_to_independent(source="tool") + return json.dumps( + { + "status": "cancelled", + "previous_colony_name": previous_colony, + } + ) + + _cancel_incubation_tool = Tool( + name="cancel_incubation", + description=( + "Bail out of INCUBATING phase back to INDEPENDENT. Use " + "when the spec turns out to not be ready after all — the " + "user changed their mind, the work is actually one-shot, " + "or more than a couple of operational details still need " + "to be sorted out. No fork happens; the full coding " + "toolkit comes back. Harmless no-op outside INCUBATING." + ), + parameters={"type": "object", "properties": {}, "required": []}, + ) + registry.register( + "cancel_incubation", + _cancel_incubation_tool, + lambda inputs: cancel_incubation(**inputs), + ) + tools_registered += 1 + # --- enqueue_task ------------------------------------------------------------ async def enqueue_task_tool( @@ -3885,7 +4317,103 @@ def register_queen_lifecycle_tools( trigger_config: dict | None = None, task: str | None = None, ) -> str: - """Activate a trigger so it fires periodically into the queen.""" + """Activate a trigger so it fires periodically into the queen. + + Phase-branched: in INCUBATING the trigger spec is buffered onto + ``phase_state.pending_triggers`` because the colony directory + + worker runtime don't exist yet. ``create_colony`` drains the + buffer into ``{colony_dir}/triggers.json`` post-fork so the + triggers auto-start on first colony load. In WORKING / REVIEWING + the live behaviour (start timer/webhook + persist) runs as before. + """ + # ── Incubating branch: buffer instead of activate ──────────── + phase_state = getattr(session, "phase_state", None) + if phase_state is not None and phase_state.phase == "incubating": + t_type = trigger_type + t_config = trigger_config or {} + if not t_type: + return json.dumps( + { + "error": ( + "trigger_type is required during incubation — " + "no existing trigger to inherit from yet." + ) + } + ) + if not task: + return json.dumps( + { + "error": ( + f"Trigger '{trigger_id}' needs a task — " + "describe what the worker should do when this " + "trigger fires." + ) + } + ) + # Mirror the live-path validation so a bad cron / interval is + # caught at draft time, not on first colony load. + if t_type == "timer": + cron_expr = t_config.get("cron") + interval = t_config.get("interval_minutes") + if cron_expr: + try: + from croniter import croniter + + if not croniter.is_valid(cron_expr): + return json.dumps({"error": f"Invalid cron expression: {cron_expr}"}) + except ImportError: + return json.dumps( + {"error": "croniter package not installed — cannot validate cron expression."} + ) + elif interval: + if not isinstance(interval, (int, float)) or interval <= 0: + return json.dumps({"error": f"interval_minutes must be > 0, got {interval}"}) + else: + return json.dumps( + {"error": "Timer trigger needs 'cron' or 'interval_minutes' in trigger_config."} + ) + elif t_type == "webhook": + path = (t_config.get("path") or "").strip() + if not path or not path.startswith("/"): + return json.dumps( + { + "error": ( + "Webhook trigger requires 'path' starting " + "with '/' in trigger_config (e.g. '/hooks/github')." + ) + } + ) + else: + return json.dumps({"error": f"Unsupported trigger type: {t_type}"}) + + # Replace any prior buffered entry with the same id so the + # queen can iteratively refine without duplicates. + phase_state.pending_triggers = [ + t for t in phase_state.pending_triggers if t.get("id") != trigger_id + ] + phase_state.pending_triggers.append( + { + "id": trigger_id, + "trigger_type": t_type, + "trigger_config": t_config, + "task": task, + "name": trigger_id, + } + ) + return json.dumps( + { + "status": "buffered", + "trigger_id": trigger_id, + "trigger_type": t_type, + "trigger_config": t_config, + "note": ( + "Buffered for incubation. Will be written to " + "triggers.json when create_colony commits the spec, " + "and auto-start on first colony load." + ), + } + ) + if trigger_id in getattr(session, "active_trigger_ids", set()): return json.dumps({"error": f"Trigger '{trigger_id}' is already active."}) @@ -4094,7 +4622,29 @@ def register_queen_lifecycle_tools( # --- remove_trigger -------------------------------------------------------- async def remove_trigger(trigger_id: str) -> str: - """Deactivate an active trigger.""" + """Deactivate an active trigger. + + Phase-branched: in INCUBATING, removes the buffered draft instead + of touching live runtime state. + """ + # ── Incubating branch: drop from buffer ────────────────────── + phase_state = getattr(session, "phase_state", None) + if phase_state is not None and phase_state.phase == "incubating": + before = len(phase_state.pending_triggers) + phase_state.pending_triggers = [ + t for t in phase_state.pending_triggers if t.get("id") != trigger_id + ] + if len(phase_state.pending_triggers) == before: + return json.dumps( + { + "error": ( + f"Trigger '{trigger_id}' was not in the " + "incubation buffer." + ) + } + ) + return json.dumps({"status": "removed_from_buffer", "trigger_id": trigger_id}) + if trigger_id not in getattr(session, "active_trigger_ids", set()): return json.dumps({"error": f"Trigger '{trigger_id}' is not active."}) @@ -4160,7 +4710,26 @@ def register_queen_lifecycle_tools( # --- list_triggers --------------------------------------------------------- async def list_triggers() -> str: - """List all available triggers and their status.""" + """List all available triggers and their status. + + Phase-branched: in INCUBATING, returns the buffered drafts that + will be written to triggers.json when create_colony commits. + """ + # ── Incubating branch: return buffered drafts ──────────────── + phase_state = getattr(session, "phase_state", None) + if phase_state is not None and phase_state.phase == "incubating": + return json.dumps( + { + "triggers": list(phase_state.pending_triggers), + "buffered": True, + "note": ( + "These are draft triggers staged for the colony " + "being incubated. They are written to triggers.json " + "when create_colony commits the spec." + ), + } + ) + available = getattr(session, "available_triggers", {}) triggers = [] for tdef in available.values(): diff --git a/core/frontend/src/api/types.ts b/core/frontend/src/api/types.ts index c2a19715..8ee0d2a4 100644 --- a/core/frontend/src/api/types.ts +++ b/core/frontend/src/api/types.ts @@ -12,8 +12,10 @@ export interface LiveSession { loaded_at: number; uptime_seconds: number; intro_message?: string; - /** Queen operating phase — "independent" (DM), "working" (workers running), or "reviewing" (workers done) */ - queen_phase?: "independent" | "working" | "reviewing"; + /** Queen operating phase — "independent" (DM), "incubating" (drafting a + * colony spec after the readiness eval approved), "working" (workers + * running), or "reviewing" (workers done) */ + queen_phase?: "independent" | "incubating" | "working" | "reviewing"; /** Whether the queen's LLM supports image content in messages */ queen_supports_images?: boolean; /** Selected queen identity ID (e.g. "queen_technology") */ diff --git a/core/frontend/src/components/ChatPanel.tsx b/core/frontend/src/components/ChatPanel.tsx index 1f2ce2aa..347c5cab 100644 --- a/core/frontend/src/components/ChatPanel.tsx +++ b/core/frontend/src/components/ChatPanel.tsx @@ -58,7 +58,7 @@ export interface ChatMessage { /** Epoch ms when this message was first created — used for ordering queen/worker interleaving */ createdAt?: number; /** Queen phase active when this message was created */ - phase?: "independent" | "working" | "reviewing"; + phase?: "independent" | "incubating" | "working" | "reviewing"; /** Images attached to a user message */ images?: ImageContent[]; /** Backend node_id that produced this message — used for subagent grouping */ @@ -105,7 +105,7 @@ interface ChatPanelProps { /** Called when user dismisses the pending question without answering */ onQuestionDismiss?: () => void; /** Queen operating phase — shown as a tag on queen messages */ - queenPhase?: "independent" | "working" | "reviewing"; + queenPhase?: "independent" | "incubating" | "working" | "reviewing"; /** When false, queen messages omit the phase badge */ showQueenPhaseBadge?: boolean; /** Context window usage for queen and workers */ @@ -347,7 +347,7 @@ function InlineAskUserBubble({ thread: string, images?: ImageContent[], ) => void; - queenPhase?: "independent" | "working" | "reviewing"; + queenPhase?: "independent" | "incubating" | "working" | "reviewing"; showQueenPhaseBadge?: boolean; queenProfileId?: string | null; }) { @@ -443,23 +443,31 @@ function InlineAskUserBubble({ > {msg.agent} - {(!isQueen || showQueenPhaseBadge) && ( - - {isQueen - ? (msg.phase ?? queenPhase) === "working" - ? "working" - : (msg.phase ?? queenPhase) === "reviewing" - ? "reviewing" + {(!isQueen || showQueenPhaseBadge) && (() => { + const effectivePhase = msg.phase ?? queenPhase; + const isIncubating = isQueen && effectivePhase === "incubating"; + const badgeClass = isQueen + ? isIncubating + // Honey-amber tint distinguishes incubating from the + // primary-tinted independent/working/reviewing badges. + ? "bg-amber-500/15 text-amber-500" + : "bg-primary/15 text-primary" + : "bg-muted text-muted-foreground"; + const label = isQueen + ? effectivePhase === "working" + ? "working" + : effectivePhase === "reviewing" + ? "reviewing" + : effectivePhase === "incubating" + ? "incubating" : "independent" - : "Worker"} - - )} + : "Worker"; + return ( + + {label} + + ); + })()} {payload.kind === "single" ? ( ; @@ -456,7 +456,7 @@ export default function ColonyChat() { } } - let restoredPhase: "independent" | "working" | "reviewing" | null = null; + let restoredPhase: "independent" | "incubating" | "working" | "reviewing" | null = null; if (!liveSession) { if (coldRestoreId) { diff --git a/core/frontend/src/pages/queen-dm.tsx b/core/frontend/src/pages/queen-dm.tsx index a3f85b27..8cb00a45 100644 --- a/core/frontend/src/pages/queen-dm.tsx +++ b/core/frontend/src/pages/queen-dm.tsx @@ -78,7 +78,7 @@ export default function QueenDM() { >({}); const queenIterTextRef = useRef>>({}); const [queenPhase, setQueenPhase] = useState< - "independent" | "working" | "reviewing" + "independent" | "incubating" | "working" | "reviewing" >("independent"); const resetViewState = useCallback(() => { @@ -317,10 +317,14 @@ export default function QueenDM() { }; }, [queenId, sessionId]); - // Hydrate the colony-spawned lock from the session detail whenever the - // session ID changes. The /sessions/{id} response carries colony_spawned - // (live) and the cold-info path returns the same field after a server - // restart, so the same fetch covers both states. + // Hydrate the colony-spawned lock + queen phase from the session detail + // whenever the session ID changes. /sessions/{id} carries both flags + // (and the cold-info path returns colony_spawned after a server restart), + // so this single fetch covers live, page-reload, and post-restart states. + // Without seeding queen_phase here the badge starts at the useState + // default ("independent") and only updates when a fresh + // QUEEN_PHASE_CHANGED SSE event fires — a reload mid-incubation would + // briefly mis-render. useEffect(() => { if (!sessionId) return; let cancelled = false; @@ -328,17 +332,25 @@ export default function QueenDM() { .get(sessionId) .then((data) => { if (cancelled) return; - const locked = Boolean( - (data as { colony_spawned?: boolean }).colony_spawned, - ); - const name = - (data as { spawned_colony_name?: string | null }) - .spawned_colony_name ?? null; - setColonySpawned(locked); - setSpawnedColonyName(name); + const detail = data as { + colony_spawned?: boolean; + spawned_colony_name?: string | null; + queen_phase?: "independent" | "incubating" | "working" | "reviewing"; + }; + setColonySpawned(Boolean(detail.colony_spawned)); + setSpawnedColonyName(detail.spawned_colony_name ?? null); + if ( + detail.queen_phase === "independent" || + detail.queen_phase === "incubating" || + detail.queen_phase === "working" || + detail.queen_phase === "reviewing" + ) { + setQueenPhase(detail.queen_phase); + } }) .catch(() => { - // Non-fatal — lock simply won't activate until the user navigates back. + // Non-fatal — lock + phase simply won't activate until a fresh + // SSE event arrives. }); return () => { cancelled = true; @@ -620,6 +632,7 @@ export default function QueenDM() { const rawPhase = event.data?.phase as string; if ( rawPhase === "independent" || + rawPhase === "incubating" || rawPhase === "working" || rawPhase === "reviewing" ) { @@ -912,7 +925,11 @@ export default function QueenDM() { isBusy={isTyping} disabled={loading || !queenReady} queenPhase={queenPhase} - showQueenPhaseBadge={false} + // The DM is normally in `independent` phase, so the per-message + // badge would just be noise. Surface it once the phase moves + // (e.g. INCUBATING after start_incubating_colony approves) so + // the user immediately sees the queen is in a different mode. + showQueenPhaseBadge={queenPhase !== "independent"} pendingQuestion={awaitingInput ? pendingQuestion : null} pendingOptions={awaitingInput ? pendingOptions : null} pendingQuestions={awaitingInput ? pendingQuestions : null}