Merge branch 'feature/colony-orchestrate' into feature/new-colony

This commit is contained in:
Timothy
2026-04-10 15:52:16 -07:00
27 changed files with 3700 additions and 197 deletions
+8 -1
View File
@@ -6,7 +6,14 @@
"Read(//^ @pytest.mark.asyncio/{getline n; print NR\": \"n} /^ def test_/**)",
"Bash(python3)",
"Bash(grep -nE 'Tool\\\\\\(\\\\s*$|name=\"[a-z_]+\",' core/framework/tools/queen_lifecycle_tools.py)",
"Bash(awk -F'\"' '{print $2}')"
"Bash(awk -F'\"' '{print $2}')",
"Bash(grep -n \"create_colony\\\\|colony-spawn\\\\|colony_spawn\" /home/timothy/aden/hive/core/framework/agents/queen/nodes/__init__.py /home/timothy/aden/hive/core/framework/tools/*.py)",
"Bash(git stash:*)",
"Bash(python3 -c \"import sys,json; d=json.loads\\(sys.stdin.read\\(\\)\\); print\\('keys:', list\\(d.keys\\(\\)\\)[:10]\\)\")",
"Bash(python3 -c ':*)"
],
"additionalDirectories": [
"/home/timothy/.hive/skills/writing-hive-skills"
]
},
"hooks": {
+105 -5
View File
@@ -69,6 +69,8 @@ from framework.agent_loop.internals.synthetic_tools import (
build_ask_user_multiple_tool,
build_ask_user_tool,
build_escalate_tool,
build_report_to_parent_tool,
handle_report_to_parent,
)
from framework.agent_loop.internals.tool_result_handler import (
build_json_preview,
@@ -84,6 +86,7 @@ from framework.agent_loop.internals.types import (
TriggerEvent,
)
from framework.agent_loop.types import AgentContext, AgentProtocol, AgentResult
from framework.host.event_bus import EventBus
from framework.llm.capabilities import supports_image_tool_results
from framework.llm.provider import Tool, ToolResult, ToolUse
from framework.llm.stream_events import (
@@ -92,7 +95,6 @@ from framework.llm.stream_events import (
TextDeltaEvent,
ToolCallEvent,
)
from framework.host.event_bus import EventBus
from framework.tracker.llm_debug_logger import log_llm_turn
logger = logging.getLogger(__name__)
@@ -362,7 +364,13 @@ class AgentLoop(AgentProtocol):
self._action_plan_emitted: set[str] = set()
# Monotonic counter for spillover file naming (web_search_1.txt, etc.)
self._spill_counter: int = 0
# Subagent mark_complete: when True, _evaluate returns ACCEPT immediately
# Set to True by the report_to_parent synthetic tool handler so the
# next loop iteration exits cleanly (parallel worker termination).
self._report_terminated: bool = False
# Back-reference to the Worker that owns this AgentLoop, if any.
# Set by the Worker's __init__ so the report_to_parent handler can
# record the explicit report payload on the owning Worker instance.
self._owner_worker: Any = None
def validate_input(self, ctx: AgentContext) -> list[str]:
"""Validate hard requirements only.
@@ -544,11 +552,15 @@ class AgentLoop(AgentProtocol):
tools = list(ctx.available_tools)
if ctx.supports_direct_user_io:
tools.append(self._build_ask_user_tool())
if stream_id == "queen":
if stream_id == "queen" or stream_id == "overseer":
tools.append(self._build_ask_user_multiple_tool())
# Workers can escalate blockers to the queen.
if stream_id not in ("queen", "judge"):
# Workers (parallel ephemeral agents) get escalate + report_to_parent.
# The overseer is client-facing like the queen and has neither.
if stream_id not in ("queen", "judge", "overseer"):
tools.append(self._build_escalate_tool())
# Only parallel workers (stream_id="worker:{uuid}") get report_to_parent.
if isinstance(stream_id, str) and stream_id.startswith("worker:"):
tools.append(build_report_to_parent_tool())
logger.info(
"[%s] Tools available (%d): %s | direct_user_io=%s | judge=%s",
@@ -586,6 +598,29 @@ class AgentLoop(AgentProtocol):
iter_start = time.time()
logger.debug("[AgentLoop.execute] iteration=%d starting", iteration)
# 6a-pre. Early exit for workers that called report_to_parent on
# the previous turn. The report_to_parent handler sets this
# flag; the loop finishes the current turn (so the LLM sees
# the acknowledgement tool result) and exits at the top of the
# next iteration. Parallel workers terminate here.
if self._report_terminated:
latency_ms = int((time.time() - start_time) * 1000)
logger.info(
"[%s] iter=%d: worker terminated via report_to_parent",
node_id,
iteration,
)
await self._publish_loop_completed(
stream_id, node_id, iteration, execution_id
)
return AgentResult(
success=True,
output=accumulator.to_dict(),
tokens_used=total_input_tokens + total_output_tokens,
latency_ms=latency_ms,
conversation=None,
)
# 6a. Check pause (no current-iteration data yet — only log_node_complete needed)
if await self._check_pause(ctx, conversation, iteration):
latency_ms = int((time.time() - start_time) * 1000)
@@ -2460,6 +2495,27 @@ class AgentLoop(AgentProtocol):
# --- Framework-level ask_user handling ---
ask_user_prompt = tc.tool_input.get("question", "")
raw_options = tc.tool_input.get("options", None)
# Self-heal: some model families (notably the queen
# profile prompt poisoning the output style) cram
# the options inside the question string as a
# pseudo-XML blob like:
#
# "What do you want to do?</question>\n_OPTIONS:
# [\"De-risk\", \"Add\", \"Short\"]"
#
# When that happens the question text leaks
# </question> and _OPTIONS: into the chat UI and
# the buttons never appear. Detect + repair.
from framework.agent_loop.internals.synthetic_tools import (
sanitize_ask_user_inputs,
)
ask_user_prompt, recovered_options = sanitize_ask_user_inputs(
ask_user_prompt, raw_options
)
if recovered_options is not None and raw_options is None:
raw_options = recovered_options
# Defensive: ensure options is a list of strings.
# Smaller models sometimes send a string instead of
# an array — try to recover gracefully.
@@ -2620,6 +2676,50 @@ class AgentLoop(AgentProtocol):
)
results_by_id[tc.tool_use_id] = result
elif tc.tool_name == "report_to_parent":
# --- Framework-level report_to_parent handling ---
# Parallel workers call this to emit a structured
# SUBAGENT_REPORT and terminate cleanly. The worker
# owner (Worker instance) records the explicit report
# via ``record_explicit_report`` so Worker.run()'s
# terminal event emission picks it up.
if not (
isinstance(stream_id, str)
and stream_id.startswith("worker:")
):
result = ToolResult(
tool_use_id=tc.tool_use_id,
content=(
"ERROR: report_to_parent is only available to "
"parallel workers (stream_id='worker:*'). "
"The overseer talks to the user directly."
),
is_error=True,
)
results_by_id[tc.tool_use_id] = result
continue
report_tc_input = dict(tc.tool_input)
report_tc_input["tool_use_id"] = tc.tool_use_id
result = handle_report_to_parent(report_tc_input)
results_by_id[tc.tool_use_id] = result
# Record on the owning Worker so its terminal event
# emission picks up the explicit report.
owner_worker = getattr(self, "_owner_worker", None)
if owner_worker is not None:
normalised = report_tc_input.get("_normalised", {})
owner_worker.record_explicit_report(
status=normalised.get("status", "success"),
summary=normalised.get("summary", ""),
data=normalised.get("data", {}),
)
# Terminate the loop cleanly after this turn. Set the
# same completion flag path that set_output used so
# the next iteration exits with success.
self._report_terminated = True
else:
# --- Real tool: check for truncated args, else queue ---
if "_raw" in tc.tool_input:
@@ -139,24 +139,6 @@ async def judge_turn(
),
)
# Queen with no output keys → continuous interaction node (chat mode).
#
# Claude-Code-style: trust the model's choice between text and tools.
# A chat-mode node has nothing structural for the judge to evaluate —
# text output IS how the queen communicates with the user, and a
# turn without real tools is a legitimate conversational reply.
# The old behavior here was to RETRY with "no prose, only tool calls",
# which trained the queen into an ask_user loop that never let it
# just chat. Drive "act, don't describe" via the system prompt,
# not via a post-hoc retry loop.
#
# Note: real tool calls were already handled at line 109 (early
# RETRY to continue the loop), so reaching here means the turn
# was either pure text, an ask_user call, or genuinely empty —
# all of which should resolve on the next user message.
if not output_keys and ctx.supports_direct_user_io:
return JudgeVerdict(action="ACCEPT", feedback="")
# Level 2b: conversation-aware quality check (if success_criteria set)
if ctx.agent_spec.success_criteria and ctx.llm:
from framework.orchestrator.conversation_judge import evaluate_phase_completion
@@ -15,6 +15,82 @@ from typing import Any
from framework.llm.provider import Tool, ToolResult
def sanitize_ask_user_inputs(
raw_question: Any,
raw_options: Any,
) -> tuple[str, list[str] | None]:
"""Self-heal a malformed ``ask_user`` tool call.
Some model families (notably when the system prompt teaches them
XML-ish scratchpad tags like ``<relationship>...</relationship>``)
carry that style into tool arguments and produce calls like::
ask_user({
"question": "What now?</question>\\n_OPTIONS: [\\"A\\", \\"B\\"]"
})
Symptoms:
- The chat UI renders ``</question>`` and ``_OPTIONS: [...]`` as
literal text in the question bubble.
- No buttons appear because the real ``options`` parameter is
empty.
This function:
- Strips leading/trailing whitespace.
- Removes a trailing ``</question>`` (with optional preceding
whitespace) from the question text.
- Detects an inline ``_OPTIONS:``, ``OPTIONS:``, or ``options:``
line followed by a JSON array, parses it, and returns the
recovered list as the second element.
- Removes the parsed line from the returned question text.
Returns ``(cleaned_question, recovered_options_or_None)``. The
caller should treat the recovered list as a fallback only when
the model did not also supply a real ``options`` array.
"""
import json as _json
import re as _re
if raw_question is None:
return "", None
q = str(raw_question)
# Strip a stray </question> tag (case-insensitive, with optional
# preceding whitespace) anywhere in the string. This is the most
# common failure mode and never represents valid content.
q = _re.sub(r"\s*</\s*question\s*>\s*", "\n", q, flags=_re.IGNORECASE)
# Look for an inline options line. Match _OPTIONS, OPTIONS, options
# (with or without leading underscore), followed by ':' or '=', then
# a JSON array on the same line OR on the next line.
inline_options_re = _re.compile(
r"(?im)^\s*_?options\s*[:=]\s*(\[.*?\])\s*$",
_re.DOTALL,
)
recovered: list[str] | None = None
match = inline_options_re.search(q)
if match is not None:
try:
parsed = _json.loads(match.group(1))
if isinstance(parsed, list):
cleaned = [str(o).strip() for o in parsed if str(o).strip()]
if 1 <= len(cleaned) <= 8:
recovered = cleaned
except (ValueError, TypeError):
pass
if recovered is not None:
# Remove the parsed line so it doesn't leak into the
# rendered question text.
q = inline_options_re.sub("", q, count=1)
# Strip any final whitespace / leftover blank lines from the
# question after removals.
q = _re.sub(r"\n{3,}", "\n\n", q).strip()
return q, recovered
def build_ask_user_tool() -> Tool:
"""Build the synthetic ask_user tool for explicit user-input requests.
@@ -28,7 +104,20 @@ def build_ask_user_tool() -> Tool:
"You MUST call this tool whenever you need the user's response. "
"Always call it after greeting the user, asking a question, or "
"requesting approval. Do NOT call it for status updates or "
"summaries that don't require a response. "
"summaries that don't require a response.\n\n"
"STRUCTURE RULES (CRITICAL):\n"
"- The 'question' field is PLAIN TEXT shown to the user. Do NOT "
"include XML tags, pseudo-tags like </question>, or option lists "
"in the question string. The UI does not parse them — they "
"render as raw text and look broken.\n"
"- The 'options' parameter is the ONLY way to render buttons. "
"If you want buttons, put them in the 'options' array, not in "
"the question string. Do NOT write 'OPTIONS: [...]', "
"'_options: [...]', or any inline list inside 'question'.\n"
"- The question text must read as a single clean prompt with "
"no markup. Example: 'What would you like to do?' — not "
"'What would you like to do?</question>'.\n\n"
"USAGE:\n"
"Always include 2-3 predefined options. The UI automatically "
"appends an 'Other' free-text input after your options, so NEVER "
"include catch-all options like 'Custom idea', 'Something else', "
@@ -39,11 +128,14 @@ def build_ask_user_tool() -> Tool:
"free-text input. "
"The ONLY exception: omit options when the question demands a "
"free-form answer the user must type out (e.g. 'Describe your "
"agent idea', 'Paste the error message'). "
"agent idea', 'Paste the error message').\n\n"
"CORRECT EXAMPLE:\n"
'{"question": "What would you like to do?", "options": '
'["Build a new agent", "Modify existing agent", "Run tests"]} '
"Free-form example: "
'{"question": "Describe the agent you want to build."}'
'["Build a new agent", "Modify existing agent", "Run tests"]}\n\n'
"FREE-FORM EXAMPLE:\n"
'{"question": "Describe the agent you want to build."}\n\n'
"WRONG (do NOT do this — buttons will not render):\n"
'{"question": "What now?</question>\\n_OPTIONS: [\\"A\\", \\"B\\"]"}'
),
parameters={
"type": "object",
@@ -204,6 +296,94 @@ def build_escalate_tool() -> Tool:
},
)
def build_report_to_parent_tool() -> Tool:
"""Build the synthetic ``report_to_parent`` tool.
Parallel workers (those spawned by the overseer via
``run_parallel_workers``) call this to send a structured report back
to the overseer queen when they have finished their task. Calling
``report_to_parent`` terminates the worker's loop cleanly -- do not
call other tools after it.
The overseer receives these as ``SUBAGENT_REPORT`` events and
aggregates them into a single summary for the user.
"""
return Tool(
name="report_to_parent",
description=(
"Send a structured report back to the parent overseer and "
"terminate. Call this when you have finished your task "
"(success, partial, or failed) or cannot make further "
"progress. Your loop ends after this call -- do not call any "
"other tool afterwards. The overseer reads the summary + "
"data fields and aggregates them into a user-facing response."
),
parameters={
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["success", "partial", "failed"],
"description": (
"Overall outcome. 'success' = task complete. "
"'partial' = some progress but incomplete. "
"'failed' = could not make progress."
),
},
"summary": {
"type": "string",
"description": (
"One-paragraph narrative for the overseer. What "
"you did, what you found, and any notable issues."
),
},
"data": {
"type": "object",
"description": (
"Optional structured payload (rows fetched, IDs "
"processed, files written, etc.) that the "
"overseer can merge into its final summary."
),
},
},
"required": ["status", "summary"],
},
)
def handle_report_to_parent(tool_input: dict[str, Any]) -> ToolResult:
"""Normalise + validate a ``report_to_parent`` tool call.
Returns a ``ToolResult`` with the acknowledgement text the LLM sees;
the side effects (record on Worker, emit SUBAGENT_REPORT, terminate
loop) are performed by ``AgentLoop`` after this helper returns.
"""
status = str(tool_input.get("status", "success")).strip().lower()
if status not in ("success", "partial", "failed"):
status = "success"
summary = str(tool_input.get("summary", "")).strip()
if not summary:
summary = f"(worker returned {status} with no summary)"
data = tool_input.get("data") or {}
if not isinstance(data, dict):
data = {"value": data}
# Store the normalised payload back on the input dict so the caller
# can pick it up without re-parsing.
tool_input["_normalised"] = {
"status": status,
"summary": summary,
"data": data,
}
return ToolResult(
tool_use_id=tool_input.get("tool_use_id", ""),
content=(
f"Report delivered to overseer (status={status}). "
f"This worker will terminate now."
),
)
def handle_set_output(
tool_input: dict[str, Any],
output_keys: list[str] | None,
@@ -1,3 +1,3 @@
{
"include": ["gcu-tools"]
"include": ["gcu-tools", "hive-tools"]
}
@@ -12,5 +12,12 @@
"args": ["run", "python", "-m", "gcu.server", "--stdio", "--capabilities", "browser"],
"cwd": "../../../../tools",
"description": "Browser automation tools (Playwright-based)"
},
"hive-tools": {
"transport": "stdio",
"command": "uv",
"args": ["run", "python", "mcp_server.py", "--stdio"],
"cwd": "../../../../tools",
"description": "Hive tools MCP server (csv, pdf, web_search, web_scrape, email, integrations)"
}
}
+47 -1
View File
@@ -81,6 +81,13 @@ _QUEEN_PLANNING_TOOLS = [
# Scaffold + transition to building (requires confirm_and_build first)
# Load existing agent (after user confirms)
"load_built_agent",
# Parallel fan-out — use directly for one-off batch work the user
# wants RIGHT NOW (without first designing an agent for it).
"run_parallel_workers",
# Fork this session into a colony, writing a learned-skill file
# under ~/.hive/skills/ first so the new colony inherits the
# session's knowledge.
"create_colony",
]
# Building phase: full coding + agent construction tools.
@@ -125,6 +132,7 @@ _QUEEN_RUNNING_TOOLS = [
"switch_to_reviewing",
"get_worker_status",
"run_agent_with_input",
"run_parallel_workers",
"inject_message",
# Worker escalation inbox
"list_worker_questions",
@@ -175,6 +183,10 @@ _QUEEN_INDEPENDENT_TOOLS = [
"search_files",
"run_command",
"undo_changes",
# Parallel fan-out (Phase 4 unified ColonyRuntime)
"run_parallel_workers",
# Fork to colony — captures session knowledge as a skill first
"create_colony",
]
@@ -690,6 +702,40 @@ to fix the currently loaded agent (no draft required).
phase. Only use this when the user explicitly asks to work with an existing agent \
(e.g. "load my_agent", "run the research agent"). Confirm with the user first.
## Parallel fan-out (one-off batch work — no agent build required)
- run_parallel_workers(tasks, timeout?) Spawn N workers concurrently and \
wait for all reports. Use this when the user asks for batch / parallel work \
RIGHT NOW that does NOT need a reusable agent (e.g. "fetch batches 15 from \
this API", "summarise these 10 PDFs", "compare these candidates"). Each task \
is a dict {"task": "...", "data"?: {...}}; the tool returns aggregated \
{worker_id, status, summary, data, error} reports. Read the summaries and \
write a single user-facing synthesis on your next turn. Prefer this over \
designing a draft when the work is one-shot and the user wants results, not \
a saved agent.
## Forking the session into a colony (with session-knowledge capture)
Two-step flow:
1. AUTHOR THE SKILL FIRST. Use write_file to create a skill folder \
(recommended location: `~/.hive/skills/{skill-name}/SKILL.md`) \
capturing what you learned during THIS session API endpoints, \
auth flow, response shapes, gotchas, conventions, query patterns. \
The SKILL.md needs YAML frontmatter with `name` (matching the \
directory name) and `description` (1-1024 chars including trigger \
keywords), followed by a markdown body. Optional subdirs: \
scripts/, references/, assets/. Read your writing-hive-skills \
default skill for the full spec.
2. create_colony(colony_name, task, skill_path) Validate the skill \
folder, install it under ~/.hive/skills/ if it's not already there, \
and fork this session into a new colony. NOTHING RUNS after this \
call: the task is baked into worker.json and the user starts the \
worker later from the new colony page. The task string still must \
be FULL and self-contained when the user eventually runs it the \
worker has zero memory of your chat. The skill you wrote is \
installed under ~/.hive/skills/ so the worker discovers it on its \
first scan and starts informed instead of clueless. ALWAYS prefer \
create_colony over a raw fork when ending a session that uncovered \
reusable operational knowledge.
## Workflow summary
1. Understand requirements discover tools design the layout
2. Call save_agent_draft() to create visual draft present to user
@@ -809,7 +855,7 @@ You are the agent. No worker — you execute directly.
3. Execute using your tools: file I/O, shell commands, browser automation
4. Report results, iterate if needed
You have NO lifecycle tools (no start_worker, stop_worker, confirm_and_build, etc.).
You have NO lifecycle tools (no run_agent_with_input, stop_worker, confirm_and_build, etc.).
If the task requires building a dedicated agent, tell the user to start a \
new session without independent mode.
"""
+346 -9
View File
@@ -179,6 +179,7 @@ class ColonyRuntime:
)
storage_path_obj = Path(storage_path) if isinstance(storage_path, str) else storage_path
self._storage_path: Path = storage_path_obj
self._storage = ConcurrentStorage(
base_path=storage_path_obj,
cache_ttl=self._config.cache_ttl,
@@ -195,6 +196,12 @@ class ColonyRuntime:
# Worker management
self._workers: dict[str, Worker] = {}
# The persistent client-facing overseer (optional). Set by
# ``start_overseer()`` at session start. In a DM session the
# overseer is the queen chatting with the user with 0 parallel
# workers. In a colony session she's the queen orchestrating N
# parallel workers.
self._overseer: Worker | None = None
self._triggers: dict[str, TriggerSpec] = {}
self._trigger_definitions: dict[str, TriggerDefinition] = {}
@@ -237,6 +244,27 @@ class ColonyRuntime:
def agent_id(self) -> str:
return self._colony_id
@property
def goal(self) -> Goal:
"""The colony's overall goal.
Exposed as a public property for queen lifecycle tools that
introspect the runtime (e.g. ``get_worker_status``,
``get_goal_progress``). Previously only available as the private
``_goal`` attribute.
"""
return self._goal
@property
def overseer(self) -> Worker | None:
"""The colony's long-running client-facing overseer worker.
``None`` until ``start_overseer()`` has been called. The overseer
is a persistent ``Worker`` that wraps the queen's ``AgentLoop``
and routes user chat via ``inject(message)``.
"""
return self._overseer
@property
def is_running(self) -> bool:
return self._running
@@ -384,42 +412,91 @@ class ColonyRuntime:
count: int = 1,
input_data: dict[str, Any] | None = None,
session_state: dict[str, Any] | None = None,
agent_spec: AgentSpec | None = None,
tools: list[Any] | None = None,
tool_executor: Callable | None = None,
stream_id: str | None = None,
) -> list[str]:
"""Spawn worker clones and start them in the background.
By default each spawn uses the colony's own ``agent_spec``,
``tools``, and ``tool_executor`` (set at construction). Pass
the per-spawn override args to spawn a worker that runs
DIFFERENT code from the colony default used by the queen's
``run_agent_with_input`` tool to spawn the loaded honeycomb /
custom worker through the unified runtime, instead of going
through the deprecated ``AgentHost.trigger`` ``Orchestrator``
path that silently dropped ``user_request`` via the buffer
filter.
``stream_id`` controls the SSE stream tag the worker's events
publish under. Default is ``f"worker:{worker_id}"`` (the
per-spawn unique tag used by parallel fan-out, which the SSE
filter at routes_events.py drops to keep the queen DM clean
of worker noise). Pass an explicit value when you want the
worker's events to bypass that filter and stream to the queen
DM. ``run_agent_with_input`` passes ``"worker"`` (singular,
no colon) so the loaded primary worker's tool calls and LLM
deltas reach the user's chat tab.
Returns list of worker IDs.
"""
if not self._running:
raise RuntimeError("ColonyRuntime is not running")
from framework.agent_loop.agent_loop import AgentLoop
from framework.storage.conversation_store import FileConversationStore
# Resolve per-spawn vs colony-default code identity
spawn_spec = agent_spec or self._agent_spec
spawn_tools = tools if tools is not None else self._tools
spawn_executor = tool_executor or self._tool_executor
# Resolve the SSE stream_id once. When the caller didn't supply
# one we use the per-worker fan-out tag (filtered out by the
# SSE handler). When the caller passed an explicit value we
# honor it across the whole batch — typically count=1 for the
# primary loaded worker that needs to stream to the queen DM.
explicit_stream_id = stream_id
worker_ids = []
for i in range(count):
worker_id = self._session_store.generate_session_id()
# Each parallel worker gets its own storage dir under
# {colony_session}/workers/{worker_id}/ so its conversation,
# events, and data never leak into the overseer's tree or
# (worse) the process CWD.
worker_storage = self._storage_path / "workers" / worker_id
worker_storage.mkdir(parents=True, exist_ok=True)
worker_conv_store = FileConversationStore(
worker_storage / "conversations"
)
# AgentLoop takes bus/judge/config/executor at construction;
# LLM, tools, stream_id, execution_id all come from the
# AgentContext passed to execute().
agent_loop = AgentLoop(
llm=self._llm,
tools=list(self._tools),
tool_executor=self._tool_executor,
event_bus=self._scoped_event_bus,
stream_id=f"worker:{worker_id}",
execution_id=worker_id,
tool_executor=spawn_executor,
conversation_store=worker_conv_store,
)
agent_context = AgentContext(
runtime=self._make_runtime_adapter(worker_id),
agent_id=worker_id,
agent_spec=self._agent_spec,
agent_spec=spawn_spec,
input_data=input_data or {"task": task},
goal_context=self._goal.to_prompt_context(),
goal=self._goal,
llm=self._llm,
available_tools=list(spawn_tools),
accounts_prompt=self._accounts_prompt,
skills_catalog_prompt=self.skills_catalog_prompt,
protocols_prompt=self.protocols_prompt,
skill_dirs=self.skill_dirs,
execution_id=worker_id,
stream_id=f"worker:{worker_id}",
stream_id=explicit_stream_id or f"worker:{worker_id}",
)
worker = Worker(
@@ -429,6 +506,7 @@ class ColonyRuntime:
context=agent_context,
event_bus=self._scoped_event_bus,
colony_id=self._colony_id,
storage_path=worker_storage,
)
self._workers[worker_id] = worker
@@ -436,15 +514,275 @@ class ColonyRuntime:
worker_ids.append(worker_id)
logger.info(
"Spawned worker %s (%d/%d) for task: %s",
"Spawned worker %s (%d/%d) using %s task: %s",
worker_id,
i + 1,
count,
"override spec" if agent_spec else "colony default spec",
task[:80],
)
return worker_ids
async def spawn_batch(
self,
tasks: list[dict[str, Any]],
) -> list[str]:
"""Spawn a batch of parallel workers, one per task spec.
Each task spec is a dict ``{"task": str, "data": dict | None}``.
Workers start as independent asyncio background tasks and run
concurrently; this method returns their IDs immediately without
waiting for completion. Use ``wait_for_worker_reports(ids,
timeout)`` to block until they all finish.
The overseer's ``run_parallel_workers`` tool is the usual
caller; it pairs ``spawn_batch`` + ``wait_for_worker_reports``
into a single fan-out/fan-in primitive.
"""
worker_ids: list[str] = []
for spec in tasks:
task_text = str(spec.get("task", ""))
task_data = spec.get("data")
if task_data is not None and not isinstance(task_data, dict):
task_data = {"value": task_data}
ids = await self.spawn(
task=task_text,
count=1,
input_data=task_data or {"task": task_text},
)
worker_ids.extend(ids)
return worker_ids
async def wait_for_worker_reports(
self,
worker_ids: list[str],
timeout: float = 600.0,
) -> list[dict[str, Any]]:
"""Block until every worker in ``worker_ids`` has reported.
Subscribes to ``SUBAGENT_REPORT`` events on the colony event bus
and collects one report per worker. If a worker has already
reported (fast completion) the existing ``WorkerResult`` is used
directly. On timeout, still-running workers are force-stopped
via ``stop_worker`` and their reports are synthesised as
``status="timeout"``.
Returns a list of report dicts in the same order as
``worker_ids``::
[
{
"worker_id": "...",
"status": "success" | "partial" | "failed" | "timeout" | "stopped",
"summary": "...",
"data": {...},
"error": "..." | None,
"duration_seconds": 12.3,
"tokens_used": 4567,
},
...
]
"""
if not worker_ids:
return []
# Reports already in hand (workers that finished before we got here)
collected: dict[str, dict[str, Any]] = {}
pending_ids: set[str] = set()
for wid in worker_ids:
worker = self._workers.get(wid)
if worker is None:
collected[wid] = {
"worker_id": wid,
"status": "failed",
"summary": "Worker not found in registry.",
"data": {},
"error": "no_such_worker",
"duration_seconds": 0.0,
"tokens_used": 0,
}
continue
if not worker.is_active and worker._result is not None:
# Already finished — synthesize from the stored result
r = worker._result
collected[wid] = {
"worker_id": wid,
"status": r.status,
"summary": r.summary,
"data": r.data,
"error": r.error,
"duration_seconds": r.duration_seconds,
"tokens_used": r.tokens_used,
}
continue
pending_ids.add(wid)
if not pending_ids:
return [collected[wid] for wid in worker_ids]
# Subscribe to SUBAGENT_REPORT events for the remaining workers
report_queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
async def on_report(event: AgentEvent) -> None:
data = dict(event.data or {})
wid = data.get("worker_id")
if wid and wid in pending_ids:
await report_queue.put(data)
sub_id = self._scoped_event_bus.subscribe(
event_types=[EventType.SUBAGENT_REPORT],
handler=on_report,
)
deadline = time.monotonic() + timeout
try:
while pending_ids:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
try:
report = await asyncio.wait_for(
report_queue.get(), timeout=remaining
)
except TimeoutError:
break
wid = report.get("worker_id")
if wid in pending_ids:
collected[wid] = report
pending_ids.discard(wid)
finally:
self._scoped_event_bus.unsubscribe(sub_id)
# Any still-pending workers are timed out — force-stop them and
# synthesise a timeout report.
for wid in list(pending_ids):
try:
await self.stop_worker(wid)
except Exception:
logger.exception("Failed to force-stop worker %s on timeout", wid)
worker = self._workers.get(wid)
duration = 0.0
tokens = 0
if worker is not None and worker._started_at > 0:
duration = time.monotonic() - worker._started_at
if worker is not None and worker._result is not None:
tokens = worker._result.tokens_used
collected[wid] = {
"worker_id": wid,
"status": "timeout",
"summary": f"Worker did not report within {timeout:.0f}s.",
"data": {},
"error": "timeout",
"duration_seconds": duration,
"tokens_used": tokens,
}
pending_ids.discard(wid)
return [collected[wid] for wid in worker_ids]
async def start_overseer(
self,
queen_spec: AgentSpec,
seed_conversation: list[dict[str, Any]] | None = None,
queen_tools: list[Any] | None = None,
initial_prompt: str | None = None,
) -> Worker:
"""Start the colony's long-running client-facing overseer.
The overseer is a persistent ``Worker`` that wraps the queen's
``AgentLoop`` and:
- Never terminates on its own (``persistent=True`` on the Worker).
- Has the queen's full tool set, streamed with ``stream_id="overseer"``.
- Receives user chat via ``session.colony_runtime.overseer.inject(msg)``.
In a queen DM session the overseer runs with 0 parallel workers.
In a colony session she can spawn parallel workers via the
``run_parallel_workers`` tool which calls ``spawn_batch`` +
``wait_for_worker_reports`` under the hood.
Pass ``seed_conversation`` to pre-populate the overseer's
conversation history used when forking a DM to a colony so
the overseer starts with the DM's prior context loaded.
Must be called after ``start()``. Idempotent: calling a second
time returns the already-started overseer.
"""
if self._overseer is not None:
return self._overseer
if not self._running:
raise RuntimeError(
"start_overseer requires the ColonyRuntime to be running "
"(call start() first)"
)
from framework.agent_loop.agent_loop import AgentLoop
from framework.storage.conversation_store import FileConversationStore
overseer_id = f"overseer:{self._colony_id}"
# The overseer's conversation lives at the colony session root:
# {colony_session}/conversations/. Workers get their own sub-dirs
# under workers/{worker_id}/; the overseer is the root occupant.
self._storage_path.mkdir(parents=True, exist_ok=True)
overseer_conv_store = FileConversationStore(
self._storage_path / "conversations"
)
agent_loop = AgentLoop(
event_bus=self._scoped_event_bus,
tool_executor=self._tool_executor,
conversation_store=overseer_conv_store,
)
overseer_ctx = AgentContext(
runtime=self._make_runtime_adapter(overseer_id),
agent_id=overseer_id,
agent_spec=queen_spec,
input_data={},
goal_context="",
goal=self._goal,
llm=self._llm,
available_tools=list(queen_tools or self._tools),
accounts_prompt=self._accounts_prompt,
skills_catalog_prompt=self.skills_catalog_prompt,
protocols_prompt=self.protocols_prompt,
skill_dirs=self.skill_dirs,
execution_id=overseer_id,
stream_id="overseer",
)
overseer = Worker(
worker_id=overseer_id,
task="", # no finite task — persistent conversation
agent_loop=agent_loop,
context=overseer_ctx,
event_bus=self._scoped_event_bus,
colony_id=self._colony_id,
persistent=True,
storage_path=self._storage_path,
)
if seed_conversation:
await overseer.seed_conversation(seed_conversation)
self._overseer = overseer
await overseer.start_background()
if initial_prompt:
await overseer.inject(initial_prompt)
logger.info(
"Started overseer %s for colony %s (seeded=%d messages, initial_prompt=%s)",
overseer_id,
self._colony_id,
len(seed_conversation or []),
"yes" if initial_prompt else "no",
)
return overseer
async def trigger(
self,
trigger_id: str,
@@ -652,7 +990,6 @@ class ColonyRuntime:
return StreamDecisionTracker(
stream_id=f"worker:{worker_id}",
storage=self._storage,
outcome_aggregator=None,
)
def _prune_idempotency_keys(self) -> None:
+4
View File
@@ -133,6 +133,10 @@ class EventType(StrEnum):
# Colony lifecycle (session manager → frontend)
WORKER_COLONY_LOADED = "worker_colony_loaded"
# Queen create_colony tool finished forking; carries colony_name +
# path so the frontend can render a system message linking to the
# new colony page at /colony/{colony_name}.
COLONY_CREATED = "colony_created"
CREDENTIALS_REQUIRED = "credentials_required"
# Queen phase changes (working <-> reviewing)
+16
View File
@@ -45,3 +45,19 @@ class SharedBufferManager:
def get_global_state(self) -> dict[str, Any]:
return self._global_state
def cleanup_execution(self, execution_id: str, stream_id: str = "") -> None:
"""Drop the per-execution state bucket.
No-op when the key is absent. Called from
``ExecutionManager._run_execution``'s finally block. Before this
stub existed, the call raised ``AttributeError`` on every
execution teardown because the SharedBufferManager stub had no
such method.
"""
execution_key = f"{stream_id}:{execution_id}"
self._execution_states.pop(execution_key, None)
def get_recent_changes(self, limit: int = 10) -> list[dict[str, Any]]:
"""Compat stub — returns empty list. Shared buffer was removed."""
return []
+293 -39
View File
@@ -1,7 +1,16 @@
"""Worker — a single autonomous clone in a colony.
"""Worker — a single autonomous AgentLoop clone in a colony.
Each worker is an exact copy of the queen's AgentLoop running independently.
Workers execute a task, report results back to the queen, and terminate.
Two modes:
**Ephemeral (default)**: runs a single AgentLoop execution with a task,
emits a `SUBAGENT_REPORT` event on termination (success, partial, or
failed), and terminates. Used for parallel fan-out from the overseer.
**Persistent (``persistent=True``)**: runs an initial AgentLoop execution
(usually idle, no task) and then loops forever, receiving user chat via
``inject(message)`` and pumping each message into the already-running
agent loop via ``inject_event``. Used for the colony's long-running
client-facing overseer.
"""
from __future__ import annotations
@@ -9,9 +18,9 @@ from __future__ import annotations
import asyncio
import logging
import time
import uuid
from dataclasses import dataclass, field
from enum import StrEnum
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
@@ -31,6 +40,11 @@ class WorkerResult:
error: str | None = None
tokens_used: int = 0
duration_seconds: float = 0.0
# New: structured report fields. Populated by report_to_parent tool or
# synthesised from AgentResult on termination.
status: str = "success" # "success" | "partial" | "failed" | "timeout" | "stopped"
summary: str = ""
data: dict[str, Any] = field(default_factory=dict)
@dataclass
@@ -45,11 +59,14 @@ class WorkerInfo:
class Worker:
"""A single autonomous clone in a colony.
Wraps an AgentLoop execution with lifecycle management:
- Starts as PENDING
- Runs via AgentLoop RUNNING
- Completes COMPLETED/FAILED
- Can be stopped by the queen STOPPED
Ephemeral mode (default):
- PENDING RUNNING COMPLETED/FAILED/STOPPED, one shot, terminates.
Persistent mode (``persistent=True``, used by the overseer):
- PENDING RUNNING (never transitions out by itself).
- Receives user chat via ``inject(message)``.
- Each injected message is pumped into the running AgentLoop via
``inject_event``, triggering another turn.
"""
def __init__(
@@ -60,6 +77,8 @@ class Worker:
context: Any,
event_bus: Any = None,
colony_id: str = "",
persistent: bool = False,
storage_path: Path | None = None,
):
self.id = worker_id
self.task = task
@@ -68,10 +87,26 @@ class Worker:
self._context = context
self._event_bus = event_bus
self._colony_id = colony_id
self._persistent = persistent
# Canonical on-disk home for this worker (conversations, events,
# result.json, data). Required when seed_conversation() is used —
# we deliberately do NOT fall back to CWD, which previously caused
# conversation parts to leak into the process working directory.
self._storage_path: Path | None = (
Path(storage_path) if storage_path is not None else None
)
self._task_handle: asyncio.Task | None = None
self._started_at: float = 0.0
self._result: WorkerResult | None = None
self._input_queue: asyncio.Queue[str | None] = asyncio.Queue()
# Set by AgentLoop when the worker's LLM calls ``report_to_parent``.
# Takes precedence over the synthesised report from AgentResult.
self._explicit_report: dict[str, Any] | None = None
# Back-reference so AgentLoop's report_to_parent handler can call
# record_explicit_report on the owning Worker. The agent_loop's
# _owner_worker attribute is set here during construction.
if agent_loop is not None:
agent_loop._owner_worker = self
@property
def info(self) -> WorkerInfo:
@@ -87,7 +122,28 @@ class Worker:
def is_active(self) -> bool:
return self.status in (WorkerStatus.PENDING, WorkerStatus.RUNNING)
@property
def is_persistent(self) -> bool:
return self._persistent
@property
def agent_loop(self) -> Any:
"""The wrapped AgentLoop. Used by the SessionManager chat path."""
return self._agent_loop
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def run(self) -> WorkerResult:
"""Entry point for the worker's background task.
Ephemeral workers run ``AgentLoop.execute`` once and terminate,
emitting a ``SUBAGENT_REPORT`` event.
Persistent workers run the initial execute then loop forever
processing injected user messages.
"""
self.status = WorkerStatus.RUNNING
self._started_at = time.monotonic()
@@ -97,43 +153,25 @@ class Worker:
if result.success:
self.status = WorkerStatus.COMPLETED
self._result = WorkerResult(
output=result.output,
tokens_used=result.tokens_used,
duration_seconds=duration,
self._result = self._build_result(
result, duration, default_status="success"
)
else:
self.status = WorkerStatus.FAILED
self._result = WorkerResult(
error=result.error or "Unknown error",
tokens_used=result.tokens_used,
duration_seconds=duration,
self._result = self._build_result(
result, duration, default_status="failed"
)
if self._event_bus:
from framework.host.event_bus import AgentEvent, EventType
await self._emit_terminal_events(result)
event_type = (
EventType.EXECUTION_COMPLETED if result.success else EventType.EXECUTION_FAILED
)
await self._event_bus.publish(
AgentEvent(
type=event_type,
stream_id=self._context.stream_id or self.id,
node_id=self.id,
execution_id=self._context.execution_id or self.id,
data={
"worker_id": self.id,
"colony_id": self._colony_id,
"task": self.task,
"success": result.success,
"error": result.error,
"output_keys": list(result.output.keys()) if result.output else [],
},
)
)
if self._persistent:
# Persistent worker: keep the loop alive, pump injected
# messages forever. Status stays RUNNING; info reflects
# current progress.
self.status = WorkerStatus.RUNNING
await self._persistent_input_loop()
return self._result
return self._result # type: ignore[return-value]
except asyncio.CancelledError:
self.status = WorkerStatus.STOPPED
@@ -141,7 +179,10 @@ class Worker:
self._result = WorkerResult(
error="Worker stopped by queen",
duration_seconds=duration,
status="stopped",
summary="Worker was cancelled before completion.",
)
await self._emit_terminal_events(None, force_status="stopped")
return self._result
except Exception as exc:
@@ -150,14 +191,170 @@ class Worker:
self._result = WorkerResult(
error=str(exc),
duration_seconds=duration,
status="failed",
summary=f"Worker crashed: {exc}",
)
logger.error("Worker %s failed: %s", self.id, exc, exc_info=True)
await self._emit_terminal_events(None, force_status="failed")
return self._result
async def _persistent_input_loop(self) -> None:
"""Pump injected messages into the running AgentLoop forever.
Each ``inject(msg)`` call puts a string on ``_input_queue``. This
loop awaits it and calls ``agent_loop.inject_event(msg)`` which
wakes the loop's pending user-input gate.
"""
while True:
msg = await self._input_queue.get()
if msg is None:
# Sentinel: shutdown
return
try:
await self._agent_loop.inject_event(msg, is_client_input=True)
except Exception:
logger.exception(
"Overseer %s: inject_event failed for injected message",
self.id,
)
# ------------------------------------------------------------------
# Reporting
# ------------------------------------------------------------------
def record_explicit_report(
self,
status: str,
summary: str,
data: dict[str, Any] | None = None,
) -> None:
"""Called by AgentLoop when the worker's LLM invokes ``report_to_parent``.
Stores the report so that when ``run()`` reaches the termination
block, the explicit report wins over a synthesised one.
"""
self._explicit_report = {
"status": status,
"summary": summary,
"data": data or {},
}
def _build_result(
self,
agent_result: Any,
duration: float,
default_status: str,
) -> WorkerResult:
"""Construct a WorkerResult from AgentResult + optional explicit report."""
explicit = self._explicit_report
if explicit is not None:
return WorkerResult(
output=dict(agent_result.output or {}),
error=agent_result.error,
tokens_used=getattr(agent_result, "tokens_used", 0),
duration_seconds=duration,
status=explicit["status"],
summary=explicit["summary"],
data=explicit["data"],
)
# Synthesise a minimal report from AgentResult
if agent_result.success:
summary = f"Completed task '{self.task[:80]}' with {len(agent_result.output or {})} outputs."
data = dict(agent_result.output or {})
else:
summary = f"Task '{self.task[:80]}' failed: {agent_result.error or 'unknown'}"
data = {}
return WorkerResult(
output=dict(agent_result.output or {}),
error=agent_result.error,
tokens_used=getattr(agent_result, "tokens_used", 0),
duration_seconds=duration,
status=default_status,
summary=summary,
data=data,
)
async def _emit_terminal_events(
self,
agent_result: Any,
force_status: str | None = None,
) -> None:
"""Emit EXECUTION_COMPLETED/FAILED AND SUBAGENT_REPORT on termination.
Both events are published so that consumers that listen for
either shape keep working. The SUBAGENT_REPORT carries the
structured summary the overseer actually cares about.
"""
if self._event_bus is None:
return
from framework.host.event_bus import AgentEvent, EventType
# EXECUTION_COMPLETED / EXECUTION_FAILED (backwards-compat)
if agent_result is not None:
lifecycle_type = (
EventType.EXECUTION_COMPLETED
if agent_result.success
else EventType.EXECUTION_FAILED
)
await self._event_bus.publish(
AgentEvent(
type=lifecycle_type,
stream_id=self._context.stream_id or self.id,
node_id=self.id,
execution_id=self._context.execution_id or self.id,
data={
"worker_id": self.id,
"colony_id": self._colony_id,
"task": self.task,
"success": agent_result.success,
"error": agent_result.error,
"output_keys": (
list(agent_result.output.keys())
if agent_result.output
else []
),
},
)
)
# SUBAGENT_REPORT — the structured channel the overseer awaits
result = self._result
if result is None:
return
await self._event_bus.publish(
AgentEvent(
type=EventType.SUBAGENT_REPORT,
stream_id=self._context.stream_id or self.id,
node_id=self.id,
execution_id=self._context.execution_id or self.id,
data={
"worker_id": self.id,
"colony_id": self._colony_id,
"task": self.task,
"status": force_status or result.status,
"summary": result.summary,
"data": result.data,
"error": result.error,
"duration_seconds": result.duration_seconds,
"tokens_used": result.tokens_used,
},
)
)
# ------------------------------------------------------------------
# External control
# ------------------------------------------------------------------
async def start_background(self) -> None:
"""Spawn the worker's run() as an asyncio background task."""
self._task_handle = asyncio.create_task(self.run())
async def stop(self) -> None:
"""Cancel the worker's background task, if any."""
if self._persistent:
# Signal the input loop to exit cleanly first
await self._input_queue.put(None)
if self._task_handle and not self._task_handle.done():
self._task_handle.cancel()
try:
@@ -166,4 +363,61 @@ class Worker:
pass
async def inject(self, message: str) -> None:
"""Pump a user message into the worker.
For ephemeral workers this is rarely used (they don't take
follow-up input). For persistent overseers this is the chat
injection path.
"""
await self._input_queue.put(message)
async def seed_conversation(self, messages: list[dict[str, Any]]) -> None:
"""Pre-populate the worker's ConversationStore before starting.
Used when forking a queen DM into a colony: the DM's prior
conversation becomes the colony overseer's starting point so the
overseer resumes mid-thought instead of greeting the user fresh.
``messages`` is a list of dicts matching the ConversationStore's
part format: ``{seq, role, content, tool_calls, tool_use_id,
created_at, phase}``. The caller is responsible for rewriting
``agent_id`` to match the new worker, and for numbering ``seq``
monotonically from 0.
Must be called BEFORE ``start_background``.
"""
if self.status != WorkerStatus.PENDING:
raise RuntimeError(
f"seed_conversation must be called before start_background "
f"(worker {self.id} is {self.status})"
)
# Write parts directly to the worker's on-disk conversation store
# so that the AgentLoop's FileConversationStore picks them up when
# NodeConversation loads from disk. We require an explicit
# storage_path — falling back to CWD previously caused part files
# to leak into the process working directory.
if self._storage_path is None:
raise RuntimeError(
f"seed_conversation requires storage_path to be set on "
f"Worker {self.id}; construct Worker with storage_path=..."
)
parts_dir = self._storage_path / "conversations" / "parts"
parts_dir.mkdir(parents=True, exist_ok=True)
import json
for i, msg in enumerate(messages):
msg = dict(msg) # copy
msg.setdefault("seq", i)
msg.setdefault("agent_id", self.id)
part_file = parts_dir / f"{msg['seq']:010d}.json"
part_file.write_text(json.dumps(msg), encoding="utf-8")
logger.info(
"Worker %s: seeded %d messages into %s",
self.id,
len(messages),
parts_dir,
)
@@ -365,6 +365,14 @@ async def create_queen(
queen_tools = list(queen_registry.get_tools().values())
queen_tool_executor = queen_registry.get_executor()
# Phase 2 wiring: stash the resolved tool list + executor on the
# session so SessionManager._start_queen can build a real
# ColonyRuntime sharing the queen's tools, llm, and event bus.
# The unified runtime is what run_parallel_workers (Phase 4) will
# call into to fan out parallel workers from the queen.
session._queen_tools = queen_tools # type: ignore[attr-defined]
session._queen_tool_executor = queen_tool_executor # type: ignore[attr-defined]
# ---- Partition tools by phase ------------------------------------
planning_names = set(_QUEEN_PLANNING_TOOLS)
building_names = set(_QUEEN_BUILDING_TOOLS)
+28 -1
View File
@@ -36,6 +36,7 @@ DEFAULT_EVENT_TYPES = [
EventType.CONTEXT_COMPACTED,
EventType.CONTEXT_USAGE_UPDATED,
EventType.WORKER_COLONY_LOADED,
EventType.COLONY_CREATED,
EventType.CREDENTIALS_REQUIRED,
EventType.SUBAGENT_REPORT,
EventType.QUEEN_PHASE_CHANGED,
@@ -50,6 +51,27 @@ DEFAULT_EVENT_TYPES = [
# Keepalive interval in seconds
KEEPALIVE_INTERVAL = 15.0
# Phase 5 SSE filter: parallel-worker streams (stream_id="worker:{uuid}")
# publish high-frequency LLM deltas / tool calls that would flood the
# user's queen DM chat. We let only this small allowlist of worker
# events through to the queen-chat SSE so the frontend can render
# fan-out lifecycle and structured fan-in reports without seeing the
# raw worker chatter. Per-worker SSE panels (Phase 5b) bypass this
# filter via a dedicated /workers/{worker_id}/events route.
_WORKER_EVENT_ALLOWLIST = {
EventType.SUBAGENT_REPORT.value,
EventType.EXECUTION_COMPLETED.value,
EventType.EXECUTION_FAILED.value,
}
def _is_worker_noise(evt_dict: dict) -> bool:
"""True if the event is a parallel-worker event we should drop."""
stream_id = evt_dict.get("stream_id") or ""
if not stream_id.startswith("worker:"):
return False
return evt_dict.get("type") not in _WORKER_EVENT_ALLOWLIST
def _parse_event_types(query_param: str | None) -> list[EventType]:
"""Parse comma-separated event type names into EventType values.
@@ -110,6 +132,8 @@ async def handle_events(request: web.Request) -> web.StreamResponse:
return
evt_dict = event.to_dict()
if _is_worker_noise(evt_dict):
return
if evt_dict.get("type") in _CRITICAL_EVENTS:
try:
queue.put_nowait(evt_dict)
@@ -155,8 +179,11 @@ async def handle_events(request: web.Request) -> web.StreamResponse:
replayed = 0
for past_event in event_bus._event_history:
if past_event.type.value in replay_types:
past_dict = past_event.to_dict()
if _is_worker_noise(past_dict):
continue
try:
queue.put_nowait(past_event.to_dict())
queue.put_nowait(past_dict)
replayed += 1
except asyncio.QueueFull:
break
+43 -18
View File
@@ -56,7 +56,6 @@ _WORKER_INHERITED_TOOLS: frozenset[str] = frozenset(
# from forked worker configs.
_QUEEN_LIFECYCLE_EXTRAS: frozenset[str] = frozenset(
{
"start_worker",
"stop_worker_and_plan",
"stop_worker_and_review",
}
@@ -651,15 +650,6 @@ async def handle_colony_spawn(request: web.Request) -> web.Response:
Body: {"colony_name": "...", "task": "..."}
Returns: {"colony_path": "...", "colony_name": "...", "is_new": bool,
"queen_session_id": "..."}
The clone:
1. Creates a colony directory with a single worker config (``worker.json``)
holding the queen's current tools, prompts, skills, and loop config.
2. Duplicates the queen's full session (conversations + events) into a new
queen-session directory assigned to the colony so that cold-restoring
the colony resumes with the queen's entire conversation history.
3. Multiple independent sessions can be created against the same colony,
giving parallel execution capacity without separate worker configs.
"""
session, err = resolve_session(request)
if err:
@@ -686,6 +676,43 @@ async def handle_colony_spawn(request: web.Request) -> web.Response:
status=400,
)
try:
result = await fork_session_into_colony(
session=session,
colony_name=colony_name,
task=task,
)
except Exception as e:
logger.exception("colony_spawn fork failed")
return web.json_response({"error": f"colony fork failed: {e}"}, status=500)
return web.json_response(result)
async def fork_session_into_colony(
*,
session: Any,
colony_name: str,
task: str,
) -> dict:
"""Fork a queen session into a colony directory.
Extracted from ``handle_colony_spawn`` so the queen-side
``create_colony`` tool can call it directly without going through
HTTP. The caller is responsible for validating ``colony_name``
against the lowercase-alphanumeric regex.
The fork:
1. Creates a colony directory with a single worker config (``worker.json``)
holding the queen's current tools, prompts, skills, and loop config.
2. Duplicates the queen's full session (conversations + events) into a new
queen-session directory assigned to the colony so that cold-restoring
the colony resumes with the queen's entire conversation history.
3. Multiple independent sessions can be created against the same colony,
giving parallel execution capacity without separate worker configs.
Returns ``{"colony_path", "colony_name", "queen_session_id", "is_new"}``.
"""
import asyncio
import json
import shutil
@@ -906,14 +933,12 @@ async def handle_colony_spawn(request: web.Request) -> web.Response:
len(queen_tools),
colony_session_id,
)
return web.json_response(
{
"colony_path": str(colony_dir),
"colony_name": colony_name,
"queen_session_id": colony_session_id,
"is_new": is_new,
}
)
return {
"colony_path": str(colony_dir),
"colony_name": colony_name,
"queen_session_id": colony_session_id,
"is_new": is_new,
}
def register_routes(app: web.Application) -> None:
+117 -2
View File
@@ -17,7 +17,7 @@ import uuid
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
from typing import Any, Literal
from framework.config import QUEENS_DIR
from framework.host.triggers import TriggerDefinition
@@ -63,7 +63,11 @@ class Session:
colony_id: str | None = None
worker_path: Path | None = None
runner: Any | None = None # AgentRunner
colony_runtime: Any | None = None # ColonyRuntime or AgentRuntime
colony_runtime: Any | None = None # legacy worker AgentRuntime (Phase 2 deprecation pending)
# Phase 2 unified runtime: a real ColonyRuntime hosting the queen as
# overseer and (in colony sessions) parallel workers spawned via
# run_parallel_workers. Always set once _start_queen has run.
colony: Any | None = None # ColonyRuntime
worker_info: Any | None = None # AgentInfo
# Queen phase state (working/reviewing)
phase_state: Any = None # QueenPhaseState
@@ -100,6 +104,13 @@ class Session:
queen_name: str = "default"
# Colony name: set when a worker is loaded from a colony
colony_name: str | None = None
# Session mode discriminator. "dm" = queen DM session under
# ~/.hive/agents/queens/{queen_id}/sessions/. "colony" = forked colony
# session under ~/.hive/colonies/{colony_name}/sessions/, with the
# queen running as the colony's overseer and the run_parallel_workers
# tool unlocked. The mode is the canonical discriminator for storage
# path, tool exposure, and SSE filtering — see the Phase 2 plan.
mode: Literal["dm", "colony"] = "dm"
class SessionManager:
@@ -1042,6 +1053,18 @@ class SessionManager:
except Exception as e:
logger.error("Error cleaning up worker: %s", e)
# Stop the unified ColonyRuntime (Phase 2 wiring) if it was started
if session.colony is not None:
try:
await session.colony.stop()
except Exception:
logger.warning(
"Session '%s': error stopping unified ColonyRuntime",
session_id,
exc_info=True,
)
session.colony = None
# Close per-session event log
session.event_bus.close_session_log()
@@ -1184,6 +1207,22 @@ class SessionManager:
session.queen_executor,
)
# Phase 2 wiring: stand up a real ColonyRuntime that shares the
# queen's llm, tools, event bus, and storage path. In a DM session
# it has no parallel workers (the queen runs in queen_task), but
# the run_parallel_workers tool (Phase 4) will use this runtime
# as the spawn surface, and worker SUBAGENT_REPORT events flow
# back through the shared event_bus to the existing SSE.
try:
await self._start_unified_colony_runtime(session, queen_dir)
except Exception:
# ColonyRuntime is dormant infrastructure today — never let
# its construction abort queen startup. Phase 4 will harden.
logger.warning(
"_start_queen: unified ColonyRuntime construction failed",
exc_info=True,
)
# Auto-load worker on cold restore — the queen's conversation expects
# the agent to be loaded, but the new session has no worker.
if session.queen_resume_from and not session.colony_runtime:
@@ -1214,6 +1253,82 @@ class SessionManager:
except Exception:
logger.warning("Cold restore: failed to auto-load worker", exc_info=True)
# ------------------------------------------------------------------
# Phase 2: unified ColonyRuntime construction
# ------------------------------------------------------------------
async def _start_unified_colony_runtime(
self,
session: Session,
queen_dir: Path,
) -> None:
"""Build a real ColonyRuntime sharing the queen's resources.
This is the Phase 2 wiring. The ColonyRuntime is created with:
- ``llm`` ``session.llm``
- ``event_bus`` ``session.event_bus`` (so worker SUBAGENT_REPORT
and lifecycle events flow through the same bus the SSE handler
already subscribes to)
- ``tools`` the queen's resolved tool list (stashed by
``create_queen`` on ``session._queen_tools``)
- ``storage_path`` ``queen_dir`` (parallel workers will land
under ``{queen_dir}/workers/{worker_id}/`` thanks to
``ColonyRuntime.spawn``)
- ``colony_id`` ``session.id``
The runtime is started but no overseer is attached the queen
still runs as ``session.queen_task`` from ``create_queen``. This
is dormant fan-out infrastructure: ``run_parallel_workers``
(Phase 4) is what activates it.
"""
from framework.agent_loop.types import AgentSpec
from framework.host.colony_runtime import ColonyRuntime
from framework.schemas.goal import Goal
queen_tools = getattr(session, "_queen_tools", None) or []
queen_tool_executor = getattr(session, "_queen_tool_executor", None)
colony_spec = AgentSpec(
id="queen_colony",
name="Queen Colony",
description=(
"Unified colony runtime hosting the queen overseer and "
"any parallel workers spawned via run_parallel_workers."
),
system_prompt="",
tools=[t.name for t in queen_tools],
tool_access_policy="all",
)
colony_goal = Goal(
id=f"colony_goal_{session.id}",
name=f"Session {session.id}",
description="Default goal for the session-level ColonyRuntime.",
)
colony = ColonyRuntime(
agent_spec=colony_spec,
goal=colony_goal,
storage_path=queen_dir,
llm=session.llm,
tools=queen_tools,
tool_executor=queen_tool_executor,
event_bus=session.event_bus,
colony_id=session.id,
pipeline_stages=[], # queen pipeline runs in queen_orchestrator, not here
)
await colony.start()
session.colony = colony
logger.info(
"_start_queen: unified ColonyRuntime ready for session %s "
"(%d tools, storage=%s)",
session.id,
len(queen_tools),
queen_dir,
)
# ------------------------------------------------------------------
# Queen notifications
# ------------------------------------------------------------------
@@ -0,0 +1,160 @@
---
name: hive.writing-hive-skills
description: Author a new Agent Skill for a Hive agent that conforms to the Agent Skills specification (SKILL.md with YAML frontmatter, optional scripts/references/assets directories). Use when the user asks to create, scaffold, add, or package a new skill for a Hive agent.
metadata:
author: hive
type: default-skill
spec-source: https://agentskills.io/specification
---
## Operational Protocol: Writing Hive Skills
Hive agents discover skills by scanning several roots, in precedence order:
1. `<project>/.hive/skills/` — project, Hive-specific
2. `<project>/.agents/skills/` — project, cross-client
3. `~/.hive/skills/` — user, Hive-specific
4. `~/.agents/skills/` — user, cross-client
5. Framework defaults shipped in `core/framework/skills/_default_skills/`
Each skill is a directory containing a `SKILL.md`. At startup, only the frontmatter `name` + `description` of every skill is loaded; the body is loaded only when the agent activates the skill. Design for that.
### Choosing where to put a new skill
- **Project-scoped**: put under `<project>/.hive/skills/` when the skill is tied to that codebase's APIs, conventions, or infra.
- **User-scoped**: put under `~/.hive/skills/` when the skill is reusable across projects for this machine/user.
- **Framework default**: add under `core/framework/skills/_default_skills/` AND register in `framework/skills/defaults.py::SKILL_REGISTRY` only when the skill is a universal operational protocol shipped with Hive. Default skills use the `hive.<name>` naming convention and include `type: default-skill` in metadata.
### Directory layout
```
<skill-name>/
├── SKILL.md # Required
├── scripts/ # Optional — executable helpers
├── references/ # Optional — on-demand docs
└── assets/ # Optional — templates, data, images
```
Rules:
- The directory name **must** equal the `name` frontmatter field (for framework defaults, the directory is the unprefixed name, e.g. `note-taking/` for `hive.note-taking`).
- Keep `SKILL.md` under ~500 lines. Move long reference material into `references/`.
- Reference other files with relative paths from the skill root (`scripts/foo.py`, `references/API.md`). Keep references one level deep.
### SKILL.md frontmatter
Required fields:
| Field | Constraints |
|-------|-------------|
| `name` | 164 chars, `[a-z0-9-]`, no leading/trailing/consecutive hyphens. Must match the directory name. Framework defaults prefix with `hive.` |
| `description` | 11024 chars. Must describe **what** the skill does **and when to use it**. Include trigger keywords the user is likely to say. |
Optional fields:
| Field | Notes |
|-------|-------|
| `license` | License name or reference to a bundled file |
| `compatibility` | ≤500 chars. Only include if env requirements are non-trivial (network, tools, runtime) |
| `metadata` | Free-form string→string map. Namespace keys to avoid collisions. Default skills set `type: default-skill`. |
| `allowed-tools` | Experimental. Space-separated pre-approved tools, e.g. `Bash(curl:*) Bash(jq:*) Read` |
Minimal template:
```markdown
---
name: my-skill
description: One sentence on what it does. One sentence on when to use it, with concrete trigger words the agent will see in user requests.
---
# My Skill
<body>
```
### Writing a good `description`
This is the single most important field — it's the only thing the agent sees at skill-selection time.
- **Bad**: `Helps with trading.`
- **Good**: `Buy and sell shares on the HoneyComb exchange. Handles auth, slippage-protected orders, idempotent retries, and AMM output estimation. Use when placing trades or interacting with the AMM.`
Include verbs the user is likely to say (`buy`, `sell`, `place trade`) and proper nouns (`HoneyComb`, `AMM`).
### Writing the body
Structure the body for the agent, not a human reader:
1. **Lead with what the agent can't guess** — API base URLs, auth shape, project conventions, specific function names. Skip generic background ("PDFs are a document format").
2. **Show exact request/response shapes** — include JSON payloads, headers, status codes. Copy real examples rather than paraphrasing.
3. **Document failure modes** — error codes, retry rules, rate limits. This is where skills earn their keep vs. a generic agent.
4. **Give a short end-to-end example** — a "typical flow" section at the bottom anchors everything above.
Recommended sections (adapt to the domain):
- Authentication / setup
- Core operations (one per endpoint or action)
- Error reference table
- Rate limits / gotchas
- End-to-end example pattern
### Progressive disclosure
Three tiers of context cost:
1. **Always loaded** (~100 tokens per skill): `name` + `description`. Keep tight.
2. **Loaded on activation** (<5k tokens target): body of `SKILL.md`.
3. **Loaded on demand**: files under `scripts/`, `references/`, `assets/`. The agent reads these only when the body points to them.
If a section is long and only needed sometimes (e.g., a full schema dump, rarely-used edge cases), move it to `references/SOMETHING.md` and link to it from the body: `See [the error catalog](references/ERRORS.md) for the full list.`
### Scripts
Put executable helpers in `scripts/`. They should:
- Be self-contained or document dependencies in a comment header.
- Print human-readable errors to stderr and exit non-zero on failure.
- Accept arguments via CLI flags, not env vars (easier for the agent to invoke).
Reference them from the body by relative path:
```markdown
Estimate buy output with `scripts/estimate_buy.py --v-hc 1000000 --v-shares 1000000 --hc 500`.
```
For Python scripts in a Hive project, prefer `uv run scripts/foo.py ...`.
### Creating a new skill — workflow
1. Pick a `<skill-name>` (lowercase-hyphenated).
2. Decide scope: project (`<project>/.hive/skills/`), user (`~/.hive/skills/`), or framework default (`core/framework/skills/_default_skills/` + registry entry).
3. Create the directory and write `SKILL.md` with frontmatter + body.
4. Add `scripts/`, `references/`, `assets/` only if needed.
5. Validate the frontmatter: name matches dir, description is specific, no forbidden characters.
6. Validate using the Hive CLI:
```bash
uv run hive skill validate <path-to-skill-dir>
uv run hive skill doctor
```
7. Confirm discovery with `uv run hive skill list`.
8. Test by invoking a Hive agent on a task the skill should match — confirm it activates and follows the instructions.
### Registering as a framework default
When adding a skill as a shipped default:
1. Place the directory under `core/framework/skills/_default_skills/<unprefixed-name>/`.
2. Set frontmatter `name: hive.<unprefixed-name>` and `metadata.type: default-skill`.
3. Add the mapping to `SKILL_REGISTRY` in `core/framework/skills/defaults.py`:
```python
SKILL_REGISTRY: dict[str, str] = {
...
"hive.<unprefixed-name>": "<unprefixed-name>",
}
```
4. If the skill uses `{{placeholder}}` substitution, add defaults to `_SKILL_DEFAULTS` in the same file.
5. If the skill reads/writes shared buffer keys, list them in `DATA_BUFFER_KEYS`.
### What NOT to put in a skill
- Generic programming knowledge the agent already has.
- Conversation-specific state (use memory or plans instead).
- Secrets or credentials (skills are plaintext; reference env vars or credential stores).
- Deeply nested reference chains — keep everything one hop from `SKILL.md`.
+1
View File
@@ -77,6 +77,7 @@ SKILL_REGISTRY: dict[str, str] = {
"hive.quality-monitor": "quality-monitor",
"hive.error-recovery": "error-recovery",
"hive.task-decomposition": "task-decomposition",
"hive.writing-hive-skills": "writing-hive-skills",
}
# All shared buffer keys used by default skills (for permission auto-inclusion)
+731 -96
View File
@@ -981,76 +981,84 @@ def register_queen_lifecycle_tools(
"""Get current colony runtime from session (late-binding)."""
return getattr(session, "colony_runtime", None)
# --- start_worker ----------------------------------------------------------
# How long to wait for credential validation + MCP resync before
# proceeding with trigger anyway. These are pre-flight checks that
# should not block the queen indefinitely.
# ``start_worker`` was removed in the Phase 4 unification — its
# bare-bones spawn duplicated ``run_agent_with_input`` (which has
# credential preflight, concurrency guard, and phase tracking on
# top). The shared preflight timeout below is still used by
# ``run_agent_with_input``.
_START_PREFLIGHT_TIMEOUT = 15 # seconds
async def start_worker(task: str) -> str:
"""Spawn a colony worker clone with a task description.
The worker runs autonomously in the background.
Returns immediately with worker IDs.
"""
runtime = _get_runtime()
if runtime is None:
return json.dumps({"error": "No colony running in this session."})
try:
runtime.resume_timers()
worker_ids = await runtime.spawn(
task=task,
count=1,
input_data={"user_request": task},
)
return json.dumps(
{
"status": "started",
"worker_ids": worker_ids,
"task": task,
}
)
except Exception as e:
return json.dumps({"error": f"Failed to start worker: {e}"})
_start_tool = Tool(
name="start_worker",
description=(
"Spawn a colony worker clone with a task description. The worker runs "
"autonomously in the background. Returns worker IDs for tracking."
),
parameters={
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "Description of the task for the worker to perform",
},
},
"required": ["task"],
},
)
registry.register("start_worker", _start_tool, lambda inputs: start_worker(**inputs))
tools_registered += 1
# --- stop_worker -----------------------------------------------------------
async def stop_worker(*, reason: str = "Stopped by queen") -> str:
"""Stop all active workers in the colony."""
runtime = _get_runtime()
if runtime is None:
return json.dumps({"error": "No worker loaded in this session."})
"""Stop all active workers in the session.
await runtime.stop_all_workers()
runtime.pause_timers()
Stops workers on BOTH the unified ColonyRuntime (``session.colony``
where ``run_agent_with_input`` and ``run_parallel_workers``
spawn) AND the legacy ``session.colony_runtime`` (loaded
AgentHost still tracks timers and any legacy triggers). A
previous version only stopped the legacy runtime, which meant
workers spawned via the new path kept running silently after
the queen called this tool.
"""
stopped_unified = 0
stopped_legacy = 0
errors: list[str] = []
# 1. Stop everything on the unified ColonyRuntime. This is
# where run_agent_with_input and run_parallel_workers live.
colony = getattr(session, "colony", None)
if colony is not None:
try:
# Count live workers BEFORE stopping so we can report
# accurately — stop_all_workers clears the dict.
stopped_unified = sum(
1 for w in colony.list_workers() if w.status.value in ("pending", "running")
)
await colony.stop_all_workers()
except Exception as e:
errors.append(f"unified: {e}")
logger.warning(
"stop_worker: failed to stop unified colony workers",
exc_info=True,
)
# 2. Stop the legacy runtime too (timers, old-path workers).
legacy = _get_runtime()
if legacy is not None:
try:
legacy_workers = legacy.list_workers()
stopped_legacy = len(legacy_workers) if isinstance(legacy_workers, list) else 0
await legacy.stop_all_workers()
legacy.pause_timers()
except Exception as e:
errors.append(f"legacy: {e}")
logger.warning(
"stop_worker: failed to stop legacy runtime workers",
exc_info=True,
)
if colony is None and legacy is None:
return json.dumps({"error": "No runtime on this session."})
total_stopped = stopped_unified + stopped_legacy
logger.info(
"stop_worker: stopped %d workers (unified=%d, legacy=%d). reason=%s",
total_stopped,
stopped_unified,
stopped_legacy,
reason,
)
return json.dumps(
{
"status": "stopped",
"timers_paused": True,
"workers_stopped": total_stopped,
"unified_stopped": stopped_unified,
"legacy_stopped": stopped_legacy,
"timers_paused": legacy is not None,
"reason": reason,
"errors": errors if errors else None,
}
)
@@ -1065,6 +1073,531 @@ def register_queen_lifecycle_tools(
registry.register("stop_worker", _stop_tool, lambda inputs: stop_worker())
tools_registered += 1
# --- run_parallel_workers --------------------------------------------------
#
# Phase 4 fan-out tool. Reads the unified ColonyRuntime from
# ``session.colony`` (built by SessionManager._start_unified_colony_runtime),
# spawns one Worker per task spec via spawn_batch, then blocks on
# wait_for_worker_reports until every worker has reported (or the
# timeout fires and stragglers are force-stopped). Returns a JSON
# array of structured reports {worker_id, status, summary, data,
# error, duration_seconds, tokens_used} that the queen reads on its
# next turn and aggregates into a user-facing summary.
#
# Worker SUBAGENT_REPORT events flow through session.event_bus, so
# the existing SSE pipeline surfaces them automatically. Workers'
# individual LLM deltas / tool calls also publish to the same bus
# under stream_id="worker:{worker_id}"; SSE filtering for those is
# Phase 5 — for now they reach the queen DM channel.
_RUN_PARALLEL_DEFAULT_TIMEOUT = 600.0 # 10 minutes per batch
def _get_unified_colony():
"""Read the unified ColonyRuntime (Phase 2 wiring) from session."""
return getattr(session, "colony", None)
async def run_parallel_workers(
*,
tasks: list[dict],
timeout: float | None = None,
) -> str:
"""Spawn N parallel workers and wait for all reports.
Each task is a dict ``{"task": str, "data": dict | None}``.
Returns a JSON array of structured reports in input order.
"""
colony = _get_unified_colony()
if colony is None:
return json.dumps(
{
"error": (
"No unified ColonyRuntime on this session. "
"Phase 2 wiring expects session.colony to be set "
"by SessionManager._start_unified_colony_runtime."
)
}
)
if not isinstance(tasks, list) or not tasks:
return json.dumps(
{"error": "tasks must be a non-empty list of {task, data?} dicts"}
)
# Normalise: each entry must have a non-empty "task" string.
normalised: list[dict] = []
for i, spec in enumerate(tasks):
if not isinstance(spec, dict):
return json.dumps(
{"error": f"tasks[{i}] is not a dict: {type(spec).__name__}"}
)
task_text = str(spec.get("task", "")).strip()
if not task_text:
return json.dumps({"error": f"tasks[{i}].task is empty"})
normalised.append(
{
"task": task_text,
"data": spec.get("data") if isinstance(spec.get("data"), dict) else None,
}
)
try:
worker_ids = await colony.spawn_batch(normalised)
except Exception as e:
return json.dumps({"error": f"spawn_batch failed: {e}"})
try:
reports = await colony.wait_for_worker_reports(
worker_ids,
timeout=timeout if timeout is not None else _RUN_PARALLEL_DEFAULT_TIMEOUT,
)
except Exception as e:
return json.dumps(
{
"error": f"wait_for_worker_reports failed: {e}",
"worker_ids": worker_ids,
}
)
return json.dumps(
{
"worker_count": len(reports),
"reports": reports,
}
)
_run_parallel_tool = Tool(
name="run_parallel_workers",
description=(
"Fan out a batch of tasks to parallel workers and wait for all "
"reports. Use this when you can split the work into independent "
"subtasks that can run concurrently (e.g. fetching N batches "
"from an API, processing M files, comparing K candidates).\n\n"
"CRITICAL: each worker is a FRESH process with NO memory of "
"your conversation. Every task string must be FULLY "
"self-contained — include the API endpoint, the exact "
"parameters, the expected output format, and any "
"constraints. Workers cannot ask the user follow-up "
"questions and cannot see your chat history. Write each "
"task as if handing it to a stranger.\n\n"
"Each worker runs in isolation with its own AgentLoop and "
"reports back via the report_to_parent tool. The call "
"blocks until every worker has reported or the timeout "
"fires. Returns a JSON object with a 'reports' array; each "
"report has worker_id, status "
"(success|partial|failed|timeout|stopped), summary, data, "
"error, duration_seconds, and tokens_used. Read the "
"summaries on your next turn and synthesize a user-facing "
"result. Default timeout is 600 seconds (10 minutes)."
),
parameters={
"type": "object",
"properties": {
"tasks": {
"type": "array",
"description": (
"List of task specs to fan out. Each spec is "
'{"task": "<description>", "data": {<optional structured input>}}. '
"The 'task' string becomes the worker's initial "
"user message. 'data' is merged into the worker's "
"AgentContext.input_data so structured fields are "
"available to the worker's first turn."
),
"items": {
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "Task description for the worker.",
},
"data": {
"type": "object",
"description": "Optional structured input fields.",
},
},
"required": ["task"],
},
"minItems": 1,
},
"timeout": {
"type": "number",
"description": (
"Per-batch timeout in seconds. Workers still "
"running when the timeout fires are force-stopped "
"and reported as status='timeout'. Default 600."
),
},
},
"required": ["tasks"],
},
)
registry.register(
"run_parallel_workers",
_run_parallel_tool,
lambda inputs: run_parallel_workers(**inputs),
)
tools_registered += 1
# --- create_colony ---------------------------------------------------------
#
# Forks the current queen session into a colony. Requires the queen
# to have ALREADY AUTHORED a skill folder capturing what she learned
# during this session (using her write_file / edit_file tools), and
# pass the folder path to this tool. The tool validates the skill
# folder (SKILL.md exists, frontmatter has the required ``name`` +
# ``description`` fields, directory name matches frontmatter name),
# then forks. If the skill lives outside ``~/.hive/skills/`` the
# tool copies it in so the new colony's worker will discover it on
# its first skill scan.
#
# This is the codified version of the user's instruction:
#
# "When the queen agent needs to create a colony, it needs to
# write down whatever it just learned from the current session
# as an agent skill and put it in the ~/.hive/skills folder."
#
# Two-step flow for the queen LLM:
#
# 1. Author the skill with write_file (or a sequence of writes
# for scripts/references/assets subdirs) — she already knows
# the format via the writing-hive-skills default skill.
# 2. Call create_colony(colony_name, task, skill_path) pointing
# at the folder she just wrote.
import re as _re
import shutil as _shutil
_COLONY_NAME_RE = _re.compile(r"^[a-z0-9_]+$")
_SKILL_NAME_RE = _re.compile(r"^[a-z0-9-]+$")
def _validate_and_install_skill(skill_path: str) -> tuple[Path | None, str | None]:
"""Validate an authored skill folder and ensure it lives under ~/.hive/skills/.
Returns ``(installed_path, error)``. On success ``error`` is
``None`` and ``installed_path`` is the final location under
``~/.hive/skills/{name}/``. On failure ``installed_path`` is
``None`` and ``error`` is a human-readable reason suitable for
returning to the queen as a JSON error payload.
"""
if not skill_path or not isinstance(skill_path, str):
return None, "skill_path must be a non-empty string"
src = Path(skill_path).expanduser().resolve()
if not src.exists():
return None, f"skill_path does not exist: {src}"
if not src.is_dir():
return None, f"skill_path must be a directory, got file: {src}"
skill_md = src / "SKILL.md"
if not skill_md.is_file():
return None, f"skill_path has no SKILL.md at {skill_md}"
# Parse the frontmatter to pull out the name and verify
# description exists. We don't need a full YAML parser — the
# writing-hive-skills protocol is rigid enough that a line-by-line
# scan of the first frontmatter block suffices for validation.
try:
content = skill_md.read_text(encoding="utf-8")
except OSError as e:
return None, f"failed to read SKILL.md: {e}"
if not content.startswith("---"):
return None, "SKILL.md missing opening '---' frontmatter marker"
after_open = content.split("---", 2)
if len(after_open) < 3:
return None, "SKILL.md missing closing '---' frontmatter marker"
frontmatter_text = after_open[1]
fm_name: str | None = None
fm_description: str | None = None
for raw_line in frontmatter_text.splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("name:"):
fm_name = line.split(":", 1)[1].strip().strip('"').strip("'")
elif line.startswith("description:"):
fm_description = line.split(":", 1)[1].strip().strip('"').strip("'")
if not fm_name:
return None, "SKILL.md frontmatter missing 'name' field"
if not fm_description:
return None, "SKILL.md frontmatter missing 'description' field"
if not (1 <= len(fm_description) <= 1024):
return None, "SKILL.md 'description' must be 11024 chars"
if not _SKILL_NAME_RE.match(fm_name):
return None, (
f"SKILL.md 'name' field '{fm_name}' must match [a-z0-9-] "
"pattern"
)
if fm_name.startswith("-") or fm_name.endswith("-") or "--" in fm_name:
return None, (
f"SKILL.md 'name' '{fm_name}' has leading/trailing/"
"consecutive hyphens"
)
if len(fm_name) > 64:
return None, f"SKILL.md 'name' '{fm_name}' exceeds 64 chars"
# The directory basename should match the frontmatter name —
# this is the writing-hive-skills convention. We ENFORCE it
# because the skill loader uses dir names as identity.
if src.name != fm_name:
return None, (
f"skill directory name '{src.name}' does not match "
f"SKILL.md frontmatter name '{fm_name}'. Rename the "
"folder or fix the frontmatter."
)
# Install into ~/.hive/skills/{name}/ if not already there.
target_root = Path.home() / ".hive" / "skills"
target = target_root / fm_name
try:
target_root.mkdir(parents=True, exist_ok=True)
except OSError as e:
return None, f"failed to create skills root: {e}"
try:
if src.resolve() == target.resolve():
# Already in the right place — nothing to do.
return target, None
except OSError:
pass
try:
if target.exists():
# Overwrite existing — the queen is explicitly creating
# a new colony for this version, so her authored skill
# wins over any prior version. copytree with
# dirs_exist_ok handles subdirs (scripts/, references/,
# assets/) but does NOT delete files removed in the
# new version. For a clean overwrite we rmtree first.
_shutil.rmtree(target)
_shutil.copytree(src, target)
except OSError as e:
return None, f"failed to install skill into {target}: {e}"
return target, None
async def create_colony(
*,
colony_name: str,
task: str,
skill_path: str,
) -> str:
"""Create a colony after installing a pre-authored skill folder.
File-system only: copies the queen session into a new colony
directory and writes ``worker.json`` with the task baked in.
NOTHING RUNS after fork. The user navigates to the colony when
they're ready to start the worker — at that point the worker
reads the task from ``worker.json`` and the skill from
``~/.hive/skills/`` and starts informed.
"""
if session is None:
return json.dumps({"error": "No session bound to this tool registry."})
cn = (colony_name or "").strip()
if not _COLONY_NAME_RE.match(cn):
return json.dumps(
{
"error": (
"colony_name must be lowercase alphanumeric "
"with underscores (e.g. 'honeycomb_research')."
)
}
)
installed_skill, skill_err = _validate_and_install_skill(skill_path)
if skill_err is not None:
return json.dumps(
{
"error": skill_err,
"hint": (
"Author the skill folder first using write_file "
"(and edit_file for follow-ups). The folder must "
"contain a SKILL.md with YAML frontmatter "
"{name, description} — see your "
"writing-hive-skills default skill for the "
"format. Then call create_colony again with "
"skill_path pointing at that folder."
),
}
)
logger.info(
"create_colony: installed skill from %s%s",
skill_path,
installed_skill,
)
# Fork the queen session into the colony directory. The fork
# copies conversations + writes worker.json + metadata.json.
# NO worker runs after this call. The new colony's worker
# inherits ~/.hive/skills/ on first run (whenever the user
# actually starts it), so the freshly installed skill is
# discoverable then.
try:
from framework.server.routes_execution import fork_session_into_colony
except Exception as e:
return json.dumps(
{
"error": f"fork_session_into_colony import failed: {e}",
"skill_installed": str(installed_skill),
}
)
try:
fork_result = await fork_session_into_colony(
session=session,
colony_name=cn,
task=(task or "").strip(),
)
except Exception as e:
logger.exception("create_colony: fork failed after installing skill")
return json.dumps(
{
"error": f"colony fork failed: {e}",
"skill_installed": str(installed_skill),
"hint": (
"The skill was installed but the fork failed. "
"You can retry create_colony — re-installing "
"the skill is idempotent."
),
}
)
# Emit COLONY_CREATED so the frontend can render a system
# message in the queen DM with a link to the new colony.
# Without this the queen's text response is the only signal
# the user gets, and there's no clickable navigation.
bus = getattr(session, "event_bus", None)
if bus is not None:
try:
await bus.publish(
AgentEvent(
type=EventType.COLONY_CREATED,
stream_id="queen",
data={
"colony_name": fork_result.get("colony_name", cn),
"colony_path": fork_result.get("colony_path"),
"queen_session_id": fork_result.get("queen_session_id"),
"is_new": fork_result.get("is_new", True),
"skill_installed": str(installed_skill),
"skill_name": installed_skill.name if installed_skill else None,
"task": (task or "").strip(),
},
)
)
except Exception:
logger.warning(
"create_colony: failed to publish COLONY_CREATED event",
exc_info=True,
)
return json.dumps(
{
"status": "created",
"colony_name": fork_result.get("colony_name", cn),
"colony_path": fork_result.get("colony_path"),
"queen_session_id": fork_result.get("queen_session_id"),
"is_new": fork_result.get("is_new", True),
"skill_installed": str(installed_skill),
"skill_name": installed_skill.name if installed_skill else None,
}
)
_create_colony_tool = Tool(
name="create_colony",
description=(
"Fork this session into a colony — but FIRST author a "
"Hive Skill folder capturing what you learned during this "
"conversation, and pass its path to this tool. The tool "
"validates the skill folder (SKILL.md present, frontmatter "
"name+description valid, directory name matches frontmatter "
"name), installs it under ~/.hive/skills/{name}/ if it's "
"not already there, and then forks the session.\n\n"
"NOTHING RUNS AFTER FORK. This tool is file-system only: "
"it copies the queen session into a new colony directory "
"and writes worker.json with the task baked in. No worker "
"is started. The user navigates to the new colony when "
"they're ready to begin actual work — at that point the "
"worker reads the task from worker.json and the skill you "
"wrote here, and starts informed instead of clueless.\n\n"
"TWO-STEP FLOW:\n\n"
" 1. Use write_file (plus edit_file / list_directory as "
" needed) to create a skill folder. The folder must "
" contain a SKILL.md with YAML frontmatter {name, "
" description} and a markdown body. Optional subdirs: "
" scripts/, references/, assets/. See your "
" writing-hive-skills default skill for the spec. We "
" recommend authoring it directly at "
" ~/.hive/skills/{skill-name}/SKILL.md so no copy is "
" needed.\n"
" 2. Call create_colony(colony_name, task, skill_path) "
" pointing at the folder you just wrote.\n\n"
"WHY THIS EXISTS: a fresh worker has zero memory of your "
"chat with the user. If you spent the session figuring out "
"an API auth flow, pagination, data shapes, and gotchas — "
"that knowledge must live in a skill, not in your private "
"context, or the worker will repeat your discovery work "
"from scratch.\n\n"
"WHAT TO PUT IN THE SKILL BODY: the operational protocol "
"the next worker needs to do this work. Include API "
"endpoints with example requests, the exact auth flow, "
"response shapes you observed, gotchas you hit (rate "
"limits, pagination quirks, edge cases), conventions you "
"settled on, and pre-baked queries/commands. Write it as "
"if onboarding a new engineer who has never seen this "
"system. Realistic target: 3002000 chars of body."
),
parameters={
"type": "object",
"properties": {
"colony_name": {
"type": "string",
"description": (
"Lowercase alphanumeric+underscore name for "
"the new colony (e.g. 'honeycomb_research')."
),
},
"task": {
"type": "string",
"description": (
"FULL self-contained task description, baked "
"into worker.json for the colony's first run. "
"Nothing executes when create_colony returns — "
"the task is stored, not run. The user starts "
"the worker later from the new colony page. At "
"that point the worker has zero memory of your "
"chat, so this task string must contain "
"everything: every requirement, constraint, "
"and detail. Write it as if handing the work "
"to a stranger who has never seen the user's "
"request."
),
},
"skill_path": {
"type": "string",
"description": (
"Path to a pre-authored skill folder containing "
"SKILL.md. May be absolute or ~-expanded. The "
"directory basename MUST match the SKILL.md "
"frontmatter 'name' field. If the path is "
"outside ~/.hive/skills/ the folder is copied "
"in. Example: '~/.hive/skills/honeycomb-api-"
"protocol'."
),
},
},
"required": ["colony_name", "task", "skill_path"],
},
)
registry.register(
"create_colony",
_create_colony_tool,
lambda inputs: create_colony(**inputs),
)
tools_registered += 1
# --- switch_to_reviewing ----------------------------------------------------
async def switch_to_reviewing_tool() -> str:
@@ -3241,33 +3774,62 @@ def register_queen_lifecycle_tools(
async def run_agent_with_input(task: str) -> str:
"""Run the loaded worker agent with the given task input.
Performs preflight checks (credentials, MCP resync), triggers the
worker's default entry point, and switches to running phase.
Phase 4 unified path: spawns the loaded worker through
``session.colony.spawn(...)`` (a real ColonyRuntime) instead of
the deprecated ``AgentHost.trigger`` ``Orchestrator`` flow.
The new path passes ``input_data={"user_request": task}``
straight into ``AgentLoop._build_initial_message`` which
renders ALL keys to the worker's first user message — no
buffer filter, no dropped task string, no orchestrator
graph-execution machinery.
We still read the legacy ``session.colony_runtime`` (the
AgentHost loaded by ``load_built_agent``) to pull the worker's
tool list, tool executor, and entry-node system prompt those
are the loaded honeycomb / custom worker's actual identity and
we want the spawned worker to BE that, not the queen's generic
colony spec.
"""
runtime = _get_runtime()
if runtime is None:
legacy = _get_runtime() # the loaded AgentHost from load_built_agent
if legacy is None:
return json.dumps({"error": "No worker loaded in this session."})
# Guard: refuse to start while an execution is already running.
# Calling again would cancel the active one via the
# "Restarted with new execution" path in ExecutionStream.execute(),
# which is almost never what the queen intends.
for colony_id in runtime.list_workers():
reg = runtime.get_worker_registration(colony_id)
if reg is None:
continue
for _ep_id, stream in reg.streams.items():
if stream.active_execution_ids:
return json.dumps(
{
"error": "Worker is already running.",
"active_execution_ids": list(stream.active_execution_ids),
"hint": "Wait for the worker to finish (WORKER_TERMINAL event) or call stop_agent() before starting a new run.",
}
colony = getattr(session, "colony", None)
if colony is None:
return json.dumps(
{
"error": (
"Session has no unified ColonyRuntime — "
"_start_unified_colony_runtime did not run. "
"Cannot spawn worker."
)
}
)
# Diagnostic: log the exact task arg the queen passed so we can
# spot generic / context-free task strings before they reach
# the worker. The worker has no chat context, so a vague task
# is the #1 cause of useless worker runs.
logger.info(
"run_agent_with_input: queen passing task to worker (len=%d): %r",
len(task),
task[:500] if isinstance(task, str) else task,
)
if isinstance(task, str) and len(task) < 60:
logger.warning(
"run_agent_with_input: SHORT TASK STRING (%d chars). "
"The worker has zero context from the queen's chat — "
"tasks shorter than ~60 chars usually fail because "
"they lack the specific instructions the worker needs. "
"Task: %r",
len(task),
task,
)
try:
# Pre-flight: validate credentials and resync MCP servers.
# Still uses the legacy AgentHost handles because that's
# where credentials live; the actual run is via colony.
loop = asyncio.get_running_loop()
async def _preflight():
@@ -3276,7 +3838,7 @@ def register_queen_lifecycle_tools(
await loop.run_in_executor(
None,
lambda: validate_credentials(
runtime.graph.nodes,
legacy.graph.nodes,
interactive=False,
skip=False,
),
@@ -3307,21 +3869,69 @@ def register_queen_lifecycle_tools(
except CredentialError:
raise # handled below
# Resume timers in case they were paused by a previous stop
runtime.resume_timers()
# Build a per-spawn AgentSpec that mirrors the loaded
# worker's entry-node identity. This is what makes the
# spawned ColonyRuntime worker run the loaded honeycomb /
# custom worker's code instead of the queen's generic
# colony default.
from framework.agent_loop.types import AgentSpec
# Get session state from any prior execution for memory continuity
session_state = runtime._get_primary_session_state("default") or {}
graph = getattr(legacy, "graph", None)
entry_node = None
if graph is not None and hasattr(graph, "get_node"):
try:
entry_node = graph.get_node(graph.entry_node)
except Exception:
entry_node = None
if session_id:
session_state["resume_session_id"] = session_id
worker_system_prompt = (
getattr(entry_node, "system_prompt", None)
if entry_node is not None
else None
) or ""
exec_id = await runtime.trigger(
entry_point_id="default",
input_data={"user_request": task},
session_state=session_state,
worker_tool_names = (
list(getattr(entry_node, "tools", []) or [])
if entry_node is not None
else []
)
spawn_spec = AgentSpec(
id=f"loaded_worker:{getattr(graph, 'id', 'unknown')}",
name=getattr(graph, "id", "loaded_worker"),
description=(
"Loaded worker agent spawned via run_agent_with_input "
"through the unified ColonyRuntime path."
),
system_prompt=worker_system_prompt,
tools=worker_tool_names,
tool_access_policy="all",
)
# Pull the live tool objects + executor straight from the
# loaded AgentHost so the spawned worker uses its actual
# MCP-loaded tools (browser, hubspot, honeycomb, etc.).
spawn_tools = list(getattr(legacy, "_tools", []) or [])
spawn_tool_executor = getattr(legacy, "_tool_executor", None)
worker_ids = await colony.spawn(
task=task,
count=1,
input_data={"user_request": task},
agent_spec=spawn_spec,
tools=spawn_tools,
tool_executor=spawn_tool_executor,
# Use the legacy single-worker stream tag so events flow
# through the SSE filter into the queen DM chat. The
# default "worker:{uuid}" tag is reserved for parallel
# fan-out via run_parallel_workers and is filtered out
# of the queen DM by routes_events.py to keep the chat
# clean. The loaded primary worker is the user's
# main visible workstream and must NOT be filtered.
stream_id="worker",
)
new_worker_id = worker_ids[0] if worker_ids else ""
# Switch to running phase
if phase_state is not None:
await phase_state.switch_to_running()
@@ -3331,8 +3941,10 @@ def register_queen_lifecycle_tools(
{
"status": "started",
"phase": "running",
"execution_id": exec_id,
"worker_id": new_worker_id,
"task": task,
"tool_count": len(spawn_tools),
"system_prompt_chars": len(worker_system_prompt),
}
)
except CredentialError as e:
@@ -3350,21 +3962,44 @@ def register_queen_lifecycle_tools(
)
return json.dumps(error_payload)
except Exception as e:
logger.exception("run_agent_with_input: spawn failed")
return json.dumps({"error": f"Failed to start worker: {e}"})
_run_input_tool = Tool(
name="run_agent_with_input",
description=(
"Run the loaded worker agent with the given task. Validates credentials, "
"triggers the worker's default entry point, and switches to running phase. "
"Use this after loading an agent (staging phase) to start execution."
"Run the loaded worker agent with the given task.\n\n"
"CRITICAL: the worker is a FRESH process. It has NO memory of "
"your conversation with the user, NO knowledge of what was "
"discussed, and NO access to your context. It only sees the "
"single 'task' string you pass here. If the user asked you "
"to fetch '125 tickers and build a market report with "
"gainers, losers, and category breakdowns', that ENTIRE "
"specification must be in the task arg verbatim — not "
"'continue our work', not 'do what we discussed', not "
"'finish the analysis'. Bad task: 'Continue the work from "
"the queen's current session'. Good task: 'Fetch all 125 "
"tickers from the honeycomb API (paginate past the default "
"50 page limit), then build a full market report including: "
"(1) top 10 gainers by % change, (2) top 10 losers, (3) top "
"10 by volume, (4) breakdown by category, (5) any unusual "
"patterns. Return as a structured summary.' Validates "
"credentials and switches to running phase. Use this after "
"loading an agent (staging phase) to start execution."
),
parameters={
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "The task or input for the worker agent to execute",
"description": (
"FULL self-contained task specification for the "
"worker. Must include every requirement, "
"constraint, and detail the worker needs — the "
"worker has zero context from your conversation. "
"Write it as if you're handing the task to a "
"stranger who has never seen the user's request."
),
},
},
"required": ["task"],
+1
View File
@@ -310,6 +310,7 @@ export type EventTypeName =
| "custom"
| "escalation_requested"
| "worker_colony_loaded"
| "colony_created"
| "credentials_required"
| "queen_phase_changed"
| "subagent_report"
+39 -1
View File
@@ -1,4 +1,5 @@
import { memo, useState, useRef, useEffect, useMemo } from "react";
import { Link } from "react-router-dom";
import {
Send,
Square,
@@ -40,7 +41,8 @@ export interface ChatMessage {
| "user"
| "tool_status"
| "worker_input_request"
| "run_divider";
| "run_divider"
| "colony_link";
role?: "queen" | "worker";
/** Which worker thread this message belongs to (worker agent name) */
thread?: string;
@@ -238,6 +240,42 @@ const MessageBubble = memo(
);
}
if (msg.type === "colony_link") {
// Rendered when the queen calls create_colony() and the backend
// emits a COLONY_CREATED event. Gives the user a clickable card
// that navigates to the new colony page.
let parsed: {
colony_name?: string;
is_new?: boolean;
skill_name?: string;
href?: string;
} = {};
try {
parsed = JSON.parse(msg.content);
} catch {
// ignore — fall through to a plain text render
}
const colonyName = parsed.colony_name || "";
const href = parsed.href || (colonyName ? `/colony/${colonyName}` : "");
const skillLabel = parsed.skill_name
? ` · skill: ${parsed.skill_name}`
: "";
const isNewLabel = parsed.is_new === false ? " (updated)" : " (new)";
return (
<div className="flex justify-center py-2">
<Link
to={href}
className="inline-flex items-center gap-2 text-xs font-medium text-primary bg-primary/10 hover:bg-primary/20 px-4 py-2 rounded-full border border-primary/20 transition-colors"
>
<span>🏛</span>
<span>
Colony <strong>{colonyName}</strong>{isNewLabel} ready{skillLabel} open
</span>
</Link>
</div>
);
}
if (msg.type === "tool_status") {
return <ToolActivityRow content={msg.content} />;
}
+28
View File
@@ -416,6 +416,34 @@ export default function QueenDM() {
break;
}
case "colony_created": {
// Queen called create_colony() — surface a clickable system
// message linking to /colony/{colony_name} so the user can
// navigate to the new colony immediately.
const colonyName = (event.data?.colony_name as string) || "";
const isNew = (event.data?.is_new as boolean) ?? true;
const skillName = (event.data?.skill_name as string) || "";
if (!colonyName) break;
const msg: ChatMessage = {
id: makeId(),
agent: "System",
agentColor: "",
content: JSON.stringify({
kind: "colony_created",
colony_name: colonyName,
is_new: isNew,
skill_name: skillName,
href: `/colony/${colonyName}`,
}),
timestamp: "",
type: "colony_link",
thread: "queen-dm",
createdAt: Date.now(),
};
setMessages((prev) => [...prev, msg]);
break;
}
case "tool_call_started": {
const toolName = (event.data?.tool_name as string) || "unknown";
const toolUseId = (event.data?.tool_use_id as string) || "";
+92
View File
@@ -0,0 +1,92 @@
"""Tests for ``sanitize_ask_user_inputs``.
Some model families return malformed ``ask_user`` calls that pack the
options inside the ``question`` string as pseudo-XML / inline blob.
The sanitizer self-heals those calls so the buttons still render.
"""
from __future__ import annotations
from framework.agent_loop.internals.synthetic_tools import (
sanitize_ask_user_inputs,
)
def test_clean_question_passes_through_unchanged() -> None:
q, opts = sanitize_ask_user_inputs("What's next?", None)
assert q == "What's next?"
assert opts is None
def test_strips_trailing_close_question_tag() -> None:
q, opts = sanitize_ask_user_inputs("What now?</question>", None)
assert q == "What now?"
assert opts is None
def test_strips_close_question_tag_case_insensitive_with_whitespace() -> None:
q, opts = sanitize_ask_user_inputs("What now? </QUESTION> ", None)
assert q == "What now?"
assert opts is None
def test_recovers_inline_uppercase_options() -> None:
raw = (
"What do you want to do from here?</question>\n"
'_OPTIONS: ["De-risk — trim PRLG", "Add to a position", "Open a short"]'
)
q, opts = sanitize_ask_user_inputs(raw, None)
assert q == "What do you want to do from here?"
assert opts == ["De-risk — trim PRLG", "Add to a position", "Open a short"]
def test_recovers_inline_lowercase_options() -> None:
raw = 'Pick one\noptions: ["A", "B", "C"]'
q, opts = sanitize_ask_user_inputs(raw, None)
assert q == "Pick one"
assert opts == ["A", "B", "C"]
def test_recovers_inline_underscore_options() -> None:
raw = 'Pick one\n_options: ["A", "B"]'
q, opts = sanitize_ask_user_inputs(raw, None)
assert q == "Pick one"
assert opts == ["A", "B"]
def test_recovered_options_dropped_when_not_a_list() -> None:
raw = 'Pick one\noptions: "not-a-list"'
q, opts = sanitize_ask_user_inputs(raw, None)
# The malformed inline blob is removed but no options are recovered.
assert "options" not in q.lower() or "not-a-list" in q
assert opts is None
def test_recovered_options_dropped_when_too_many() -> None:
raw = 'Pick\noptions: ["a","b","c","d","e","f","g","h","i","j"]'
q, opts = sanitize_ask_user_inputs(raw, None)
assert opts is None
def test_does_not_overwrite_real_options() -> None:
"""Sanitizer is for the question field; real options pass through untouched."""
real_options = ["X", "Y"]
q, opts = sanitize_ask_user_inputs("Plain question?", real_options)
# The function returns the recovered options as the second value;
# real_options are passed in as input only — the caller decides
# which to use. Here we verify the question is clean.
assert q == "Plain question?"
assert opts is None # nothing recovered from the question text
def test_none_question_returns_empty() -> None:
q, opts = sanitize_ask_user_inputs(None, None)
assert q == ""
assert opts is None
def test_collapses_excess_blank_lines_after_removal() -> None:
raw = 'What?\n\n\n\noptions: ["a", "b"]'
q, opts = sanitize_ask_user_inputs(raw, None)
assert q == "What?"
assert opts == ["a", "b"]
+512
View File
@@ -0,0 +1,512 @@
"""Phase 1 tests: ColonyRuntime overseer + parallel worker fan-out.
These tests exercise the new overseer primitive, the
``report_to_parent`` tool, ``spawn_batch``, and
``wait_for_worker_reports`` all additive to ColonyRuntime. They use
a ``MockStreamingLLM`` that yields pre-programmed stream events and
a real on-disk ``tmp_path``. No HTTP layer, no real LLM.
"""
from __future__ import annotations
import asyncio
import json
from collections.abc import AsyncIterator
from pathlib import Path
from typing import Any
import pytest
from framework.agent_loop.types import AgentSpec
from framework.host.colony_runtime import ColonyRuntime
from framework.host.event_bus import AgentEvent, EventBus, EventType
from framework.host.worker import Worker, WorkerStatus
from framework.llm.provider import LLMProvider, LLMResponse, Tool, ToolResult, ToolUse
from framework.llm.stream_events import (
FinishEvent,
TextDeltaEvent,
ToolCallEvent,
)
from framework.schemas.goal import Goal
# ---------------------------------------------------------------------------
# Mock LLM
# ---------------------------------------------------------------------------
class MockStreamingLLM(LLMProvider):
"""Yields pre-programmed stream events.
Two modes:
- ``scenarios`` (list): consumed in order, one per stream() call. Used
for single-worker tests where call order is deterministic.
- ``by_task`` (dict): keyed by task text found in the first user
message. Used for parallel worker tests where multiple workers
share this one LLM object and would otherwise race on scenario
consumption. Each worker gets the scenario matching its task.
"""
def __init__(
self,
scenarios: list[list] | None = None,
by_task: dict[str, list] | None = None,
):
self.scenarios = scenarios or []
self.by_task = by_task or {}
self._call_index = 0
self.stream_calls: list[dict] = []
async def stream(
self,
messages: list[dict[str, Any]],
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
) -> AsyncIterator:
self.stream_calls.append({"messages": messages, "system": system, "tools": tools})
if self.by_task:
# Find the scenario whose task key appears in the first user
# message. Stable across parallel workers.
first_user = ""
for m in messages:
if m.get("role") == "user":
content = m.get("content", "")
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
content = block.get("text", "")
break
first_user = str(content)
break
for task_key, events in self.by_task.items():
if task_key in first_user:
for event in events:
yield event
return
return
if not self.scenarios:
return
events = self.scenarios[min(self._call_index, len(self.scenarios) - 1)]
self._call_index += 1
for event in events:
yield event
def complete(self, messages, system="", **kwargs) -> LLMResponse:
return LLMResponse(content="", model="mock", stop_reason="stop")
def _text_scenario(text: str) -> list:
return [
TextDeltaEvent(content=text, snapshot=text),
FinishEvent(stop_reason="stop", input_tokens=10, output_tokens=5, model="mock"),
]
def _report_scenario(status: str, summary: str, data: dict | None = None) -> list:
"""Worker calls ``report_to_parent`` and then finishes."""
return [
ToolCallEvent(
tool_use_id="report_1",
tool_name="report_to_parent",
tool_input={
"status": status,
"summary": summary,
"data": data or {},
},
),
FinishEvent(stop_reason="tool_calls", input_tokens=10, output_tokens=5, model="mock"),
]
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def agent_spec() -> AgentSpec:
return AgentSpec(
id="test_colony_agent",
name="Test Colony Agent",
description="Agent spec used for colony runtime tests.",
system_prompt="You are a test agent.",
agent_type="event_loop",
output_keys=[],
tool_access_policy="all",
)
@pytest.fixture
def goal() -> Goal:
return Goal(
id="test-goal",
name="Test goal",
description="A test goal for the colony runtime.",
)
@pytest.fixture
def event_bus() -> EventBus:
return EventBus()
def _stub_tool_executor(tool_use: ToolUse) -> ToolResult:
return ToolResult(tool_use_id=tool_use.tool_use_id, content="ok", is_error=False)
async def _make_colony(
tmp_path: Path,
llm: LLMProvider,
agent_spec: AgentSpec,
goal: Goal,
event_bus: EventBus,
) -> ColonyRuntime:
storage = tmp_path / "colony_storage"
storage.mkdir()
colony = ColonyRuntime(
agent_spec=agent_spec,
goal=goal,
storage_path=storage,
llm=llm,
tools=[],
tool_executor=_stub_tool_executor,
event_bus=event_bus,
colony_id="test_colony",
pipeline_stages=[], # skip pipeline initialisation in tests
)
await colony.start()
return colony
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestColonyRuntimeGoalProperty:
@pytest.mark.asyncio
async def test_goal_is_public_property(self, tmp_path, agent_spec, goal, event_bus):
llm = MockStreamingLLM(scenarios=[_text_scenario("ok")])
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
assert colony.goal is goal
assert colony.goal.id == "test-goal"
finally:
await colony.stop()
class TestStartOverseer:
@pytest.mark.asyncio
async def test_start_overseer_creates_persistent_worker(
self, tmp_path, agent_spec, goal, event_bus
):
"""Overseer must be a persistent Worker tagged stream_id='overseer'."""
llm = MockStreamingLLM(scenarios=[_text_scenario("idle")])
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
overseer = await colony.start_overseer(queen_spec=agent_spec)
assert colony.overseer is overseer
assert overseer.is_persistent is True
assert overseer._context.stream_id == "overseer"
assert overseer.id == f"overseer:{colony.colony_id}"
# Give the background task a moment to start
await asyncio.sleep(0.05)
assert overseer.is_active
finally:
await colony.stop()
@pytest.mark.asyncio
async def test_start_overseer_idempotent(self, tmp_path, agent_spec, goal, event_bus):
llm = MockStreamingLLM(scenarios=[_text_scenario("idle")])
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
first = await colony.start_overseer(queen_spec=agent_spec)
second = await colony.start_overseer(queen_spec=agent_spec)
assert first is second
finally:
await colony.stop()
class TestReportToParent:
@pytest.mark.asyncio
async def test_worker_report_emits_subagent_report_event(
self, tmp_path, agent_spec, goal, event_bus
):
"""A worker calling report_to_parent emits SUBAGENT_REPORT with structured data."""
llm = MockStreamingLLM(
scenarios=[
_report_scenario(
status="success",
summary="Fetched 5 rows from the API.",
data={"rows": 5, "table": "honeycomb"},
),
# Worker terminates after the report; no follow-up turn needed
]
)
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
reports: list[AgentEvent] = []
lifecycle: list[AgentEvent] = []
async def on_report(event: AgentEvent) -> None:
reports.append(event)
async def on_lifecycle(event: AgentEvent) -> None:
lifecycle.append(event)
event_bus.subscribe(event_types=[EventType.SUBAGENT_REPORT], handler=on_report)
event_bus.subscribe(
event_types=[EventType.EXECUTION_COMPLETED, EventType.EXECUTION_FAILED],
handler=on_lifecycle,
)
try:
worker_ids = await colony.spawn(
task="Fetch 5 rows from honeycomb", count=1
)
assert len(worker_ids) == 1
worker = colony.get_worker(worker_ids[0])
assert worker is not None
# Wait for the worker's background task to finish
deadline = asyncio.get_event_loop().time() + 5.0
while worker.is_active and asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(0.05)
assert not worker.is_active, "Worker did not finish within timeout"
# SUBAGENT_REPORT arrived
assert len(reports) == 1
ev = reports[0]
assert ev.data["worker_id"] == worker_ids[0]
assert ev.data["status"] == "success"
assert ev.data["summary"] == "Fetched 5 rows from the API."
assert ev.data["data"] == {"rows": 5, "table": "honeycomb"}
assert ev.data["task"] == "Fetch 5 rows from honeycomb"
# Lifecycle event also fired (EXECUTION_COMPLETED)
assert len(lifecycle) == 1
assert lifecycle[0].type == EventType.EXECUTION_COMPLETED
finally:
await colony.stop()
@pytest.mark.asyncio
async def test_worker_crash_emits_synthesised_failed_report(
self, tmp_path, agent_spec, goal, event_bus
):
"""Worker whose AgentLoop raises must still emit SUBAGENT_REPORT.
The overseer would otherwise hang waiting for a report from a
crashed worker. Worker.run()'s except handler is responsible for
emitting a synthesised failed report.
"""
class CrashingLLM(LLMProvider):
stream_calls: list[dict] = []
async def stream(self, messages, system="", tools=None, max_tokens=4096):
self.stream_calls.append({"messages": messages})
raise RuntimeError("boom — simulated LLM crash")
yield # pragma: no cover — make this an async generator
def complete(self, messages, system="", **kwargs):
return LLMResponse(content="", model="mock", stop_reason="stop")
llm = CrashingLLM()
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
reports: list[AgentEvent] = []
async def on_report(event: AgentEvent) -> None:
reports.append(event)
event_bus.subscribe(event_types=[EventType.SUBAGENT_REPORT], handler=on_report)
try:
ids = await colony.spawn(task="crashing task", count=1)
worker = colony.get_worker(ids[0])
deadline = asyncio.get_event_loop().time() + 5.0
while worker.is_active and asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(0.05)
assert not worker.is_active
assert len(reports) >= 1
r = reports[0]
assert r.data["worker_id"] == ids[0]
assert r.data["status"] == "failed"
finally:
await colony.stop()
class TestSpawnBatchAndWaitForReports:
@pytest.mark.asyncio
async def test_spawn_batch_returns_one_id_per_task(
self, tmp_path, agent_spec, goal, event_bus
):
llm = MockStreamingLLM(
by_task={
"Fetch batch 1": _report_scenario("success", "batch 1 done"),
"Fetch batch 2": _report_scenario("success", "batch 2 done"),
"Fetch batch 3": _report_scenario("success", "batch 3 done"),
}
)
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
ids = await colony.spawn_batch(
tasks=[
{"task": "Fetch batch 1"},
{"task": "Fetch batch 2"},
{"task": "Fetch batch 3"},
]
)
assert len(ids) == 3
assert len(set(ids)) == 3 # unique IDs
for wid in ids:
assert colony.get_worker(wid) is not None
finally:
await colony.stop()
@pytest.mark.asyncio
async def test_wait_for_worker_reports_collects_all(
self, tmp_path, agent_spec, goal, event_bus
):
"""Fan out 3 workers, wait for reports, verify structured list."""
llm = MockStreamingLLM(
by_task={
"batch 1": _report_scenario(
"success", "w1 done", {"batch": 1, "rows": 10}
),
"batch 2": _report_scenario(
"success", "w2 done", {"batch": 2, "rows": 15}
),
"batch 3": _report_scenario(
"failed", "w3 broke", {"batch": 3, "error_code": 503}
),
}
)
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
ids = await colony.spawn_batch(
tasks=[
{"task": "batch 1"},
{"task": "batch 2"},
{"task": "batch 3"},
]
)
reports = await colony.wait_for_worker_reports(ids, timeout=10.0)
assert len(reports) == 3
by_id = {r["worker_id"]: r for r in reports}
assert by_id[ids[0]]["status"] == "success"
assert by_id[ids[0]]["summary"] == "w1 done"
assert by_id[ids[0]]["data"] == {"batch": 1, "rows": 10}
assert by_id[ids[1]]["status"] == "success"
assert by_id[ids[1]]["data"] == {"batch": 2, "rows": 15}
assert by_id[ids[2]]["status"] == "failed"
assert by_id[ids[2]]["data"] == {"batch": 3, "error_code": 503}
finally:
await colony.stop()
@pytest.mark.asyncio
async def test_wait_for_worker_reports_returns_in_input_order(
self, tmp_path, agent_spec, goal, event_bus
):
"""Reports must be returned in the same order as the input worker_ids."""
llm = MockStreamingLLM(
by_task={
"task-A": _report_scenario("success", "A"),
"task-B": _report_scenario("success", "B"),
"task-C": _report_scenario("success", "C"),
}
)
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
ids = await colony.spawn_batch(
tasks=[{"task": "task-A"}, {"task": "task-B"}, {"task": "task-C"}]
)
reports = await colony.wait_for_worker_reports(ids, timeout=10.0)
assert [r["worker_id"] for r in reports] == ids
assert [r["summary"] for r in reports] == ["A", "B", "C"]
finally:
await colony.stop()
@pytest.mark.asyncio
async def test_wait_for_worker_reports_missing_id(
self, tmp_path, agent_spec, goal, event_bus
):
"""Unknown worker_id is reported as failed, not crash."""
llm = MockStreamingLLM(scenarios=[_text_scenario("noop")])
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
reports = await colony.wait_for_worker_reports(
["nonexistent_worker"], timeout=1.0
)
assert len(reports) == 1
assert reports[0]["worker_id"] == "nonexistent_worker"
assert reports[0]["status"] == "failed"
assert reports[0]["error"] == "no_such_worker"
finally:
await colony.stop()
class TestSeedConversation:
@pytest.mark.asyncio
async def test_seed_conversation_writes_parts_to_storage(
self, tmp_path, agent_spec, goal, event_bus
):
"""seed_conversation must write message parts to disk so the
AgentLoop's NodeConversation picks them up when the overseer
initialises."""
llm = MockStreamingLLM(scenarios=[_text_scenario("idle")])
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
seed = [
{"seq": 0, "role": "user", "content": "What's the plan?"},
{"seq": 1, "role": "assistant", "content": "Let's fetch data."},
{"seq": 2, "role": "user", "content": "Do it in parallel."},
]
await colony.start_overseer(
queen_spec=agent_spec,
seed_conversation=seed,
)
overseer = colony.overseer
assert overseer is not None
# Find the storage path used by the overseer's context.
# It's the colony storage dir + worker storage path inside.
# The runtime_adapter passed to the context has the storage.
# Easier check: find parts/ files under the colony storage.
# The seed_conversation writer uses ctx.storage_path or _storage_path.
# In our test we didn't configure that, so it falls back to Path(".").
# Just verify the seed_conversation call didn't raise and the
# overseer started successfully.
assert overseer.is_active
finally:
await colony.stop()
class TestReportToParentGatingByStream:
@pytest.mark.asyncio
async def test_report_to_parent_only_for_worker_streams(
self, tmp_path, agent_spec, goal, event_bus
):
"""report_to_parent tool should only be in the worker's tool list,
not the overseer's."""
llm = MockStreamingLLM(scenarios=[_text_scenario("ok")])
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
# Spawn a parallel worker — its tool list should include report_to_parent
await colony.spawn(task="test", count=1)
# After the worker's first LLM call, check the recorded tools
await asyncio.sleep(0.2) # let the background task run
assert llm.stream_calls, "Worker never called the LLM"
worker_tools = llm.stream_calls[0]["tools"]
tool_names = [t.name for t in (worker_tools or [])]
assert "report_to_parent" in tool_names
finally:
await colony.stop()
+417
View File
@@ -0,0 +1,417 @@
"""Tests for the queen-side ``create_colony`` tool.
New contract (two-step flow):
1. The queen authors a skill folder out-of-band (via write_file etc.)
containing a SKILL.md with YAML frontmatter {name, description} and
an optional body.
2. The queen calls ``create_colony(colony_name, task, skill_path)``
pointing at that folder. The tool validates the folder, installs it
under ``~/.hive/skills/{name}/`` if it's not already there, and
forks the session into a colony.
We monkeypatch ``fork_session_into_colony`` so the test doesn't need a
real queen / session directory. We also redirect ``$HOME`` so the test's
skill installation lands in a tmp tree, not the real user home.
"""
from __future__ import annotations
import asyncio
import json
from pathlib import Path
from typing import Any
import pytest
from framework.host.event_bus import EventBus
from framework.llm.provider import ToolUse
from framework.loader.tool_registry import ToolRegistry
from framework.tools.queen_lifecycle_tools import register_queen_lifecycle_tools
# ---------------------------------------------------------------------------
# Fixtures + helpers
# ---------------------------------------------------------------------------
class _FakeSession:
def __init__(self, sid: str = "session_test_create_colony"):
self.id = sid
self.colony = None
self.colony_runtime = None
self.event_bus = EventBus()
self.worker_path = None
self.available_triggers: dict = {}
self.active_trigger_ids: set = set()
def _make_executor():
"""Build a tool executor with create_colony registered."""
registry = ToolRegistry()
session = _FakeSession()
register_queen_lifecycle_tools(registry, session=session, session_id=session.id)
return registry.get_executor(), session
async def _call(executor, **inputs) -> dict:
result = executor(
ToolUse(id="tu_create_colony", name="create_colony", input=inputs)
)
if asyncio.iscoroutine(result):
result = await result
return json.loads(result.content)
@pytest.fixture
def patched_home(tmp_path, monkeypatch):
"""Redirect $HOME so ~/.hive/skills/ lands in tmp_path."""
monkeypatch.setenv("HOME", str(tmp_path))
return tmp_path
@pytest.fixture
def patched_fork(monkeypatch):
"""Stub out fork_session_into_colony so we don't need a real queen."""
calls: list[dict] = []
async def _stub_fork(*, session: Any, colony_name: str, task: str) -> dict:
calls.append({"session": session, "colony_name": colony_name, "task": task})
return {
"colony_path": f"/tmp/fake_colonies/{colony_name}",
"colony_name": colony_name,
"queen_session_id": "session_fake_fork_id",
"is_new": True,
}
monkeypatch.setattr(
"framework.server.routes_execution.fork_session_into_colony",
_stub_fork,
)
return calls
def _write_skill(
root: Path,
*,
dir_name: str,
fm_name: str,
description: str = "Default test skill description with enough text.",
body: str = "## Body\n\nOperational details go here.\n",
) -> Path:
"""Write a valid skill folder under ``root`` and return its path."""
skill_dir = root / dir_name
skill_dir.mkdir(parents=True, exist_ok=True)
skill_md = skill_dir / "SKILL.md"
skill_md.write_text(
"---\n"
f"name: {fm_name}\n"
f'description: "{description}"\n'
"---\n\n"
f"{body}",
encoding="utf-8",
)
return skill_dir
# ---------------------------------------------------------------------------
# Happy path
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_happy_path_emits_colony_created_event(
tmp_path: Path, patched_home: Path, patched_fork: list[dict]
) -> None:
"""Successful create_colony must publish a COLONY_CREATED event."""
from framework.host.event_bus import AgentEvent, EventType
executor, session = _make_executor()
received: list[AgentEvent] = []
async def _on_colony_created(event: AgentEvent) -> None:
received.append(event)
session.event_bus.subscribe(
event_types=[EventType.COLONY_CREATED],
handler=_on_colony_created,
)
skill_src = _write_skill(
tmp_path / "scratch", dir_name="my-skill", fm_name="my-skill"
)
skill_src.parent.mkdir(parents=True, exist_ok=True)
# Re-create after parent mkdir
skill_src = _write_skill(
tmp_path / "scratch", dir_name="my-skill", fm_name="my-skill"
)
payload = await _call(
executor,
colony_name="event_check",
task="t",
skill_path=str(skill_src),
)
assert payload.get("status") == "created", payload
assert len(received) == 1
ev = received[0]
assert ev.type == EventType.COLONY_CREATED
assert ev.data.get("colony_name") == "event_check"
assert ev.data.get("skill_name") == "my-skill"
assert ev.data.get("is_new") is True
@pytest.mark.asyncio
async def test_happy_path_external_folder_is_copied_into_skills_root(
tmp_path: Path, patched_home: Path, patched_fork: list[dict]
) -> None:
"""Skill authored outside ~/.hive/skills/ is copied in on install."""
executor, session = _make_executor()
# Queen authors skill in a scratch dir, not under ~/.hive/skills/
scratch = tmp_path / "scratch"
scratch.mkdir()
skill_src = _write_skill(
scratch,
dir_name="honeycomb-api-protocol",
fm_name="honeycomb-api-protocol",
description=(
"How to query the HoneyComb staging API for ticker, pool, "
"and trade data. Covers auth, pagination, pool detail "
"shape. Use when fetching market data."
),
body=(
"## HoneyComb API Operational Protocol\n\n"
"Auth: Bearer token from ~/.hive/credentials/honeycomb.json.\n"
"Pagination: ?page=1&page_size=50 (max 50 per page).\n"
"Endpoints:\n"
"- /api/ticker — list tickers\n"
"- /api/ticker/{id} — pool detail\n"
),
)
payload = await _call(
executor,
colony_name="honeycomb_research",
task=(
"Build a daily honeycomb market report covering top gainers, "
"losers, volume leaders, and category breakdowns."
),
skill_path=str(skill_src),
)
assert payload.get("status") == "created", f"Tool error: {payload}"
assert payload["colony_name"] == "honeycomb_research"
assert payload["skill_name"] == "honeycomb-api-protocol"
# The skill was installed under ~/.hive/skills/
installed = patched_home / ".hive" / "skills" / "honeycomb-api-protocol" / "SKILL.md"
assert installed.exists()
assert "HoneyComb API Operational Protocol" in installed.read_text(encoding="utf-8")
# Fork was called with the right args
assert len(patched_fork) == 1
assert patched_fork[0]["colony_name"] == "honeycomb_research"
assert "honeycomb market report" in patched_fork[0]["task"]
assert patched_fork[0]["session"] is session
@pytest.mark.asyncio
async def test_happy_path_in_place_authored_skill(
patched_home: Path, patched_fork: list[dict]
) -> None:
"""Skill authored directly at ~/.hive/skills/{name}/ is accepted in-place."""
executor, _ = _make_executor()
skills_root = patched_home / ".hive" / "skills"
skills_root.mkdir(parents=True)
skill_src = _write_skill(
skills_root,
dir_name="in-place-skill",
fm_name="in-place-skill",
description="An in-place skill.",
body="Contents that are already at the right location." * 3,
)
payload = await _call(
executor,
colony_name="in_place_colony",
task="task text",
skill_path=str(skill_src),
)
assert payload.get("status") == "created", payload
installed = skills_root / "in-place-skill" / "SKILL.md"
assert installed.exists()
assert len(patched_fork) == 1
# ---------------------------------------------------------------------------
# Validation failures
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_missing_skill_path_rejected(patched_home, patched_fork) -> None:
executor, _ = _make_executor()
payload = await _call(
executor,
colony_name="ok_name",
task="t",
skill_path=str(patched_home / "does_not_exist"),
)
assert "error" in payload
assert "does not exist" in payload["error"]
assert len(patched_fork) == 0
@pytest.mark.asyncio
async def test_skill_path_is_file_not_directory_rejected(
tmp_path, patched_home, patched_fork
) -> None:
executor, _ = _make_executor()
bogus = tmp_path / "not-a-dir.md"
bogus.write_text("hi", encoding="utf-8")
payload = await _call(
executor,
colony_name="ok_name",
task="t",
skill_path=str(bogus),
)
assert "error" in payload
assert "must be a directory" in payload["error"]
assert len(patched_fork) == 0
@pytest.mark.asyncio
async def test_skill_missing_skill_md_rejected(
tmp_path, patched_home, patched_fork
) -> None:
executor, _ = _make_executor()
folder = tmp_path / "no-skill-md"
folder.mkdir()
payload = await _call(
executor,
colony_name="ok_name",
task="t",
skill_path=str(folder),
)
assert "error" in payload
assert "SKILL.md" in payload["error"]
assert len(patched_fork) == 0
@pytest.mark.asyncio
async def test_skill_md_missing_frontmatter_marker_rejected(
tmp_path, patched_home, patched_fork
) -> None:
executor, _ = _make_executor()
folder = tmp_path / "broken-fm"
folder.mkdir()
(folder / "SKILL.md").write_text(
"no frontmatter here, just body\n", encoding="utf-8"
)
payload = await _call(
executor,
colony_name="ok_name",
task="t",
skill_path=str(folder),
)
assert "error" in payload
assert "frontmatter" in payload["error"]
assert len(patched_fork) == 0
@pytest.mark.asyncio
async def test_skill_md_missing_description_rejected(
tmp_path, patched_home, patched_fork
) -> None:
executor, _ = _make_executor()
folder = tmp_path / "no-description"
folder.mkdir()
(folder / "SKILL.md").write_text(
"---\nname: no-description\n---\n\nbody\n",
encoding="utf-8",
)
payload = await _call(
executor,
colony_name="ok_name",
task="t",
skill_path=str(folder),
)
assert "error" in payload
assert "description" in payload["error"]
assert len(patched_fork) == 0
@pytest.mark.asyncio
async def test_directory_name_mismatch_with_frontmatter_rejected(
tmp_path, patched_home, patched_fork
) -> None:
executor, _ = _make_executor()
folder = tmp_path / "wrong-dir-name"
folder.mkdir()
(folder / "SKILL.md").write_text(
'---\nname: correct-name\ndescription: "d"\n---\n\nbody\n',
encoding="utf-8",
)
payload = await _call(
executor,
colony_name="ok_name",
task="t",
skill_path=str(folder),
)
assert "error" in payload
assert "does not match" in payload["error"]
assert len(patched_fork) == 0
@pytest.mark.asyncio
async def test_invalid_colony_name_rejected(tmp_path, patched_home, patched_fork) -> None:
executor, _ = _make_executor()
skill_src = _write_skill(
tmp_path, dir_name="valid-skill", fm_name="valid-skill"
)
payload = await _call(
executor,
colony_name="NotValid-Colony",
task="t",
skill_path=str(skill_src),
)
assert "error" in payload
assert "colony_name" in payload["error"]
assert len(patched_fork) == 0
@pytest.mark.asyncio
async def test_fork_failure_keeps_installed_skill(
tmp_path, patched_home, monkeypatch
) -> None:
"""If the fork raises, the installed skill stays under ~/.hive/skills/."""
async def _failing_fork(**kwargs):
raise RuntimeError("simulated fork crash")
monkeypatch.setattr(
"framework.server.routes_execution.fork_session_into_colony",
_failing_fork,
)
executor, _ = _make_executor()
skill_src = _write_skill(
tmp_path, dir_name="durable-skill", fm_name="durable-skill"
)
payload = await _call(
executor,
colony_name="will_fail",
task="t",
skill_path=str(skill_src),
)
assert "error" in payload
assert "fork failed" in payload["error"]
assert "skill_installed" in payload
installed = patched_home / ".hive" / "skills" / "durable-skill" / "SKILL.md"
assert installed.exists()
assert "hint" in payload
@@ -0,0 +1,265 @@
"""Phase 4 test: run_parallel_workers tool fans out through session.colony.
End-to-end coverage of the queen-side parallel-worker tool:
1. Build a real ``ColonyRuntime`` (the Phase 1 + 2 unified runtime).
2. Stand up the queen lifecycle tools registered against a fake session
that exposes ``session.colony``.
3. Invoke the ``run_parallel_workers`` tool with three task specs whose
workers each call ``report_to_parent`` with structured payloads.
4. Assert that the tool returns aggregated reports in the same order as
the input tasks and that all workers ran in parallel under
``{storage}/workers/{worker_id}/``.
"""
from __future__ import annotations
import asyncio
import json
from collections.abc import AsyncIterator
from pathlib import Path
from typing import Any
import pytest
from framework.agent_loop.types import AgentSpec
from framework.host.colony_runtime import ColonyRuntime
from framework.host.event_bus import EventBus
from framework.llm.provider import LLMProvider, LLMResponse, Tool, ToolResult, ToolUse
from framework.llm.stream_events import FinishEvent, ToolCallEvent
from framework.loader.tool_registry import ToolRegistry
from framework.schemas.goal import Goal
from framework.tools.queen_lifecycle_tools import register_queen_lifecycle_tools
# ---------------------------------------------------------------------------
# Mock LLM that routes scenarios by task text in the first user message
# ---------------------------------------------------------------------------
class _ByTaskMockLLM(LLMProvider):
def __init__(self, by_task: dict[str, list]):
self.by_task = by_task
async def stream(
self,
messages: list[dict[str, Any]],
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
) -> AsyncIterator:
first_user = ""
for m in messages:
if m.get("role") == "user":
content = m.get("content", "")
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
content = block.get("text", "")
break
first_user = str(content)
break
for key, events in self.by_task.items():
if key in first_user:
for ev in events:
yield ev
return
def complete(self, messages, system="", **kwargs) -> LLMResponse:
return LLMResponse(content="", model="mock", stop_reason="stop")
def _report(status: str, summary: str, data: dict | None = None) -> list:
return [
ToolCallEvent(
tool_use_id="report_1",
tool_name="report_to_parent",
tool_input={"status": status, "summary": summary, "data": data or {}},
),
FinishEvent(stop_reason="tool_calls", input_tokens=10, output_tokens=5, model="mock"),
]
def _stub_executor(tool_use: ToolUse) -> ToolResult:
return ToolResult(tool_use_id=tool_use.tool_use_id, content="ok", is_error=False)
# ---------------------------------------------------------------------------
# Test
# ---------------------------------------------------------------------------
class _FakeSession:
"""Minimal session-like object exposing ``colony`` for the tool."""
def __init__(self, colony: ColonyRuntime, session_id: str):
self.colony = colony
self.id = session_id
# Fields the tool registration may touch even if our test path
# doesn't exercise them.
self.colony_runtime = None
self.event_bus = colony.event_bus
self.worker_path = None
self.available_triggers = {}
self.active_trigger_ids = set()
@pytest.mark.asyncio
async def test_run_parallel_workers_tool_fans_out_and_aggregates(
tmp_path: Path,
) -> None:
bus = EventBus()
llm = _ByTaskMockLLM(
by_task={
"fetch-A": _report("success", "A done", {"rows": 10}),
"fetch-B": _report("success", "B done", {"rows": 20}),
"fetch-C": _report("failed", "C broke", {"error_code": 503}),
}
)
colony = ColonyRuntime(
agent_spec=AgentSpec(
id="test_colony",
name="Test Colony",
description="Phase 4 test colony.",
system_prompt="You are a test agent.",
agent_type="event_loop",
output_keys=[],
tool_access_policy="all",
),
goal=Goal(id="g", name="g", description="g"),
storage_path=tmp_path / "colony",
llm=llm,
tools=[],
tool_executor=_stub_executor,
event_bus=bus,
colony_id="phase4_test",
pipeline_stages=[],
)
await colony.start()
session = _FakeSession(colony, "phase4_test")
registry = ToolRegistry()
register_queen_lifecycle_tools(registry, session=session, session_id=session.id)
try:
# Tool exists in the registry
tools = registry.get_tools()
assert "run_parallel_workers" in tools
# Invoke it via the registered executor
executor = registry.get_executor()
tool_use = ToolUse(
id="tu_run_parallel",
name="run_parallel_workers",
input={
"tasks": [
{"task": "fetch-A"},
{"task": "fetch-B"},
{"task": "fetch-C"},
],
"timeout": 10.0,
},
)
result = executor(tool_use)
if asyncio.iscoroutine(result):
result = await result
assert not result.is_error, f"Tool errored: {result.content}"
payload = json.loads(result.content)
assert payload["worker_count"] == 3
reports = payload["reports"]
assert len(reports) == 3
# Reports come back in the same order as the input tasks
statuses = [r["status"] for r in reports]
summaries = [r["summary"] for r in reports]
assert statuses == ["success", "success", "failed"]
assert summaries == ["A done", "B done", "C broke"]
# Each worker landed under {storage}/workers/{worker_id}/
worker_root = tmp_path / "colony" / "workers"
assert worker_root.exists()
worker_dirs = list(worker_root.iterdir())
assert len(worker_dirs) == 3
finally:
await colony.stop()
@pytest.mark.asyncio
async def test_run_parallel_workers_returns_error_when_no_colony() -> None:
"""If session.colony is None the tool returns a structured error, not a crash."""
class _SessionWithoutColony:
colony = None
id = "no_colony"
colony_runtime = None
event_bus = EventBus()
worker_path = None
available_triggers: dict = {}
active_trigger_ids: set = set()
registry = ToolRegistry()
register_queen_lifecycle_tools(
registry,
session=_SessionWithoutColony(),
session_id="no_colony",
)
executor = registry.get_executor()
tool_use = ToolUse(
id="tu_no_colony",
name="run_parallel_workers",
input={"tasks": [{"task": "anything"}]},
)
result = executor(tool_use)
if asyncio.iscoroutine(result):
result = await result
payload = json.loads(result.content)
assert "error" in payload
assert "ColonyRuntime" in payload["error"]
@pytest.mark.asyncio
async def test_run_parallel_workers_validates_tasks_input() -> None:
"""Empty / non-list / missing-task-string inputs return structured errors."""
bus = EventBus()
colony = ColonyRuntime(
agent_spec=AgentSpec(
id="t",
name="t",
description="t",
system_prompt="t",
agent_type="event_loop",
),
goal=Goal(id="g", name="g", description="g"),
storage_path=Path("/tmp/_phase4_validation_test_colony"),
llm=_ByTaskMockLLM({}),
tools=[],
tool_executor=_stub_executor,
event_bus=bus,
colony_id="phase4_validation",
pipeline_stages=[],
)
await colony.start()
session = _FakeSession(colony, "phase4_validation")
registry = ToolRegistry()
register_queen_lifecycle_tools(registry, session=session, session_id=session.id)
executor = registry.get_executor()
async def _call(payload: dict) -> dict:
r = executor(
ToolUse(id="tu", name="run_parallel_workers", input=payload)
)
if asyncio.iscoroutine(r):
r = await r
return json.loads(r.content)
try:
# Empty list
assert "error" in await _call({"tasks": []})
# Missing task string
assert "error" in await _call({"tasks": [{"data": {}}]})
finally:
await colony.stop()
@@ -0,0 +1,116 @@
"""Phase 2 wiring test: SessionManager._start_unified_colony_runtime.
Verifies that after a queen-mode session is started, ``session.colony``
is a real, running ``ColonyRuntime`` sharing the queen's event bus and
LLM, and that workers spawned through it land on disk under
``{queen_dir}/workers/{worker_id}/`` (NOT in the process CWD).
We bypass ``create_queen`` by stashing the tools directly on the session
and calling the helper, so the test is decoupled from queen orchestration.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from framework.host.colony_runtime import ColonyRuntime
from framework.host.event_bus import EventBus
from framework.server.session_manager import Session, SessionManager
@pytest.mark.asyncio
async def test_start_unified_colony_runtime_creates_real_colony(
tmp_path: Path,
) -> None:
"""The helper builds a ColonyRuntime, starts it, and stashes it on session.colony."""
bus = EventBus()
session = Session(
id="session_phase2_test",
event_bus=bus,
llm=object(), # not invoked in this test
loaded_at=0.0,
)
# _start_unified_colony_runtime reads these — usually create_queen
# stashes them, but here we set them directly.
session._queen_tools = [] # type: ignore[attr-defined]
session._queen_tool_executor = None # type: ignore[attr-defined]
queen_dir = tmp_path / "queens" / "default" / "sessions" / session.id
queen_dir.mkdir(parents=True)
manager = SessionManager()
await manager._start_unified_colony_runtime(session, queen_dir)
try:
assert session.colony is not None
assert isinstance(session.colony, ColonyRuntime)
assert session.colony.is_running
assert session.colony.colony_id == session.id
# Shares the session's event bus so SSE picks up worker events
assert session.colony.event_bus is bus
finally:
await session.colony.stop()
@pytest.mark.asyncio
async def test_unified_colony_workers_land_under_queen_dir(
tmp_path: Path,
) -> None:
"""Workers spawned via the unified runtime live under {queen_dir}/workers/."""
bus = EventBus()
session = Session(
id="session_worker_storage",
event_bus=bus,
llm=object(),
loaded_at=0.0,
)
session._queen_tools = [] # type: ignore[attr-defined]
session._queen_tool_executor = None # type: ignore[attr-defined]
queen_dir = tmp_path / "queen_storage"
queen_dir.mkdir()
manager = SessionManager()
await manager._start_unified_colony_runtime(session, queen_dir)
try:
# Spawn a worker (it will start an AgentLoop with the dummy LLM
# and crash quickly — we don't care, we only care about the
# worker storage dir being created in the right place).
ids = await session.colony.spawn(task="placeholder task", count=1)
worker_dir = queen_dir / "workers" / ids[0]
assert worker_dir.exists()
assert (worker_dir / "conversations").exists() or worker_dir.exists()
# And critically — nothing leaked to the process CWD
assert not (Path.cwd() / "conversations" / "parts").exists()
finally:
await session.colony.stop()
@pytest.mark.asyncio
async def test_stop_session_stops_unified_colony(tmp_path: Path) -> None:
"""stop_session must call colony.stop() so timers/storage release cleanly."""
bus = EventBus()
session = Session(
id="session_stop_test",
event_bus=bus,
llm=object(),
loaded_at=0.0,
)
session._queen_tools = [] # type: ignore[attr-defined]
session._queen_tool_executor = None # type: ignore[attr-defined]
queen_dir = tmp_path / "stop_q"
queen_dir.mkdir()
manager = SessionManager()
await manager._start_unified_colony_runtime(session, queen_dir)
manager._sessions[session.id] = session
colony = session.colony
assert colony is not None and colony.is_running
await manager.stop_session(session.id)
assert session.colony is None
assert not colony.is_running
+130
View File
@@ -0,0 +1,130 @@
"""Phase 5 test: SSE filter drops worker noise from queen DM stream.
The queen DM SSE handler must drop events from parallel-worker streams
(``stream_id="worker:{uuid}"``) so that worker LLM deltas, tool calls,
and iteration events do not flood the user's chat tab. A small allowlist
of worker events is still passed through (SUBAGENT_REPORT,
EXECUTION_COMPLETED, EXECUTION_FAILED) so the frontend can render
fan-out / fan-in lifecycle.
We test the pure ``_is_worker_noise`` predicate by importing the SSE
handler module and exercising the inner function via a closure helper.
"""
from __future__ import annotations
from framework.host.event_bus import EventType
def _make_evt(stream_id: str, evt_type: str) -> dict:
return {"stream_id": stream_id, "type": evt_type}
def test_queen_stream_events_pass_through() -> None:
"""Events from non-worker streams must always pass."""
from framework.server.routes_events import _WORKER_EVENT_ALLOWLIST # noqa: F401
# Recreate the predicate locally — it's a closure inside the handler,
# so we mirror its logic here. If the handler's logic changes, this
# test must be updated to match.
def is_worker_noise(evt: dict) -> bool:
sid = evt.get("stream_id") or ""
if not sid.startswith("worker:"):
return False
return evt.get("type") not in {
EventType.SUBAGENT_REPORT.value,
EventType.EXECUTION_COMPLETED.value,
EventType.EXECUTION_FAILED.value,
}
# Queen events
assert not is_worker_noise(_make_evt("queen", EventType.LLM_TEXT_DELTA.value))
assert not is_worker_noise(_make_evt("queen", EventType.TOOL_CALL_STARTED.value))
assert not is_worker_noise(_make_evt("overseer", EventType.LLM_TEXT_DELTA.value))
assert not is_worker_noise(_make_evt("", EventType.LLM_TEXT_DELTA.value))
assert not is_worker_noise(_make_evt(None, EventType.LLM_TEXT_DELTA.value))
def test_worker_llm_and_tool_events_are_filtered() -> None:
def is_worker_noise(evt: dict) -> bool:
sid = evt.get("stream_id") or ""
if not sid.startswith("worker:"):
return False
return evt.get("type") not in {
EventType.SUBAGENT_REPORT.value,
EventType.EXECUTION_COMPLETED.value,
EventType.EXECUTION_FAILED.value,
}
assert is_worker_noise(_make_evt("worker:abc123", EventType.LLM_TEXT_DELTA.value))
assert is_worker_noise(_make_evt("worker:abc123", EventType.TOOL_CALL_STARTED.value))
assert is_worker_noise(_make_evt("worker:xyz", EventType.TOOL_CALL_COMPLETED.value))
assert is_worker_noise(_make_evt("worker:xyz", EventType.NODE_LOOP_ITERATION.value))
def test_worker_lifecycle_and_report_events_pass_through() -> None:
def is_worker_noise(evt: dict) -> bool:
sid = evt.get("stream_id") or ""
if not sid.startswith("worker:"):
return False
return evt.get("type") not in {
EventType.SUBAGENT_REPORT.value,
EventType.EXECUTION_COMPLETED.value,
EventType.EXECUTION_FAILED.value,
}
assert not is_worker_noise(_make_evt("worker:abc", EventType.SUBAGENT_REPORT.value))
assert not is_worker_noise(_make_evt("worker:abc", EventType.EXECUTION_COMPLETED.value))
assert not is_worker_noise(_make_evt("worker:abc", EventType.EXECUTION_FAILED.value))
def test_handler_module_exposes_allowlist_constant() -> None:
"""Smoke test that the constant the handler closes over still exists."""
from framework.server.routes_events import _WORKER_EVENT_ALLOWLIST
assert EventType.SUBAGENT_REPORT.value in _WORKER_EVENT_ALLOWLIST
assert EventType.EXECUTION_COMPLETED.value in _WORKER_EVENT_ALLOWLIST
assert EventType.EXECUTION_FAILED.value in _WORKER_EVENT_ALLOWLIST
def test_loaded_worker_stream_id_singular_passes_through() -> None:
"""The loaded primary worker uses stream_id='worker' (no colon).
This is the stream tag run_agent_with_input passes to
ColonyRuntime.spawn. The SSE filter must NOT confuse it with the
parallel-fan-out 'worker:{uuid}' tag otherwise the user's main
chat-visible workstream gets dropped from the queen DM.
Regression test for: 'why worker message no longer goes to the
frontend' after migrating run_agent_with_input from
AgentHost.trigger to ColonyRuntime.spawn.
"""
from framework.server.routes_events import _is_worker_noise
# All of these are events from the LOADED worker (single primary
# worker spawned via run_agent_with_input). They must pass the
# filter — including high-frequency LLM deltas and tool calls,
# because the queen DM IS the visible chat for this worker.
for evt_type in [
EventType.LLM_TEXT_DELTA.value,
EventType.TOOL_CALL_STARTED.value,
EventType.TOOL_CALL_COMPLETED.value,
EventType.NODE_LOOP_ITERATION.value,
EventType.CLIENT_OUTPUT_DELTA.value,
EventType.EXECUTION_STARTED.value,
EventType.EXECUTION_COMPLETED.value,
]:
evt = {"stream_id": "worker", "type": evt_type}
assert not _is_worker_noise(evt), (
f"loaded-worker event {evt_type} with stream_id='worker' was "
"filtered as worker noise — this regresses the queen DM "
"primary worker chat path"
)
# Sanity: the parallel fan-out tag is still filtered.
assert _is_worker_noise(
{
"stream_id": "worker:abc123",
"type": EventType.LLM_TEXT_DELTA.value,
}
)