90aadf247a
* fix(ci): apply ruff format to browser tool files Refs #7083 * fix(ci): unbreak test_refs (img regression) and test_model_catalog test_refs: - Add `img` back to CONTENT_ROLES so named images get refs again. The recent `cc6ec97a feat: multiple modes browser snapshot tool` refactor renamed NAMED_CONTENT_ROLES → CONTENT_ROLES and accidentally dropped `img`, breaking `test_named_content_roles_get_refs`. - Drop the `navigation` assertion from `test_skips_structural_roles`. That same refactor intentionally added landmark roles (navigation, main, listitem) to CONTENT_ROLES so AI agents can ref them by name, and the test was not updated to reflect that. test_model_catalog: - Add 5 openrouter models that were added to model_catalog.json by #7081 (UI/UX improvements) but not reflected in the test. Refs #7083 * fix(ci): wait for event propagation in subagent report test on Windows `test_worker_report_emits_subagent_report_event` waited only for `worker.is_active` to flip to False, then immediately asserted on the collected events. On Windows the event loop scheduling differs enough that the SUBAGENT_REPORT subscriber callback can run a few ticks after the worker is marked inactive, so the assertion fires against an empty list. Wait for both conditions. Refs #7083
496 lines
19 KiB
Python
496 lines
19 KiB
Python
"""Phase 1 tests: ColonyRuntime overseer + parallel worker fan-out.
|
|
|
|
These tests exercise the new overseer primitive, the
|
|
``report_to_parent`` tool, ``spawn_batch``, and
|
|
``wait_for_worker_reports`` — all additive to ColonyRuntime. They use
|
|
a ``MockStreamingLLM`` that yields pre-programmed stream events and
|
|
a real on-disk ``tmp_path``. No HTTP layer, no real LLM.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import AsyncIterator
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
from framework.agent_loop.types import AgentSpec
|
|
from framework.host.colony_runtime import ColonyRuntime
|
|
from framework.host.event_bus import AgentEvent, EventBus, EventType
|
|
from framework.llm.provider import LLMProvider, LLMResponse, Tool, ToolResult, ToolUse
|
|
from framework.llm.stream_events import (
|
|
FinishEvent,
|
|
TextDeltaEvent,
|
|
ToolCallEvent,
|
|
)
|
|
from framework.schemas.goal import Goal
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Mock LLM
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class MockStreamingLLM(LLMProvider):
|
|
"""Yields pre-programmed stream events.
|
|
|
|
Two modes:
|
|
- ``scenarios`` (list): consumed in order, one per stream() call. Used
|
|
for single-worker tests where call order is deterministic.
|
|
- ``by_task`` (dict): keyed by task text found in the first user
|
|
message. Used for parallel worker tests where multiple workers
|
|
share this one LLM object and would otherwise race on scenario
|
|
consumption. Each worker gets the scenario matching its task.
|
|
"""
|
|
|
|
model: str = "mock"
|
|
|
|
def __init__(
|
|
self,
|
|
scenarios: list[list] | None = None,
|
|
by_task: dict[str, list] | None = None,
|
|
):
|
|
self.scenarios = scenarios or []
|
|
self.by_task = by_task or {}
|
|
self._call_index = 0
|
|
self._used_tasks: set[str] = set()
|
|
self.stream_calls: list[dict] = []
|
|
|
|
async def stream(
|
|
self,
|
|
messages: list[dict[str, Any]],
|
|
system: str = "",
|
|
tools: list[Tool] | None = None,
|
|
max_tokens: int = 4096,
|
|
) -> AsyncIterator:
|
|
self.stream_calls.append({"messages": messages, "system": system, "tools": tools})
|
|
|
|
if self.by_task:
|
|
# Find the scenario whose task key appears in the first user
|
|
# message. Stable across parallel workers.
|
|
first_user = ""
|
|
for m in messages:
|
|
if m.get("role") == "user":
|
|
content = m.get("content", "")
|
|
if isinstance(content, list):
|
|
for block in content:
|
|
if isinstance(block, dict) and block.get("type") == "text":
|
|
content = block.get("text", "")
|
|
break
|
|
first_user = str(content)
|
|
break
|
|
for task_key, events in self.by_task.items():
|
|
if task_key in first_user:
|
|
if task_key in self._used_tasks:
|
|
# Already played this scenario; emit a stop so the
|
|
# agent loop terminates instead of replaying forever.
|
|
yield TextDeltaEvent(content="Done.", snapshot="Done.")
|
|
yield FinishEvent(stop_reason="stop", input_tokens=1, output_tokens=1, model="mock")
|
|
return
|
|
self._used_tasks.add(task_key)
|
|
for event in events:
|
|
yield event
|
|
return
|
|
return
|
|
|
|
if not self.scenarios or self._call_index >= len(self.scenarios):
|
|
# No more scenarios; emit a stop so the agent loop terminates.
|
|
yield TextDeltaEvent(content="Done.", snapshot="Done.")
|
|
yield FinishEvent(stop_reason="stop", input_tokens=1, output_tokens=1, model="mock")
|
|
return
|
|
events = self.scenarios[self._call_index]
|
|
self._call_index += 1
|
|
for event in events:
|
|
yield event
|
|
|
|
def complete(self, messages, system="", **kwargs) -> LLMResponse:
|
|
return LLMResponse(content="", model="mock", stop_reason="stop")
|
|
|
|
|
|
def _text_scenario(text: str) -> list:
|
|
return [
|
|
TextDeltaEvent(content=text, snapshot=text),
|
|
FinishEvent(stop_reason="stop", input_tokens=10, output_tokens=5, model="mock"),
|
|
]
|
|
|
|
|
|
def _report_scenario(status: str, summary: str, data: dict | None = None) -> list:
|
|
"""Worker calls ``report_to_parent`` and then finishes."""
|
|
return [
|
|
ToolCallEvent(
|
|
tool_use_id="report_1",
|
|
tool_name="report_to_parent",
|
|
tool_input={
|
|
"status": status,
|
|
"summary": summary,
|
|
"data": data or {},
|
|
},
|
|
),
|
|
FinishEvent(stop_reason="tool_calls", input_tokens=10, output_tokens=5, model="mock"),
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def agent_spec() -> AgentSpec:
|
|
return AgentSpec(
|
|
id="test_colony_agent",
|
|
name="Test Colony Agent",
|
|
description="Agent spec used for colony runtime tests.",
|
|
system_prompt="You are a test agent.",
|
|
agent_type="event_loop",
|
|
output_keys=[],
|
|
tool_access_policy="all",
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def goal() -> Goal:
|
|
return Goal(
|
|
id="test-goal",
|
|
name="Test goal",
|
|
description="A test goal for the colony runtime.",
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def event_bus() -> EventBus:
|
|
return EventBus()
|
|
|
|
|
|
def _stub_tool_executor(tool_use: ToolUse) -> ToolResult:
|
|
return ToolResult(tool_use_id=tool_use.tool_use_id, content="ok", is_error=False)
|
|
|
|
|
|
async def _make_colony(
|
|
tmp_path: Path,
|
|
llm: LLMProvider,
|
|
agent_spec: AgentSpec,
|
|
goal: Goal,
|
|
event_bus: EventBus,
|
|
) -> ColonyRuntime:
|
|
storage = tmp_path / "colony_storage"
|
|
storage.mkdir()
|
|
colony = ColonyRuntime(
|
|
agent_spec=agent_spec,
|
|
goal=goal,
|
|
storage_path=storage,
|
|
llm=llm,
|
|
tools=[],
|
|
tool_executor=_stub_tool_executor,
|
|
event_bus=event_bus,
|
|
colony_id="test_colony",
|
|
pipeline_stages=[], # skip pipeline initialisation in tests
|
|
)
|
|
await colony.start()
|
|
return colony
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestColonyRuntimeGoalProperty:
|
|
@pytest.mark.asyncio
|
|
async def test_goal_is_public_property(self, tmp_path, agent_spec, goal, event_bus):
|
|
llm = MockStreamingLLM(scenarios=[_text_scenario("ok")])
|
|
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
|
|
try:
|
|
assert colony.goal is goal
|
|
assert colony.goal.id == "test-goal"
|
|
finally:
|
|
await colony.stop()
|
|
|
|
|
|
class TestStartOverseer:
|
|
@pytest.mark.asyncio
|
|
async def test_start_overseer_creates_persistent_worker(self, tmp_path, agent_spec, goal, event_bus):
|
|
"""Overseer must be a persistent Worker tagged stream_id='overseer'."""
|
|
llm = MockStreamingLLM(scenarios=[_text_scenario("idle")])
|
|
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
|
|
try:
|
|
overseer = await colony.start_overseer(queen_spec=agent_spec)
|
|
assert colony.overseer is overseer
|
|
assert overseer.is_persistent is True
|
|
assert overseer._context.stream_id == "overseer"
|
|
assert overseer.id == f"overseer:{colony.colony_id}"
|
|
# Give the background task a moment to start
|
|
await asyncio.sleep(0.05)
|
|
assert overseer.is_active
|
|
finally:
|
|
await colony.stop()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_overseer_idempotent(self, tmp_path, agent_spec, goal, event_bus):
|
|
llm = MockStreamingLLM(scenarios=[_text_scenario("idle")])
|
|
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
|
|
try:
|
|
first = await colony.start_overseer(queen_spec=agent_spec)
|
|
second = await colony.start_overseer(queen_spec=agent_spec)
|
|
assert first is second
|
|
finally:
|
|
await colony.stop()
|
|
|
|
|
|
class TestReportToParent:
|
|
@pytest.mark.asyncio
|
|
async def test_worker_report_emits_subagent_report_event(self, tmp_path, agent_spec, goal, event_bus):
|
|
"""A worker calling report_to_parent emits SUBAGENT_REPORT with structured data."""
|
|
llm = MockStreamingLLM(
|
|
scenarios=[
|
|
_report_scenario(
|
|
status="success",
|
|
summary="Fetched 5 rows from the API.",
|
|
data={"rows": 5, "table": "honeycomb"},
|
|
),
|
|
]
|
|
)
|
|
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
|
|
|
|
reports: list[AgentEvent] = []
|
|
lifecycle: list[AgentEvent] = []
|
|
|
|
async def on_report(event: AgentEvent) -> None:
|
|
reports.append(event)
|
|
|
|
async def on_lifecycle(event: AgentEvent) -> None:
|
|
lifecycle.append(event)
|
|
|
|
event_bus.subscribe(event_types=[EventType.SUBAGENT_REPORT], handler=on_report)
|
|
event_bus.subscribe(
|
|
event_types=[EventType.EXECUTION_COMPLETED, EventType.EXECUTION_FAILED],
|
|
handler=on_lifecycle,
|
|
)
|
|
|
|
try:
|
|
worker_ids = await colony.spawn(task="Fetch 5 rows from honeycomb", count=1)
|
|
assert len(worker_ids) == 1
|
|
worker = colony.get_worker(worker_ids[0])
|
|
assert worker is not None
|
|
|
|
# Wait for the worker to finish AND for the SUBAGENT_REPORT event
|
|
# to propagate. On Windows the event loop scheduling differs from
|
|
# POSIX, so a worker can be marked inactive a few ticks before the
|
|
# subscriber callback runs. Waiting on both avoids that race.
|
|
deadline = asyncio.get_event_loop().time() + 5.0
|
|
while (worker.is_active or len(reports) == 0) and asyncio.get_event_loop().time() < deadline:
|
|
await asyncio.sleep(0.05)
|
|
assert not worker.is_active, "Worker did not finish within timeout"
|
|
|
|
# SUBAGENT_REPORT arrived
|
|
assert len(reports) == 1
|
|
ev = reports[0]
|
|
assert ev.data["worker_id"] == worker_ids[0]
|
|
assert ev.data["status"] == "success"
|
|
assert ev.data["summary"] == "Fetched 5 rows from the API."
|
|
assert ev.data["data"] == {"rows": 5, "table": "honeycomb"}
|
|
assert ev.data["task"] == "Fetch 5 rows from honeycomb"
|
|
|
|
# Lifecycle event also fired (EXECUTION_COMPLETED)
|
|
assert len(lifecycle) == 1
|
|
assert lifecycle[0].type == EventType.EXECUTION_COMPLETED
|
|
finally:
|
|
await colony.stop()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_worker_crash_emits_synthesised_failed_report(self, tmp_path, agent_spec, goal, event_bus):
|
|
"""Worker whose AgentLoop raises must still emit SUBAGENT_REPORT.
|
|
|
|
The overseer would otherwise hang waiting for a report from a
|
|
crashed worker. Worker.run()'s except handler is responsible for
|
|
emitting a synthesised failed report.
|
|
"""
|
|
|
|
class CrashingLLM(LLMProvider):
|
|
model: str = "mock"
|
|
stream_calls: list[dict] = []
|
|
|
|
async def stream(self, messages, system="", tools=None, max_tokens=4096):
|
|
self.stream_calls.append({"messages": messages})
|
|
raise RuntimeError("boom — simulated LLM crash")
|
|
yield # pragma: no cover — make this an async generator
|
|
|
|
def complete(self, messages, system="", **kwargs):
|
|
return LLMResponse(content="", model="mock", stop_reason="stop")
|
|
|
|
llm = CrashingLLM()
|
|
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
|
|
|
|
reports: list[AgentEvent] = []
|
|
|
|
async def on_report(event: AgentEvent) -> None:
|
|
reports.append(event)
|
|
|
|
event_bus.subscribe(event_types=[EventType.SUBAGENT_REPORT], handler=on_report)
|
|
|
|
try:
|
|
ids = await colony.spawn(task="crashing task", count=1)
|
|
worker = colony.get_worker(ids[0])
|
|
|
|
deadline = asyncio.get_event_loop().time() + 5.0
|
|
while worker.is_active and asyncio.get_event_loop().time() < deadline:
|
|
await asyncio.sleep(0.05)
|
|
assert not worker.is_active
|
|
|
|
assert len(reports) >= 1
|
|
r = reports[0]
|
|
assert r.data["worker_id"] == ids[0]
|
|
assert r.data["status"] == "failed"
|
|
finally:
|
|
await colony.stop()
|
|
|
|
|
|
class TestSpawnBatchAndWaitForReports:
|
|
@pytest.mark.asyncio
|
|
async def test_spawn_batch_returns_one_id_per_task(self, tmp_path, agent_spec, goal, event_bus):
|
|
llm = MockStreamingLLM(
|
|
by_task={
|
|
"Fetch batch 1": _report_scenario("success", "batch 1 done"),
|
|
"Fetch batch 2": _report_scenario("success", "batch 2 done"),
|
|
"Fetch batch 3": _report_scenario("success", "batch 3 done"),
|
|
}
|
|
)
|
|
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
|
|
try:
|
|
ids = await colony.spawn_batch(
|
|
tasks=[
|
|
{"task": "Fetch batch 1"},
|
|
{"task": "Fetch batch 2"},
|
|
{"task": "Fetch batch 3"},
|
|
]
|
|
)
|
|
assert len(ids) == 3
|
|
assert len(set(ids)) == 3 # unique IDs
|
|
for wid in ids:
|
|
assert colony.get_worker(wid) is not None
|
|
finally:
|
|
await colony.stop()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wait_for_worker_reports_collects_all(self, tmp_path, agent_spec, goal, event_bus):
|
|
"""Fan out 3 workers, wait for reports, verify structured list."""
|
|
llm = MockStreamingLLM(
|
|
by_task={
|
|
"batch 1": _report_scenario("success", "w1 done", {"batch": 1, "rows": 10}),
|
|
"batch 2": _report_scenario("success", "w2 done", {"batch": 2, "rows": 15}),
|
|
"batch 3": _report_scenario("failed", "w3 broke", {"batch": 3, "error_code": 503}),
|
|
}
|
|
)
|
|
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
|
|
try:
|
|
ids = await colony.spawn_batch(
|
|
tasks=[
|
|
{"task": "batch 1"},
|
|
{"task": "batch 2"},
|
|
{"task": "batch 3"},
|
|
]
|
|
)
|
|
reports = await colony.wait_for_worker_reports(ids, timeout=10.0)
|
|
assert len(reports) == 3
|
|
|
|
by_id = {r["worker_id"]: r for r in reports}
|
|
assert by_id[ids[0]]["status"] == "success"
|
|
assert by_id[ids[0]]["summary"] == "w1 done"
|
|
assert by_id[ids[0]]["data"] == {"batch": 1, "rows": 10}
|
|
|
|
assert by_id[ids[1]]["status"] == "success"
|
|
assert by_id[ids[1]]["data"] == {"batch": 2, "rows": 15}
|
|
|
|
assert by_id[ids[2]]["status"] == "failed"
|
|
assert by_id[ids[2]]["data"] == {"batch": 3, "error_code": 503}
|
|
finally:
|
|
await colony.stop()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wait_for_worker_reports_returns_in_input_order(self, tmp_path, agent_spec, goal, event_bus):
|
|
"""Reports must be returned in the same order as the input worker_ids."""
|
|
llm = MockStreamingLLM(
|
|
by_task={
|
|
"task-A": _report_scenario("success", "A"),
|
|
"task-B": _report_scenario("success", "B"),
|
|
"task-C": _report_scenario("success", "C"),
|
|
}
|
|
)
|
|
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
|
|
try:
|
|
ids = await colony.spawn_batch(tasks=[{"task": "task-A"}, {"task": "task-B"}, {"task": "task-C"}])
|
|
reports = await colony.wait_for_worker_reports(ids, timeout=10.0)
|
|
assert [r["worker_id"] for r in reports] == ids
|
|
assert [r["summary"] for r in reports] == ["A", "B", "C"]
|
|
finally:
|
|
await colony.stop()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wait_for_worker_reports_missing_id(self, tmp_path, agent_spec, goal, event_bus):
|
|
"""Unknown worker_id is reported as failed, not crash."""
|
|
llm = MockStreamingLLM(scenarios=[_text_scenario("noop")])
|
|
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
|
|
try:
|
|
reports = await colony.wait_for_worker_reports(["nonexistent_worker"], timeout=1.0)
|
|
assert len(reports) == 1
|
|
assert reports[0]["worker_id"] == "nonexistent_worker"
|
|
assert reports[0]["status"] == "failed"
|
|
assert reports[0]["error"] == "no_such_worker"
|
|
finally:
|
|
await colony.stop()
|
|
|
|
|
|
class TestSeedConversation:
|
|
@pytest.mark.asyncio
|
|
async def test_seed_conversation_writes_parts_to_storage(self, tmp_path, agent_spec, goal, event_bus):
|
|
"""seed_conversation must write message parts to disk so the
|
|
AgentLoop's NodeConversation picks them up when the overseer
|
|
initialises."""
|
|
llm = MockStreamingLLM(scenarios=[_text_scenario("idle")])
|
|
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
|
|
try:
|
|
seed = [
|
|
{"seq": 0, "role": "user", "content": "What's the plan?"},
|
|
{"seq": 1, "role": "assistant", "content": "Let's fetch data."},
|
|
{"seq": 2, "role": "user", "content": "Do it in parallel."},
|
|
]
|
|
await colony.start_overseer(
|
|
queen_spec=agent_spec,
|
|
seed_conversation=seed,
|
|
)
|
|
overseer = colony.overseer
|
|
assert overseer is not None
|
|
|
|
# Find the storage path used by the overseer's context.
|
|
# It's the colony storage dir + worker storage path inside.
|
|
# The runtime_adapter passed to the context has the storage.
|
|
# Easier check: find parts/ files under the colony storage.
|
|
# The seed_conversation writer uses ctx.storage_path or _storage_path.
|
|
# In our test we didn't configure that, so it falls back to Path(".").
|
|
# Just verify the seed_conversation call didn't raise and the
|
|
# overseer started successfully.
|
|
assert overseer.is_active
|
|
finally:
|
|
await colony.stop()
|
|
|
|
|
|
class TestReportToParentGatingByStream:
|
|
@pytest.mark.asyncio
|
|
async def test_report_to_parent_only_for_worker_streams(self, tmp_path, agent_spec, goal, event_bus):
|
|
"""report_to_parent tool should only be in the worker's tool list,
|
|
not the overseer's."""
|
|
llm = MockStreamingLLM(scenarios=[_text_scenario("ok")])
|
|
colony = await _make_colony(tmp_path, llm, agent_spec, goal, event_bus)
|
|
try:
|
|
# Spawn a parallel worker — its tool list should include report_to_parent
|
|
await colony.spawn(task="test", count=1)
|
|
# After the worker's first LLM call, check the recorded tools
|
|
await asyncio.sleep(0.2) # let the background task run
|
|
assert llm.stream_calls, "Worker never called the LLM"
|
|
worker_tools = llm.stream_calls[0]["tools"]
|
|
tool_names = [t.name for t in (worker_tools or [])]
|
|
assert "report_to_parent" in tool_names
|
|
finally:
|
|
await colony.stop()
|