Compare commits

...

2 Commits

Author SHA1 Message Date
bryan 9c33da7b8d added graph validation for client-facing nodes [WP-10] 2026-02-01 18:45:35 -08:00
Anshumaan Saraf 23146c8dae docs: remove duplicate entry in Edge Protocol docstring (#2994)
Fixes #2717

The Edge Types list in edge.py had 'always' listed twice.
Removed the duplicate line.
2026-02-01 15:19:11 +08:00
2 changed files with 273 additions and 1 deletions
+36 -1
View File
@@ -11,7 +11,6 @@ our edges can be created dynamically by a Builder agent based on the goal.
Edge Types:
- always: Always traverse after source completes
- always: Always traverse after source completes
- on_success: Traverse only if source succeeds
- on_failure: Traverse only if source fails
- conditional: Traverse based on expression evaluation (SAFE SUBSET ONLY)
@@ -609,4 +608,40 @@ class GraphSpec(BaseModel):
continue
errors.append(f"Node '{node.id}' is unreachable from entry")
# Client-facing fan-out validation
fan_outs = self.detect_fan_out_nodes()
for source_id, targets in fan_outs.items():
client_facing_targets = [
t
for t in targets
if self.get_node(t) and getattr(self.get_node(t), "client_facing", False)
]
if len(client_facing_targets) > 1:
errors.append(
f"Fan-out from '{source_id}' has multiple client-facing nodes: "
f"{client_facing_targets}. Only one branch may be client-facing."
)
# Output key overlap on parallel event_loop nodes
for source_id, targets in fan_outs.items():
event_loop_targets = [
t
for t in targets
if self.get_node(t) and getattr(self.get_node(t), "node_type", "") == "event_loop"
]
if len(event_loop_targets) > 1:
seen_keys: dict[str, str] = {}
for node_id in event_loop_targets:
node = self.get_node(node_id)
for key in getattr(node, "output_keys", []):
if key in seen_keys:
errors.append(
f"Fan-out from '{source_id}': event_loop nodes "
f"'{seen_keys[key]}' and '{node_id}' both write to "
f"output_key '{key}'. Parallel event_loop nodes must "
f"have disjoint output_keys to prevent last-wins data loss."
)
else:
seen_keys[key] = node_id
return errors
+237
View File
@@ -0,0 +1,237 @@
"""
Tests for client-facing fan-out and event_loop output_key overlap validation.
Validates two rules added to GraphSpec.validate():
1. Fan-out must not have multiple client_facing=True targets.
2. Parallel event_loop nodes must have disjoint output_keys.
"""
from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec
from framework.graph.node import NodeSpec
# ---------------------------------------------------------------------------
# Rule 1: client_facing fan-out
# ---------------------------------------------------------------------------
class TestClientFacingFanOut:
"""Fan-out to multiple client_facing=True targets must be rejected."""
def test_fan_out_two_client_facing_fails(self):
"""Two client-facing targets on the same fan-out -> error."""
graph = GraphSpec(
id="g1",
goal_id="goal1",
entry_node="src",
nodes=[
NodeSpec(id="src", name="src", description="Source node"),
NodeSpec(id="a", name="a", description="Node a", client_facing=True),
NodeSpec(id="b", name="b", description="Node b", client_facing=True),
],
edges=[
EdgeSpec(id="src->a", source="src", target="a", condition=EdgeCondition.ON_SUCCESS),
EdgeSpec(id="src->b", source="src", target="b", condition=EdgeCondition.ON_SUCCESS),
],
)
errors = graph.validate()
cf_errors = [e for e in errors if "multiple client-facing" in e]
assert len(cf_errors) == 1
assert "'src'" in cf_errors[0]
def test_fan_out_one_client_facing_passes(self):
"""Only one client-facing target -> no error."""
graph = GraphSpec(
id="g1",
goal_id="goal1",
entry_node="src",
nodes=[
NodeSpec(id="src", name="src", description="Source node"),
NodeSpec(id="a", name="a", description="Node a", client_facing=True),
NodeSpec(id="b", name="b", description="Node b", client_facing=False),
],
edges=[
EdgeSpec(id="src->a", source="src", target="a", condition=EdgeCondition.ON_SUCCESS),
EdgeSpec(id="src->b", source="src", target="b", condition=EdgeCondition.ON_SUCCESS),
],
)
errors = graph.validate()
cf_errors = [e for e in errors if "multiple client-facing" in e]
assert len(cf_errors) == 0
def test_fan_out_zero_client_facing_passes(self):
"""No client-facing targets at all -> no error."""
graph = GraphSpec(
id="g1",
goal_id="goal1",
entry_node="src",
nodes=[
NodeSpec(id="src", name="src", description="Source node"),
NodeSpec(id="a", name="a", description="Node a"),
NodeSpec(id="b", name="b", description="Node b"),
],
edges=[
EdgeSpec(id="src->a", source="src", target="a", condition=EdgeCondition.ON_SUCCESS),
EdgeSpec(id="src->b", source="src", target="b", condition=EdgeCondition.ON_SUCCESS),
],
)
errors = graph.validate()
cf_errors = [e for e in errors if "multiple client-facing" in e]
assert len(cf_errors) == 0
# ---------------------------------------------------------------------------
# Rule 2: event_loop output_key overlap
# ---------------------------------------------------------------------------
class TestEventLoopOutputKeyOverlap:
"""Parallel event_loop nodes with overlapping output_keys must be rejected."""
def test_overlapping_output_keys_event_loop_fails(self):
"""Two event_loop nodes sharing an output_key -> error."""
graph = GraphSpec(
id="g1",
goal_id="goal1",
entry_node="src",
nodes=[
NodeSpec(id="src", name="src", description="Source node"),
NodeSpec(
id="a",
name="a",
description="Node a",
node_type="event_loop",
output_keys=["status", "shared"],
),
NodeSpec(
id="b",
name="b",
description="Node b",
node_type="event_loop",
output_keys=["result", "shared"],
),
],
edges=[
EdgeSpec(id="src->a", source="src", target="a", condition=EdgeCondition.ON_SUCCESS),
EdgeSpec(id="src->b", source="src", target="b", condition=EdgeCondition.ON_SUCCESS),
],
)
errors = graph.validate()
key_errors = [e for e in errors if "output_key" in e]
assert len(key_errors) == 1
assert "'shared'" in key_errors[0]
def test_disjoint_output_keys_event_loop_passes(self):
"""Two event_loop nodes with disjoint output_keys -> no error."""
graph = GraphSpec(
id="g1",
goal_id="goal1",
entry_node="src",
nodes=[
NodeSpec(id="src", name="src", description="Source node"),
NodeSpec(
id="a",
name="a",
description="Node a",
node_type="event_loop",
output_keys=["status"],
),
NodeSpec(
id="b",
name="b",
description="Node b",
node_type="event_loop",
output_keys=["result"],
),
],
edges=[
EdgeSpec(id="src->a", source="src", target="a", condition=EdgeCondition.ON_SUCCESS),
EdgeSpec(id="src->b", source="src", target="b", condition=EdgeCondition.ON_SUCCESS),
],
)
errors = graph.validate()
key_errors = [e for e in errors if "output_key" in e]
assert len(key_errors) == 0
def test_overlapping_keys_non_event_loop_no_error(self):
"""Non-event_loop nodes with overlapping keys -> no error (last-wins OK)."""
graph = GraphSpec(
id="g1",
goal_id="goal1",
entry_node="src",
nodes=[
NodeSpec(id="src", name="src", description="Source node"),
NodeSpec(
id="a",
name="a",
description="Node a",
node_type="llm_generate",
output_keys=["shared"],
),
NodeSpec(
id="b",
name="b",
description="Node b",
node_type="llm_generate",
output_keys=["shared"],
),
],
edges=[
EdgeSpec(id="src->a", source="src", target="a", condition=EdgeCondition.ON_SUCCESS),
EdgeSpec(id="src->b", source="src", target="b", condition=EdgeCondition.ON_SUCCESS),
],
)
errors = graph.validate()
key_errors = [e for e in errors if "output_key" in e]
assert len(key_errors) == 0
# ---------------------------------------------------------------------------
# Baseline: no fan-out -> no errors from these rules
# ---------------------------------------------------------------------------
class TestNoFanOutUnaffected:
"""Linear graphs should not trigger either validation rule."""
def test_no_fan_out_unaffected(self):
"""Linear chain with client_facing and event_loop nodes -> no errors."""
graph = GraphSpec(
id="g1",
goal_id="goal1",
entry_node="a",
terminal_nodes=["c"],
nodes=[
NodeSpec(id="a", name="a", description="Node a", client_facing=True),
NodeSpec(
id="b",
name="b",
description="Node b",
node_type="event_loop",
output_keys=["x"],
),
NodeSpec(
id="c",
name="c",
description="Node c",
client_facing=True,
node_type="event_loop",
output_keys=["x"],
),
],
edges=[
EdgeSpec(id="a->b", source="a", target="b", condition=EdgeCondition.ON_SUCCESS),
EdgeSpec(id="b->c", source="b", target="c", condition=EdgeCondition.ON_SUCCESS),
],
)
errors = graph.validate()
cf_errors = [e for e in errors if "multiple client-facing" in e]
key_errors = [e for e in errors if "output_key" in e]
assert len(cf_errors) == 0
assert len(key_errors) == 0