feat: colony creation with skill

This commit is contained in:
Timothy
2026-04-10 15:09:27 -07:00
parent 1335a15341
commit c23c274ac7
10 changed files with 1130 additions and 23 deletions
+8 -1
View File
@@ -6,7 +6,14 @@
"Read(//^ @pytest.mark.asyncio/{getline n; print NR\": \"n} /^ def test_/**)",
"Bash(python3)",
"Bash(grep -nE 'Tool\\\\\\(\\\\s*$|name=\"[a-z_]+\",' core/framework/tools/queen_lifecycle_tools.py)",
"Bash(awk -F'\"' '{print $2}')"
"Bash(awk -F'\"' '{print $2}')",
"Bash(grep -n \"create_colony\\\\|colony-spawn\\\\|colony_spawn\" /home/timothy/aden/hive/core/framework/agents/queen/nodes/__init__.py /home/timothy/aden/hive/core/framework/tools/*.py)",
"Bash(git stash:*)",
"Bash(python3 -c \"import sys,json; d=json.loads\\(sys.stdin.read\\(\\)\\); print\\('keys:', list\\(d.keys\\(\\)\\)[:10]\\)\")",
"Bash(python3 -c ':*)"
],
"additionalDirectories": [
"/home/timothy/.hive/skills/writing-hive-skills"
]
},
"hooks": {
+21
View File
@@ -2483,6 +2483,27 @@ class AgentLoop(AgentProtocol):
# --- Framework-level ask_user handling ---
ask_user_prompt = tc.tool_input.get("question", "")
raw_options = tc.tool_input.get("options", None)
# Self-heal: some model families (notably the queen
# profile prompt poisoning the output style) cram
# the options inside the question string as a
# pseudo-XML blob like:
#
# "What do you want to do?</question>\n_OPTIONS:
# [\"De-risk\", \"Add\", \"Short\"]"
#
# When that happens the question text leaks
# </question> and _OPTIONS: into the chat UI and
# the buttons never appear. Detect + repair.
from framework.agent_loop.internals.synthetic_tools import (
sanitize_ask_user_inputs,
)
ask_user_prompt, recovered_options = sanitize_ask_user_inputs(
ask_user_prompt, raw_options
)
if recovered_options is not None and raw_options is None:
raw_options = recovered_options
# Defensive: ensure options is a list of strings.
# Smaller models sometimes send a string instead of
# an array — try to recover gracefully.
@@ -15,6 +15,82 @@ from typing import Any
from framework.llm.provider import Tool, ToolResult
def sanitize_ask_user_inputs(
raw_question: Any,
raw_options: Any,
) -> tuple[str, list[str] | None]:
"""Self-heal a malformed ``ask_user`` tool call.
Some model families (notably when the system prompt teaches them
XML-ish scratchpad tags like ``<relationship>...</relationship>``)
carry that style into tool arguments and produce calls like::
ask_user({
"question": "What now?</question>\\n_OPTIONS: [\\"A\\", \\"B\\"]"
})
Symptoms:
- The chat UI renders ``</question>`` and ``_OPTIONS: [...]`` as
literal text in the question bubble.
- No buttons appear because the real ``options`` parameter is
empty.
This function:
- Strips leading/trailing whitespace.
- Removes a trailing ``</question>`` (with optional preceding
whitespace) from the question text.
- Detects an inline ``_OPTIONS:``, ``OPTIONS:``, or ``options:``
line followed by a JSON array, parses it, and returns the
recovered list as the second element.
- Removes the parsed line from the returned question text.
Returns ``(cleaned_question, recovered_options_or_None)``. The
caller should treat the recovered list as a fallback only when
the model did not also supply a real ``options`` array.
"""
import json as _json
import re as _re
if raw_question is None:
return "", None
q = str(raw_question)
# Strip a stray </question> tag (case-insensitive, with optional
# preceding whitespace) anywhere in the string. This is the most
# common failure mode and never represents valid content.
q = _re.sub(r"\s*</\s*question\s*>\s*", "\n", q, flags=_re.IGNORECASE)
# Look for an inline options line. Match _OPTIONS, OPTIONS, options
# (with or without leading underscore), followed by ':' or '=', then
# a JSON array on the same line OR on the next line.
inline_options_re = _re.compile(
r"(?im)^\s*_?options\s*[:=]\s*(\[.*?\])\s*$",
_re.DOTALL,
)
recovered: list[str] | None = None
match = inline_options_re.search(q)
if match is not None:
try:
parsed = _json.loads(match.group(1))
if isinstance(parsed, list):
cleaned = [str(o).strip() for o in parsed if str(o).strip()]
if 1 <= len(cleaned) <= 8:
recovered = cleaned
except (ValueError, TypeError):
pass
if recovered is not None:
# Remove the parsed line so it doesn't leak into the
# rendered question text.
q = inline_options_re.sub("", q, count=1)
# Strip any final whitespace / leftover blank lines from the
# question after removals.
q = _re.sub(r"\n{3,}", "\n\n", q).strip()
return q, recovered
def build_ask_user_tool() -> Tool:
"""Build the synthetic ask_user tool for explicit user-input requests.
@@ -28,7 +104,20 @@ def build_ask_user_tool() -> Tool:
"You MUST call this tool whenever you need the user's response. "
"Always call it after greeting the user, asking a question, or "
"requesting approval. Do NOT call it for status updates or "
"summaries that don't require a response. "
"summaries that don't require a response.\n\n"
"STRUCTURE RULES (CRITICAL):\n"
"- The 'question' field is PLAIN TEXT shown to the user. Do NOT "
"include XML tags, pseudo-tags like </question>, or option lists "
"in the question string. The UI does not parse them — they "
"render as raw text and look broken.\n"
"- The 'options' parameter is the ONLY way to render buttons. "
"If you want buttons, put them in the 'options' array, not in "
"the question string. Do NOT write 'OPTIONS: [...]', "
"'_options: [...]', or any inline list inside 'question'.\n"
"- The question text must read as a single clean prompt with "
"no markup. Example: 'What would you like to do?' — not "
"'What would you like to do?</question>'.\n\n"
"USAGE:\n"
"Always include 2-3 predefined options. The UI automatically "
"appends an 'Other' free-text input after your options, so NEVER "
"include catch-all options like 'Custom idea', 'Something else', "
@@ -39,11 +128,14 @@ def build_ask_user_tool() -> Tool:
"free-text input. "
"The ONLY exception: omit options when the question demands a "
"free-form answer the user must type out (e.g. 'Describe your "
"agent idea', 'Paste the error message'). "
"agent idea', 'Paste the error message').\n\n"
"CORRECT EXAMPLE:\n"
'{"question": "What would you like to do?", "options": '
'["Build a new agent", "Modify existing agent", "Run tests"]} '
"Free-form example: "
'{"question": "Describe the agent you want to build."}'
'["Build a new agent", "Modify existing agent", "Run tests"]}\n\n'
"FREE-FORM EXAMPLE:\n"
'{"question": "Describe the agent you want to build."}\n\n'
"WRONG (do NOT do this — buttons will not render):\n"
'{"question": "What now?</question>\\n_OPTIONS: [\\"A\\", \\"B\\"]"}'
),
parameters={
"type": "object",
@@ -84,6 +84,10 @@ _QUEEN_PLANNING_TOOLS = [
# Parallel fan-out — use directly for one-off batch work the user
# wants RIGHT NOW (without first designing an agent for it).
"run_parallel_workers",
# Fork this session into a colony, writing a learned-skill file
# under ~/.hive/skills/ first so the new colony inherits the
# session's knowledge.
"create_colony",
]
# Building phase: full coding + agent construction tools.
@@ -181,6 +185,8 @@ _QUEEN_INDEPENDENT_TOOLS = [
"undo_changes",
# Parallel fan-out (Phase 4 unified ColonyRuntime)
"run_parallel_workers",
# Fork to colony — captures session knowledge as a skill first
"create_colony",
]
@@ -685,6 +691,26 @@ write a single user-facing synthesis on your next turn. Prefer this over \
designing a draft when the work is one-shot and the user wants results, not \
a saved agent.
## Forking the session into a colony (with session-knowledge capture)
Two-step flow:
1. AUTHOR THE SKILL FIRST. Use write_file to create a skill folder \
(recommended location: `~/.hive/skills/{skill-name}/SKILL.md`) \
capturing what you learned during THIS session API endpoints, \
auth flow, response shapes, gotchas, conventions, query patterns. \
The SKILL.md needs YAML frontmatter with `name` (matching the \
directory name) and `description` (1-1024 chars including trigger \
keywords), followed by a markdown body. Optional subdirs: \
scripts/, references/, assets/. Read your writing-hive-skills \
default skill for the full spec.
2. create_colony(colony_name, task, skill_path) Validate the skill \
folder, install it under ~/.hive/skills/ if it's not already there, \
and fork this session into a new colony. The new colony's worker \
(which inherits ~/.hive/skills/) discovers the skill on its first \
scan, so it's born already knowing what you learned instead of \
re-doing your discovery work from scratch. ALWAYS prefer \
create_colony over a raw fork when ending a session that uncovered \
reusable operational knowledge.
## Workflow summary
1. Understand requirements discover tools design the layout
2. Call save_agent_draft() to create visual draft present to user
+43 -17
View File
@@ -650,15 +650,6 @@ async def handle_colony_spawn(request: web.Request) -> web.Response:
Body: {"colony_name": "...", "task": "..."}
Returns: {"colony_path": "...", "colony_name": "...", "is_new": bool,
"queen_session_id": "..."}
The clone:
1. Creates a colony directory with a single worker config (``worker.json``)
holding the queen's current tools, prompts, skills, and loop config.
2. Duplicates the queen's full session (conversations + events) into a new
queen-session directory assigned to the colony so that cold-restoring
the colony resumes with the queen's entire conversation history.
3. Multiple independent sessions can be created against the same colony,
giving parallel execution capacity without separate worker configs.
"""
session, err = resolve_session(request)
if err:
@@ -685,6 +676,43 @@ async def handle_colony_spawn(request: web.Request) -> web.Response:
status=400,
)
try:
result = await fork_session_into_colony(
session=session,
colony_name=colony_name,
task=task,
)
except Exception as e:
logger.exception("colony_spawn fork failed")
return web.json_response({"error": f"colony fork failed: {e}"}, status=500)
return web.json_response(result)
async def fork_session_into_colony(
*,
session: Any,
colony_name: str,
task: str,
) -> dict:
"""Fork a queen session into a colony directory.
Extracted from ``handle_colony_spawn`` so the queen-side
``create_colony`` tool can call it directly without going through
HTTP. The caller is responsible for validating ``colony_name``
against the lowercase-alphanumeric regex.
The fork:
1. Creates a colony directory with a single worker config (``worker.json``)
holding the queen's current tools, prompts, skills, and loop config.
2. Duplicates the queen's full session (conversations + events) into a new
queen-session directory assigned to the colony so that cold-restoring
the colony resumes with the queen's entire conversation history.
3. Multiple independent sessions can be created against the same colony,
giving parallel execution capacity without separate worker configs.
Returns ``{"colony_path", "colony_name", "queen_session_id", "is_new"}``.
"""
import asyncio
import json
import shutil
@@ -905,14 +933,12 @@ async def handle_colony_spawn(request: web.Request) -> web.Response:
len(queen_tools),
colony_session_id,
)
return web.json_response(
{
"colony_path": str(colony_dir),
"colony_name": colony_name,
"queen_session_id": colony_session_id,
"is_new": is_new,
}
)
return {
"colony_path": str(colony_dir),
"colony_name": colony_name,
"queen_session_id": colony_session_id,
"is_new": is_new,
}
def register_routes(app: web.Application) -> None:
@@ -0,0 +1,160 @@
---
name: hive.writing-hive-skills
description: Author a new Agent Skill for a Hive agent that conforms to the Agent Skills specification (SKILL.md with YAML frontmatter, optional scripts/references/assets directories). Use when the user asks to create, scaffold, add, or package a new skill for a Hive agent.
metadata:
author: hive
type: default-skill
spec-source: https://agentskills.io/specification
---
## Operational Protocol: Writing Hive Skills
Hive agents discover skills by scanning several roots, in precedence order:
1. `<project>/.hive/skills/` — project, Hive-specific
2. `<project>/.agents/skills/` — project, cross-client
3. `~/.hive/skills/` — user, Hive-specific
4. `~/.agents/skills/` — user, cross-client
5. Framework defaults shipped in `core/framework/skills/_default_skills/`
Each skill is a directory containing a `SKILL.md`. At startup, only the frontmatter `name` + `description` of every skill is loaded; the body is loaded only when the agent activates the skill. Design for that.
### Choosing where to put a new skill
- **Project-scoped**: put under `<project>/.hive/skills/` when the skill is tied to that codebase's APIs, conventions, or infra.
- **User-scoped**: put under `~/.hive/skills/` when the skill is reusable across projects for this machine/user.
- **Framework default**: add under `core/framework/skills/_default_skills/` AND register in `framework/skills/defaults.py::SKILL_REGISTRY` only when the skill is a universal operational protocol shipped with Hive. Default skills use the `hive.<name>` naming convention and include `type: default-skill` in metadata.
### Directory layout
```
<skill-name>/
├── SKILL.md # Required
├── scripts/ # Optional — executable helpers
├── references/ # Optional — on-demand docs
└── assets/ # Optional — templates, data, images
```
Rules:
- The directory name **must** equal the `name` frontmatter field (for framework defaults, the directory is the unprefixed name, e.g. `note-taking/` for `hive.note-taking`).
- Keep `SKILL.md` under ~500 lines. Move long reference material into `references/`.
- Reference other files with relative paths from the skill root (`scripts/foo.py`, `references/API.md`). Keep references one level deep.
### SKILL.md frontmatter
Required fields:
| Field | Constraints |
|-------|-------------|
| `name` | 164 chars, `[a-z0-9-]`, no leading/trailing/consecutive hyphens. Must match the directory name. Framework defaults prefix with `hive.` |
| `description` | 11024 chars. Must describe **what** the skill does **and when to use it**. Include trigger keywords the user is likely to say. |
Optional fields:
| Field | Notes |
|-------|-------|
| `license` | License name or reference to a bundled file |
| `compatibility` | ≤500 chars. Only include if env requirements are non-trivial (network, tools, runtime) |
| `metadata` | Free-form string→string map. Namespace keys to avoid collisions. Default skills set `type: default-skill`. |
| `allowed-tools` | Experimental. Space-separated pre-approved tools, e.g. `Bash(curl:*) Bash(jq:*) Read` |
Minimal template:
```markdown
---
name: my-skill
description: One sentence on what it does. One sentence on when to use it, with concrete trigger words the agent will see in user requests.
---
# My Skill
<body>
```
### Writing a good `description`
This is the single most important field — it's the only thing the agent sees at skill-selection time.
- **Bad**: `Helps with trading.`
- **Good**: `Buy and sell shares on the HoneyComb exchange. Handles auth, slippage-protected orders, idempotent retries, and AMM output estimation. Use when placing trades or interacting with the AMM.`
Include verbs the user is likely to say (`buy`, `sell`, `place trade`) and proper nouns (`HoneyComb`, `AMM`).
### Writing the body
Structure the body for the agent, not a human reader:
1. **Lead with what the agent can't guess** — API base URLs, auth shape, project conventions, specific function names. Skip generic background ("PDFs are a document format").
2. **Show exact request/response shapes** — include JSON payloads, headers, status codes. Copy real examples rather than paraphrasing.
3. **Document failure modes** — error codes, retry rules, rate limits. This is where skills earn their keep vs. a generic agent.
4. **Give a short end-to-end example** — a "typical flow" section at the bottom anchors everything above.
Recommended sections (adapt to the domain):
- Authentication / setup
- Core operations (one per endpoint or action)
- Error reference table
- Rate limits / gotchas
- End-to-end example pattern
### Progressive disclosure
Three tiers of context cost:
1. **Always loaded** (~100 tokens per skill): `name` + `description`. Keep tight.
2. **Loaded on activation** (<5k tokens target): body of `SKILL.md`.
3. **Loaded on demand**: files under `scripts/`, `references/`, `assets/`. The agent reads these only when the body points to them.
If a section is long and only needed sometimes (e.g., a full schema dump, rarely-used edge cases), move it to `references/SOMETHING.md` and link to it from the body: `See [the error catalog](references/ERRORS.md) for the full list.`
### Scripts
Put executable helpers in `scripts/`. They should:
- Be self-contained or document dependencies in a comment header.
- Print human-readable errors to stderr and exit non-zero on failure.
- Accept arguments via CLI flags, not env vars (easier for the agent to invoke).
Reference them from the body by relative path:
```markdown
Estimate buy output with `scripts/estimate_buy.py --v-hc 1000000 --v-shares 1000000 --hc 500`.
```
For Python scripts in a Hive project, prefer `uv run scripts/foo.py ...`.
### Creating a new skill — workflow
1. Pick a `<skill-name>` (lowercase-hyphenated).
2. Decide scope: project (`<project>/.hive/skills/`), user (`~/.hive/skills/`), or framework default (`core/framework/skills/_default_skills/` + registry entry).
3. Create the directory and write `SKILL.md` with frontmatter + body.
4. Add `scripts/`, `references/`, `assets/` only if needed.
5. Validate the frontmatter: name matches dir, description is specific, no forbidden characters.
6. Validate using the Hive CLI:
```bash
uv run hive skill validate <path-to-skill-dir>
uv run hive skill doctor
```
7. Confirm discovery with `uv run hive skill list`.
8. Test by invoking a Hive agent on a task the skill should match — confirm it activates and follows the instructions.
### Registering as a framework default
When adding a skill as a shipped default:
1. Place the directory under `core/framework/skills/_default_skills/<unprefixed-name>/`.
2. Set frontmatter `name: hive.<unprefixed-name>` and `metadata.type: default-skill`.
3. Add the mapping to `SKILL_REGISTRY` in `core/framework/skills/defaults.py`:
```python
SKILL_REGISTRY: dict[str, str] = {
...
"hive.<unprefixed-name>": "<unprefixed-name>",
}
```
4. If the skill uses `{{placeholder}}` substitution, add defaults to `_SKILL_DEFAULTS` in the same file.
5. If the skill reads/writes shared buffer keys, list them in `DATA_BUFFER_KEYS`.
### What NOT to put in a skill
- Generic programming knowledge the agent already has.
- Conversation-specific state (use memory or plans instead).
- Secrets or credentials (skills are plaintext; reference env vars or credential stores).
- Deeply nested reference chains — keep everything one hop from `SKILL.md`.
+1
View File
@@ -77,6 +77,7 @@ SKILL_REGISTRY: dict[str, str] = {
"hive.quality-monitor": "quality-monitor",
"hive.error-recovery": "error-recovery",
"hive.task-decomposition": "task-decomposition",
"hive.writing-hive-skills": "writing-hive-skills",
}
# All shared buffer keys used by default skills (for permission auto-inclusion)
@@ -1181,6 +1181,314 @@ def register_queen_lifecycle_tools(
)
tools_registered += 1
# --- create_colony ---------------------------------------------------------
#
# Forks the current queen session into a colony. Requires the queen
# to have ALREADY AUTHORED a skill folder capturing what she learned
# during this session (using her write_file / edit_file tools), and
# pass the folder path to this tool. The tool validates the skill
# folder (SKILL.md exists, frontmatter has the required ``name`` +
# ``description`` fields, directory name matches frontmatter name),
# then forks. If the skill lives outside ``~/.hive/skills/`` the
# tool copies it in so the new colony's worker will discover it on
# its first skill scan.
#
# This is the codified version of the user's instruction:
#
# "When the queen agent needs to create a colony, it needs to
# write down whatever it just learned from the current session
# as an agent skill and put it in the ~/.hive/skills folder."
#
# Two-step flow for the queen LLM:
#
# 1. Author the skill with write_file (or a sequence of writes
# for scripts/references/assets subdirs) — she already knows
# the format via the writing-hive-skills default skill.
# 2. Call create_colony(colony_name, task, skill_path) pointing
# at the folder she just wrote.
import re as _re
import shutil as _shutil
_COLONY_NAME_RE = _re.compile(r"^[a-z0-9_]+$")
_SKILL_NAME_RE = _re.compile(r"^[a-z0-9-]+$")
def _validate_and_install_skill(skill_path: str) -> tuple[Path | None, str | None]:
"""Validate an authored skill folder and ensure it lives under ~/.hive/skills/.
Returns ``(installed_path, error)``. On success ``error`` is
``None`` and ``installed_path`` is the final location under
``~/.hive/skills/{name}/``. On failure ``installed_path`` is
``None`` and ``error`` is a human-readable reason suitable for
returning to the queen as a JSON error payload.
"""
if not skill_path or not isinstance(skill_path, str):
return None, "skill_path must be a non-empty string"
src = Path(skill_path).expanduser().resolve()
if not src.exists():
return None, f"skill_path does not exist: {src}"
if not src.is_dir():
return None, f"skill_path must be a directory, got file: {src}"
skill_md = src / "SKILL.md"
if not skill_md.is_file():
return None, f"skill_path has no SKILL.md at {skill_md}"
# Parse the frontmatter to pull out the name and verify
# description exists. We don't need a full YAML parser — the
# writing-hive-skills protocol is rigid enough that a line-by-line
# scan of the first frontmatter block suffices for validation.
try:
content = skill_md.read_text(encoding="utf-8")
except OSError as e:
return None, f"failed to read SKILL.md: {e}"
if not content.startswith("---"):
return None, "SKILL.md missing opening '---' frontmatter marker"
after_open = content.split("---", 2)
if len(after_open) < 3:
return None, "SKILL.md missing closing '---' frontmatter marker"
frontmatter_text = after_open[1]
fm_name: str | None = None
fm_description: str | None = None
for raw_line in frontmatter_text.splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("name:"):
fm_name = line.split(":", 1)[1].strip().strip('"').strip("'")
elif line.startswith("description:"):
fm_description = line.split(":", 1)[1].strip().strip('"').strip("'")
if not fm_name:
return None, "SKILL.md frontmatter missing 'name' field"
if not fm_description:
return None, "SKILL.md frontmatter missing 'description' field"
if not (1 <= len(fm_description) <= 1024):
return None, "SKILL.md 'description' must be 11024 chars"
if not _SKILL_NAME_RE.match(fm_name):
return None, (
f"SKILL.md 'name' field '{fm_name}' must match [a-z0-9-] "
"pattern"
)
if fm_name.startswith("-") or fm_name.endswith("-") or "--" in fm_name:
return None, (
f"SKILL.md 'name' '{fm_name}' has leading/trailing/"
"consecutive hyphens"
)
if len(fm_name) > 64:
return None, f"SKILL.md 'name' '{fm_name}' exceeds 64 chars"
# The directory basename should match the frontmatter name —
# this is the writing-hive-skills convention. We ENFORCE it
# because the skill loader uses dir names as identity.
if src.name != fm_name:
return None, (
f"skill directory name '{src.name}' does not match "
f"SKILL.md frontmatter name '{fm_name}'. Rename the "
"folder or fix the frontmatter."
)
# Install into ~/.hive/skills/{name}/ if not already there.
target_root = Path.home() / ".hive" / "skills"
target = target_root / fm_name
try:
target_root.mkdir(parents=True, exist_ok=True)
except OSError as e:
return None, f"failed to create skills root: {e}"
try:
if src.resolve() == target.resolve():
# Already in the right place — nothing to do.
return target, None
except OSError:
pass
try:
if target.exists():
# Overwrite existing — the queen is explicitly creating
# a new colony for this version, so her authored skill
# wins over any prior version. copytree with
# dirs_exist_ok handles subdirs (scripts/, references/,
# assets/) but does NOT delete files removed in the
# new version. For a clean overwrite we rmtree first.
_shutil.rmtree(target)
_shutil.copytree(src, target)
except OSError as e:
return None, f"failed to install skill into {target}: {e}"
return target, None
async def create_colony(
*,
colony_name: str,
task: str,
skill_path: str,
) -> str:
"""Create a colony after installing a pre-authored skill folder."""
if session is None:
return json.dumps({"error": "No session bound to this tool registry."})
cn = (colony_name or "").strip()
if not _COLONY_NAME_RE.match(cn):
return json.dumps(
{
"error": (
"colony_name must be lowercase alphanumeric "
"with underscores (e.g. 'honeycomb_research')."
)
}
)
installed_skill, skill_err = _validate_and_install_skill(skill_path)
if skill_err is not None:
return json.dumps(
{
"error": skill_err,
"hint": (
"Author the skill folder first using write_file "
"(and edit_file for follow-ups). The folder must "
"contain a SKILL.md with YAML frontmatter "
"{name, description} — see your "
"writing-hive-skills default skill for the "
"format. Then call create_colony again with "
"skill_path pointing at that folder."
),
}
)
logger.info(
"create_colony: installed skill from %s%s",
skill_path,
installed_skill,
)
# Fork the queen session into the colony. The fork inherits
# session.queen_ctx.skill_dirs which already includes
# ~/.hive/skills/, so the freshly installed skill is
# discovered on the worker's first scan.
try:
from framework.server.routes_execution import fork_session_into_colony
except Exception as e:
return json.dumps(
{
"error": f"fork_session_into_colony import failed: {e}",
"skill_installed": str(installed_skill),
}
)
try:
fork_result = await fork_session_into_colony(
session=session,
colony_name=cn,
task=(task or "").strip(),
)
except Exception as e:
logger.exception("create_colony: fork failed after installing skill")
return json.dumps(
{
"error": f"colony fork failed: {e}",
"skill_installed": str(installed_skill),
"hint": (
"The skill was installed but the fork failed. "
"You can retry create_colony — re-installing "
"the skill is idempotent."
),
}
)
return json.dumps(
{
"status": "created",
"colony_name": fork_result.get("colony_name", cn),
"colony_path": fork_result.get("colony_path"),
"queen_session_id": fork_result.get("queen_session_id"),
"is_new": fork_result.get("is_new", True),
"skill_installed": str(installed_skill),
"skill_name": installed_skill.name if installed_skill else None,
}
)
_create_colony_tool = Tool(
name="create_colony",
description=(
"Fork this session into a colony — but FIRST author a "
"Hive Skill folder capturing what you learned during this "
"conversation, and pass its path to this tool. The tool "
"validates the skill folder (SKILL.md present, frontmatter "
"name+description valid, directory name matches frontmatter "
"name), installs it under ~/.hive/skills/{name}/ if it's "
"not already there, and then forks the session.\n\n"
"TWO-STEP FLOW:\n\n"
" 1. Use write_file (plus edit_file / list_directory as "
" needed) to create a skill folder. The folder must "
" contain a SKILL.md with YAML frontmatter {name, "
" description} and a markdown body. Optional subdirs: "
" scripts/, references/, assets/. See your "
" writing-hive-skills default skill for the spec. We "
" recommend authoring it directly at "
" ~/.hive/skills/{skill-name}/SKILL.md so no copy is "
" needed.\n"
" 2. Call create_colony(colony_name, task, skill_path) "
" pointing at the folder you just wrote.\n\n"
"WHY THIS EXISTS: a fresh worker has zero memory of your "
"chat with the user. If you spent the session figuring out "
"an API auth flow, pagination, data shapes, and gotchas — "
"that knowledge must live in a skill, not in your private "
"context, or the worker will repeat your discovery work "
"from scratch.\n\n"
"WHAT TO PUT IN THE SKILL BODY: the operational protocol "
"the next worker needs to do this work. Include API "
"endpoints with example requests, the exact auth flow, "
"response shapes you observed, gotchas you hit (rate "
"limits, pagination quirks, edge cases), conventions you "
"settled on, and pre-baked queries/commands. Write it as "
"if onboarding a new engineer who has never seen this "
"system. Realistic target: 3002000 chars of body."
),
parameters={
"type": "object",
"properties": {
"colony_name": {
"type": "string",
"description": (
"Lowercase alphanumeric+underscore name for "
"the new colony (e.g. 'honeycomb_research')."
),
},
"task": {
"type": "string",
"description": (
"FULL self-contained task description for the "
"first worker run in the new colony. Worker "
"has zero context — include everything."
),
},
"skill_path": {
"type": "string",
"description": (
"Path to a pre-authored skill folder containing "
"SKILL.md. May be absolute or ~-expanded. The "
"directory basename MUST match the SKILL.md "
"frontmatter 'name' field. If the path is "
"outside ~/.hive/skills/ the folder is copied "
"in. Example: '~/.hive/skills/honeycomb-api-"
"protocol'."
),
},
},
"required": ["colony_name", "task", "skill_path"],
},
)
registry.register(
"create_colony",
_create_colony_tool,
lambda inputs: create_colony(**inputs),
)
tools_registered += 1
# --- switch_to_reviewing ----------------------------------------------------
async def switch_to_reviewing_tool() -> str:
+92
View File
@@ -0,0 +1,92 @@
"""Tests for ``sanitize_ask_user_inputs``.
Some model families return malformed ``ask_user`` calls that pack the
options inside the ``question`` string as pseudo-XML / inline blob.
The sanitizer self-heals those calls so the buttons still render.
"""
from __future__ import annotations
from framework.agent_loop.internals.synthetic_tools import (
sanitize_ask_user_inputs,
)
def test_clean_question_passes_through_unchanged() -> None:
q, opts = sanitize_ask_user_inputs("What's next?", None)
assert q == "What's next?"
assert opts is None
def test_strips_trailing_close_question_tag() -> None:
q, opts = sanitize_ask_user_inputs("What now?</question>", None)
assert q == "What now?"
assert opts is None
def test_strips_close_question_tag_case_insensitive_with_whitespace() -> None:
q, opts = sanitize_ask_user_inputs("What now? </QUESTION> ", None)
assert q == "What now?"
assert opts is None
def test_recovers_inline_uppercase_options() -> None:
raw = (
"What do you want to do from here?</question>\n"
'_OPTIONS: ["De-risk — trim PRLG", "Add to a position", "Open a short"]'
)
q, opts = sanitize_ask_user_inputs(raw, None)
assert q == "What do you want to do from here?"
assert opts == ["De-risk — trim PRLG", "Add to a position", "Open a short"]
def test_recovers_inline_lowercase_options() -> None:
raw = 'Pick one\noptions: ["A", "B", "C"]'
q, opts = sanitize_ask_user_inputs(raw, None)
assert q == "Pick one"
assert opts == ["A", "B", "C"]
def test_recovers_inline_underscore_options() -> None:
raw = 'Pick one\n_options: ["A", "B"]'
q, opts = sanitize_ask_user_inputs(raw, None)
assert q == "Pick one"
assert opts == ["A", "B"]
def test_recovered_options_dropped_when_not_a_list() -> None:
raw = 'Pick one\noptions: "not-a-list"'
q, opts = sanitize_ask_user_inputs(raw, None)
# The malformed inline blob is removed but no options are recovered.
assert "options" not in q.lower() or "not-a-list" in q
assert opts is None
def test_recovered_options_dropped_when_too_many() -> None:
raw = 'Pick\noptions: ["a","b","c","d","e","f","g","h","i","j"]'
q, opts = sanitize_ask_user_inputs(raw, None)
assert opts is None
def test_does_not_overwrite_real_options() -> None:
"""Sanitizer is for the question field; real options pass through untouched."""
real_options = ["X", "Y"]
q, opts = sanitize_ask_user_inputs("Plain question?", real_options)
# The function returns the recovered options as the second value;
# real_options are passed in as input only — the caller decides
# which to use. Here we verify the question is clean.
assert q == "Plain question?"
assert opts is None # nothing recovered from the question text
def test_none_question_returns_empty() -> None:
q, opts = sanitize_ask_user_inputs(None, None)
assert q == ""
assert opts is None
def test_collapses_excess_blank_lines_after_removal() -> None:
raw = 'What?\n\n\n\noptions: ["a", "b"]'
q, opts = sanitize_ask_user_inputs(raw, None)
assert q == "What?"
assert opts == ["a", "b"]
+374
View File
@@ -0,0 +1,374 @@
"""Tests for the queen-side ``create_colony`` tool.
New contract (two-step flow):
1. The queen authors a skill folder out-of-band (via write_file etc.)
containing a SKILL.md with YAML frontmatter {name, description} and
an optional body.
2. The queen calls ``create_colony(colony_name, task, skill_path)``
pointing at that folder. The tool validates the folder, installs it
under ``~/.hive/skills/{name}/`` if it's not already there, and
forks the session into a colony.
We monkeypatch ``fork_session_into_colony`` so the test doesn't need a
real queen / session directory. We also redirect ``$HOME`` so the test's
skill installation lands in a tmp tree, not the real user home.
"""
from __future__ import annotations
import asyncio
import json
from pathlib import Path
from typing import Any
import pytest
from framework.host.event_bus import EventBus
from framework.llm.provider import ToolUse
from framework.loader.tool_registry import ToolRegistry
from framework.tools.queen_lifecycle_tools import register_queen_lifecycle_tools
# ---------------------------------------------------------------------------
# Fixtures + helpers
# ---------------------------------------------------------------------------
class _FakeSession:
def __init__(self, sid: str = "session_test_create_colony"):
self.id = sid
self.colony = None
self.colony_runtime = None
self.event_bus = EventBus()
self.worker_path = None
self.available_triggers: dict = {}
self.active_trigger_ids: set = set()
def _make_executor():
"""Build a tool executor with create_colony registered."""
registry = ToolRegistry()
session = _FakeSession()
register_queen_lifecycle_tools(registry, session=session, session_id=session.id)
return registry.get_executor(), session
async def _call(executor, **inputs) -> dict:
result = executor(
ToolUse(id="tu_create_colony", name="create_colony", input=inputs)
)
if asyncio.iscoroutine(result):
result = await result
return json.loads(result.content)
@pytest.fixture
def patched_home(tmp_path, monkeypatch):
"""Redirect $HOME so ~/.hive/skills/ lands in tmp_path."""
monkeypatch.setenv("HOME", str(tmp_path))
return tmp_path
@pytest.fixture
def patched_fork(monkeypatch):
"""Stub out fork_session_into_colony so we don't need a real queen."""
calls: list[dict] = []
async def _stub_fork(*, session: Any, colony_name: str, task: str) -> dict:
calls.append({"session": session, "colony_name": colony_name, "task": task})
return {
"colony_path": f"/tmp/fake_colonies/{colony_name}",
"colony_name": colony_name,
"queen_session_id": "session_fake_fork_id",
"is_new": True,
}
monkeypatch.setattr(
"framework.server.routes_execution.fork_session_into_colony",
_stub_fork,
)
return calls
def _write_skill(
root: Path,
*,
dir_name: str,
fm_name: str,
description: str = "Default test skill description with enough text.",
body: str = "## Body\n\nOperational details go here.\n",
) -> Path:
"""Write a valid skill folder under ``root`` and return its path."""
skill_dir = root / dir_name
skill_dir.mkdir(parents=True, exist_ok=True)
skill_md = skill_dir / "SKILL.md"
skill_md.write_text(
"---\n"
f"name: {fm_name}\n"
f'description: "{description}"\n'
"---\n\n"
f"{body}",
encoding="utf-8",
)
return skill_dir
# ---------------------------------------------------------------------------
# Happy path
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_happy_path_external_folder_is_copied_into_skills_root(
tmp_path: Path, patched_home: Path, patched_fork: list[dict]
) -> None:
"""Skill authored outside ~/.hive/skills/ is copied in on install."""
executor, session = _make_executor()
# Queen authors skill in a scratch dir, not under ~/.hive/skills/
scratch = tmp_path / "scratch"
scratch.mkdir()
skill_src = _write_skill(
scratch,
dir_name="honeycomb-api-protocol",
fm_name="honeycomb-api-protocol",
description=(
"How to query the HoneyComb staging API for ticker, pool, "
"and trade data. Covers auth, pagination, pool detail "
"shape. Use when fetching market data."
),
body=(
"## HoneyComb API Operational Protocol\n\n"
"Auth: Bearer token from ~/.hive/credentials/honeycomb.json.\n"
"Pagination: ?page=1&page_size=50 (max 50 per page).\n"
"Endpoints:\n"
"- /api/ticker — list tickers\n"
"- /api/ticker/{id} — pool detail\n"
),
)
payload = await _call(
executor,
colony_name="honeycomb_research",
task=(
"Build a daily honeycomb market report covering top gainers, "
"losers, volume leaders, and category breakdowns."
),
skill_path=str(skill_src),
)
assert payload.get("status") == "created", f"Tool error: {payload}"
assert payload["colony_name"] == "honeycomb_research"
assert payload["skill_name"] == "honeycomb-api-protocol"
# The skill was installed under ~/.hive/skills/
installed = patched_home / ".hive" / "skills" / "honeycomb-api-protocol" / "SKILL.md"
assert installed.exists()
assert "HoneyComb API Operational Protocol" in installed.read_text(encoding="utf-8")
# Fork was called with the right args
assert len(patched_fork) == 1
assert patched_fork[0]["colony_name"] == "honeycomb_research"
assert "honeycomb market report" in patched_fork[0]["task"]
assert patched_fork[0]["session"] is session
@pytest.mark.asyncio
async def test_happy_path_in_place_authored_skill(
patched_home: Path, patched_fork: list[dict]
) -> None:
"""Skill authored directly at ~/.hive/skills/{name}/ is accepted in-place."""
executor, _ = _make_executor()
skills_root = patched_home / ".hive" / "skills"
skills_root.mkdir(parents=True)
skill_src = _write_skill(
skills_root,
dir_name="in-place-skill",
fm_name="in-place-skill",
description="An in-place skill.",
body="Contents that are already at the right location." * 3,
)
payload = await _call(
executor,
colony_name="in_place_colony",
task="task text",
skill_path=str(skill_src),
)
assert payload.get("status") == "created", payload
installed = skills_root / "in-place-skill" / "SKILL.md"
assert installed.exists()
assert len(patched_fork) == 1
# ---------------------------------------------------------------------------
# Validation failures
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_missing_skill_path_rejected(patched_home, patched_fork) -> None:
executor, _ = _make_executor()
payload = await _call(
executor,
colony_name="ok_name",
task="t",
skill_path=str(patched_home / "does_not_exist"),
)
assert "error" in payload
assert "does not exist" in payload["error"]
assert len(patched_fork) == 0
@pytest.mark.asyncio
async def test_skill_path_is_file_not_directory_rejected(
tmp_path, patched_home, patched_fork
) -> None:
executor, _ = _make_executor()
bogus = tmp_path / "not-a-dir.md"
bogus.write_text("hi", encoding="utf-8")
payload = await _call(
executor,
colony_name="ok_name",
task="t",
skill_path=str(bogus),
)
assert "error" in payload
assert "must be a directory" in payload["error"]
assert len(patched_fork) == 0
@pytest.mark.asyncio
async def test_skill_missing_skill_md_rejected(
tmp_path, patched_home, patched_fork
) -> None:
executor, _ = _make_executor()
folder = tmp_path / "no-skill-md"
folder.mkdir()
payload = await _call(
executor,
colony_name="ok_name",
task="t",
skill_path=str(folder),
)
assert "error" in payload
assert "SKILL.md" in payload["error"]
assert len(patched_fork) == 0
@pytest.mark.asyncio
async def test_skill_md_missing_frontmatter_marker_rejected(
tmp_path, patched_home, patched_fork
) -> None:
executor, _ = _make_executor()
folder = tmp_path / "broken-fm"
folder.mkdir()
(folder / "SKILL.md").write_text(
"no frontmatter here, just body\n", encoding="utf-8"
)
payload = await _call(
executor,
colony_name="ok_name",
task="t",
skill_path=str(folder),
)
assert "error" in payload
assert "frontmatter" in payload["error"]
assert len(patched_fork) == 0
@pytest.mark.asyncio
async def test_skill_md_missing_description_rejected(
tmp_path, patched_home, patched_fork
) -> None:
executor, _ = _make_executor()
folder = tmp_path / "no-description"
folder.mkdir()
(folder / "SKILL.md").write_text(
"---\nname: no-description\n---\n\nbody\n",
encoding="utf-8",
)
payload = await _call(
executor,
colony_name="ok_name",
task="t",
skill_path=str(folder),
)
assert "error" in payload
assert "description" in payload["error"]
assert len(patched_fork) == 0
@pytest.mark.asyncio
async def test_directory_name_mismatch_with_frontmatter_rejected(
tmp_path, patched_home, patched_fork
) -> None:
executor, _ = _make_executor()
folder = tmp_path / "wrong-dir-name"
folder.mkdir()
(folder / "SKILL.md").write_text(
'---\nname: correct-name\ndescription: "d"\n---\n\nbody\n',
encoding="utf-8",
)
payload = await _call(
executor,
colony_name="ok_name",
task="t",
skill_path=str(folder),
)
assert "error" in payload
assert "does not match" in payload["error"]
assert len(patched_fork) == 0
@pytest.mark.asyncio
async def test_invalid_colony_name_rejected(tmp_path, patched_home, patched_fork) -> None:
executor, _ = _make_executor()
skill_src = _write_skill(
tmp_path, dir_name="valid-skill", fm_name="valid-skill"
)
payload = await _call(
executor,
colony_name="NotValid-Colony",
task="t",
skill_path=str(skill_src),
)
assert "error" in payload
assert "colony_name" in payload["error"]
assert len(patched_fork) == 0
@pytest.mark.asyncio
async def test_fork_failure_keeps_installed_skill(
tmp_path, patched_home, monkeypatch
) -> None:
"""If the fork raises, the installed skill stays under ~/.hive/skills/."""
async def _failing_fork(**kwargs):
raise RuntimeError("simulated fork crash")
monkeypatch.setattr(
"framework.server.routes_execution.fork_session_into_colony",
_failing_fork,
)
executor, _ = _make_executor()
skill_src = _write_skill(
tmp_path, dir_name="durable-skill", fm_name="durable-skill"
)
payload = await _call(
executor,
colony_name="will_fail",
task="t",
skill_path=str(skill_src),
)
assert "error" in payload
assert "fork failed" in payload["error"]
assert "skill_installed" in payload
installed = patched_home / ".hive" / "skills" / "durable-skill" / "SKILL.md"
assert installed.exists()
assert "hint" in payload