feat: incubating phase

This commit is contained in:
Richard Tang
2026-04-19 18:07:09 -07:00
parent 0d11a946a5
commit 1229b4ad4d
10 changed files with 1208 additions and 117 deletions
@@ -0,0 +1,247 @@
"""One-shot LLM gate that decides if a queen DM is ready to fork a colony.
The queen's ``start_incubating_colony`` tool calls :func:`evaluate` with
the queen's recent conversation, a proposed ``colony_name``, and a
one-paragraph ``intended_purpose``. The evaluator returns a structured
verdict:
{
"ready": bool,
"reasons": [str],
"missing_prerequisites": [str],
}
On ``ready=False`` the queen receives the verdict as her tool result and
self-corrects (asks the user, refines scope, drops the idea). On
``ready=True`` the tool flips the queen's phase to ``incubating``.
Failure mode is **fail-closed**: any LLM error or unparseable response
returns ``ready=False`` with reason ``"evaluation_failed"`` so the queen
cannot accidentally proceed past a broken gate.
"""
from __future__ import annotations
import json
import logging
import re
from typing import Any
from framework.agent_loop.conversation import Message
logger = logging.getLogger(__name__)
_INCUBATING_EVALUATOR_SYSTEM_PROMPT = """\
You gate whether a queen agent should commit to forking a persistent
"colony" (a headless worker spec written to disk). Forking is
expensive: it ends the user's chat with this queen and the worker runs
unattended afterward, so the spec must be settled before you approve.
Read the conversation excerpt and the queen's proposed colony_name +
intended_purpose, then decide.
APPROVE (ready=true) only when ALL of the following hold:
1. The user has explicitly asked for work that needs to outlive this
chat recurring (cron / interval), monitoring + alert, scheduled
batch, or "fire-and-forget background job". A one-shot question
that the queen can answer in chat does NOT qualify.
2. The scope of the work is concrete enough to write down what
inputs, what outputs, what success looks like. Vague ("help me
with my workflow") does NOT qualify.
3. The technical approach is at least sketched what data sources,
APIs, or tools the worker will use. The queen does not have to
have written the SKILL.md yet, but she must have the operational
ingredients available.
4. There are no open clarifying questions on the table that the user
hasn't answered. If the queen recently asked the user something
and is still waiting, do NOT approve.
REJECT (ready=false) on any of:
- Conversation is too short / too generic to support a settled spec.
- User is still describing what they want.
- User has expressed doubts, change-of-direction, or "let me think".
- Work is one-shot and could be done in chat instead.
- Open question awaiting user reply.
Reply with a JSON object exactly matching this shape:
{
"ready": true | false,
"reasons": ["short phrase", ...], // at least one entry
"missing_prerequisites": ["short phrase", ...] // empty when ready
}
``reasons`` explains the verdict in 1-3 short phrases.
``missing_prerequisites`` lists what's missing in queen-actionable
form ("user hasn't confirmed schedule", "no API auth flow discussed").
Empty list when ``ready=true``.
Output JSON only. Do not wrap in markdown. Do not add prose.
"""
# Bound the formatted excerpt so the eval call stays cheap and fits well
# under the LLM's context window even for long DM sessions.
_MAX_MESSAGES = 30
_MAX_TOOL_CONTENT_CHARS = 400
_MAX_USER_CONTENT_CHARS = 2_000
_MAX_ASSISTANT_CONTENT_CHARS = 2_000
def format_conversation_excerpt(messages: list[Message]) -> str:
"""Format the tail of a queen conversation for the evaluator prompt.
Keeps the most recent ``_MAX_MESSAGES`` messages. Tool results are
truncated hard since they're rarely load-bearing for the readiness
decision; user/assistant text is truncated more generously to
preserve the actual conversation signal.
"""
if not messages:
return "(no messages)"
tail = messages[-_MAX_MESSAGES:]
parts: list[str] = []
for msg in tail:
role = msg.role.upper()
content = (msg.content or "").strip()
if msg.role == "tool":
if len(content) > _MAX_TOOL_CONTENT_CHARS:
content = content[:_MAX_TOOL_CONTENT_CHARS] + "..."
elif msg.role == "assistant":
# Surface tool-call intent for empty assistant turns so the
# evaluator sees what the queen has been doing.
if not content and msg.tool_calls:
names = [
tc.get("function", {}).get("name", "?")
for tc in msg.tool_calls
]
content = f"(called: {', '.join(names)})"
if len(content) > _MAX_ASSISTANT_CONTENT_CHARS:
content = content[:_MAX_ASSISTANT_CONTENT_CHARS] + "..."
else: # user
if len(content) > _MAX_USER_CONTENT_CHARS:
content = content[:_MAX_USER_CONTENT_CHARS] + "..."
if content:
parts.append(f"[{role}]: {content}")
return "\n\n".join(parts) if parts else "(no messages)"
def _build_user_message(
conversation_excerpt: str,
colony_name: str,
intended_purpose: str,
) -> str:
return (
f"## Proposed colony name\n{colony_name}\n\n"
f"## Queen's intended_purpose\n{intended_purpose.strip()}\n\n"
f"## Recent conversation (oldest → newest)\n{conversation_excerpt}\n\n"
"Decide: should this queen be approved to enter INCUBATING phase?"
)
def _parse_verdict(raw: str) -> dict[str, Any] | None:
"""Parse the evaluator's JSON. Returns None if parsing fails."""
if not raw:
return None
raw = raw.strip()
try:
return json.loads(raw)
except json.JSONDecodeError:
# Some models wrap JSON in markdown fences or add preamble.
# Pull the first { ... } block out as a best-effort fallback —
# mirrors the same recovery pattern used in recall_selector.py.
match = re.search(r"\{.*\}", raw, re.DOTALL)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
return None
return None
def _normalize_verdict(parsed: dict[str, Any]) -> dict[str, Any]:
"""Coerce a parsed verdict into the shape the tool returns to the queen."""
ready = bool(parsed.get("ready"))
reasons = parsed.get("reasons") or []
if isinstance(reasons, str):
reasons = [reasons]
reasons = [str(r).strip() for r in reasons if str(r).strip()]
missing = parsed.get("missing_prerequisites") or []
if isinstance(missing, str):
missing = [missing]
missing = [str(m).strip() for m in missing if str(m).strip()]
if ready:
# When approved we don't surface missing prerequisites — the
# incubating role prompt opens that floor itself.
missing = []
elif not reasons:
# Always give the queen at least one reason to reflect on.
reasons = ["evaluator returned no reasons"]
return {
"ready": ready,
"reasons": reasons,
"missing_prerequisites": missing,
}
async def evaluate(
llm: Any,
messages: list[Message],
colony_name: str,
intended_purpose: str,
) -> dict[str, Any]:
"""Run the incubating evaluator against the queen's conversation.
Args:
llm: An LLM provider exposing ``acomplete(messages, system, ...)``.
Pass the queen's own ``ctx.llm`` so the eval uses the same
model the user is talking to.
messages: The queen's conversation messages, oldest first. The
evaluator slices its own tail; pass the full list.
colony_name: Validated colony slug.
intended_purpose: Queen's one-paragraph brief.
Returns:
``{"ready": bool, "reasons": [str], "missing_prerequisites": [str]}``.
Fail-closed on any error.
"""
excerpt = format_conversation_excerpt(messages)
user_msg = _build_user_message(excerpt, colony_name, intended_purpose)
try:
response = await llm.acomplete(
messages=[{"role": "user", "content": user_msg}],
system=_INCUBATING_EVALUATOR_SYSTEM_PROMPT,
max_tokens=1024,
response_format={"type": "json_object"},
)
except Exception as exc: # noqa: BLE001 - fail-closed on any LLM failure
logger.warning("incubating_evaluator: LLM call failed (%s)", exc)
return {
"ready": False,
"reasons": ["evaluation_failed"],
"missing_prerequisites": [
"evaluator LLM call failed; retry once the queen can reach the model again"
],
}
raw = (getattr(response, "content", "") or "").strip()
parsed = _parse_verdict(raw)
if parsed is None:
logger.warning(
"incubating_evaluator: could not parse JSON verdict (raw=%.200s)",
raw,
)
return {
"ready": False,
"reasons": ["evaluation_failed"],
"missing_prerequisites": [
"evaluator returned malformed JSON; retry"
],
}
return _normalize_verdict(parsed)
+233 -42
View File
@@ -47,9 +47,30 @@ _QUEEN_INDEPENDENT_TOOLS = [
"undo_changes",
# NOTE (2026-04-16): ``run_parallel_workers`` is not in the DM phase.
# Pure DM is for conversation with the user; fan out parallel work via
# ``create_colony`` (forks into a persistent colony with its own page
# and phase machine).
# ``start_incubating_colony`` (which gates the colony fork behind a
# readiness eval before exposing create_colony in INCUBATING phase).
"start_incubating_colony",
]
# Incubating phase: queen has been approved by the incubating_evaluator to
# fork into a colony. Tool surface is intentionally small — the queen's job
# in this phase is to nail the operational spec (concurrency, schedule,
# result tracking, credentials) and write a tight task + SKILL.md, not to
# keep doing work. Read-only file tools are kept so she can confirm details
# (e.g. inspect an existing skill) before committing.
_QUEEN_INCUBATING_TOOLS = [
"read_file",
"list_directory",
"search_files",
"run_command",
# Trigger management: phase-branched in queen_lifecycle_tools.py to
# buffer drafts onto phase_state.pending_triggers, then drained into
# the colony's triggers.json by create_colony post-fork.
"set_trigger",
"remove_trigger",
"list_triggers",
"create_colony",
"cancel_incubation",
]
# Working phase: colony workers are running. Queen monitors, replies
@@ -129,15 +150,70 @@ If the user opens with a greeting or chat, reply in plain prose in \
character first check recall memory for name and past topics and weave \
them in. If you need a structured choice or approval gate, always use \
ask_user or ask_user_multiple; otherwise ask in plain prose. \
\
When the user clearly wants persistent / recurring / headless work that \
needs to outlive THIS chat (e.g. "every morning", "monitor X and alert \
me", "set up a job that"), call ``start_incubating_colony`` with a \
proposed colony_name and a one-paragraph intended_purpose. A side \
evaluator reads the conversation and decides if the spec is settled. If \
it returns ``not_ready`` you keep talking with the user sort out \
whatever the evaluator said is missing, then retry. If it returns \
``incubating`` your phase flips and a new prompt takes over. Do not \
try to write SKILL.md, fork directories, or otherwise build the colony \
yourself in this phase.\
"""
_queen_role_incubating = """\
You are in INCUBATING mode. The incubating evaluator has approved you to \
fork colony ``{colony_name}`` and you are now drafting the spec. Your \
ONLY job in this phase: produce a self-contained ``task`` description \
and ``SKILL.md`` body that lets a fresh worker, who has zero memory of \
this chat, do the work unattended. Do not start doing the work yourself \
the coding toolkit is gone on purpose so you can focus.
Before you call ``create_colony``, sort out the operational details that \
conversation tends to skip. The "Approved → operational checklist" block \
in your tools doc lists the kinds of things to think about (concurrency, \
schedule, result-tracking, failure handling, credentials). Treat that \
list as prompts for YOUR judgement only ask the user about the items \
that actually matter for THIS colony and that the conversation hasn't \
already settled. Use ``ask_user`` / ``ask_user_multiple`` for the gaps; \
plain prose for everything else.
If you realise mid-incubation that the spec isn't ready (user changed \
their mind, you're missing more than a couple of details, the work \
turned out to be one-shot after all), call ``cancel_incubation`` \
no harm, you go back to INDEPENDENT and can retry later.
"""
_queen_role_working = """\
You are in WORKING mode. Your colony has workers executing right now. \
Your job: monitor progress, answer worker escalations through \
reply_to_worker, and fan out more tasks with run_parallel_workers if \
the user asks. Keep the user informed when they ask; do NOT poll the \
workers just to have something to say. If the user greets you \
mid-run, reply in prose and wait for their next message.
You are in WORKING mode. The colony's spec was settled during \
INCUBATING; workers are executing that spec now. Your role here is \
operational presence, not direction think on-call engineer for a \
running deployment, not architect of a new one.
What you DO in this phase:
- Be available for worker escalations (reply_to_worker on items in \
list_worker_questions).
- Surface progress when the user asks for it (get_worker_status), or \
when something concrete is worth flagging (a notable failure, a \
worker stuck on a question that needs them).
- Intervene when a worker is clearly off course (inject_message) or \
needs to stop (stop_worker).
- Make SPEC-COMPATIBLE adjustments when the user asks fan out MORE \
of the same work (run_parallel_workers), or tweak the schedule \
(set_trigger / remove_trigger / list_triggers). These are tweaks to \
the spec the user already approved, not redesigns.
What you DO NOT do in this phase:
- Redesign the colony. If the user asks for something fundamentally \
new (different scope, different skill, different problem), say so \
plainly: "this colony is for X — for that we'd need a fresh chat \
with me, where I can incubate a new colony." A new colony is born \
in INDEPENDENT via start_incubating_colony, and you cannot reach \
that from inside a colony.
- Drive the conversation. Do not poll workers just to have something \
to say. If the user greets you mid-run, reply in prose and wait.
"""
_queen_role_reviewing = """\
@@ -167,42 +243,140 @@ search_files, run_command, undo_changes
browser_tabs, browser_close, browser_evaluate, etc.).
- MUST Follow the browser-automation skill protocol before using browser tools.
## Persistent colony
## Hand off to a colony
- start_incubating_colony(colony_name, intended_purpose) Use this when \
the user wants persistent / recurring / headless work that needs to \
outlive THIS chat. It does NOT fork on its own; it spawns a one-shot \
evaluator that reads this conversation and decides whether the spec \
is settled enough to proceed. On approval your phase flips to \
INCUBATING and a new tool surface (including create_colony itself) \
unlocks. On rejection you stay here and keep the conversation going \
to fill the gaps the evaluator named.
- ``intended_purpose`` is a one-paragraph brief: what the colony will \
do, on what cadence, why it must outlive this chat. Don't write a \
SKILL.md here that comes in INCUBATING.
"""
_queen_tools_incubating = """
# Tools (INCUBATING mode)
You've been approved to fork. The full coding toolkit is gone on \
purpose your job in this phase is to nail the spec, not keep doing \
work. Available:
## Read-only inspection (coder-tools MCP)
- read_file, list_directory, search_files, run_command for confirming \
details before you commit (e.g. peek at an existing skill in \
~/.hive/skills/, sanity-check an API URL).
## Schedule the colony (drafted now, written on commit)
- set_trigger(trigger_id, trigger_type, trigger_config, task) Stage a \
schedule for the colony. Validated immediately (cron syntax / interval \
shape) but NOT activated yet buffered onto the incubation context \
and written to ``triggers.json`` by create_colony so it auto-starts on \
first colony load. Repeated calls with the same trigger_id replace \
the prior draft.
- list_triggers() Inspect the buffered drafts.
- remove_trigger(trigger_id) Drop a draft from the buffer.
- ``trigger_type`` is "timer" (with ``trigger_config={"cron": "..."}`` \
or ``{"interval_minutes": N}``) or "webhook" (with \
``trigger_config={"path": "/hooks/..."}``).
- ``task`` describes what the worker should do when the trigger fires; \
this is REQUIRED a trigger with no task can't fire usefully. It's \
separate from create_colony's ``task`` argument because the trigger \
task is one-shot per fire, while create_colony's ``task`` is the \
worker's overall purpose.
## Approved → operational checklist (use your judgement, ask only what's missing)
The conversation that got you here probably did NOT cover all of:
- Concurrency: how many tasks should run in parallel? Single-fire?
- Schedule: cron expression, interval (every N minutes), webhook, \
manual-only?
- Result tracking: what should the worker write into ``progress.db`` so \
the user can review later? Per-task status, summary, raw payload?
- Failure handling: retry, alert, mark-failed-and-continue?
- Credentials and MCP servers: what does the worker need that you \
haven't discussed (API keys, OAuth, browser profile)?
- Skills the worker needs beyond the one you'll write inline.
These are PROMPTS for your judgement, not a required checklist. Cover \
the items that actually matter for THIS colony, and only the ones the \
user hasn't already implied. Use ``ask_user`` / ``ask_user_multiple`` \
for gaps that need a real answer; skip the rest.
## Commit
- create_colony(colony_name, task, skill_name, skill_description, \
skill_body, skill_files?, tasks?) Fork this session into a \
persistent colony for headless / recurring / background work. The colony \
has its own chat surface and runs `run_parallel_workers` from there.
- **Atomic call pass the skill INLINE.** Do NOT write SKILL.md with \
`write_file` beforehand. Provide `skill_name`, `skill_description`, \
and `skill_body` as arguments and the tool will materialize \
`~/.hive/skills/{skill_name}/` for you, then fork. Use optional \
`skill_files` (array of `{path, content}`) for supporting scripts \
or references. Reusing an existing `skill_name` simply replaces that \
skill with your latest content.
- The `task` must be FULL and self-contained because the future worker \
run cannot rely on this live chat turn for missing context.
- The `skill_body` must be FULL and self-contained too capture the \
operational protocol (endpoints, auth, gotchas, pre-baked queries) so \
the worker doesn't have to rediscover what you already know.
- Nothing runs immediately after the call. The user launches the \
worker later from the new colony page.
skill_body, skill_files?, tasks?, concurrency_hint?) Fork this \
session into the colony. **Atomic call pass the skill INLINE.** Do \
NOT write SKILL.md with write_file beforehand; this tool materialises \
the folder for you and then forks. Reusing an existing skill_name \
within the colony replaces that skill with your latest content.
- The ``task`` must be FULL and self-contained the worker has zero \
memory of THIS chat at run time.
- The ``skill_body`` must be FULL and self-contained capture the \
operational protocol (endpoints, auth, gotchas, pre-baked queries) \
so the worker doesn't have to rediscover what you already know.
- ``concurrency_hint`` (optional integer 1) advisory cap on how \
many worker processes typically run in parallel for this colony \
(e.g. 1 for "send digest", 5 for a fan-out). Baked into worker.json \
for the future colony queen to consult; not enforced.
- Any triggers you staged with ``set_trigger`` during this incubation \
are written to ``triggers.json`` as part of the commit and \
auto-start on first colony load.
- After this returns, the chat is over: the session locks immediately \
and the user gets a "compact and start a new session with you" \
button. So make your call to create_colony the last thing you do \
one closing message to the user is fine, but expect the next user \
input to land in a fresh forked session, not this one.
## Bail
- cancel_incubation() Call when the spec isn't ready after all (user \
changed their mind, you discovered the work is actually one-shot, \
more than a couple of details still need to be worked out). Returns \
you to INDEPENDENT with the full toolkit; no fork happens.
"""
_queen_tools_working = """
# Tools (WORKING mode)
Workers are running in your colony. You have:
- Read-only: read_file, list_directory, search_files, run_command
- get_worker_status(focus?) Poll latest progress / issues
- inject_message(content) Send guidance to a running worker
- list_worker_questions() / reply_to_worker(request_id, reply) Answer escalations
- stop_worker() Stop a worker early
- run_parallel_workers(tasks, timeout?) Fan out MORE parallel tasks on \
top of what's already running (each task string must be fully self-contained)
- set_trigger / remove_trigger / list_triggers Timer management
The colony's spec was committed during INCUBATING. Your tools here are \
operational, not editorial.
When every worker has reported (success or failure), the phase auto-moves \
to REVIEWING. You do not need to call a transition tool yourself.
## Stay informed (only when asked, or when something matters)
- get_worker_status(focus?) Pull progress / issues for the user.
- list_worker_questions() Check the escalation inbox.
## Respond
- reply_to_worker(request_id, reply) Answer a worker escalation.
- inject_message(content) Course-correct a running worker (e.g. it's \
heading the wrong way and the user wants it redirected).
## Intervene
- stop_worker() Kill switch for a runaway or no-longer-needed worker.
## Spec-compatible adjustments
- run_parallel_workers(tasks, timeout?) Fan out MORE of the same \
work. Use when the user wants additional units of an already-defined \
job, NOT for new scope. Each task string must be fully self-contained.
- set_trigger / remove_trigger / list_triggers Tweak the schedule \
the user already approved during incubation. Adding a follow-up \
trigger for the same colony is fine; redesigning the colony's \
purpose is not.
## Read-only inspection
- read_file, list_directory, search_files, run_command
When every worker has reported (success or failure), the phase \
auto-moves to REVIEWING. You do not need to call a transition tool \
yourself.
## What does NOT belong here
A request like "actually let's also do X" with X being a new scope, \
new skill, or different problem is a NEW COLONY, not an extension of \
this one. Tell the user plainly: "this colony is for the work we \
already started for that we'd need a fresh chat with me, where I \
can incubate a new colony." You cannot create a new colony from \
inside a colony.
"""
_queen_tools_reviewing = """
@@ -291,9 +465,9 @@ queen_node = NodeSpec(
id="queen",
name="Queen",
description=(
"User's primary interactive interface. Operates in DM (independent) "
"or colony mode (working / reviewing) depending on whether workers "
"have been spawned."
"User's primary interactive interface. Operates in DM (independent), "
"colony-spec drafting (incubating), or colony mode (working / "
"reviewing) depending on whether workers have been spawned."
),
node_type="event_loop",
max_node_visits=0,
@@ -301,7 +475,14 @@ queen_node = NodeSpec(
output_keys=[], # Queen should never have this
nullable_output_keys=[], # Queen should never have this
skip_judge=True, # Queen is a conversational agent; suppress tool-use pressure feedback
tools=sorted(set(_QUEEN_INDEPENDENT_TOOLS + _QUEEN_WORKING_TOOLS + _QUEEN_REVIEWING_TOOLS)),
tools=sorted(
set(
_QUEEN_INDEPENDENT_TOOLS
+ _QUEEN_INCUBATING_TOOLS
+ _QUEEN_WORKING_TOOLS
+ _QUEEN_REVIEWING_TOOLS
)
),
system_prompt=(
_queen_character_core
+ _queen_role_independent
@@ -312,20 +493,30 @@ queen_node = NodeSpec(
),
)
ALL_QUEEN_TOOLS = sorted(set(_QUEEN_INDEPENDENT_TOOLS + _QUEEN_WORKING_TOOLS + _QUEEN_REVIEWING_TOOLS))
ALL_QUEEN_TOOLS = sorted(
set(
_QUEEN_INDEPENDENT_TOOLS
+ _QUEEN_INCUBATING_TOOLS
+ _QUEEN_WORKING_TOOLS
+ _QUEEN_REVIEWING_TOOLS
)
)
__all__ = [
"queen_node",
"ALL_QUEEN_TOOLS",
"_QUEEN_INDEPENDENT_TOOLS",
"_QUEEN_INCUBATING_TOOLS",
"_QUEEN_WORKING_TOOLS",
"_QUEEN_REVIEWING_TOOLS",
# Character + phase-specific prompt segments (used by queen_orchestrator for dynamic prompts)
"_queen_character_core",
"_queen_role_independent",
"_queen_role_incubating",
"_queen_role_working",
"_queen_role_reviewing",
"_queen_tools_independent",
"_queen_tools_incubating",
"_queen_tools_working",
"_queen_tools_reviewing",
"_queen_behavior_always",
+23 -1
View File
@@ -266,16 +266,19 @@ async def create_queen(
queen_loop_config as _base_loop_config,
)
from framework.agents.queen.nodes import (
_QUEEN_INCUBATING_TOOLS,
_QUEEN_INDEPENDENT_TOOLS,
_QUEEN_REVIEWING_TOOLS,
_QUEEN_WORKING_TOOLS,
_queen_behavior_always,
_queen_behavior_independent,
_queen_character_core,
_queen_role_incubating,
_queen_role_independent,
_queen_role_reviewing,
_queen_role_working,
_queen_style,
_queen_tools_incubating,
_queen_tools_independent,
_queen_tools_reviewing,
_queen_tools_working,
@@ -378,6 +381,7 @@ async def create_queen(
# ---- Partition tools by phase ------------------------------------
independent_names = set(_QUEEN_INDEPENDENT_TOOLS)
incubating_names = set(_QUEEN_INCUBATING_TOOLS)
working_names = set(_QUEEN_WORKING_TOOLS)
reviewing_names = set(_QUEEN_REVIEWING_TOOLS)
@@ -386,16 +390,24 @@ async def create_queen(
phase_state.working_tools = [t for t in queen_tools if t.name in working_names]
phase_state.reviewing_tools = [t for t in queen_tools if t.name in reviewing_names]
# Incubating tool surface is intentionally minimal (read-only inspection
# + create_colony + cancel_incubation) — no MCP tools spliced in, so the
# queen stays focused on drafting the spec.
phase_state.incubating_tools = [t for t in queen_tools if t.name in incubating_names]
# Independent phase gets core tools + all MCP tools not claimed by any
# other phase (coder-tools file I/O, gcu-tools browser, etc.).
all_phase_names = independent_names | working_names | reviewing_names
all_phase_names = independent_names | incubating_names | working_names | reviewing_names
mcp_tools = [t for t in queen_tools if t.name not in all_phase_names]
phase_state.independent_tools = [t for t in queen_tools if t.name in independent_names] + mcp_tools
logger.info(
"Queen: independent tools: %s",
sorted(t.name for t in phase_state.independent_tools),
)
logger.info(
"Queen: incubating tools: %s",
sorted(t.name for t in phase_state.incubating_tools),
)
# ---- Global + queen-scoped memory ----------------------------------
global_dir, queen_mem_dir = initialize_memory_scopes(session, phase_state)
@@ -428,6 +440,16 @@ async def create_queen(
),
_has_vision,
)
phase_state.prompt_incubating = finalize_queen_prompt(
(
_queen_character_core
+ _queen_role_incubating
+ _queen_style
+ _queen_tools_incubating
+ _queen_behavior_always
),
_has_vision,
)
phase_state.prompt_working = finalize_queen_prompt(
(_queen_character_core + _queen_role_working + _queen_style + _queen_tools_working + _queen_behavior_always),
_has_vision,
+56 -28
View File
@@ -682,35 +682,32 @@ async def handle_cancel_queen(request: web.Request) -> web.Response:
return web.json_response({"cancelled": True})
async def handle_mark_colony_spawned(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/mark-colony-spawned -- lock the queen DM.
def persist_colony_spawn_lock(session: Any, colony_name: str) -> None:
"""Persist the colony-spawned lock on a queen session.
Called by the frontend the first time the user clicks the
COLONY_CREATED system message. Persists ``colony_spawned: true`` and
``spawned_colony_name`` into the queen session's ``meta.json`` so the
lock survives server restart, and caches the same on the live Session
object so subsequent /chat calls in this process can be rejected
immediately without disk I/O.
Writes ``colony_spawned: true`` + ``spawned_colony_name`` + a timestamp
into the queen session's ``meta.json`` and mirrors the same fields onto
the live ``Session`` object so subsequent ``/chat`` calls in this
process are rejected immediately without disk I/O.
Body: ``{"colony_name": "..."}``
Shared by the HTTP route ``handle_mark_colony_spawned`` (frontend
click on the colony-link card) and the in-process ``create_colony``
tool path (when the queen forks while in ``incubating`` phase).
Raises ``OSError`` if the meta.json write fails. Callers should catch
and respond/log appropriately.
"""
from datetime import datetime as _dt
session, err = resolve_session(request)
if err:
return err
body = await request.json() if request.can_read_body else {}
colony_name = (body.get("colony_name") or "").strip()
if not colony_name:
return web.json_response({"error": "colony_name is required"}, status=400)
queen_dir = getattr(session, "queen_dir", None)
if queen_dir is None:
return web.json_response(
{"error": "queen session directory is not set on this session"},
status=503,
)
# Tool-side callers may invoke before the queen dir is available.
# Still mirror onto the session so the in-process /chat guard
# works; the meta.json write is just deferred until the next
# session start writes the file (rare path).
session.colony_spawned = True
session.spawned_colony_name = colony_name
return
meta_path = queen_dir / "meta.json"
meta: dict = {}
@@ -724,16 +721,39 @@ async def handle_mark_colony_spawned(request: web.Request) -> web.Response:
meta["spawned_colony_name"] = colony_name
meta["spawned_colony_at"] = _dt.now(UTC).isoformat()
try:
meta_path.parent.mkdir(parents=True, exist_ok=True)
meta_path.write_text(json.dumps(meta), encoding="utf-8")
except OSError as exc:
logger.exception("mark_colony_spawned: failed to persist meta.json")
return web.json_response({"error": f"failed to persist: {exc}"}, status=500)
meta_path.parent.mkdir(parents=True, exist_ok=True)
meta_path.write_text(json.dumps(meta), encoding="utf-8")
session.colony_spawned = True
session.spawned_colony_name = colony_name
async def handle_mark_colony_spawned(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/mark-colony-spawned -- lock the queen DM.
Called by the frontend the first time the user clicks the
COLONY_CREATED system message. Thin wrapper around
:func:`persist_colony_spawn_lock` the heavy lifting (meta.json
merge + Session cache) lives in the helper so the in-process
``create_colony`` path can reuse it without re-issuing an HTTP call.
Body: ``{"colony_name": "..."}``
"""
session, err = resolve_session(request)
if err:
return err
body = await request.json() if request.can_read_body else {}
colony_name = (body.get("colony_name") or "").strip()
if not colony_name:
return web.json_response({"error": "colony_name is required"}, status=400)
try:
persist_colony_spawn_lock(session, colony_name)
except OSError as exc:
logger.exception("mark_colony_spawned: failed to persist meta.json")
return web.json_response({"error": f"failed to persist: {exc}"}, status=500)
return web.json_response(
{
"session_id": session.id,
@@ -991,6 +1011,7 @@ async def fork_session_into_colony(
colony_name: str,
task: str,
tasks: list[dict] | None = None,
concurrency_hint: int | None = None,
) -> dict:
"""Fork a queen session into a colony directory.
@@ -1228,6 +1249,13 @@ async def fork_session_into_colony(
"spawned_from": session.id,
"spawned_at": datetime.now(UTC).isoformat(),
}
# Concurrency advisory baked in at incubation time. Not enforced — the
# progress.db queue is atomic regardless — but the colony queen reads
# this when planning fan-outs (run_parallel_workers, trigger-fired
# batches) so behavior matches what the user agreed to during
# incubation.
if isinstance(concurrency_hint, int) and concurrency_hint > 0:
worker_meta["concurrency_hint"] = concurrency_hint
worker_config_path.write_text(json.dumps(worker_meta, indent=2, ensure_ascii=False), encoding="utf-8")
# ── 3. Duplicate queen session into colony ───────────────────
+574 -5
View File
@@ -61,6 +61,33 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# Open-the-floor message returned by ``start_incubating_colony`` on
# approval. The same kinds of prompts (concurrency, schedule, result
# tracking, failure handling, credentials) live ongoingly inside
# ``_queen_tools_incubating`` so the queen sees them every turn — this
# constant is the single-shot version that lands as the tool result.
# Phrasing intentionally invites the queen's judgement; do NOT turn this
# into a hard checklist.
_INCUBATING_APPROVAL_GUIDANCE = (
"Approved to incubate colony '{colony_name}' for: {intended_purpose}\n\n"
"Your phase has flipped to INCUBATING. Before you call create_colony, "
"the worker will need operational details that are easy to lose in a "
"planning conversation. Take a moment to figure out what's still "
"ambiguous for THIS colony — for example: how many tasks should run "
"in parallel, what schedule fits (cron, interval, manual-only), what "
"should the worker write into progress.db so the user can review "
"results later, how to handle partial failures, what credentials or "
"MCP servers the worker needs that you haven't discussed. You don't "
"need to cover every example — only the items that actually matter "
"for this colony, and only the ones the user hasn't already implied. "
"Use ask_user / ask_user_multiple to fill the real gaps. "
"If, while sorting these out, you decide the spec isn't ready, call "
"cancel_incubation and we go back to INDEPENDENT."
)
def _render_credentials_block(provider: Any) -> str:
"""Call a credentials_prompt_provider safely and return its output.
@@ -94,8 +121,10 @@ class WorkerSessionAdapter:
class QueenPhaseState:
"""Mutable state container for queen operating phase.
Three phases: independent, working, reviewing.
Four phases: independent, incubating, working, reviewing.
INDEPENDENT: queen acts as a standalone agent with MCP tools, no colony workers.
INCUBATING: queen has been approved by the incubating_evaluator to fork
a colony focused tool surface for drafting the spec.
WORKING: colony workers are running autonomously.
REVIEWING: workers have completed, queen reviews results.
@@ -103,8 +132,9 @@ class QueenPhaseState:
that trigger phase transitions.
"""
phase: str = "independent" # "independent", "working", or "reviewing"
phase: str = "independent" # "independent", "incubating", "working", or "reviewing"
independent_tools: list = field(default_factory=list) # list[Tool]
incubating_tools: list = field(default_factory=list) # list[Tool]
working_tools: list = field(default_factory=list) # list[Tool]
reviewing_tools: list = field(default_factory=list) # list[Tool]
inject_notification: Any = None # async (str) -> None
@@ -115,9 +145,27 @@ class QueenPhaseState:
# Phase-specific prompts (set by queen_orchestrator after construction)
prompt_independent: str = ""
prompt_incubating: str = ""
prompt_working: str = ""
prompt_reviewing: str = ""
# Last-set incubation context (colony_name + intended_purpose), populated
# by start_incubating_colony when the evaluator approves. Read by
# get_current_prompt() to interpolate the colony name into the
# incubating role prompt so the queen sees the same name across turns
# without having to remember it from the tool result.
incubating_colony_name: str | None = None
incubating_intended_purpose: str | None = None
# Trigger configs the queen drafted during incubation. set_trigger /
# remove_trigger / list_triggers buffer here when phase=="incubating"
# because the colony directory + worker runtime don't exist yet —
# there's nothing live to attach a timer/webhook to. ``create_colony``
# drains this list into ``{colony_dir}/triggers.json`` after the fork
# so the triggers auto-start the next time the colony is loaded.
# Cleared on switch_to_independent so failed/cancelled incubations
# don't leak into a future incubation attempt.
pending_triggers: list = field(default_factory=list) # list[dict]
# Default skill operational protocols — appended to every phase prompt
protocols_prompt: str = ""
# Community skills catalog (XML) — appended after protocols
@@ -172,6 +220,8 @@ class QueenPhaseState:
return list(self.working_tools)
if self.phase == "reviewing":
return list(self.reviewing_tools)
if self.phase == "incubating":
return list(self.incubating_tools)
# Default / "independent" — DM mode with full MCP tools.
return list(self.independent_tools)
@@ -181,6 +231,15 @@ class QueenPhaseState:
base = self.prompt_working
elif self.phase == "reviewing":
base = self.prompt_reviewing
elif self.phase == "incubating":
# Interpolate the active incubation context so the queen sees the
# same colony_name on every turn, not just the first tool result.
base = self.prompt_incubating
if self.incubating_colony_name:
base = base.replace(
"{colony_name}",
self.incubating_colony_name,
)
else:
base = self.prompt_independent
@@ -250,6 +309,14 @@ class QueenPhaseState:
if self.phase == "independent":
return
self.phase = "independent"
# Clear stale incubation context so a future incubation starts fresh.
# This includes the trigger buffer — a cancelled or successful
# incubation should not leak its drafted triggers into the next
# attempt (successful create_colony already drained them; cancel
# never committed them).
self.incubating_colony_name = None
self.incubating_intended_purpose = None
self.pending_triggers = []
tool_names = [t.name for t in self.independent_tools]
logger.info("Queen phase → independent (source=%s, tools: %s)", source, tool_names)
await self._emit_phase_event()
@@ -260,6 +327,48 @@ class QueenPhaseState:
"Available tools: " + ", ".join(tool_names) + "."
)
async def switch_to_incubating(
self,
*,
colony_name: str,
intended_purpose: str,
source: str = "tool",
) -> None:
"""Switch to incubating phase — queen drafts the colony spec.
Caller must already have validated colony_name. Stores the active
incubation context on self so get_current_prompt() can interpolate
it on every turn (the queen otherwise loses the colony_name after
the first tool result rolls past in the conversation history).
Args:
colony_name: Validated colony slug (lowercase alphanumeric + _).
intended_purpose: One-paragraph brief from the queen.
source: "tool", "frontend", or "auto".
"""
if self.phase == "incubating":
# Allow re-statement of context even when already incubating —
# the queen may have refined her intended_purpose mid-flight.
self.incubating_colony_name = colony_name
self.incubating_intended_purpose = intended_purpose
return
self.phase = "incubating"
self.incubating_colony_name = colony_name
self.incubating_intended_purpose = intended_purpose
tool_names = [t.name for t in self.incubating_tools]
logger.info(
"Queen phase → incubating (source=%s, colony=%s, tools: %s)",
source,
colony_name,
tool_names,
)
await self._emit_phase_event()
if self.inject_notification and source != "tool":
await self.inject_notification(
"[PHASE CHANGE] Switched to INCUBATING phase for colony "
f"'{colony_name}'. Available tools: " + ", ".join(tool_names) + "."
)
def build_worker_profile(runtime: Any, agent_path: Path | str | None = None) -> str:
"""Build a worker capability profile from the runtime's spec and goal."""
@@ -1334,6 +1443,7 @@ def register_queen_lifecycle_tools(
skill_body: str,
skill_files: list[dict] | None = None,
tasks: list[dict] | None = None,
concurrency_hint: int | None = None,
) -> str:
"""Create a colony and materialize its skill folder in one atomic call.
@@ -1427,6 +1537,7 @@ def register_queen_lifecycle_tools(
colony_name=cn,
task=(task or "").strip(),
tasks=tasks if isinstance(tasks, list) else None,
concurrency_hint=concurrency_hint if isinstance(concurrency_hint, int) and concurrency_hint > 0 else None,
)
except Exception as e:
logger.exception("create_colony: fork failed after installing skill")
@@ -1471,6 +1582,61 @@ def register_queen_lifecycle_tools(
exc_info=True,
)
# When the queen forked from INCUBATING phase, the chat is over by
# design: the colony spec is committed and there's nothing left to
# discuss in this DM. Auto-lock the session immediately (same
# mechanism the user-click path uses) and switch the queen back to
# INDEPENDENT so her closing message renders normally. The
# colony_spawned check on /chat will reject the user's NEXT message
# with the "compact and start a new session" UX.
phase_state = getattr(session, "phase_state", None)
if phase_state is not None and phase_state.phase == "incubating":
# Drain buffered triggers into the colony's triggers.json so they
# auto-start when the colony is first loaded (session_manager
# reads triggers.json on colony load and starts each timer /
# webhook). Do this BEFORE switch_to_independent clears the
# buffer.
if phase_state.pending_triggers:
triggers_path = colony_dir / "triggers.json"
try:
triggers_path.write_text(
json.dumps(phase_state.pending_triggers, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
logger.info(
"create_colony: wrote %d buffered trigger(s) to %s",
len(phase_state.pending_triggers),
triggers_path,
)
except OSError:
logger.warning(
"create_colony: failed to write triggers.json from buffer",
exc_info=True,
)
try:
from framework.server.routes_execution import (
persist_colony_spawn_lock,
)
persist_colony_spawn_lock(session, fork_result.get("colony_name", cn))
except OSError:
logger.warning(
"create_colony: failed to persist colony-spawned lock",
exc_info=True,
)
except Exception:
logger.warning(
"create_colony: persist_colony_spawn_lock raised",
exc_info=True,
)
try:
await phase_state.switch_to_independent(source="tool")
except Exception:
logger.warning(
"create_colony: failed to switch phase back to independent",
exc_info=True,
)
return json.dumps(
{
"status": "created",
@@ -1674,6 +1840,20 @@ def register_queen_lifecycle_tools(
"required": ["goal"],
},
},
"concurrency_hint": {
"type": "integer",
"description": (
"Optional advisory cap: how many worker processes "
"should typically run in parallel for this colony "
"(e.g. 1 for a single-fire 'send digest' job, 5 "
"for a fan-out that processes records). Baked "
"into worker.json as ``concurrency_hint`` for the "
"future colony queen to consult when planning "
"fan-outs. Not enforced — the queue itself is "
"atomic, this is just guidance. Omit if unsure."
),
"minimum": 1,
},
},
"required": [
"colony_name",
@@ -1691,6 +1871,258 @@ def register_queen_lifecycle_tools(
)
tools_registered += 1
# --- start_incubating_colony -------------------------------------------------
async def start_incubating_colony(
*,
colony_name: str,
intended_purpose: str,
) -> str:
"""Gate the queen behind a one-shot readiness evaluator.
Reads the queen's recent conversation off disk and asks
:func:`incubating_evaluator.evaluate` whether the spec is
settled enough to fork. On approval, flips the queen's phase
to ``incubating`` so a focused tool surface (``create_colony``,
``cancel_incubation``, read-only file tools) takes over. On
rejection, returns the verdict for the queen to self-correct
on her next turn the rejection is queen-only by design (no
SSE event, no user-facing message).
"""
if session is None:
return json.dumps({"error": "No session bound to this tool registry."})
cn = (colony_name or "").strip()
if not _COLONY_NAME_RE.match(cn):
return json.dumps(
{
"error": (
"colony_name must be lowercase alphanumeric with "
"underscores (e.g. 'morning_hn_digest')."
)
}
)
purpose = (intended_purpose or "").strip()
if not purpose:
return json.dumps(
{
"error": (
"intended_purpose is required — describe in one "
"paragraph what the colony will do, on what "
"cadence, and why it must outlive this chat."
)
}
)
phase_state = getattr(session, "phase_state", None)
if phase_state is None:
return json.dumps({"error": "phase_state is not initialised on this session."})
# Block re-entry from working/reviewing — those phases mean a colony
# is already running, the queen should NOT be drafting another spec
# on top. Independent → incubating is the only legal entry path.
if phase_state.phase not in ("independent", "incubating"):
return json.dumps(
{
"error": (
f"start_incubating_colony is not available in phase "
f"'{phase_state.phase}' — finish or stop the current "
"colony's workers first."
)
}
)
# Read the queen's conversation parts straight from disk. Same
# pattern as handle_compact_and_fork — avoids needing access to
# the live NodeConversation, which is local to the agent loop.
from framework.agent_loop.conversation import Message
from framework.agents.queen import incubating_evaluator
from framework.storage.conversation_store import FileConversationStore
queen_dir = getattr(session, "queen_dir", None)
messages: list = []
if queen_dir is not None and (queen_dir / "conversations").exists():
try:
store = FileConversationStore(queen_dir / "conversations")
raw_parts = await store.read_parts()
for part in raw_parts:
try:
messages.append(Message.from_storage_dict(part))
except (KeyError, TypeError):
# Skip malformed parts; the evaluator can still work
# off whatever messages it gets.
continue
except Exception:
logger.warning(
"start_incubating_colony: failed to read queen conversation",
exc_info=True,
)
llm = getattr(session, "llm", None)
if llm is None:
return json.dumps(
{
"error": (
"session has no LLM — cannot run readiness "
"evaluator. Retry once the session has fully "
"initialised."
)
}
)
verdict = await incubating_evaluator.evaluate(
llm=llm,
messages=messages,
colony_name=cn,
intended_purpose=purpose,
)
if not verdict.get("ready"):
# Queen-only silent rejection — no SSE, no user message.
# The queen reads the reasons in her tool result and decides
# what to do next (ask the user, refine scope, drop the idea).
return json.dumps(
{
"status": "not_ready",
"colony_name": cn,
"reasons": verdict.get("reasons", []),
"missing_prerequisites": verdict.get(
"missing_prerequisites", []
),
}
)
# Approved — flip phase. switch_to_incubating publishes
# QUEEN_PHASE_CHANGED so the frontend badge updates and stores
# the colony_name + purpose for the role prompt to interpolate.
await phase_state.switch_to_incubating(
colony_name=cn,
intended_purpose=purpose,
source="tool",
)
return json.dumps(
{
"status": "incubating",
"colony_name": cn,
"intended_purpose": purpose,
"guidance": _INCUBATING_APPROVAL_GUIDANCE.format(
colony_name=cn,
intended_purpose=purpose,
),
}
)
_start_incubating_colony_tool = Tool(
name="start_incubating_colony",
description=(
"Ask to fork this session into a persistent colony for "
"HEADLESS / RECURRING / BACKGROUND work that needs to "
"outlive this chat. This tool does NOT fork on its own — "
"it spawns a one-shot evaluator that reads the recent "
"conversation and decides whether the spec is settled "
"enough to proceed.\n\n"
"On APPROVAL, your phase flips to INCUBATING and a focused "
"tool surface unlocks (create_colony, cancel_incubation, "
"read-only file tools). The full coding toolkit goes away "
"on purpose so you can concentrate on writing a tight task "
"+ SKILL.md.\n\n"
"On REJECTION, you stay in INDEPENDENT and the verdict's "
"``missing_prerequisites`` lists what's still ambiguous in "
"queen-actionable form. Resolve those with the user (ask "
"in plain prose or via ask_user) and call this tool again "
"when the spec is settled. The rejection is queen-only — "
"the user does NOT see it, so frame your follow-up "
"naturally without referencing 'the evaluator'.\n\n"
"DO NOT call this for one-shot work that the user wants "
"results for right now in this chat — do that work yourself "
"with your independent toolkit instead."
),
parameters={
"type": "object",
"properties": {
"colony_name": {
"type": "string",
"description": (
"Lowercase alphanumeric+underscore name for the "
"proposed colony (e.g. 'morning_hn_digest', "
"'inbox_monitor')."
),
},
"intended_purpose": {
"type": "string",
"description": (
"One-paragraph brief: what the colony will do, "
"on what cadence, why it must outlive this "
"chat. Do NOT write the SKILL.md here — that "
"happens in INCUBATING phase after approval."
),
},
},
"required": ["colony_name", "intended_purpose"],
},
)
registry.register(
"start_incubating_colony",
_start_incubating_colony_tool,
lambda inputs: start_incubating_colony(**inputs),
)
tools_registered += 1
# --- cancel_incubation -------------------------------------------------------
async def cancel_incubation() -> str:
"""Bail out of incubating mode and return to independent.
Use when the spec turns out to not be ready after all (user
changed their mind, the work is one-shot, more than a couple of
details still need to be worked out). Harmless no-op if not
currently in incubating.
"""
if session is None:
return json.dumps({"error": "No session bound to this tool registry."})
phase_state = getattr(session, "phase_state", None)
if phase_state is None:
return json.dumps({"error": "phase_state is not initialised on this session."})
if phase_state.phase != "incubating":
return json.dumps(
{
"status": "noop",
"reason": f"phase is '{phase_state.phase}', not 'incubating'",
}
)
previous_colony = phase_state.incubating_colony_name
await phase_state.switch_to_independent(source="tool")
return json.dumps(
{
"status": "cancelled",
"previous_colony_name": previous_colony,
}
)
_cancel_incubation_tool = Tool(
name="cancel_incubation",
description=(
"Bail out of INCUBATING phase back to INDEPENDENT. Use "
"when the spec turns out to not be ready after all — the "
"user changed their mind, the work is actually one-shot, "
"or more than a couple of operational details still need "
"to be sorted out. No fork happens; the full coding "
"toolkit comes back. Harmless no-op outside INCUBATING."
),
parameters={"type": "object", "properties": {}, "required": []},
)
registry.register(
"cancel_incubation",
_cancel_incubation_tool,
lambda inputs: cancel_incubation(**inputs),
)
tools_registered += 1
# --- enqueue_task ------------------------------------------------------------
async def enqueue_task_tool(
@@ -3885,7 +4317,103 @@ def register_queen_lifecycle_tools(
trigger_config: dict | None = None,
task: str | None = None,
) -> str:
"""Activate a trigger so it fires periodically into the queen."""
"""Activate a trigger so it fires periodically into the queen.
Phase-branched: in INCUBATING the trigger spec is buffered onto
``phase_state.pending_triggers`` because the colony directory +
worker runtime don't exist yet. ``create_colony`` drains the
buffer into ``{colony_dir}/triggers.json`` post-fork so the
triggers auto-start on first colony load. In WORKING / REVIEWING
the live behaviour (start timer/webhook + persist) runs as before.
"""
# ── Incubating branch: buffer instead of activate ────────────
phase_state = getattr(session, "phase_state", None)
if phase_state is not None and phase_state.phase == "incubating":
t_type = trigger_type
t_config = trigger_config or {}
if not t_type:
return json.dumps(
{
"error": (
"trigger_type is required during incubation — "
"no existing trigger to inherit from yet."
)
}
)
if not task:
return json.dumps(
{
"error": (
f"Trigger '{trigger_id}' needs a task — "
"describe what the worker should do when this "
"trigger fires."
)
}
)
# Mirror the live-path validation so a bad cron / interval is
# caught at draft time, not on first colony load.
if t_type == "timer":
cron_expr = t_config.get("cron")
interval = t_config.get("interval_minutes")
if cron_expr:
try:
from croniter import croniter
if not croniter.is_valid(cron_expr):
return json.dumps({"error": f"Invalid cron expression: {cron_expr}"})
except ImportError:
return json.dumps(
{"error": "croniter package not installed — cannot validate cron expression."}
)
elif interval:
if not isinstance(interval, (int, float)) or interval <= 0:
return json.dumps({"error": f"interval_minutes must be > 0, got {interval}"})
else:
return json.dumps(
{"error": "Timer trigger needs 'cron' or 'interval_minutes' in trigger_config."}
)
elif t_type == "webhook":
path = (t_config.get("path") or "").strip()
if not path or not path.startswith("/"):
return json.dumps(
{
"error": (
"Webhook trigger requires 'path' starting "
"with '/' in trigger_config (e.g. '/hooks/github')."
)
}
)
else:
return json.dumps({"error": f"Unsupported trigger type: {t_type}"})
# Replace any prior buffered entry with the same id so the
# queen can iteratively refine without duplicates.
phase_state.pending_triggers = [
t for t in phase_state.pending_triggers if t.get("id") != trigger_id
]
phase_state.pending_triggers.append(
{
"id": trigger_id,
"trigger_type": t_type,
"trigger_config": t_config,
"task": task,
"name": trigger_id,
}
)
return json.dumps(
{
"status": "buffered",
"trigger_id": trigger_id,
"trigger_type": t_type,
"trigger_config": t_config,
"note": (
"Buffered for incubation. Will be written to "
"triggers.json when create_colony commits the spec, "
"and auto-start on first colony load."
),
}
)
if trigger_id in getattr(session, "active_trigger_ids", set()):
return json.dumps({"error": f"Trigger '{trigger_id}' is already active."})
@@ -4094,7 +4622,29 @@ def register_queen_lifecycle_tools(
# --- remove_trigger --------------------------------------------------------
async def remove_trigger(trigger_id: str) -> str:
"""Deactivate an active trigger."""
"""Deactivate an active trigger.
Phase-branched: in INCUBATING, removes the buffered draft instead
of touching live runtime state.
"""
# ── Incubating branch: drop from buffer ──────────────────────
phase_state = getattr(session, "phase_state", None)
if phase_state is not None and phase_state.phase == "incubating":
before = len(phase_state.pending_triggers)
phase_state.pending_triggers = [
t for t in phase_state.pending_triggers if t.get("id") != trigger_id
]
if len(phase_state.pending_triggers) == before:
return json.dumps(
{
"error": (
f"Trigger '{trigger_id}' was not in the "
"incubation buffer."
)
}
)
return json.dumps({"status": "removed_from_buffer", "trigger_id": trigger_id})
if trigger_id not in getattr(session, "active_trigger_ids", set()):
return json.dumps({"error": f"Trigger '{trigger_id}' is not active."})
@@ -4160,7 +4710,26 @@ def register_queen_lifecycle_tools(
# --- list_triggers ---------------------------------------------------------
async def list_triggers() -> str:
"""List all available triggers and their status."""
"""List all available triggers and their status.
Phase-branched: in INCUBATING, returns the buffered drafts that
will be written to triggers.json when create_colony commits.
"""
# ── Incubating branch: return buffered drafts ────────────────
phase_state = getattr(session, "phase_state", None)
if phase_state is not None and phase_state.phase == "incubating":
return json.dumps(
{
"triggers": list(phase_state.pending_triggers),
"buffered": True,
"note": (
"These are draft triggers staged for the colony "
"being incubated. They are written to triggers.json "
"when create_colony commits the spec."
),
}
)
available = getattr(session, "available_triggers", {})
triggers = []
for tdef in available.values():
+4 -2
View File
@@ -12,8 +12,10 @@ export interface LiveSession {
loaded_at: number;
uptime_seconds: number;
intro_message?: string;
/** Queen operating phase — "independent" (DM), "working" (workers running), or "reviewing" (workers done) */
queen_phase?: "independent" | "working" | "reviewing";
/** Queen operating phase — "independent" (DM), "incubating" (drafting a
* colony spec after the readiness eval approved), "working" (workers
* running), or "reviewing" (workers done) */
queen_phase?: "independent" | "incubating" | "working" | "reviewing";
/** Whether the queen's LLM supports image content in messages */
queen_supports_images?: boolean;
/** Selected queen identity ID (e.g. "queen_technology") */
+28 -20
View File
@@ -58,7 +58,7 @@ export interface ChatMessage {
/** Epoch ms when this message was first created — used for ordering queen/worker interleaving */
createdAt?: number;
/** Queen phase active when this message was created */
phase?: "independent" | "working" | "reviewing";
phase?: "independent" | "incubating" | "working" | "reviewing";
/** Images attached to a user message */
images?: ImageContent[];
/** Backend node_id that produced this message — used for subagent grouping */
@@ -105,7 +105,7 @@ interface ChatPanelProps {
/** Called when user dismisses the pending question without answering */
onQuestionDismiss?: () => void;
/** Queen operating phase — shown as a tag on queen messages */
queenPhase?: "independent" | "working" | "reviewing";
queenPhase?: "independent" | "incubating" | "working" | "reviewing";
/** When false, queen messages omit the phase badge */
showQueenPhaseBadge?: boolean;
/** Context window usage for queen and workers */
@@ -347,7 +347,7 @@ function InlineAskUserBubble({
thread: string,
images?: ImageContent[],
) => void;
queenPhase?: "independent" | "working" | "reviewing";
queenPhase?: "independent" | "incubating" | "working" | "reviewing";
showQueenPhaseBadge?: boolean;
queenProfileId?: string | null;
}) {
@@ -443,23 +443,31 @@ function InlineAskUserBubble({
>
{msg.agent}
</span>
{(!isQueen || showQueenPhaseBadge) && (
<span
className={`text-[10px] font-medium px-1.5 py-0.5 rounded-md ${
isQueen
? "bg-primary/15 text-primary"
: "bg-muted text-muted-foreground"
}`}
>
{isQueen
? (msg.phase ?? queenPhase) === "working"
? "working"
: (msg.phase ?? queenPhase) === "reviewing"
? "reviewing"
{(!isQueen || showQueenPhaseBadge) && (() => {
const effectivePhase = msg.phase ?? queenPhase;
const isIncubating = isQueen && effectivePhase === "incubating";
const badgeClass = isQueen
? isIncubating
// Honey-amber tint distinguishes incubating from the
// primary-tinted independent/working/reviewing badges.
? "bg-amber-500/15 text-amber-500"
: "bg-primary/15 text-primary"
: "bg-muted text-muted-foreground";
const label = isQueen
? effectivePhase === "working"
? "working"
: effectivePhase === "reviewing"
? "reviewing"
: effectivePhase === "incubating"
? "incubating"
: "independent"
: "Worker"}
</span>
)}
: "Worker";
return (
<span className={`text-[10px] font-medium px-1.5 py-0.5 rounded-md ${badgeClass}`}>
{label}
</span>
);
})()}
</div>
{payload.kind === "single" ? (
<QuestionWidget
@@ -501,7 +509,7 @@ const MessageBubble = memo(
onColonyLinkClick,
}: {
msg: ChatMessage;
queenPhase?: "independent" | "working" | "reviewing";
queenPhase?: "independent" | "incubating" | "working" | "reviewing";
showQueenPhaseBadge?: boolean;
queenProfileId?: string | null;
queenAvatarUrl?: string | null;
@@ -1,4 +1,11 @@
export type ColonyRestorePhase = "independent" | "working" | "reviewing";
// "incubating" is queen-DM-only; the colony page never enters it (the queen
// auto-switches back to independent before the lock fires). Keep it in the
// union so the type lines up with LiveSession.queen_phase.
export type ColonyRestorePhase =
| "independent"
| "incubating"
| "working"
| "reviewing";
export function shouldUsePrefetchedColonyRestore(
prefetchedSessionId: string | undefined,
+3 -3
View File
@@ -47,7 +47,7 @@ function truncate(s: string, max: number): string {
type SessionRestoreResult = {
messages: ChatMessage[];
restoredPhase: "independent" | "working" | "reviewing" | null;
restoredPhase: "independent" | "incubating" | "working" | "reviewing" | null;
truncated: boolean;
droppedCount: number;
};
@@ -139,7 +139,7 @@ interface AgentState {
displayName: string | null;
awaitingInput: boolean;
workerInputMessageId: string | null;
queenPhase: "independent" | "working" | "reviewing";
queenPhase: "independent" | "incubating" | "working" | "reviewing";
agentPath: string | null;
currentRunId: string | null;
nodeLogs: Record<string, string[]>;
@@ -456,7 +456,7 @@ export default function ColonyChat() {
}
}
let restoredPhase: "independent" | "working" | "reviewing" | null = null;
let restoredPhase: "independent" | "incubating" | "working" | "reviewing" | null = null;
if (!liveSession) {
if (coldRestoreId) {
+32 -15
View File
@@ -78,7 +78,7 @@ export default function QueenDM() {
>({});
const queenIterTextRef = useRef<Record<string, Record<number, string>>>({});
const [queenPhase, setQueenPhase] = useState<
"independent" | "working" | "reviewing"
"independent" | "incubating" | "working" | "reviewing"
>("independent");
const resetViewState = useCallback(() => {
@@ -317,10 +317,14 @@ export default function QueenDM() {
};
}, [queenId, sessionId]);
// Hydrate the colony-spawned lock from the session detail whenever the
// session ID changes. The /sessions/{id} response carries colony_spawned
// (live) and the cold-info path returns the same field after a server
// restart, so the same fetch covers both states.
// Hydrate the colony-spawned lock + queen phase from the session detail
// whenever the session ID changes. /sessions/{id} carries both flags
// (and the cold-info path returns colony_spawned after a server restart),
// so this single fetch covers live, page-reload, and post-restart states.
// Without seeding queen_phase here the badge starts at the useState
// default ("independent") and only updates when a fresh
// QUEEN_PHASE_CHANGED SSE event fires — a reload mid-incubation would
// briefly mis-render.
useEffect(() => {
if (!sessionId) return;
let cancelled = false;
@@ -328,17 +332,25 @@ export default function QueenDM() {
.get(sessionId)
.then((data) => {
if (cancelled) return;
const locked = Boolean(
(data as { colony_spawned?: boolean }).colony_spawned,
);
const name =
(data as { spawned_colony_name?: string | null })
.spawned_colony_name ?? null;
setColonySpawned(locked);
setSpawnedColonyName(name);
const detail = data as {
colony_spawned?: boolean;
spawned_colony_name?: string | null;
queen_phase?: "independent" | "incubating" | "working" | "reviewing";
};
setColonySpawned(Boolean(detail.colony_spawned));
setSpawnedColonyName(detail.spawned_colony_name ?? null);
if (
detail.queen_phase === "independent" ||
detail.queen_phase === "incubating" ||
detail.queen_phase === "working" ||
detail.queen_phase === "reviewing"
) {
setQueenPhase(detail.queen_phase);
}
})
.catch(() => {
// Non-fatal — lock simply won't activate until the user navigates back.
// Non-fatal — lock + phase simply won't activate until a fresh
// SSE event arrives.
});
return () => {
cancelled = true;
@@ -620,6 +632,7 @@ export default function QueenDM() {
const rawPhase = event.data?.phase as string;
if (
rawPhase === "independent" ||
rawPhase === "incubating" ||
rawPhase === "working" ||
rawPhase === "reviewing"
) {
@@ -912,7 +925,11 @@ export default function QueenDM() {
isBusy={isTyping}
disabled={loading || !queenReady}
queenPhase={queenPhase}
showQueenPhaseBadge={false}
// The DM is normally in `independent` phase, so the per-message
// badge would just be noise. Surface it once the phase moves
// (e.g. INCUBATING after start_incubating_colony approves) so
// the user immediately sees the queen is in a different mode.
showQueenPhaseBadge={queenPhase !== "independent"}
pendingQuestion={awaitingInput ? pendingQuestion : null}
pendingOptions={awaitingInput ? pendingOptions : null}
pendingQuestions={awaitingInput ? pendingQuestions : null}