fix: integration tests

This commit is contained in:
Timothy
2026-04-09 23:05:11 -07:00
parent da0aa65c31
commit fb5b7ed9de
10 changed files with 769 additions and 83 deletions
+2
View File
@@ -413,7 +413,9 @@ class TestSessionCRUD:
model=None,
initial_prompt=None,
queen_resume_from=None,
queen_name=None,
initial_phase=None,
worker_name=None,
)
@pytest.mark.asyncio
+32 -44
View File
@@ -17,23 +17,7 @@ def project_root():
class TestConfigurePaths:
"""Test _configure_paths auto-discovers exports/ and core/."""
def test_adds_exports_to_sys_path(self, project_root):
exports_dir = project_root / "exports"
if not exports_dir.is_dir():
pytest.skip("exports/ directory does not exist in this environment")
exports_str = str(exports_dir)
# Remove if already present to test fresh addition
original_path = sys.path.copy()
sys.path = [p for p in sys.path if p != exports_str]
try:
_configure_paths()
assert exports_str in sys.path
finally:
sys.path = original_path
"""Test _configure_paths auto-discovers core/."""
def test_adds_core_to_sys_path(self, project_root):
core_dir = project_root / "core"
@@ -49,21 +33,15 @@ class TestConfigurePaths:
def test_does_not_duplicate_paths(self):
_configure_paths()
# Call twice — should not create duplicates
before = sys.path.copy()
_configure_paths()
assert sys.path == before
def test_handles_missing_exports_gracefully(self):
"""If exports/ doesn't exist, _configure_paths should not crash."""
_configure_paths()
class TestFrameworkModule:
"""Test ``python -m framework`` invocation (the underlying module)."""
"""Test ``python -m framework`` invocation."""
def test_module_help(self, project_root):
"""Verify ``python -m framework --help`` prints usage."""
result = subprocess.run(
[sys.executable, "-m", "framework", "--help"],
capture_output=True,
@@ -72,28 +50,23 @@ class TestFrameworkModule:
cwd=str(project_root / "core"),
)
assert result.returncode == 0
assert "hive" in result.stdout.lower() or "goal" in result.stdout.lower()
assert "hive" in result.stdout.lower()
def test_module_list_subcommand(self, project_root):
"""Verify ``python -m framework list --help`` registers the subcommand."""
def test_module_serve_subcommand(self, project_root):
"""Verify ``python -m framework serve --help`` prints usage."""
result = subprocess.run(
[sys.executable, "-m", "framework", "list", "--help"],
[sys.executable, "-m", "framework", "serve", "--help"],
capture_output=True,
text=True,
encoding="utf-8",
cwd=str(project_root / "core"),
)
assert result.returncode == 0
assert "agents" in result.stdout.lower() or "directory" in result.stdout.lower()
assert "host" in result.stdout.lower() or "port" in result.stdout.lower()
class TestHiveEntryPoint:
"""Test the ``hive`` console_scripts entry point.
These tests verify the actual ``hive`` command installed by
``pip install -e core/``. If the entry point is not installed,
the tests are skipped gracefully.
"""
"""Test the ``hive`` console_scripts entry point."""
@pytest.fixture(autouse=True)
def _require_hive(self):
@@ -101,7 +74,7 @@ class TestHiveEntryPoint:
pytest.skip("'hive' entry point not installed (run: pip install -e core/)")
def test_hive_help(self):
"""Verify ``hive --help`` exits 0 and prints usage."""
"""Verify ``hive --help`` exits 0 and lists the new commands."""
result = subprocess.run(
["hive", "--help"],
capture_output=True,
@@ -109,23 +82,38 @@ class TestHiveEntryPoint:
encoding="utf-8",
)
assert result.returncode == 0
assert "run" in result.stdout.lower()
assert "validate" in result.stdout.lower()
out = result.stdout.lower()
# New CLI surface (post-cleanup)
assert "serve" in out
assert "queen" in out
assert "colony" in out
assert "session" in out
assert "chat" in out
def test_hive_list_help(self):
"""Verify ``hive list --help`` exits 0."""
def test_hive_queen_list_help(self):
"""``hive queen list --help`` is one of the new core commands."""
result = subprocess.run(
["hive", "list", "--help"],
["hive", "queen", "list", "--help"],
capture_output=True,
text=True,
encoding="utf-8",
)
assert result.returncode == 0
def test_hive_run_missing_agent(self):
"""Verify ``hive run`` with a non-existent agent prints an error."""
def test_hive_colony_list_help(self):
"""``hive colony list --help`` is one of the new core commands."""
result = subprocess.run(
["hive", "run", "nonexistent_agent_xyz"],
["hive", "colony", "list", "--help"],
capture_output=True,
text=True,
encoding="utf-8",
)
assert result.returncode == 0
def test_hive_unknown_command_exits_nonzero(self):
"""An unknown subcommand must error out."""
result = subprocess.run(
["hive", "definitely-not-a-command"],
capture_output=True,
text=True,
encoding="utf-8",
+401
View File
@@ -0,0 +1,401 @@
"""Wiring smoke test for the queen → fork → colony flow.
Validates the on-disk artifacts produced by ``handle_colony_spawn`` and
that ``create_session_with_worker_colony`` resolves the colony's forked
session ID from ``metadata.json`` rather than spinning up a fresh ID.
These tests do NOT exercise the LLM or the queen identity hook -- they
construct a Session object with the minimum state ``handle_colony_spawn``
needs and run everything against a temp directory.
"""
from __future__ import annotations
import json
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from aiohttp.test_utils import TestClient, TestServer
from framework.agent_loop.internals.types import LoopConfig
from framework.host.event_bus import EventBus
from framework.server.app import create_app
from framework.server.session_manager import Session, _queen_session_dir
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_fake_queen_session(
home: Path,
*,
queen_name: str,
session_id: str,
) -> Path:
"""Create a fake queen session directory with conversations and meta.json."""
queen_dir = home / ".hive" / "agents" / "queens" / queen_name / "sessions" / session_id
(queen_dir / "conversations" / "parts").mkdir(parents=True)
(queen_dir / "data").mkdir()
# Two fake conversation parts so we can verify they get copied
parts = queen_dir / "conversations" / "parts"
(parts / "0000000000.json").write_text(
json.dumps({"seq": 0, "role": "user", "content": "trade honeycomb"}),
encoding="utf-8",
)
(parts / "0000000001.json").write_text(
json.dumps({"seq": 1, "role": "assistant", "content": "on it"}),
encoding="utf-8",
)
# Conversation cursor + meta
(queen_dir / "conversations" / "cursor.json").write_text("{}", encoding="utf-8")
(queen_dir / "conversations" / "meta.json").write_text("{}", encoding="utf-8")
# Session meta.json (this is the queen-session meta, distinct from convs/meta.json)
(queen_dir / "meta.json").write_text(
json.dumps({"created_at": 1, "queen_id": queen_name}),
encoding="utf-8",
)
# Empty events log
(queen_dir / "events.jsonl").write_text("", encoding="utf-8")
return queen_dir
def _make_session_with_queen_state(
*,
session_id: str,
queen_name: str,
queen_dir: Path,
) -> Session:
"""Construct a Session pre-populated with the state colony-spawn reads."""
bus = MagicMock()
bus.publish = AsyncMock()
# Fake queen_loop with the attributes the spawn handler reads
fake_loop = SimpleNamespace(
_last_ctx=SimpleNamespace(
available_tools=[
SimpleNamespace(name="read_file"),
SimpleNamespace(name="search_files"),
],
skills_catalog_prompt="<skills/>",
protocols_prompt="<protocols/>",
skill_dirs=["/fake/skills"],
),
_config=LoopConfig(
max_iterations=42,
max_tool_calls_per_turn=7,
max_context_tokens=99_000,
max_tool_result_chars=2048,
),
_conversation_store=None,
)
queen_executor = SimpleNamespace(node_registry={"queen": fake_loop})
# Fake phase_state with the attributes the spawn handler reads
phase_state = SimpleNamespace(
phase="planning",
queen_id=queen_name,
queen_identity_prompt="You are Charlotte, head of finance.",
_cached_global_recall_block="",
get_current_prompt=lambda: "you are the queen",
)
session = Session(
id=session_id,
event_bus=bus,
llm=MagicMock(),
loaded_at=0.0,
queen_executor=queen_executor,
queen_dir=queen_dir,
queen_name=queen_name,
phase_state=phase_state,
)
return session
# ---------------------------------------------------------------------------
# 1. AgentLoader skips metadata.json when picking a worker config
# ---------------------------------------------------------------------------
def test_agent_loader_picks_worker_json_not_metadata_json(tmp_path):
"""AgentLoader.load must select worker.json from a colony, not metadata.json.
Regression: ``metadata.json`` sorts before ``worker.json`` alphabetically;
if it isn't excluded, the loader treats colony provenance as a worker spec
and the worker spawns under the wrong storage path with no goal/tools.
"""
from framework.loader.agent_loader import AgentLoader
colony_dir = tmp_path / "colonies" / "honeycomb"
colony_dir.mkdir(parents=True)
(colony_dir / "data").mkdir()
# Colony provenance (must NOT be picked)
(colony_dir / "metadata.json").write_text(
json.dumps(
{
"colony_name": "honeycomb",
"queen_name": "queen_finance_fundraising",
"queen_session_id": "session_xxx",
"workers": {"worker": {"task": "trade"}},
}
),
encoding="utf-8",
)
# Real worker config
(colony_dir / "worker.json").write_text(
json.dumps(
{
"name": "worker",
"version": "1.0.0",
"description": "trader",
"goal": {"description": "trade honeycomb", "success_criteria": [], "constraints": []},
"system_prompt": "be a careful trader",
"tools": ["read_file", "search_files"],
"loop_config": {"max_iterations": 50},
}
),
encoding="utf-8",
)
runner = AgentLoader.load(
colony_dir,
interactive=False,
skip_credential_validation=True,
)
# Picked the right config: name comes from worker.json
assert runner.graph.nodes[0].id == "worker"
assert runner.goal.description == "trade honeycomb"
assert "read_file" in runner.graph.nodes[0].tools
# ---------------------------------------------------------------------------
# 2. handle_colony_spawn produces the correct on-disk artifacts
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_colony_spawn_creates_correct_artifacts(tmp_path, monkeypatch):
"""End-to-end POST /api/sessions/{id}/colony-spawn against an in-process app.
Validates the full set of artifacts produced by the spawn handler,
catching the bugs we hit yesterday:
- queen_name in metadata.json must be the actual queen profile, not "default"
- queen_session_id in metadata.json must point to the duplicated dir
- duplicated session dir must live under the correct queen identity
- duplicated session must be flagged colony_fork=true
- worker.json must contain the queen state snapshot
- worker storage must receive the queen conversations
- source queen session meta must be linked back to the colony
"""
monkeypatch.setattr(Path, "home", classmethod(lambda cls: tmp_path))
queen_name = "queen_finance_fundraising"
source_session_id = "session_20260410_120000_aaaaaaaa"
# Pre-create a fake queen session on disk
source_queen_dir = _make_fake_queen_session(
tmp_path,
queen_name=queen_name,
session_id=source_session_id,
)
# Build the in-process aiohttp app and inject our fake session
app = create_app()
manager = app["manager"]
session = _make_session_with_queen_state(
session_id=source_session_id,
queen_name=queen_name,
queen_dir=source_queen_dir,
)
manager._sessions[session.id] = session
async with TestClient(TestServer(app)) as client:
resp = await client.post(
f"/api/sessions/{source_session_id}/colony-spawn",
json={"colony_name": "honeycomb", "task": "trade carefully"},
)
assert resp.status == 200, await resp.text()
body = await resp.json()
colony_session_id = body["queen_session_id"]
assert body["colony_name"] == "honeycomb"
assert body["is_new"] is True
assert colony_session_id != source_session_id
# ── colony_dir layout ──────────────────────────────────────────
colony_dir = tmp_path / ".hive" / "colonies" / "honeycomb"
assert colony_dir.is_dir()
assert (colony_dir / "data").is_dir()
assert (colony_dir / "worker.json").is_file()
assert (colony_dir / "metadata.json").is_file()
# ── metadata.json contents ─────────────────────────────────────
metadata = json.loads((colony_dir / "metadata.json").read_text())
assert metadata["colony_name"] == "honeycomb"
assert metadata["queen_name"] == queen_name, (
f"queen_name should be the actual queen profile, got {metadata['queen_name']!r}"
)
assert metadata["queen_session_id"] == colony_session_id
assert metadata["source_session_id"] == source_session_id
assert "worker" in metadata["workers"]
assert metadata["workers"]["worker"]["task"] == "trade carefully"
# ── worker.json contents ───────────────────────────────────────
worker_meta = json.loads((colony_dir / "worker.json").read_text())
assert worker_meta["name"] == "worker"
assert worker_meta["queen_id"] == queen_name
assert worker_meta["queen_phase"] == "planning"
assert worker_meta["spawned_from"] == source_session_id
assert worker_meta["goal"]["description"] == "trade carefully"
assert worker_meta["system_prompt"] == "you are the queen"
assert worker_meta["identity_prompt"] == "You are Charlotte, head of finance."
assert worker_meta["tools"] == ["read_file", "search_files"]
assert worker_meta["skills_catalog_prompt"] == "<skills/>"
assert worker_meta["protocols_prompt"] == "<protocols/>"
assert worker_meta["loop_config"]["max_iterations"] == 42
assert worker_meta["loop_config"]["max_tool_calls_per_turn"] == 7
# ── duplicated queen session dir ──────────────────────────────
dest_queen_dir = _queen_session_dir(colony_session_id, queen_name)
assert dest_queen_dir.is_dir(), (
f"Forked session dir not under {queen_name}/, got {dest_queen_dir}"
)
# Conversations were copied
assert (dest_queen_dir / "conversations" / "parts" / "0000000000.json").is_file()
assert (dest_queen_dir / "conversations" / "parts" / "0000000001.json").is_file()
# Forked meta.json carries the colony_fork flag and links to the colony
dest_meta = json.loads((dest_queen_dir / "meta.json").read_text())
assert dest_meta["colony_fork"] is True
assert dest_meta["forked_from"] == source_session_id
assert dest_meta["queen_id"] == queen_name
assert dest_meta["agent_path"] == str(colony_dir)
assert dest_meta["agent_name"] == "Honeycomb"
# ── worker storage receives queen conversations ───────────────
worker_storage_convs = (
tmp_path / ".hive" / "agents" / "honeycomb" / "worker" / "conversations"
)
assert worker_storage_convs.is_dir()
assert (worker_storage_convs / "parts" / "0000000000.json").is_file()
# ── source queen session updated with agent_path ──────────────
source_meta = json.loads((source_queen_dir / "meta.json").read_text())
assert source_meta["agent_path"] == str(colony_dir)
assert source_meta["agent_name"] == "Honeycomb"
# ---------------------------------------------------------------------------
# 3. create_session_with_worker_colony resolves the forked session ID from
# metadata.json (not whatever the caller passed in)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_create_session_with_worker_colony_uses_forked_session_id(
tmp_path, monkeypatch
):
"""When a colony is loaded, its metadata.json's queen_session_id wins.
Regression: returning to a colony was loading the SOURCE queen session
instead of the forked one because the frontend's history scan found the
source first. The backend now overrides ``queen_resume_from`` with the
colony's designated session ID.
"""
monkeypatch.setattr(Path, "home", classmethod(lambda cls: tmp_path))
from framework.server.session_manager import SessionManager
queen_name = "queen_finance_fundraising"
source_id = "session_20260410_120000_aaaaaaaa"
forked_id = "session_20260410_130000_bbbbbbbb"
# Pre-create the forked queen session that the colony points at
_make_fake_queen_session(tmp_path, queen_name=queen_name, session_id=forked_id)
# Also create a source session (the one we don't want to be picked)
_make_fake_queen_session(tmp_path, queen_name=queen_name, session_id=source_id)
# Build the colony dir with metadata pointing at the forked session
colony_dir = tmp_path / ".hive" / "colonies" / "honeycomb"
colony_dir.mkdir(parents=True)
(colony_dir / "data").mkdir()
(colony_dir / "metadata.json").write_text(
json.dumps(
{
"colony_name": "honeycomb",
"queen_name": queen_name,
"queen_session_id": forked_id,
"source_session_id": source_id,
"workers": {"worker": {"task": "trade"}},
}
),
encoding="utf-8",
)
(colony_dir / "worker.json").write_text(
json.dumps(
{
"name": "worker",
"version": "1.0.0",
"description": "trader",
"goal": {"description": "trade", "success_criteria": [], "constraints": []},
"system_prompt": "be a trader",
"tools": [],
"loop_config": {},
}
),
encoding="utf-8",
)
manager = SessionManager(model="claude-haiku-4-5-20251001")
# Stub out the heavy bits: we only care about session-id resolution.
captured: dict = {}
async def fake_load_worker_core(self, session, agent_path, *, colony_id=None, model=None):
session.colony_id = colony_id or Path(agent_path).name
session.worker_path = Path(agent_path)
session.colony_runtime = MagicMock()
session.worker_info = SimpleNamespace(name="worker")
async def fake_start_queen(self, session, **kwargs):
captured["session_id"] = session.id
captured["queen_resume_from"] = session.queen_resume_from
captured["queen_name"] = session.queen_name
session.queen_executor = SimpleNamespace(node_registry={"queen": MagicMock()})
async def fake_restore_active_triggers(self, session, session_id):
return None
monkeypatch.setattr(SessionManager, "_load_worker_core", fake_load_worker_core)
monkeypatch.setattr(SessionManager, "_start_queen", fake_start_queen)
monkeypatch.setattr(
SessionManager, "_restore_active_triggers", fake_restore_active_triggers
)
# Caller passes the SOURCE session id (mimicking the frontend's history scan)
session = await manager.create_session_with_worker_colony(
agent_path=colony_dir,
queen_resume_from=source_id,
)
# The colony's forked session ID should win, not the caller's source ID
assert captured["queen_resume_from"] == forked_id, (
f"Expected forked id {forked_id}, got {captured['queen_resume_from']}"
)
assert session.id == forked_id, (
f"Live session ID should match forked session, got {session.id}"
)
assert captured["queen_name"] == queen_name
+293
View File
@@ -0,0 +1,293 @@
"""Live end-to-end test of the queen → fork → colony flow.
Hits the real LLM via the in-process aiohttp app. Validates that:
- The queen identity hook fires after queen startup
- ``handle_colony_spawn`` produces the right artifacts under the actual
selected queen identity (not "default")
- The forked queen session dir lives under the correct queen profile
- The colony's metadata.json picks up the real queen_name
Skipped automatically if no LLM API key is configured.
Costs a few cents per run.
"""
from __future__ import annotations
import asyncio
import importlib
import json
import os
import shutil
import sys
import time
from pathlib import Path
import pytest
from aiohttp.test_utils import TestClient, TestServer
# ---------------------------------------------------------------------------
# Skip if no live LLM credentials are available
# ---------------------------------------------------------------------------
_LLM_KEY_ENV_VARS = (
"ANTHROPIC_API_KEY",
"OPENAI_API_KEY",
"ZAI_API_KEY",
"OPENROUTER_API_KEY",
"CEREBRAS_API_KEY",
"GROQ_API_KEY",
"GOOGLE_AI_API_KEY",
"MINIMAX_API_KEY",
)
def _has_any_llm_key() -> bool:
return any(os.environ.get(k) for k in _LLM_KEY_ENV_VARS)
pytestmark = pytest.mark.skipif(
not _has_any_llm_key(),
reason="No LLM API key set; skipping live integration test",
)
# ---------------------------------------------------------------------------
# Fixture: isolated ~/.hive in a temp dir
# ---------------------------------------------------------------------------
# Modules that import HIVE_HOME / QUEENS_DIR / COLONIES_DIR / MEMORIES_DIR /
# HIVE_CONFIG_FILE at import time and need their bindings rewritten when we
# redirect ~/.hive to a temp directory.
_HIVE_PATH_CONSUMERS = (
"framework.config",
"framework.server.session_manager",
"framework.server.queen_orchestrator",
"framework.server.routes_queens",
"framework.server.app",
"framework.agents.discovery",
"framework.agents.queen.queen_profiles",
"framework.tools.queen_lifecycle_tools",
"framework.storage.migrate_v2",
"framework.loader.cli",
)
_HIVE_PATH_NAMES = (
("HIVE_HOME", lambda h: h),
("QUEENS_DIR", lambda h: h / "agents" / "queens"),
("COLONIES_DIR", lambda h: h / "colonies"),
("MEMORIES_DIR", lambda h: h / "memories"),
("HIVE_CONFIG_FILE", lambda h: h / "configuration.json"),
)
@pytest.fixture
def isolated_hive_home(tmp_path, monkeypatch):
"""Redirect ~/.hive to a temp directory.
Patches Path.home() AND every module-level binding of HIVE_HOME and
its derivatives, since those constants were captured at import time
and won't follow Path.home() changes alone.
Copies the user's real ~/.hive/configuration.json into the temp home
so the LLM provider config (model, api_base) is preserved.
"""
fake_home_root = tmp_path
fake_hive = fake_home_root / ".hive"
fake_hive.mkdir()
# Copy LLM configuration so the framework picks up the user's model.
# Done BEFORE we monkey-patch Path.home so the source resolves correctly.
real_config = Path.home() / ".hive" / "configuration.json"
if real_config.exists():
shutil.copy(real_config, fake_hive / "configuration.json")
# Patch Path.home -> tmp_path so any call-site computation goes there.
monkeypatch.setattr(Path, "home", classmethod(lambda cls: fake_home_root))
# Patch every module-level binding that captured a path constant.
for mod_name in _HIVE_PATH_CONSUMERS:
try:
mod = importlib.import_module(mod_name)
except ImportError:
continue
for attr_name, builder in _HIVE_PATH_NAMES:
if hasattr(mod, attr_name):
monkeypatch.setattr(mod, attr_name, builder(fake_hive))
yield fake_hive
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _wait_for_queen_identity(
client: TestClient,
session_id: str,
*,
timeout: float = 60.0,
poll_interval: float = 0.5,
) -> str:
"""Poll /api/sessions/{id} until queen_id is set to a non-default value.
Returns the resolved queen_id. Fails the test on timeout.
"""
deadline = time.time() + timeout
last_qid: str | None = None
while time.time() < deadline:
r = await client.get(f"/api/sessions/{session_id}")
if r.status == 200:
d = await r.json()
qid = d.get("queen_id")
if qid:
last_qid = qid
if qid != "default":
return qid
await asyncio.sleep(poll_interval)
pytest.fail(
f"Queen identity not selected within {timeout}s "
f"(last queen_id={last_qid!r}). The queen identity hook may not be firing."
)
# ---------------------------------------------------------------------------
# The live test
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_live_queen_fork_to_colony(isolated_hive_home):
"""Spin up a real queen, let her select an identity, fork to a colony.
Validates the full wiring against a live LLM:
1. Queen-only session starts and runs the identity hook
2. session.queen_dir gets relocated from default/ to the selected queen
3. handle_colony_spawn produces metadata pointing at the real queen
4. The forked queen session dir lives under the correct queen identity
5. Conversations are copied through to worker storage
"""
from framework.agents.queen.queen_profiles import ensure_default_queens
from framework.server.app import create_app
from framework.server.session_manager import _queen_session_dir
# Pre-populate queen profiles in the temp ~/.hive so the identity
# hook has something to choose from.
ensure_default_queens()
app = create_app() # picks up model from copied configuration.json
manager = app["manager"]
async with TestClient(TestServer(app)) as client:
# ── 1. Create a queen-only session ─────────────────────────
# The initial_prompt steers the identity hook toward a finance queen.
resp = await client.post(
"/api/sessions",
json={
"initial_prompt": (
"I want to incubate a finance colony to help me trade "
"carefully on a small honeycomb market. Just briefly "
"acknowledge — one sentence is fine."
),
},
)
assert resp.status == 201, await resp.text()
body = await resp.json()
session_id = body["session_id"]
assert session_id.startswith("session_")
# ── 2. Wait for queen identity hook to fire ────────────────
queen_name = await _wait_for_queen_identity(client, session_id)
assert queen_name != "default", (
f"Identity hook didn't pick a real queen, got {queen_name!r}"
)
# ── 3. Fork to a colony ────────────────────────────────────
colony_name = "live_test_honeycomb"
resp = await client.post(
f"/api/sessions/{session_id}/colony-spawn",
json={"colony_name": colony_name, "task": "trade carefully"},
)
assert resp.status == 200, await resp.text()
spawn_data = await resp.json()
colony_session_id = spawn_data["queen_session_id"]
assert spawn_data["colony_name"] == colony_name
assert spawn_data["is_new"] is True
assert colony_session_id != session_id
# ── 4. Validate on-disk artifacts ──────────────────────────
colony_dir = isolated_hive_home / "colonies" / colony_name
assert colony_dir.is_dir()
assert (colony_dir / "worker.json").is_file()
assert (colony_dir / "metadata.json").is_file()
metadata = json.loads((colony_dir / "metadata.json").read_text())
assert metadata["colony_name"] == colony_name
# The crucial assertion: the metadata's queen_name must be the
# auto-selected queen, not "default". This is what failed
# repeatedly yesterday before the queen-dir relocate fix.
assert metadata["queen_name"] == queen_name, (
f"metadata.queen_name should be {queen_name!r}, got "
f"{metadata['queen_name']!r}. The session-dir relocation in "
f"queen_orchestrator may not be firing."
)
assert metadata["queen_session_id"] == colony_session_id
assert metadata["source_session_id"] == session_id
worker_meta = json.loads((colony_dir / "worker.json").read_text())
assert worker_meta["queen_id"] == queen_name
assert worker_meta["spawned_from"] == session_id
# The queen always has at least the framework-default tools
assert len(worker_meta["tools"]) > 0
# Goal carries the task we passed in
assert worker_meta["goal"]["description"] == "trade carefully"
# ── 5. Validate the forked queen session dir ──────────────
# It must live under the SELECTED queen identity, not "default".
dest_queen_dir = _queen_session_dir(colony_session_id, queen_name)
assert dest_queen_dir.is_dir(), (
f"Forked session dir not under {queen_name}/, expected "
f"{dest_queen_dir}"
)
# Conversations from the original queen session were copied
assert (dest_queen_dir / "conversations").is_dir()
dest_meta = json.loads((dest_queen_dir / "meta.json").read_text())
assert dest_meta["colony_fork"] is True
assert dest_meta["queen_id"] == queen_name
assert dest_meta["forked_from"] == session_id
assert dest_meta["agent_path"] == str(colony_dir)
# ── 6. The forked session must NOT show up in the queen DM history.
from framework.server.session_manager import SessionManager
cold = SessionManager.list_cold_sessions()
forked_in_history = [s for s in cold if s.get("session_id") == colony_session_id]
assert not forked_in_history, (
f"Forked colony session leaked into queen DM history: {forked_in_history}"
)
# ── 7. Worker storage received the conversations ──────────
worker_storage_convs = (
isolated_hive_home / "agents" / colony_name / "worker" / "conversations"
)
assert worker_storage_convs.is_dir()
# The queen has had at least one turn (the initial_prompt acknowledgment),
# so there should be conversation parts.
parts_dir = worker_storage_convs / "parts"
if parts_dir.exists():
assert any(parts_dir.iterdir()), (
"worker storage has conversations dir but no parts"
)
# ── 8. Stop the live session cleanly ──────────────────────
resp = await client.delete(f"/api/sessions/{session_id}")
assert resp.status == 200
# Drain background queen task so pytest doesn't warn about
# never-awaited coroutines.
await manager.shutdown_all()
+10 -12
View File
@@ -67,11 +67,12 @@ class TestAgentEvent:
execution_id="exec_1",
data={"output": "result"},
correlation_id="corr_1",
graph_id="graph_1",
colony_id="colony_1",
)
d = event.to_dict()
assert d["type"] == "execution_completed"
assert d["stream_id"] == "stream_1"
assert d["colony_id"] == "colony_1"
def test_to_dict_includes_run_id(self):
"""run_id is included in to_dict() when set."""
@@ -350,28 +351,28 @@ class TestEventFiltering:
assert len(received) == 1
@pytest.mark.asyncio
async def test_filter_by_graph(self):
"""filter_graph only receives events from that graph."""
async def test_filter_by_colony(self):
"""filter_colony only receives events from that colony."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event.graph_id)
received.append(event.colony_id)
bus.subscribe(
event_types=[EventType.EXECUTION_STARTED],
handler=handler,
filter_graph="graph_a",
filter_colony="colony_a",
)
await bus.publish(
AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="s", graph_id="graph_a")
AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="s", colony_id="colony_a")
)
await bus.publish(
AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="s", graph_id="graph_b")
AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="s", colony_id="colony_b")
)
assert received == ["graph_a"]
assert received == ["colony_a"]
# ---------------------------------------------------------------------------
@@ -915,13 +916,10 @@ class TestEventType:
assert EventType.ESCALATION_REQUESTED
assert EventType.LLM_TURN_COMPLETE
assert EventType.NODE_ACTION_PLAN
assert EventType.WORKER_GRAPH_LOADED
assert EventType.WORKER_COLONY_LOADED
assert EventType.CREDENTIALS_REQUIRED
assert EventType.EXECUTION_RESURRECTED
assert EventType.DRAFT_GRAPH_UPDATED
assert EventType.FLOWCHART_MAP_UPDATED
assert EventType.QUEEN_PHASE_CHANGED
assert EventType.QUEEN_PERSONA_SELECTED
assert EventType.SUBAGENT_REPORT
assert EventType.TRIGGER_AVAILABLE
assert EventType.TRIGGER_FIRED
-6
View File
@@ -85,12 +85,6 @@ class TestClassifyFlowchartNode:
result = classify_flowchart_node(node, 2, 3, edges, {"n3"})
assert result == "terminal"
def test_gcu_node_is_browser(self):
node = {"id": "n2", "node_type": "gcu", "tools": []}
edges = [{"source": "n1", "target": "n2"}]
result = classify_flowchart_node(node, 1, 3, edges, set())
assert result == "browser"
def test_subprocess_node(self):
node = {"id": "n2", "node_type": "event_loop", "tools": [], "sub_agents": ["sub1"]}
edges = [{"source": "n1", "target": "n2"}, {"source": "n2", "target": "n3"}]
+5 -6
View File
@@ -1236,17 +1236,16 @@ class TestExtractToolCallHistory:
role="assistant",
content="",
tool_calls=[
_make_tool_call(
"c2", "save_data", {"filename": "output.txt", "content": "data"}
),
_make_tool_call("c2", "read_file", {"path": "/tmp/output.txt"}),
],
),
Message(seq=3, role="tool", content="saved", tool_use_id="c2"),
Message(seq=3, role="tool", content="contents", tool_use_id="c2"),
]
result = extract_tool_call_history(msgs)
assert "web_search (1x)" in result
assert "save_data (1x)" in result
assert "FILES SAVED: output.txt" in result
assert "read_file (1x)" in result
# read_file paths are tracked under FILES SAVED in production
assert "FILES SAVED: /tmp/output.txt" in result
def test_errors_included(self):
msgs = [
+12 -5
View File
@@ -63,9 +63,16 @@ class TestSkillCatalog:
assert catalog.to_prompt() == ""
def test_to_prompt_framework_only(self):
"""Framework-scope skills should NOT appear in the catalog prompt."""
"""Framework-scope skills now appear in the catalog like any other scope.
The old design filtered framework skills out and surfaced them via
DefaultSkillManager only. The current design folds them into the
normal progressive-disclosure catalog.
"""
catalog = SkillCatalog([_make_skill(source_scope="framework")])
assert catalog.to_prompt() == ""
prompt = catalog.to_prompt()
assert "<available_skills>" in prompt
assert "<name>my-skill</name>" in prompt
def test_to_prompt_xml_generation(self):
skills = [
@@ -109,8 +116,8 @@ class TestSkillCatalog:
assert "&lt;special&gt;" in prompt
assert "&amp;" in prompt
def test_to_prompt_excludes_framework_includes_others(self):
"""Mixed scopes: only framework skills are excluded from catalog."""
def test_to_prompt_includes_all_scopes(self):
"""Mixed scopes: project, user, AND framework skills all appear in the catalog."""
skills = [
_make_skill("proj", "Project skill", "project"),
_make_skill("usr", "User skill", "user"),
@@ -121,7 +128,7 @@ class TestSkillCatalog:
assert "<name>proj</name>" in prompt
assert "<name>usr</name>" in prompt
assert "fw" not in prompt
assert "<name>fw</name>" in prompt
def test_to_prompt_contains_behavioral_instruction(self):
catalog = SkillCatalog([_make_skill(source_scope="project")])
+6 -3
View File
@@ -42,11 +42,14 @@ class TestSkillResourceBaseDir:
assert "<base_dir>/path/with &lt;&amp;&gt; chars</base_dir>" in prompt
def test_base_dir_absent_for_framework_skills(self):
"""Framework-scope skills are filtered from the catalog, so no base_dir either."""
def test_base_dir_present_for_framework_skills(self):
"""Framework-scope skills now appear in the catalog like any other scope,
and their base_dir is included in the XML."""
skill = _make_skill("fw", "/hive/_default_skills/fw", source_scope="framework")
catalog = SkillCatalog([skill])
assert catalog.to_prompt() == ""
prompt = catalog.to_prompt()
assert "<name>fw</name>" in prompt
assert "<base_dir>/hive/_default_skills/fw</base_dir>" in prompt
def test_allowlisted_dirs_matches_skills(self):
"""allowlisted_dirs returns all skill base_dirs including framework ones."""
+8 -7
View File
@@ -44,9 +44,10 @@ class TestGetAllowedAgentRoots:
roots = _get_allowed_agent_roots()
assert isinstance(roots, tuple), f"Expected tuple, got {type(roots).__name__}"
def test_contains_three_roots(self):
def test_contains_four_roots(self):
# 4 roots: ~/.hive/colonies, repo/exports (compat), repo/examples, ~/.hive/agents
roots = _get_allowed_agent_roots()
assert len(roots) == 3
assert len(roots) == 4
def test_cached_on_repeated_calls(self):
first = _get_allowed_agent_roots()
@@ -65,9 +66,9 @@ class TestGetAllowedAgentRoots:
from framework.server.app import _REPO_ROOT
roots = _get_allowed_agent_roots()
exports_root, examples_root = roots[0], roots[1]
assert exports_root == (_REPO_ROOT / "exports").resolve()
assert examples_root == (_REPO_ROOT / "examples").resolve()
# Order: ~/.hive/colonies, repo/exports, repo/examples, ~/.hive/agents
assert roots[1] == (_REPO_ROOT / "exports").resolve()
assert roots[2] == (_REPO_ROOT / "examples").resolve()
# ---------------------------------------------------------------------------
@@ -296,7 +297,7 @@ class TestHTTPEndpointsRejectMaliciousPaths:
_reset_allowed_roots()
@pytest.mark.asyncio
async def test_load_worker_rejects_outside_path(self, tmp_path):
async def test_load_colony_rejects_outside_path(self, tmp_path):
import framework.server.app as app_module
exports = tmp_path / "exports"
@@ -312,7 +313,7 @@ class TestHTTPEndpointsRejectMaliciousPaths:
session_id = (await create_resp.json())["session_id"]
resp = await client.post(
f"/api/sessions/{session_id}/graph",
f"/api/sessions/{session_id}/colony",
json={"agent_path": "/tmp/evil"},
)
assert resp.status == 400