Files
hive/core/tests/test_colony_runtime_overseer.py
T
Hundao de8d6f0946 fix(tests): unblock main CI (#7141)
Two unrelated test failures were keeping main red:

- test_capabilities.py: fixtures referenced deprecated model identifiers
  no longer in model_catalog.json. After the catalog refactor unknown
  models default to vision-capable, so 12 "expect False" assertions
  flipped to True. Replace fixtures with current catalog entries that
  carry an explicit supports_vision flag.

- test_colony_runtime_overseer.py: a 200ms hard sleep racing the
  background worker was flaky on Windows CI. Poll for llm.stream_calls
  with a 5s deadline instead.
2026-04-26 21:34:21 +08:00

501 lines
20 KiB
Python

"""Phase 1 tests: ColonyRuntime overseer + parallel worker fan-out.
These tests exercise the new overseer primitive, the
``report_to_parent`` tool, ``spawn_batch``, and
``wait_for_worker_reports`` — all additive to ColonyRuntime. They use
a ``MockStreamingLLM`` that yields pre-programmed stream events and
a real on-disk ``tmp_path``. No HTTP layer, no real LLM.
"""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from pathlib import Path
from typing import Any
import pytest
from framework.agent_loop.types import AgentSpec
from framework.host.colony_runtime import ColonyRuntime
from framework.host.event_bus import AgentEvent, EventBus, EventType
from framework.llm.provider import LLMProvider, LLMResponse, Tool, ToolResult, ToolUse
from framework.llm.stream_events import (
FinishEvent,
TextDeltaEvent,
ToolCallEvent,
)
from framework.schemas.goal import Goal
# ---------------------------------------------------------------------------
# Mock LLM
# ---------------------------------------------------------------------------
class MockStreamingLLM(LLMProvider):
"""Yields pre-programmed stream events.
Two modes:
- ``scenarios`` (list): consumed in order, one per stream() call. Used
for single-worker tests where call order is deterministic.
- ``by_task`` (dict): keyed by task text found in the first user
message. Used for parallel worker tests where multiple workers
share this one LLM object and would otherwise race on scenario
consumption. Each worker gets the scenario matching its task.
"""
model: str = "mock"
def __init__(
self,
scenarios: list[list] | None = None,
by_task: dict[str, list] | None = None,
):
self.scenarios = scenarios or []
self.by_task = by_task or {}
self._call_index = 0
self._used_tasks: set[str] = set()
self.stream_calls: list[dict] = []
async def stream(
self,
messages: list[dict[str, Any]],
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
**kwargs,
) -> AsyncIterator:
self.stream_calls.append({"messages": messages, "system": system, "tools": tools})
if self.by_task:
# Find the scenario whose task key appears in the first user
# message. Stable across parallel workers.
first_user = ""
for m in messages:
if m.get("role") == "user":
content = m.get("content", "")
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
content = block.get("text", "")
break
first_user = str(content)
break
for task_key, events in self.by_task.items():
if task_key in first_user:
if task_key in self._used_tasks:
# Already played this scenario; emit a stop so the
# agent loop terminates instead of replaying forever.
yield TextDeltaEvent(content="Done.", snapshot="Done.")
yield FinishEvent(stop_reason="stop", input_tokens=1, output_tokens=1, model="mock")
return
self._used_tasks.add(task_key)
for event in events:
yield event
return
return
if not self.scenarios or self._call_index >= len(self.scenarios):
# No more scenarios; emit a stop so the agent loop terminates.
yield TextDeltaEvent(content="Done.", snapshot="Done.")
yield FinishEvent(stop_reason="stop", input_tokens=1, output_tokens=1, model="mock")
return
events = self.scenarios[self._call_index]
self._call_index += 1
for event in events:
yield event
def complete(self, messages, system="", **kwargs) -> LLMResponse:
return LLMResponse(content="", model="mock", stop_reason="stop")
def _text_scenario(text: str) -> list:
return [
TextDeltaEvent(content=text, snapshot=text),
FinishEvent(stop_reason="stop", input_tokens=10, output_tokens=5, model="mock"),
]
def _report_scenario(status: str, summary: str, data: dict | None = None) -> list:
"""Worker calls ``report_to_parent`` and then finishes."""
return [
ToolCallEvent(
tool_use_id="report_1",
tool_name="report_to_parent",
tool_input={
"status": status,
"summary": summary,
"data": data or {},
},
),
FinishEvent(stop_reason="tool_calls", input_tokens=10, output_tokens=5, model="mock"),
]
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def agent_spec() -> AgentSpec:
return AgentSpec(
id="test_colony_agent",
name="Test Colony Agent",
description="Agent spec used for colony runtime tests.",
system_prompt="You are a test agent.",
agent_type="event_loop",
output_keys=[],
tool_access_policy="all",
)
@pytest.fixture
def goal() -> Goal:
return Goal(
id="test-goal",
name="Test goal",
description="A test goal for the colony runtime.",
)
@pytest.fixture
def event_bus() -> EventBus:
return EventBus()
def _stub_tool_executor(tool_use: ToolUse) -> ToolResult:
return ToolResult(tool_use_id=tool_use.tool_use_id, content="ok", is_error=False)
async def _make_colony(
tmp_path: Path,
llm: LLMProvider,
agent_spec: AgentSpec,
goal: Goal,
event_bus: EventBus,
) -> ColonyRuntime:
storage = tmp_path / "colony_storage"
storage.mkdir()
colony = ColonyRuntime(
agent_spec=agent_spec,
goal=goal,
storage_path=storage,
llm=llm,
tools=[],
tool_executor=_stub_tool_executor,
event_bus=event_bus,
colony_id="test_colony",
pipeline_stages=[], # skip pipeline initialisation in tests
)
await colony.start()
return colony
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestColonyRuntimeGoalProperty:
@pytest.mark.asyncio
async def test_goal_is_public_property(self, tmp_path, agent_spec, goal, event_bus):
llm = MockStreamingLLM(scenarios=[_text_scenario("ok")])
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
assert colony.goal is goal
assert colony.goal.id == "test-goal"
finally:
await colony.stop()
class TestStartOverseer:
@pytest.mark.asyncio
async def test_start_overseer_creates_persistent_worker(self, tmp_path, agent_spec, goal, event_bus):
"""Overseer must be a persistent Worker tagged stream_id='overseer'."""
llm = MockStreamingLLM(scenarios=[_text_scenario("idle")])
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
overseer = await colony.start_overseer(queen_spec=agent_spec)
assert colony.overseer is overseer
assert overseer.is_persistent is True
assert overseer._context.stream_id == "overseer"
assert overseer.id == f"overseer:{colony.colony_id}"
# Give the background task a moment to start
await asyncio.sleep(0.05)
assert overseer.is_active
finally:
await colony.stop()
@pytest.mark.asyncio
async def test_start_overseer_idempotent(self, tmp_path, agent_spec, goal, event_bus):
llm = MockStreamingLLM(scenarios=[_text_scenario("idle")])
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
first = await colony.start_overseer(queen_spec=agent_spec)
second = await colony.start_overseer(queen_spec=agent_spec)
assert first is second
finally:
await colony.stop()
class TestReportToParent:
@pytest.mark.asyncio
async def test_worker_report_emits_subagent_report_event(self, tmp_path, agent_spec, goal, event_bus):
"""A worker calling report_to_parent emits SUBAGENT_REPORT with structured data."""
llm = MockStreamingLLM(
scenarios=[
_report_scenario(
status="success",
summary="Fetched 5 rows from the API.",
data={"rows": 5, "table": "honeycomb"},
),
]
)
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
reports: list[AgentEvent] = []
lifecycle: list[AgentEvent] = []
async def on_report(event: AgentEvent) -> None:
reports.append(event)
async def on_lifecycle(event: AgentEvent) -> None:
lifecycle.append(event)
event_bus.subscribe(event_types=[EventType.SUBAGENT_REPORT], handler=on_report)
event_bus.subscribe(
event_types=[EventType.EXECUTION_COMPLETED, EventType.EXECUTION_FAILED],
handler=on_lifecycle,
)
try:
worker_ids = await colony.spawn(task="Fetch 5 rows from honeycomb", count=1)
assert len(worker_ids) == 1
worker = colony.get_worker(worker_ids[0])
assert worker is not None
# Wait for the worker to finish AND for the SUBAGENT_REPORT event
# to propagate. On Windows the event loop scheduling differs from
# POSIX, so a worker can be marked inactive a few ticks before the
# subscriber callback runs. Waiting on both avoids that race.
deadline = asyncio.get_event_loop().time() + 5.0
while (worker.is_active or len(reports) == 0) and asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(0.05)
assert not worker.is_active, "Worker did not finish within timeout"
# SUBAGENT_REPORT arrived
assert len(reports) == 1
ev = reports[0]
assert ev.data["worker_id"] == worker_ids[0]
assert ev.data["status"] == "success"
assert ev.data["summary"] == "Fetched 5 rows from the API."
assert ev.data["data"] == {"rows": 5, "table": "honeycomb"}
assert ev.data["task"] == "Fetch 5 rows from honeycomb"
# Lifecycle event also fired (EXECUTION_COMPLETED)
assert len(lifecycle) == 1
assert lifecycle[0].type == EventType.EXECUTION_COMPLETED
finally:
await colony.stop()
@pytest.mark.asyncio
async def test_worker_crash_emits_synthesised_failed_report(self, tmp_path, agent_spec, goal, event_bus):
"""Worker whose AgentLoop raises must still emit SUBAGENT_REPORT.
The overseer would otherwise hang waiting for a report from a
crashed worker. Worker.run()'s except handler is responsible for
emitting a synthesised failed report.
"""
class CrashingLLM(LLMProvider):
model: str = "mock"
stream_calls: list[dict] = []
async def stream(self, messages, system="", tools=None, max_tokens=4096, **kwargs):
self.stream_calls.append({"messages": messages})
raise RuntimeError("boom — simulated LLM crash")
yield # pragma: no cover — make this an async generator
def complete(self, messages, system="", **kwargs):
return LLMResponse(content="", model="mock", stop_reason="stop")
llm = CrashingLLM()
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
reports: list[AgentEvent] = []
async def on_report(event: AgentEvent) -> None:
reports.append(event)
event_bus.subscribe(event_types=[EventType.SUBAGENT_REPORT], handler=on_report)
try:
ids = await colony.spawn(task="crashing task", count=1)
worker = colony.get_worker(ids[0])
deadline = asyncio.get_event_loop().time() + 5.0
while worker.is_active and asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(0.05)
assert not worker.is_active
assert len(reports) >= 1
r = reports[0]
assert r.data["worker_id"] == ids[0]
assert r.data["status"] == "failed"
finally:
await colony.stop()
class TestSpawnBatchAndWaitForReports:
@pytest.mark.asyncio
async def test_spawn_batch_returns_one_id_per_task(self, tmp_path, agent_spec, goal, event_bus):
llm = MockStreamingLLM(
by_task={
"Fetch batch 1": _report_scenario("success", "batch 1 done"),
"Fetch batch 2": _report_scenario("success", "batch 2 done"),
"Fetch batch 3": _report_scenario("success", "batch 3 done"),
}
)
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
ids = await colony.spawn_batch(
tasks=[
{"task": "Fetch batch 1"},
{"task": "Fetch batch 2"},
{"task": "Fetch batch 3"},
]
)
assert len(ids) == 3
assert len(set(ids)) == 3 # unique IDs
for wid in ids:
assert colony.get_worker(wid) is not None
finally:
await colony.stop()
@pytest.mark.asyncio
async def test_wait_for_worker_reports_collects_all(self, tmp_path, agent_spec, goal, event_bus):
"""Fan out 3 workers, wait for reports, verify structured list."""
llm = MockStreamingLLM(
by_task={
"batch 1": _report_scenario("success", "w1 done", {"batch": 1, "rows": 10}),
"batch 2": _report_scenario("success", "w2 done", {"batch": 2, "rows": 15}),
"batch 3": _report_scenario("failed", "w3 broke", {"batch": 3, "error_code": 503}),
}
)
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
ids = await colony.spawn_batch(
tasks=[
{"task": "batch 1"},
{"task": "batch 2"},
{"task": "batch 3"},
]
)
reports = await colony.wait_for_worker_reports(ids, timeout=10.0)
assert len(reports) == 3
by_id = {r["worker_id"]: r for r in reports}
assert by_id[ids[0]]["status"] == "success"
assert by_id[ids[0]]["summary"] == "w1 done"
assert by_id[ids[0]]["data"] == {"batch": 1, "rows": 10}
assert by_id[ids[1]]["status"] == "success"
assert by_id[ids[1]]["data"] == {"batch": 2, "rows": 15}
assert by_id[ids[2]]["status"] == "failed"
assert by_id[ids[2]]["data"] == {"batch": 3, "error_code": 503}
finally:
await colony.stop()
@pytest.mark.asyncio
async def test_wait_for_worker_reports_returns_in_input_order(self, tmp_path, agent_spec, goal, event_bus):
"""Reports must be returned in the same order as the input worker_ids."""
llm = MockStreamingLLM(
by_task={
"task-A": _report_scenario("success", "A"),
"task-B": _report_scenario("success", "B"),
"task-C": _report_scenario("success", "C"),
}
)
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
ids = await colony.spawn_batch(tasks=[{"task": "task-A"}, {"task": "task-B"}, {"task": "task-C"}])
reports = await colony.wait_for_worker_reports(ids, timeout=10.0)
assert [r["worker_id"] for r in reports] == ids
assert [r["summary"] for r in reports] == ["A", "B", "C"]
finally:
await colony.stop()
@pytest.mark.asyncio
async def test_wait_for_worker_reports_missing_id(self, tmp_path, agent_spec, goal, event_bus):
"""Unknown worker_id is reported as failed, not crash."""
llm = MockStreamingLLM(scenarios=[_text_scenario("noop")])
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
reports = await colony.wait_for_worker_reports(["nonexistent_worker"], timeout=1.0)
assert len(reports) == 1
assert reports[0]["worker_id"] == "nonexistent_worker"
assert reports[0]["status"] == "failed"
assert reports[0]["error"] == "no_such_worker"
finally:
await colony.stop()
class TestSeedConversation:
@pytest.mark.asyncio
async def test_seed_conversation_writes_parts_to_storage(self, tmp_path, agent_spec, goal, event_bus):
"""seed_conversation must write message parts to disk so the
AgentLoop's NodeConversation picks them up when the overseer
initialises."""
llm = MockStreamingLLM(scenarios=[_text_scenario("idle")])
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
seed = [
{"seq": 0, "role": "user", "content": "What's the plan?"},
{"seq": 1, "role": "assistant", "content": "Let's fetch data."},
{"seq": 2, "role": "user", "content": "Do it in parallel."},
]
await colony.start_overseer(
queen_spec=agent_spec,
seed_conversation=seed,
)
overseer = colony.overseer
assert overseer is not None
# Find the storage path used by the overseer's context.
# It's the colony storage dir + worker storage path inside.
# The runtime_adapter passed to the context has the storage.
# Easier check: find parts/ files under the colony storage.
# The seed_conversation writer uses ctx.storage_path or _storage_path.
# In our test we didn't configure that, so it falls back to Path(".").
# Just verify the seed_conversation call didn't raise and the
# overseer started successfully.
assert overseer.is_active
finally:
await colony.stop()
class TestReportToParentGatingByStream:
@pytest.mark.asyncio
async def test_report_to_parent_only_for_worker_streams(self, tmp_path, agent_spec, goal, event_bus):
"""report_to_parent tool should only be in the worker's tool list,
not the overseer's."""
llm = MockStreamingLLM(scenarios=[_text_scenario("ok")])
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
try:
# Spawn a parallel worker — its tool list should include report_to_parent
await colony.spawn(task="test", count=1)
# Poll until the worker fires its first LLM call. Bare sleeps were
# flaky on slow Windows CI; loop with a generous deadline instead.
for _ in range(100):
if llm.stream_calls:
break
await asyncio.sleep(0.05)
assert llm.stream_calls, "Worker never called the LLM"
worker_tools = llm.stream_calls[0]["tools"]
tool_names = [t.name for t in (worker_tools or [])]
assert "report_to_parent" in tool_names
finally:
await colony.stop()