Files
hive/core/tests/test_event_loop_integration.py
T

1237 lines
39 KiB
Python

"""
Integration tests for EventLoopNode lifecycle
Default: real LLM (cerebras/zai-glm-4.7).
Set HIVE_TEST_LLM_MODE=mock for fast, deterministic, no-API tests.
Set HIVE_TEST_LLM_MODEL=<model> to override the real model.
"""
from __future__ import annotations
import os
from collections.abc import AsyncIterator, Callable
from dataclasses import dataclass
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import pytest
from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec
from framework.graph.event_loop_node import (
EventLoopNode,
JudgeVerdict,
LoopConfig,
)
from framework.graph.executor import GraphExecutor
from framework.graph.goal import Goal
from framework.graph.node import (
NodeContext,
NodeProtocol,
NodeResult,
NodeSpec,
SharedMemory,
)
from framework.llm.provider import LLMProvider, LLMResponse, Tool, ToolResult, ToolUse
from framework.llm.stream_events import (
FinishEvent,
StreamEvent,
TextDeltaEvent,
ToolCallEvent,
)
from framework.runtime.core import Runtime
from framework.runtime.event_bus import AgentEvent, EventBus, EventType
# ---------------------------------------------------------------------------
# Config: mock / real toggle
# ---------------------------------------------------------------------------
USE_MOCK_LLM = os.environ.get("HIVE_TEST_LLM_MODE", "mock").lower() == "mock"
LLM_MODEL = os.environ.get("HIVE_TEST_LLM_MODEL", "cerebras/zai-glm-4.7")
# ---------------------------------------------------------------------------
# ScriptableMockLLMProvider
# ---------------------------------------------------------------------------
@dataclass
class StreamScript:
"""One scripted stream() invocation.
- text only -> yields TextDeltaEvent + FinishEvent (turn ends)
- tool_calls -> yields ToolCallEvent(s) + FinishEvent (node executes tools, calls stream again)
"""
text: str = ""
tool_calls: list[dict] | None = None # [{name, id, input}, ...]
class ScriptableMockLLMProvider(LLMProvider):
"""Mock LLM that plays back a flat list of StreamScript entries.
Each call to stream() pops the next entry and yields the corresponding events.
complete() returns a fixed summary (used by _generate_compaction_summary).
"""
def __init__(self, scripts: list[StreamScript] | None = None):
self._scripts: list[StreamScript] = list(scripts or [])
self._call_index = 0
self.model = "mock-scriptable"
def complete(
self,
messages: list[dict[str, Any]],
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 1024,
response_format: dict[str, Any] | None = None,
json_mode: bool = False,
max_retries: int | None = None,
) -> LLMResponse:
return LLMResponse(
content="Conversation summary for compaction.",
model=self.model,
input_tokens=10,
output_tokens=10,
)
def complete_with_tools(
self,
messages: list[dict[str, Any]],
system: str = "",
tools: list[Tool] | None = None,
tool_executor: Callable[[ToolUse], ToolResult] | None = None,
max_iterations: int = 10,
max_tokens: int = 1024,
) -> LLMResponse:
return self.complete(messages, system, tools, max_tokens)
async def stream(
self,
messages: list[dict[str, Any]],
system: str = "",
tools: list[Tool] | None = None,
max_tokens: int = 4096,
) -> AsyncIterator[StreamEvent]:
if self._call_index >= len(self._scripts):
# Fallback: yield empty text finish so node can terminate
yield TextDeltaEvent(content="(no more scripts)", snapshot="(no more scripts)")
yield FinishEvent(stop_reason="end_turn", input_tokens=5, output_tokens=5)
return
script = self._scripts[self._call_index]
self._call_index += 1
if script.tool_calls:
# Yield tool call events
for tc in script.tool_calls:
yield ToolCallEvent(
tool_use_id=tc.get("id", f"tc_{self._call_index}"),
tool_name=tc["name"],
tool_input=tc.get("input", {}),
)
if script.text:
yield TextDeltaEvent(content=script.text, snapshot=script.text)
yield FinishEvent(stop_reason="tool_use", input_tokens=10, output_tokens=10)
else:
# Text-only response
if script.text:
yield TextDeltaEvent(content=script.text, snapshot=script.text)
yield FinishEvent(stop_reason="end_turn", input_tokens=10, output_tokens=10)
# ---------------------------------------------------------------------------
# MockConversationStore
# ---------------------------------------------------------------------------
class MockConversationStore:
"""In-memory ConversationStore for testing persistence and restore."""
def __init__(self) -> None:
self._parts: dict[int, dict[str, Any]] = {}
self._meta: dict[str, Any] | None = None
self._cursor: dict[str, Any] | None = None
async def write_part(self, seq: int, data: dict[str, Any]) -> None:
self._parts[seq] = data
async def read_parts(self) -> list[dict[str, Any]]:
return [self._parts[k] for k in sorted(self._parts)]
async def write_meta(self, data: dict[str, Any]) -> None:
self._meta = data
async def read_meta(self) -> dict[str, Any] | None:
return self._meta
async def write_cursor(self, data: dict[str, Any]) -> None:
self._cursor = data
async def read_cursor(self) -> dict[str, Any] | None:
return self._cursor
async def delete_parts_before(self, seq: int) -> None:
keys_to_delete = [k for k in self._parts if k < seq]
for k in keys_to_delete:
del self._parts[k]
async def close(self) -> None:
pass
async def destroy(self) -> None:
self._parts.clear()
self._meta = None
self._cursor = None
# ---------------------------------------------------------------------------
# Judge helpers
# ---------------------------------------------------------------------------
class AlwaysAcceptJudge:
"""Judge that always accepts."""
async def evaluate(self, context: dict[str, Any]) -> JudgeVerdict:
return JudgeVerdict(action="ACCEPT")
class AlwaysRetryJudge:
"""Judge that always retries with feedback."""
async def evaluate(self, context: dict[str, Any]) -> JudgeVerdict:
return JudgeVerdict(action="RETRY", feedback="Try harder.")
class CountingJudge:
"""Judge that retries N times then accepts."""
def __init__(self, retry_count: int = 1):
self._retry_count = retry_count
self._calls = 0
async def evaluate(self, context: dict[str, Any]) -> JudgeVerdict:
self._calls += 1
if self._calls <= self._retry_count:
return JudgeVerdict(action="RETRY", feedback=f"Retry {self._calls}")
return JudgeVerdict(action="ACCEPT")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def make_llm(scripts: list[StreamScript] | None = None) -> LLMProvider:
"""Create an LLM provider based on the test mode."""
if USE_MOCK_LLM:
return ScriptableMockLLMProvider(scripts)
# Real mode: use LiteLLM
from framework.llm.litellm import LiteLLMProvider
return LiteLLMProvider(model=LLM_MODEL)
def make_tool_executor(results_map: dict[str, str]) -> Callable:
"""Create a tool executor that returns predetermined results."""
def executor(tool_use: ToolUse) -> ToolResult:
content = results_map.get(tool_use.name, f"Unknown tool: {tool_use.name}")
return ToolResult(
tool_use_id=tool_use.id,
content=content,
is_error=tool_use.name not in results_map,
)
return executor
def make_ctx(
node_id: str = "test_node",
llm: LLMProvider | None = None,
output_keys: list[str] | None = None,
input_keys: list[str] | None = None,
input_data: dict[str, Any] | None = None,
system_prompt: str = "You are a test assistant.",
client_facing: bool = False,
available_tools: list[Tool] | None = None,
) -> NodeContext:
"""Build a NodeContext for direct EventLoopNode testing."""
runtime = MagicMock(spec=Runtime)
runtime.start_run = MagicMock(return_value="run_id")
runtime.decide = MagicMock(return_value="dec_id")
runtime.record_outcome = MagicMock()
runtime.end_run = MagicMock()
runtime.report_problem = MagicMock()
runtime.set_node = MagicMock()
spec = NodeSpec(
id=node_id,
name=f"Test {node_id}",
description="test node",
node_type="event_loop",
output_keys=output_keys or [],
input_keys=input_keys or [],
system_prompt=system_prompt,
client_facing=client_facing,
)
memory = SharedMemory()
return NodeContext(
runtime=runtime,
node_id=node_id,
node_spec=spec,
memory=memory,
input_data=input_data or {},
llm=llm,
available_tools=available_tools or [],
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def runtime():
"""Create a mock Runtime."""
rt = MagicMock(spec=Runtime)
rt.start_run = MagicMock(return_value="test_run_id")
rt.decide = MagicMock(return_value="test_decision_id")
rt.record_outcome = MagicMock()
rt.end_run = MagicMock()
rt.report_problem = MagicMock()
rt.set_node = MagicMock()
return rt
@pytest.fixture
def event_bus():
"""Create a real EventBus."""
return EventBus()
@pytest.fixture(autouse=True)
def fast_sleep(monkeypatch):
"""Mock asyncio.sleep to avoid real delays from exponential backoff."""
monkeypatch.setattr("asyncio.sleep", AsyncMock())
# ===========================================================================
# Group 1: Core Lifecycle
# ===========================================================================
@pytest.mark.asyncio
async def test_event_loop_node_in_graph(runtime):
"""EventLoopNode runs inside GraphExecutor, produces output."""
scripts = [
# stream 1: call set_output("result", "ok")
StreamScript(
tool_calls=[
{"name": "set_output", "id": "tc_1", "input": {"key": "result", "value": "ok"}}
],
),
# stream 2: text finish (turn ends, implicit judge accepts because all keys present)
StreamScript(text="Done."),
]
llm = make_llm(scripts)
node_spec = NodeSpec(
id="el_node",
name="Event Loop Node",
description="test event loop",
node_type="event_loop",
output_keys=["result"],
)
graph = GraphSpec(
id="test_graph",
goal_id="test_goal",
name="Test Graph",
entry_node="el_node",
nodes=[node_spec],
edges=[],
terminal_nodes=["el_node"],
)
goal = Goal(id="test_goal", name="Test Goal", description="test")
executor = GraphExecutor(runtime=runtime, llm=llm)
el_node = EventLoopNode(config=LoopConfig(max_iterations=5))
executor.register_node("el_node", el_node)
result = await executor.execute(graph, goal, {})
assert result.success
if USE_MOCK_LLM:
assert result.output.get("result") == "ok"
else:
assert "result" in result.output
@pytest.mark.asyncio
async def test_event_loop_with_event_bus():
"""Lifecycle events are published correctly to EventBus."""
recorded: list[AgentEvent] = []
async def handler(event: AgentEvent) -> None:
recorded.append(event)
bus = EventBus()
bus.subscribe(
event_types=[
EventType.NODE_LOOP_STARTED,
EventType.NODE_LOOP_ITERATION,
EventType.NODE_LOOP_COMPLETED,
],
handler=handler,
)
scripts = [StreamScript(text="All done.")]
llm = make_llm(scripts)
ctx = make_ctx(llm=llm, output_keys=[])
node = EventLoopNode(
event_bus=bus,
config=LoopConfig(max_iterations=5),
)
result = await node.execute(ctx)
assert result.success
event_types = [e.type for e in recorded]
assert EventType.NODE_LOOP_STARTED in event_types
assert EventType.NODE_LOOP_ITERATION in event_types
assert EventType.NODE_LOOP_COMPLETED in event_types
# Verify ordering: STARTED before ITERATION before COMPLETED
started_idx = event_types.index(EventType.NODE_LOOP_STARTED)
iteration_idx = event_types.index(EventType.NODE_LOOP_ITERATION)
completed_idx = event_types.index(EventType.NODE_LOOP_COMPLETED)
assert started_idx < iteration_idx < completed_idx
@pytest.mark.asyncio
async def test_event_loop_tool_execution():
"""Custom tools execute, results feed back to LLM."""
recorded_events: list[AgentEvent] = []
async def handler(event: AgentEvent) -> None:
recorded_events.append(event)
bus = EventBus()
bus.subscribe(
event_types=[EventType.TOOL_CALL_STARTED, EventType.TOOL_CALL_COMPLETED],
handler=handler,
)
scripts = [
# stream 1: call search_crm tool
StreamScript(
tool_calls=[{"name": "search_crm", "id": "tc_crm", "input": {"query": "TechCorp"}}],
),
# stream 2: call set_output with result
StreamScript(
tool_calls=[
{
"name": "set_output",
"id": "tc_so",
"input": {"key": "result", "value": "Found: TechCorp"},
}
],
),
# stream 3: text finish
StreamScript(text="Search complete."),
]
llm = make_llm(scripts)
ctx = make_ctx(llm=llm, output_keys=["result"])
search_tool = Tool(
name="search_crm",
description="Search CRM",
parameters={"type": "object", "properties": {"query": {"type": "string"}}},
)
ctx.available_tools = [search_tool]
tool_executor = make_tool_executor({"search_crm": "Found: TechCorp"})
node = EventLoopNode(
event_bus=bus,
tool_executor=tool_executor,
config=LoopConfig(max_iterations=5),
)
result = await node.execute(ctx)
assert result.success
# Check tool events were published
tool_event_types = [e.type for e in recorded_events]
assert EventType.TOOL_CALL_STARTED in tool_event_types
assert EventType.TOOL_CALL_COMPLETED in tool_event_types
# ===========================================================================
# Group 2: Output Collection
# ===========================================================================
@pytest.mark.asyncio
async def test_event_loop_set_output():
"""set_output tool sets values in NodeResult.output."""
scripts = [
# stream 1: set lead_score
StreamScript(
tool_calls=[
{"name": "set_output", "id": "tc_1", "input": {"key": "lead_score", "value": "87"}}
],
),
# stream 2: set company
StreamScript(
tool_calls=[
{
"name": "set_output",
"id": "tc_2",
"input": {"key": "company", "value": "TechCorp"},
}
],
),
# stream 3: text finish
StreamScript(text="Outputs set."),
]
llm = make_llm(scripts)
ctx = make_ctx(llm=llm, output_keys=["lead_score", "company"])
node = EventLoopNode(config=LoopConfig(max_iterations=5))
result = await node.execute(ctx)
assert result.success
if USE_MOCK_LLM:
assert result.output == {"lead_score": 87, "company": "TechCorp"}
else:
assert "lead_score" in result.output
assert "company" in result.output
assert len(result.output["lead_score"]) > 0
assert len(result.output["company"]) > 0
@pytest.mark.asyncio
async def test_event_loop_missing_output_keys_retried():
"""Missing output keys trigger implicit judge retry."""
scripts = [
# Iteration 1: only set "score" (missing "reason")
StreamScript(
tool_calls=[
{"name": "set_output", "id": "tc_1", "input": {"key": "score", "value": "87"}}
],
),
StreamScript(text="Scored the lead."),
# Iteration 2 (after implicit retry feedback): set "reason"
StreamScript(
tool_calls=[
{
"name": "set_output",
"id": "tc_2",
"input": {"key": "reason", "value": "good fit"},
}
],
),
StreamScript(text="Complete."),
]
llm = make_llm(scripts)
ctx = make_ctx(llm=llm, output_keys=["score", "reason"])
node = EventLoopNode(config=LoopConfig(max_iterations=10))
result = await node.execute(ctx)
assert result.success
assert "score" in result.output
assert "reason" in result.output
if USE_MOCK_LLM:
assert result.output["score"] == 87
assert result.output["reason"] == "good fit"
# ===========================================================================
# Group 3: Compaction
# ===========================================================================
@pytest.mark.asyncio
async def test_event_loop_conversation_compaction():
"""Long conversations compact, output keys survive."""
# Build enough scripts for 4 iterations (CountingJudge retries 3 times then accepts)
scripts = []
for i in range(4):
scripts.append(
StreamScript(
tool_calls=[
{
"name": "set_output",
"id": f"tc_{i}",
"input": {"key": "result", "value": f"val_{i}"},
}
],
)
)
scripts.append(StreamScript(text=f"Iteration {i} done. " + "x" * 200))
llm = make_llm(scripts)
ctx = make_ctx(llm=llm, output_keys=["result"])
judge = CountingJudge(retry_count=3)
node = EventLoopNode(
judge=judge,
config=LoopConfig(max_iterations=10, max_history_tokens=200),
)
result = await node.execute(ctx)
assert result.success
assert "result" in result.output
# ===========================================================================
# Group 4: Crash Recovery
# ===========================================================================
@pytest.mark.asyncio
async def test_event_loop_checkpoint_and_restore():
"""Crash mid-loop, resume from checkpoint via ConversationStore."""
store = MockConversationStore()
# Phase 1: Run with max_iterations=2, judge always retries -> fails at max
scripts_phase1 = [
StreamScript(
tool_calls=[
{"name": "set_output", "id": "tc_p1", "input": {"key": "score", "value": "50"}}
],
),
StreamScript(text="Phase 1 iter 0."),
StreamScript(
tool_calls=[
{"name": "set_output", "id": "tc_p1b", "input": {"key": "score", "value": "60"}}
],
),
StreamScript(text="Phase 1 iter 1."),
]
llm1 = ScriptableMockLLMProvider(scripts_phase1)
ctx1 = make_ctx(node_id="el_restore", llm=llm1, output_keys=["score", "reason"])
node1 = EventLoopNode(
judge=AlwaysRetryJudge(),
config=LoopConfig(max_iterations=2),
conversation_store=store,
)
result1 = await node1.execute(ctx1)
# Phase 1 should fail (max iterations)
assert not result1.success
assert "max iterations" in result1.error.lower()
# Store should have persisted data (meta + parts from conversation write-through)
meta = await store.read_meta()
assert meta is not None # Conversation was persisted
parts = await store.read_parts()
assert len(parts) > 0 # Messages were written
# The cursor may be overwritten by conversation's _persist (which writes {next_seq})
# after _write_cursor (which writes {iteration, ...}). This is expected behavior:
# the last write wins. What matters for restore is that meta and parts exist.
# Phase 2: Resume with higher limit, implicit judge (accepts when all keys present).
# The cursor's "outputs" may have been overwritten by conversation _persist,
# so the accumulator may not have "score". Re-set both keys to be safe.
scripts_phase2 = [
StreamScript(
tool_calls=[
{"name": "set_output", "id": "tc_p2a", "input": {"key": "score", "value": "75"}}
],
),
StreamScript(
tool_calls=[
{
"name": "set_output",
"id": "tc_p2b",
"input": {"key": "reason", "value": "recovered"},
}
],
),
StreamScript(text="Phase 2 done."),
]
llm2 = ScriptableMockLLMProvider(scripts_phase2)
ctx2 = make_ctx(node_id="el_restore", llm=llm2, output_keys=["score", "reason"])
node2 = EventLoopNode(
config=LoopConfig(max_iterations=10),
conversation_store=store,
)
result2 = await node2.execute(ctx2)
assert result2.success
assert "score" in result2.output
assert "reason" in result2.output
# ===========================================================================
# Group 5: External Injection
# ===========================================================================
@pytest.mark.asyncio
async def test_event_loop_external_injection():
"""inject_event() appears as user message in conversation."""
store = MockConversationStore()
scripts = [
StreamScript(text="First response."),
StreamScript(text="Second response after injection."),
]
llm = ScriptableMockLLMProvider(scripts)
ctx = make_ctx(llm=llm, output_keys=[])
judge = CountingJudge(retry_count=1) # RETRY once then ACCEPT
node = EventLoopNode(
judge=judge,
config=LoopConfig(max_iterations=5),
conversation_store=store,
)
# Run in a task so we can inject mid-execution
async def run_with_injection():
# Inject before running - will be drained at iteration start
await node.inject_event("Priority: CEO email")
return await node.execute(ctx)
result = await run_with_injection()
assert result.success
# Check that the injection appeared in the stored messages
parts = await store.read_parts()
all_content = " ".join(p.get("content", "") for p in parts)
assert "[External event]: Priority: CEO email" in all_content
# ===========================================================================
# Group 6: Pause/Resume
# ===========================================================================
@pytest.mark.asyncio
async def test_event_loop_pause_and_resume():
"""Pause triggers early return, resume continues."""
store = MockConversationStore()
# Phase 1: pause_requested=True -> immediate return
scripts_phase1 = [
StreamScript(
tool_calls=[
{
"name": "set_output",
"id": "tc_p",
"input": {"key": "partial", "value": "started"},
}
],
),
StreamScript(text="Should not reach here in phase 1."),
]
llm1 = ScriptableMockLLMProvider(scripts_phase1)
ctx1 = make_ctx(
llm=llm1, output_keys=["partial", "final"], input_data={"pause_requested": True}
)
node1 = EventLoopNode(
config=LoopConfig(max_iterations=5),
conversation_store=store,
)
result1 = await node1.execute(ctx1)
# Pause returns success immediately (before any LLM call)
assert result1.success
# Phase 2: Resume without pause
scripts_phase2 = [
StreamScript(
tool_calls=[
{
"name": "set_output",
"id": "tc_r1",
"input": {"key": "partial", "value": "resumed"},
}
],
),
StreamScript(
tool_calls=[
{"name": "set_output", "id": "tc_r2", "input": {"key": "final", "value": "done"}}
],
),
StreamScript(text="Resume complete."),
]
llm2 = ScriptableMockLLMProvider(scripts_phase2)
ctx2 = make_ctx(llm=llm2, output_keys=["partial", "final"], input_data={})
node2 = EventLoopNode(
config=LoopConfig(max_iterations=10),
conversation_store=store,
)
result2 = await node2.execute(ctx2)
assert result2.success
assert "final" in result2.output
# ===========================================================================
# Group 7: Executor Retry Enforcement
# ===========================================================================
class AlwaysFailsNode(NodeProtocol):
"""A test node that always fails (for retry enforcement testing)."""
def __init__(self):
self.attempt_count = 0
async def execute(self, ctx: NodeContext) -> NodeResult:
self.attempt_count += 1
return NodeResult(success=False, error=f"Permanent error (attempt {self.attempt_count})")
@pytest.mark.asyncio
async def test_event_loop_no_executor_retry(runtime):
"""Executor runs event_loop exactly once (no retry)."""
node_spec = NodeSpec(
id="el_fail",
name="Failing Event Loop",
description="event loop that fails",
node_type="event_loop",
max_retries=3,
output_keys=["result"],
)
graph = GraphSpec(
id="test_graph",
goal_id="test_goal",
name="Test Graph",
entry_node="el_fail",
nodes=[node_spec],
edges=[],
terminal_nodes=["el_fail"],
)
goal = Goal(id="test_goal", name="Test", description="test")
executor = GraphExecutor(runtime=runtime)
failing_node = AlwaysFailsNode()
executor.register_node("el_fail", failing_node)
result = await executor.execute(graph, goal, {})
assert not result.success
assert failing_node.attempt_count == 3 # Custom nodes keep their max_retries
# ===========================================================================
# Group 8: Context Handoff
# ===========================================================================
@pytest.mark.asyncio
async def test_context_handoff_between_nodes(runtime):
"""Output from one event_loop feeds into next via shared memory."""
# Enrichment node scripts: set lead_score
enrichment_scripts = [
StreamScript(
tool_calls=[
{"name": "set_output", "id": "tc_e", "input": {"key": "lead_score", "value": "92"}}
],
),
StreamScript(text="Enrichment complete."),
]
enrichment_llm = ScriptableMockLLMProvider(enrichment_scripts)
# Strategy node scripts: set strategy
strategy_scripts = [
StreamScript(
tool_calls=[
{
"name": "set_output",
"id": "tc_s",
"input": {"key": "strategy", "value": "premium"},
}
],
),
StreamScript(text="Strategy determined."),
]
enrichment_spec = NodeSpec(
id="enrichment",
name="Enrichment",
description="Enrich lead data",
node_type="event_loop",
output_keys=["lead_score"],
)
strategy_spec = NodeSpec(
id="strategy",
name="Strategy",
description="Determine strategy",
node_type="event_loop",
# Note: input_keys left empty so scoped memory allows reading all keys.
# EventLoopNode._check_pause() reads "pause_requested" from memory,
# and a restrictive scope would block it. The node still receives
# lead_score via input_data mapping from the edge.
output_keys=["strategy"],
)
graph = GraphSpec(
id="handoff_graph",
goal_id="test_goal",
name="Handoff Graph",
entry_node="enrichment",
nodes=[enrichment_spec, strategy_spec],
edges=[
EdgeSpec(
id="e_to_s",
source="enrichment",
target="strategy",
condition=EdgeCondition.ON_SUCCESS,
),
],
terminal_nodes=["strategy"],
)
goal = Goal(id="test_goal", name="Handoff Test", description="test context handoff")
executor = GraphExecutor(runtime=runtime, llm=enrichment_llm)
el_enrichment = EventLoopNode(config=LoopConfig(max_iterations=5))
el_strategy = EventLoopNode(config=LoopConfig(max_iterations=5))
executor.register_node("enrichment", el_enrichment)
executor.register_node("strategy", el_strategy)
# Override: the executor uses self.llm for all nodes, but EventLoopNode uses ctx.llm.
# For this test, we need different LLMs per node. Since the executor passes self.llm
# via context, and EventLoopNode uses ctx.llm, we need a workaround.
# The simplest approach: use one LLM that serves both scripts sequentially.
combined_scripts = enrichment_scripts + strategy_scripts
combined_llm = ScriptableMockLLMProvider(combined_scripts)
executor.llm = combined_llm
result = await executor.execute(graph, goal, {})
assert result.success
assert "lead_score" in result.output
assert "strategy" in result.output
if USE_MOCK_LLM:
assert result.output["lead_score"] == 92
assert result.output["strategy"] == "premium"
# ===========================================================================
# Group 9: Client I/O
# ===========================================================================
@pytest.mark.asyncio
@pytest.mark.skip(reason="Hangs in non-interactive shells (client-facing blocks on stdin)")
async def test_client_facing_node_streams_output():
"""Client-facing node emits CLIENT_OUTPUT_DELTA events."""
recorded: list[AgentEvent] = []
async def handler(event: AgentEvent) -> None:
recorded.append(event)
bus = EventBus()
bus.subscribe(
event_types=[EventType.CLIENT_OUTPUT_DELTA, EventType.LLM_TEXT_DELTA],
handler=handler,
)
scripts = [StreamScript(text="Hello, user!")]
llm = make_llm(scripts)
ctx = make_ctx(llm=llm, output_keys=[], client_facing=True)
node = EventLoopNode(
event_bus=bus,
config=LoopConfig(max_iterations=5),
)
# Text-only on client_facing does not block (no ask_user called),
# so the node completes without needing a shutdown workaround.
result = await node.execute(ctx)
assert result.success
event_types = [e.type for e in recorded]
assert EventType.CLIENT_OUTPUT_DELTA in event_types
# Should NOT have LLM_TEXT_DELTA (that's for internal nodes)
assert EventType.LLM_TEXT_DELTA not in event_types
# Verify node_id is correct
client_events = [e for e in recorded if e.type == EventType.CLIENT_OUTPUT_DELTA]
assert all(e.node_id == "test_node" for e in client_events)
@pytest.mark.asyncio
async def test_internal_node_no_client_output():
"""Internal node emits LLM_TEXT_DELTA, not CLIENT_OUTPUT_DELTA."""
recorded: list[AgentEvent] = []
async def handler(event: AgentEvent) -> None:
recorded.append(event)
bus = EventBus()
bus.subscribe(
event_types=[EventType.CLIENT_OUTPUT_DELTA, EventType.LLM_TEXT_DELTA],
handler=handler,
)
scripts = [StreamScript(text="Internal processing.")]
llm = make_llm(scripts)
ctx = make_ctx(llm=llm, output_keys=[], client_facing=False)
node = EventLoopNode(
event_bus=bus,
config=LoopConfig(max_iterations=5),
)
result = await node.execute(ctx)
assert result.success
event_types = [e.type for e in recorded]
assert EventType.LLM_TEXT_DELTA in event_types
assert EventType.CLIENT_OUTPUT_DELTA not in event_types
# ===========================================================================
# Group 10: Full Pipeline
# ===========================================================================
@pytest.mark.asyncio
async def test_mixed_node_graph(runtime):
"""Simple node -> event_loop -> simple node end-to-end."""
class LoadLeadsNode(NodeProtocol):
async def execute(self, ctx: NodeContext) -> NodeResult:
leads = ["lead_A", "lead_B", "lead_C"]
ctx.memory.write("leads", leads)
return NodeResult(success=True, output={"leads": leads})
class FormatOutputNode(NodeProtocol):
async def execute(self, ctx: NodeContext) -> NodeResult:
summary = ctx.input_data.get("summary", ctx.memory.read("summary") or "no summary")
report = f"Report: {summary}"
ctx.memory.write("report", report)
return NodeResult(success=True, output={"report": report})
# Event loop: process leads, produce summary
el_scripts = [
StreamScript(
tool_calls=[
{
"name": "set_output",
"id": "tc_sum",
"input": {"key": "summary", "value": "3 leads processed"},
}
],
),
StreamScript(text="Processing complete."),
]
el_llm = ScriptableMockLLMProvider(el_scripts)
# Node specs
load_spec = NodeSpec(
id="load",
name="Load Leads",
description="Load lead data",
node_type="event_loop",
output_keys=["leads"],
)
process_spec = NodeSpec(
id="process",
name="Process Leads",
description="Process leads with LLM",
node_type="event_loop",
output_keys=["summary"],
)
format_spec = NodeSpec(
id="format",
name="Format Output",
description="Format final report",
node_type="event_loop",
output_keys=["report"],
)
graph = GraphSpec(
id="pipeline_graph",
goal_id="test_goal",
name="Pipeline Graph",
entry_node="load",
nodes=[load_spec, process_spec, format_spec],
edges=[
EdgeSpec(id="e1", source="load", target="process", condition=EdgeCondition.ON_SUCCESS),
EdgeSpec(
id="e2", source="process", target="format", condition=EdgeCondition.ON_SUCCESS
),
],
terminal_nodes=["format"],
)
goal = Goal(id="test_goal", name="Pipeline Test", description="test full pipeline")
executor = GraphExecutor(runtime=runtime, llm=el_llm)
executor.register_node("load", LoadLeadsNode())
executor.register_node("process", EventLoopNode(config=LoopConfig(max_iterations=5)))
executor.register_node("format", FormatOutputNode())
result = await executor.execute(graph, goal, {})
assert result.success
assert "summary" in result.output
assert "report" in result.output
if USE_MOCK_LLM:
assert "3 leads processed" in result.output["summary"]
# ===========================================================================
# Group 11: Validation
# ===========================================================================
@pytest.mark.asyncio
async def test_fan_out_rejects_overlapping_output_keys(runtime):
"""Parallel event_loop nodes with same output_keys fail at execution.
The GraphExecutor's parallel execution with overlapping keys uses
last-wins memory strategy, which can cause data corruption.
We verify the behavior is at least deterministic (both branches execute).
"""
scripts_a = [
StreamScript(
tool_calls=[
{"name": "set_output", "id": "tc_a", "input": {"key": "result", "value": "from_A"}}
],
),
StreamScript(text="A done."),
]
scripts_b = [
StreamScript(
tool_calls=[
{"name": "set_output", "id": "tc_b", "input": {"key": "result", "value": "from_B"}}
],
),
StreamScript(text="B done."),
]
# Combined scripts: A's scripts then B's scripts
combined = scripts_a + scripts_b
source_spec = NodeSpec(
id="source",
name="Source",
description="Source node",
node_type="event_loop",
output_keys=["trigger"],
)
branch_a_spec = NodeSpec(
id="branch_a",
name="Branch A",
description="Parallel branch A",
node_type="event_loop",
output_keys=["result"],
)
branch_b_spec = NodeSpec(
id="branch_b",
name="Branch B",
description="Parallel branch B",
node_type="event_loop",
output_keys=["result"], # Same key as branch A
)
graph = GraphSpec(
id="fanout_graph",
goal_id="test_goal",
name="Fan Out Graph",
entry_node="source",
nodes=[source_spec, branch_a_spec, branch_b_spec],
edges=[
EdgeSpec(
id="e_a", source="source", target="branch_a", condition=EdgeCondition.ON_SUCCESS
),
EdgeSpec(
id="e_b", source="source", target="branch_b", condition=EdgeCondition.ON_SUCCESS
),
],
terminal_nodes=["branch_a", "branch_b"],
)
goal = Goal(id="test_goal", name="Fanout Test", description="test fanout")
# Source node: simple success
source_scripts = [
StreamScript(
tool_calls=[
{"name": "set_output", "id": "tc_src", "input": {"key": "trigger", "value": "go"}}
],
),
StreamScript(text="Source done."),
]
all_scripts = source_scripts + combined
all_llm = ScriptableMockLLMProvider(all_scripts)
executor = GraphExecutor(runtime=runtime, llm=all_llm)
executor.register_node("source", EventLoopNode(config=LoopConfig(max_iterations=5)))
executor.register_node("branch_a", EventLoopNode(config=LoopConfig(max_iterations=5)))
executor.register_node("branch_b", EventLoopNode(config=LoopConfig(max_iterations=5)))
result = await executor.execute(graph, goal, {})
# GraphSpec.validate() catches overlapping output_keys on parallel
# event_loop branches and rejects the graph before execution starts.
assert not result.success
assert "Invalid graph" in result.error
# ===========================================================================
# Group 12: Edge Cases
# ===========================================================================
@pytest.mark.asyncio
async def test_max_iterations_exceeded():
"""Loop hits max_iterations, returns failure."""
scripts = [
StreamScript(text="Response 1."),
StreamScript(text="Response 2."),
StreamScript(text="Response 3."), # Extra safety
]
llm = ScriptableMockLLMProvider(scripts)
ctx = make_ctx(llm=llm, output_keys=[])
node = EventLoopNode(
judge=AlwaysRetryJudge(),
config=LoopConfig(max_iterations=2),
)
result = await node.execute(ctx)
assert not result.success
assert "max iterations" in result.error.lower()
@pytest.mark.asyncio
async def test_stall_detection():
"""N identical responses trigger stall failure."""
# 3 identical text responses will trigger stall (threshold=3)
scripts = [
StreamScript(text="I am stuck"),
StreamScript(text="I am stuck"),
StreamScript(text="I am stuck"),
StreamScript(text="I am stuck"), # Extra safety
]
llm = ScriptableMockLLMProvider(scripts)
ctx = make_ctx(llm=llm, output_keys=[])
node = EventLoopNode(
judge=AlwaysRetryJudge(),
config=LoopConfig(stall_detection_threshold=3, max_iterations=10),
)
result = await node.execute(ctx)
assert not result.success
assert "stall" in result.error.lower()