Files
hive/core/tests/test_event_bus.py
2026-04-23 15:34:07 -07:00

920 lines
31 KiB
Python

"""Tests for EventBus pub/sub event system.
Validates subscription management, event publishing, filtering,
concurrency handling, history operations, and convenience publishers.
"""
from __future__ import annotations
import asyncio
from datetime import datetime
import pytest
from framework.host.event_bus import (
AgentEvent,
EventBus,
EventType,
)
# ---------------------------------------------------------------------------
# AgentEvent dataclass tests
# ---------------------------------------------------------------------------
class TestAgentEvent:
"""Tests for AgentEvent dataclass."""
def test_minimal_construction(self):
"""Event can be created with just type and stream_id."""
event = AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="test_stream")
assert event.type == EventType.EXECUTION_STARTED
assert event.stream_id == "test_stream"
assert event.node_id is None
assert event.execution_id is None
assert event.data == {}
assert event.correlation_id is None
def test_full_construction(self):
"""Event stores all provided fields."""
event = AgentEvent(
type=EventType.TOOL_CALL_COMPLETED,
stream_id="stream_1",
node_id="node_1",
execution_id="exec_123",
data={"result": "success"},
correlation_id="corr_456",
)
assert event.type == EventType.TOOL_CALL_COMPLETED
assert event.stream_id == "stream_1"
assert event.node_id == "node_1"
assert event.execution_id == "exec_123"
assert event.data == {"result": "success"}
assert event.correlation_id == "corr_456"
def test_timestamp_auto_generated(self):
"""Timestamp is auto-generated if not provided."""
before = datetime.now()
event = AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="test")
after = datetime.now()
assert before <= event.timestamp <= after
def test_to_dict_serialization(self):
"""Event can be serialized to dictionary."""
event = AgentEvent(
type=EventType.EXECUTION_COMPLETED,
stream_id="stream_1",
node_id="node_1",
execution_id="exec_1",
data={"output": "result"},
correlation_id="corr_1",
colony_id="colony_1",
)
d = event.to_dict()
assert d["type"] == "execution_completed"
assert d["stream_id"] == "stream_1"
assert d["colony_id"] == "colony_1"
def test_to_dict_includes_run_id(self):
"""run_id is included in to_dict() when set."""
event = AgentEvent(
type=EventType.EXECUTION_STARTED,
stream_id="s1",
run_id="run-abc",
)
d = event.to_dict()
assert d["run_id"] == "run-abc"
def test_to_dict_omits_run_id_when_none(self):
"""run_id is omitted from to_dict() when None."""
event = AgentEvent(
type=EventType.EXECUTION_STARTED,
stream_id="s1",
)
d = event.to_dict()
assert "run_id" not in d
# ---------------------------------------------------------------------------
# Subscription management tests
# ---------------------------------------------------------------------------
class TestSubscriptionManagement:
"""Tests for subscribe/unsubscribe operations."""
def test_subscribe_returns_id(self):
"""subscribe() returns a subscription ID."""
bus = EventBus()
async def handler(event: AgentEvent) -> None:
pass
sub_id = bus.subscribe(event_types=[EventType.EXECUTION_STARTED], handler=handler)
assert sub_id.startswith("sub_")
def test_subscribe_increments_id(self):
"""Each subscription gets a unique incremented ID."""
bus = EventBus()
async def handler(event: AgentEvent) -> None:
pass
id1 = bus.subscribe(event_types=[EventType.EXECUTION_STARTED], handler=handler)
id2 = bus.subscribe(event_types=[EventType.EXECUTION_COMPLETED], handler=handler)
id3 = bus.subscribe(event_types=[EventType.EXECUTION_FAILED], handler=handler)
assert id1 == "sub_1"
assert id2 == "sub_2"
assert id3 == "sub_3"
def test_unsubscribe_removes_subscription(self):
"""unsubscribe() removes the subscription."""
bus = EventBus()
async def handler(event: AgentEvent) -> None:
pass
sub_id = bus.subscribe(event_types=[EventType.EXECUTION_STARTED], handler=handler)
assert sub_id in bus._subscriptions
result = bus.unsubscribe(sub_id)
assert result is True
assert sub_id not in bus._subscriptions
def test_unsubscribe_nonexistent_returns_false(self):
"""unsubscribe() returns False for non-existent subscription."""
bus = EventBus()
result = bus.unsubscribe("sub_nonexistent")
assert result is False
def test_multiple_subscriptions_same_event_type(self):
"""Multiple handlers can subscribe to the same event type."""
bus = EventBus()
received = []
async def handler1(event: AgentEvent) -> None:
received.append("handler1")
async def handler2(event: AgentEvent) -> None:
received.append("handler2")
bus.subscribe(event_types=[EventType.EXECUTION_STARTED], handler=handler1)
bus.subscribe(event_types=[EventType.EXECUTION_STARTED], handler=handler2)
assert len(bus._subscriptions) == 2
# ---------------------------------------------------------------------------
# Event publishing tests
# ---------------------------------------------------------------------------
class TestEventPublishing:
"""Tests for publish() and event delivery."""
@pytest.mark.asyncio
async def test_publish_delivers_to_subscriber(self):
"""Published events are delivered to matching subscribers."""
bus = EventBus()
received_events = []
async def handler(event: AgentEvent) -> None:
received_events.append(event)
bus.subscribe(event_types=[EventType.EXECUTION_STARTED], handler=handler)
event = AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="test")
await bus.publish(event)
assert len(received_events) == 1
assert received_events[0] == event
@pytest.mark.asyncio
async def test_publish_to_multiple_subscribers(self):
"""Event is delivered to all matching subscribers."""
bus = EventBus()
received = []
async def handler1(event: AgentEvent) -> None:
received.append("h1")
async def handler2(event: AgentEvent) -> None:
received.append("h2")
bus.subscribe(event_types=[EventType.EXECUTION_STARTED], handler=handler1)
bus.subscribe(event_types=[EventType.EXECUTION_STARTED], handler=handler2)
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="test"))
assert "h1" in received
assert "h2" in received
@pytest.mark.asyncio
async def test_publish_non_matching_type_not_delivered(self):
"""Events with non-matching types are not delivered."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event)
bus.subscribe(event_types=[EventType.EXECUTION_STARTED], handler=handler)
await bus.publish(AgentEvent(type=EventType.EXECUTION_COMPLETED, stream_id="test"))
assert len(received) == 0
@pytest.mark.asyncio
async def test_publish_adds_to_history(self):
"""Published events are added to history."""
bus = EventBus()
event = AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="test")
await bus.publish(event)
history = bus.get_history()
assert len(history) == 1
assert history[0] == event
# ---------------------------------------------------------------------------
# Filter tests
# ---------------------------------------------------------------------------
class TestEventFiltering:
"""Tests for subscription filters."""
@pytest.mark.asyncio
async def test_filter_by_stream(self):
"""filter_stream only receives events from that stream."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event.stream_id)
bus.subscribe(
event_types=[EventType.EXECUTION_STARTED],
handler=handler,
filter_stream="stream_a",
)
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="stream_a"))
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="stream_b"))
assert received == ["stream_a"]
@pytest.mark.asyncio
async def test_filter_by_node(self):
"""filter_node only receives events from that node."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event.node_id)
bus.subscribe(
event_types=[EventType.NODE_LOOP_STARTED],
handler=handler,
filter_node="node_x",
)
await bus.publish(AgentEvent(type=EventType.NODE_LOOP_STARTED, stream_id="s", node_id="node_x"))
await bus.publish(AgentEvent(type=EventType.NODE_LOOP_STARTED, stream_id="s", node_id="node_y"))
assert received == ["node_x"]
@pytest.mark.asyncio
async def test_filter_by_execution(self):
"""filter_execution only receives events from that execution."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event.execution_id)
bus.subscribe(
event_types=[EventType.EXECUTION_COMPLETED],
handler=handler,
filter_execution="exec_1",
)
await bus.publish(AgentEvent(type=EventType.EXECUTION_COMPLETED, stream_id="s", execution_id="exec_1"))
await bus.publish(AgentEvent(type=EventType.EXECUTION_COMPLETED, stream_id="s", execution_id="exec_2"))
assert received == ["exec_1"]
@pytest.mark.asyncio
async def test_combined_filters(self):
"""Multiple filters are AND-ed together."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(True)
bus.subscribe(
event_types=[EventType.TOOL_CALL_COMPLETED],
handler=handler,
filter_stream="stream_1",
filter_node="node_1",
)
# Matches both filters
await bus.publish(
AgentEvent(
type=EventType.TOOL_CALL_COMPLETED,
stream_id="stream_1",
node_id="node_1",
)
)
# Matches stream but not node
await bus.publish(
AgentEvent(
type=EventType.TOOL_CALL_COMPLETED,
stream_id="stream_1",
node_id="node_2",
)
)
# Matches node but not stream
await bus.publish(
AgentEvent(
type=EventType.TOOL_CALL_COMPLETED,
stream_id="stream_2",
node_id="node_1",
)
)
assert len(received) == 1
@pytest.mark.asyncio
async def test_filter_by_colony(self):
"""filter_colony only receives events from that colony."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event.colony_id)
bus.subscribe(
event_types=[EventType.EXECUTION_STARTED],
handler=handler,
filter_colony="colony_a",
)
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="s", colony_id="colony_a"))
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="s", colony_id="colony_b"))
assert received == ["colony_a"]
# ---------------------------------------------------------------------------
# Concurrency tests
# ---------------------------------------------------------------------------
class TestConcurrency:
"""Tests for concurrent handler execution."""
@pytest.mark.asyncio
async def test_handler_error_doesnt_crash_others(self):
"""One handler's error doesn't prevent other handlers from running."""
bus = EventBus()
results = []
async def failing_handler(event: AgentEvent) -> None:
raise ValueError("Handler error!")
async def working_handler(event: AgentEvent) -> None:
results.append("success")
bus.subscribe(event_types=[EventType.EXECUTION_STARTED], handler=failing_handler)
bus.subscribe(event_types=[EventType.EXECUTION_STARTED], handler=working_handler)
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="test"))
assert "success" in results
@pytest.mark.asyncio
async def test_max_concurrent_handlers_respected(self):
"""Semaphore limits concurrent handler executions."""
bus = EventBus(max_concurrent_handlers=2)
concurrent_count = 0
max_concurrent = 0
async def slow_handler(event: AgentEvent) -> None:
nonlocal concurrent_count, max_concurrent
concurrent_count += 1
max_concurrent = max(max_concurrent, concurrent_count)
await asyncio.sleep(0.1)
concurrent_count -= 1
# Subscribe 5 handlers
for _ in range(5):
bus.subscribe(event_types=[EventType.EXECUTION_STARTED], handler=slow_handler)
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="test"))
# Max concurrent should be limited to 2
assert max_concurrent <= 2
# ---------------------------------------------------------------------------
# History and query tests
# ---------------------------------------------------------------------------
class TestHistoryAndQueries:
"""Tests for get_history() and get_stats()."""
@pytest.mark.asyncio
async def test_history_returns_events_most_recent_first(self):
"""get_history() returns events in reverse chronological order."""
bus = EventBus()
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="s1"))
await bus.publish(AgentEvent(type=EventType.EXECUTION_COMPLETED, stream_id="s2"))
await bus.publish(AgentEvent(type=EventType.EXECUTION_FAILED, stream_id="s3"))
history = bus.get_history()
assert history[0].stream_id == "s3" # Most recent
assert history[1].stream_id == "s2"
assert history[2].stream_id == "s1" # Oldest
@pytest.mark.asyncio
async def test_history_filter_by_event_type(self):
"""get_history() can filter by event type."""
bus = EventBus()
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="s"))
await bus.publish(AgentEvent(type=EventType.EXECUTION_COMPLETED, stream_id="s"))
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="s"))
history = bus.get_history(event_type=EventType.EXECUTION_STARTED)
assert len(history) == 2
assert all(e.type == EventType.EXECUTION_STARTED for e in history)
@pytest.mark.asyncio
async def test_history_filter_by_stream_id(self):
"""get_history() can filter by stream_id."""
bus = EventBus()
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="stream_a"))
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="stream_b"))
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="stream_a"))
history = bus.get_history(stream_id="stream_a")
assert len(history) == 2
assert all(e.stream_id == "stream_a" for e in history)
@pytest.mark.asyncio
async def test_history_limit(self):
"""get_history() respects limit parameter."""
bus = EventBus()
for i in range(10):
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id=f"s{i}"))
history = bus.get_history(limit=3)
assert len(history) == 3
@pytest.mark.asyncio
async def test_max_history_enforced(self):
"""EventBus enforces max_history limit."""
bus = EventBus(max_history=5)
for i in range(10):
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id=f"s{i}"))
assert len(bus._event_history) == 5
# Should have the 5 most recent
assert bus._event_history[-1].stream_id == "s9"
assert bus._event_history[0].stream_id == "s5"
@pytest.mark.asyncio
async def test_get_stats(self):
"""get_stats() returns accurate statistics."""
bus = EventBus()
async def handler(event: AgentEvent) -> None:
pass
bus.subscribe(event_types=[EventType.EXECUTION_STARTED], handler=handler)
bus.subscribe(event_types=[EventType.EXECUTION_COMPLETED], handler=handler)
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="s"))
await bus.publish(AgentEvent(type=EventType.EXECUTION_STARTED, stream_id="s"))
await bus.publish(AgentEvent(type=EventType.EXECUTION_COMPLETED, stream_id="s"))
stats = bus.get_stats()
assert stats["total_events"] == 3
assert stats["subscriptions"] == 2
assert stats["events_by_type"]["execution_started"] == 2
assert stats["events_by_type"]["execution_completed"] == 1
# ---------------------------------------------------------------------------
# Wait operations tests
# ---------------------------------------------------------------------------
class TestWaitOperations:
"""Tests for wait_for() async waiting."""
@pytest.mark.asyncio
async def test_wait_for_receives_event(self):
"""wait_for() returns when matching event is published."""
bus = EventBus()
async def publish_later():
await asyncio.sleep(0.05)
await bus.publish(
AgentEvent(
type=EventType.EXECUTION_COMPLETED,
stream_id="test",
execution_id="exec_1",
)
)
asyncio.create_task(publish_later())
event = await bus.wait_for(
event_type=EventType.EXECUTION_COMPLETED,
timeout=1.0,
)
assert event is not None
assert event.type == EventType.EXECUTION_COMPLETED
@pytest.mark.asyncio
async def test_wait_for_timeout_returns_none(self):
"""wait_for() returns None on timeout."""
bus = EventBus()
event = await bus.wait_for(
event_type=EventType.EXECUTION_COMPLETED,
timeout=0.05,
)
assert event is None
@pytest.mark.asyncio
async def test_wait_for_with_filters(self):
"""wait_for() respects filters."""
bus = EventBus()
async def publish_events():
await asyncio.sleep(0.02)
# This one shouldn't match
await bus.publish(
AgentEvent(
type=EventType.EXECUTION_COMPLETED,
stream_id="wrong_stream",
)
)
await asyncio.sleep(0.02)
# This one should match
await bus.publish(
AgentEvent(
type=EventType.EXECUTION_COMPLETED,
stream_id="correct_stream",
)
)
asyncio.create_task(publish_events())
event = await bus.wait_for(
event_type=EventType.EXECUTION_COMPLETED,
stream_id="correct_stream",
timeout=1.0,
)
assert event is not None
assert event.stream_id == "correct_stream"
@pytest.mark.asyncio
async def test_wait_for_cleans_up_subscription(self):
"""wait_for() removes its subscription after completion."""
bus = EventBus()
initial_count = len(bus._subscriptions)
async def publish_later():
await asyncio.sleep(0.02)
await bus.publish(AgentEvent(type=EventType.EXECUTION_COMPLETED, stream_id="s"))
asyncio.create_task(publish_later())
await bus.wait_for(event_type=EventType.EXECUTION_COMPLETED, timeout=1.0)
assert len(bus._subscriptions) == initial_count
# ---------------------------------------------------------------------------
# Convenience publisher tests
# ---------------------------------------------------------------------------
class TestConveniencePublishers:
"""Tests for emit_* convenience methods."""
@pytest.mark.asyncio
async def test_emit_execution_started(self):
"""emit_execution_started publishes correct event."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event)
bus.subscribe(event_types=[EventType.EXECUTION_STARTED], handler=handler)
await bus.emit_execution_started(
stream_id="test_stream",
execution_id="exec_1",
input_data={"key": "value"},
correlation_id="corr_1",
)
assert len(received) == 1
assert received[0].type == EventType.EXECUTION_STARTED
assert received[0].stream_id == "test_stream"
assert received[0].execution_id == "exec_1"
assert received[0].data == {"input": {"key": "value"}}
assert received[0].correlation_id == "corr_1"
@pytest.mark.asyncio
async def test_emit_execution_completed(self):
"""emit_execution_completed publishes correct event."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event)
bus.subscribe(event_types=[EventType.EXECUTION_COMPLETED], handler=handler)
await bus.emit_execution_completed(
stream_id="s",
execution_id="e",
output={"result": "success"},
)
assert len(received) == 1
assert received[0].type == EventType.EXECUTION_COMPLETED
assert received[0].data == {"output": {"result": "success"}}
@pytest.mark.asyncio
async def test_emit_execution_failed(self):
"""emit_execution_failed publishes correct event."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event)
bus.subscribe(event_types=[EventType.EXECUTION_FAILED], handler=handler)
await bus.emit_execution_failed(
stream_id="s",
execution_id="e",
error="Something went wrong",
)
assert len(received) == 1
assert received[0].type == EventType.EXECUTION_FAILED
assert received[0].data == {"error": "Something went wrong"}
@pytest.mark.asyncio
async def test_emit_tool_call_started(self):
"""emit_tool_call_started publishes correct event."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event)
bus.subscribe(event_types=[EventType.TOOL_CALL_STARTED], handler=handler)
await bus.emit_tool_call_started(
stream_id="s",
node_id="n",
tool_use_id="tool_1",
tool_name="web_search",
tool_input={"query": "test"},
)
assert len(received) == 1
assert received[0].data["tool_name"] == "web_search"
assert received[0].data["tool_input"] == {"query": "test"}
@pytest.mark.asyncio
async def test_emit_tool_call_completed(self):
"""emit_tool_call_completed publishes correct event."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event)
bus.subscribe(event_types=[EventType.TOOL_CALL_COMPLETED], handler=handler)
await bus.emit_tool_call_completed(
stream_id="s",
node_id="n",
tool_use_id="tool_1",
tool_name="web_search",
result="search results",
is_error=False,
)
assert len(received) == 1
assert received[0].data["result"] == "search results"
assert received[0].data["is_error"] is False
@pytest.mark.asyncio
async def test_emit_webhook_received(self):
"""emit_webhook_received publishes correct event."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event)
bus.subscribe(event_types=[EventType.WEBHOOK_RECEIVED], handler=handler)
await bus.emit_webhook_received(
source_id="webhook_1",
path="/api/webhook",
method="POST",
headers={"Content-Type": "application/json"},
payload={"data": "test"},
query_params={"token": "abc"},
)
assert len(received) == 1
assert received[0].data["path"] == "/api/webhook"
assert received[0].data["method"] == "POST"
assert received[0].data["payload"] == {"data": "test"}
@pytest.mark.asyncio
async def test_emit_tool_doom_loop(self):
"""emit_tool_doom_loop publishes correct event."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event)
bus.subscribe(event_types=[EventType.NODE_TOOL_DOOM_LOOP], handler=handler)
await bus.emit_tool_doom_loop(
stream_id="test_stream",
node_id="node_1",
description="Tool called same endpoint 5 times",
execution_id="exec_1",
)
assert len(received) == 1
assert received[0].type == EventType.NODE_TOOL_DOOM_LOOP
assert received[0].stream_id == "test_stream"
assert received[0].node_id == "node_1"
assert received[0].data["description"] == "Tool called same endpoint 5 times"
@pytest.mark.asyncio
async def test_emit_escalation_requested(self):
"""emit_escalation_requested publishes correct event."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event)
bus.subscribe(event_types=[EventType.ESCALATION_REQUESTED], handler=handler)
await bus.emit_escalation_requested(
stream_id="test_stream",
node_id="node_1",
reason="Need human intervention",
context="Complex decision required",
execution_id="exec_1",
)
assert len(received) == 1
assert received[0].type == EventType.ESCALATION_REQUESTED
assert received[0].stream_id == "test_stream"
assert received[0].node_id == "node_1"
assert received[0].data["reason"] == "Need human intervention"
assert received[0].data["context"] == "Complex decision required"
@pytest.mark.asyncio
async def test_emit_llm_turn_complete(self):
"""emit_llm_turn_complete publishes correct event."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event)
bus.subscribe(event_types=[EventType.LLM_TURN_COMPLETE], handler=handler)
await bus.emit_llm_turn_complete(
stream_id="test_stream",
node_id="node_1",
stop_reason="end_turn",
model="claude-sonnet-4-20250514",
input_tokens=100,
output_tokens=50,
cached_tokens=30,
cache_creation_tokens=10,
cost_usd=0.0042,
execution_id="exec_1",
iteration=3,
)
assert len(received) == 1
assert received[0].type == EventType.LLM_TURN_COMPLETE
assert received[0].data["stop_reason"] == "end_turn"
assert received[0].data["model"] == "claude-sonnet-4-20250514"
assert received[0].data["input_tokens"] == 100
assert received[0].data["output_tokens"] == 50
# cached / cache_creation are subsets of input — propagated for
# display, NOT additive to input_tokens.
assert received[0].data["cached_tokens"] == 30
assert received[0].data["cache_creation_tokens"] == 10
assert received[0].data["cost_usd"] == 0.0042
assert received[0].data["iteration"] == 3
@pytest.mark.asyncio
async def test_emit_node_action_plan(self):
"""emit_node_action_plan publishes correct event."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event)
bus.subscribe(event_types=[EventType.NODE_ACTION_PLAN], handler=handler)
await bus.emit_node_action_plan(
stream_id="test_stream",
node_id="node_1",
plan="1. Search for data\n2. Analyze results\n3. Generate report",
execution_id="exec_1",
)
assert len(received) == 1
assert received[0].type == EventType.NODE_ACTION_PLAN
assert received[0].data["plan"] == "1. Search for data\n2. Analyze results\n3. Generate report"
@pytest.mark.asyncio
async def test_emit_subagent_report(self):
"""emit_subagent_report publishes correct event."""
bus = EventBus()
received = []
async def handler(event: AgentEvent) -> None:
received.append(event)
bus.subscribe(event_types=[EventType.SUBAGENT_REPORT], handler=handler)
await bus.emit_subagent_report(
stream_id="test_stream",
node_id="queen",
subagent_id="worker-1",
message="Task 50% complete",
data={"progress": 0.5},
)
assert len(received) == 1
assert received[0].type == EventType.SUBAGENT_REPORT
assert received[0].data["subagent_id"] == "worker-1"
assert received[0].data["message"] == "Task 50% complete"
assert received[0].data["data"]["progress"] == 0.5
# ---------------------------------------------------------------------------
# EventType enum tests
# ---------------------------------------------------------------------------
class TestEventType:
"""Tests for EventType enum."""
def test_all_event_types_are_strings(self):
"""All EventType values are strings."""
for event_type in EventType:
assert isinstance(event_type.value, str)
def test_event_types_are_unique(self):
"""All EventType values are unique."""
values = [e.value for e in EventType]
assert len(values) == len(set(values))
def test_key_event_types_exist(self):
"""Key event types are defined."""
assert EventType.EXECUTION_STARTED
assert EventType.EXECUTION_COMPLETED
assert EventType.EXECUTION_FAILED
assert EventType.EXECUTION_PAUSED
assert EventType.EXECUTION_RESUMED
assert EventType.TOOL_CALL_STARTED
assert EventType.TOOL_CALL_COMPLETED
assert EventType.WEBHOOK_RECEIVED
assert EventType.NODE_TOOL_DOOM_LOOP
assert EventType.ESCALATION_REQUESTED
assert EventType.LLM_TURN_COMPLETE
assert EventType.NODE_ACTION_PLAN
assert EventType.WORKER_COLONY_LOADED
assert EventType.CREDENTIALS_REQUIRED
assert EventType.EXECUTION_RESURRECTED
assert EventType.QUEEN_PHASE_CHANGED
assert EventType.SUBAGENT_REPORT
assert EventType.TRIGGER_AVAILABLE
assert EventType.TRIGGER_FIRED