Merge branch 'feature/queen-worker-comm' into fix/queen-recovery

This commit is contained in:
Timothy
2026-03-03 08:02:59 -08:00
9 changed files with 279 additions and 71 deletions
@@ -410,7 +410,10 @@ If list_agent_tools() shows these don't exist, use alternatives \
**Node rules**:
- **2-4 nodes MAX.** Never exceed 4. Merge thin nodes aggressively.
- A node with 0 tools is NOT a real node merge it.
- node_type always "event_loop"
- node_type "event_loop" for all regular graph nodes. Use "gcu" ONLY for
browser automation subagents (see GCU appendix). GCU nodes MUST be in a
parent node's sub_agents list, NEVER connected via edges, and NEVER used
as entry/terminal nodes.
- max_node_visits default is 0 (unbounded) correct for forever-alive. \
Only set >0 in one-shot agents with bounded feedback loops.
- Feedback inputs: nullable_output_keys
@@ -613,6 +616,15 @@ If NO worker is loaded, say so and offer to build one.
- For tasks matching the worker's goal, call start_worker(task).
- For everything else, do it directly.
## When the user clicks Run (external event notification)
When you receive an event that the user clicked Run:
- If the worker started successfully, briefly acknowledge it do NOT \
repeat the full status. The user can see the graph is running.
- If the worker failed to start (credential or structural error), \
explain the problem clearly and help fix it. For credential errors, \
guide the user to set up the missing credentials. For structural \
issues, offer to fix the agent graph directly.
## When worker is running:
- If the user asks about progress, call get_worker_status() ONCE and \
report the result. Do NOT poll in a loop.
@@ -107,3 +107,5 @@ def test_research_routes_back_to_interact(self):
24. **Not using auto_responder for client-facing nodes** — Tests with client-facing nodes hang without an auto-responder that injects input. But note: even WITH auto_responder, forever-alive agents still hang because the graph never terminates. Auto-responder only helps for agents with terminal nodes.
25. **Manually wiring browser tools on event_loop nodes** — If the agent needs browser automation, use `node_type="gcu"` which auto-includes all browser tools and prepends best-practices guidance. Do NOT manually list browser tool names on event_loop nodes — they may not exist in the MCP server or may be incomplete. See the GCU Guide appendix.
26. **Using GCU nodes as regular graph nodes** — GCU nodes (`node_type="gcu"`) are exclusively subagents. They must ONLY appear in a parent node's `sub_agents=["gcu-node-id"]` list and be invoked via `delegate_to_sub_agent()`. They must NEVER be connected via edges, used as entry nodes, or used as terminal nodes. If a GCU node appears as an edge source or target, the graph will fail pre-load validation.
+1 -5
View File
@@ -159,11 +159,7 @@ class CredentialValidationResult:
f" {c.env_var} for {_label(c)}"
f"\n Connect this integration at hive.adenhq.com first."
)
lines.append(
"\nTo fix: run /hive-credentials in Claude Code."
"\nIf you've already set up credentials, "
"restart your terminal to load them."
)
lines.append("\nIf you've already set up credentials, restart your terminal to load them.")
return "\n".join(lines)
+44
View File
@@ -706,4 +706,48 @@ class GraphSpec(BaseModel):
else:
seen_keys[key] = node_id
# GCU nodes must only be used as subagents
gcu_node_ids = {n.id for n in self.nodes if n.node_type == "gcu"}
if gcu_node_ids:
# GCU nodes must not be entry nodes
if self.entry_node in gcu_node_ids:
errors.append(
f"GCU node '{self.entry_node}' is used as entry node. "
"GCU nodes must only be used as subagents via delegate_to_sub_agent()."
)
# GCU nodes must not be terminal nodes
for term in self.terminal_nodes:
if term in gcu_node_ids:
errors.append(
f"GCU node '{term}' is used as terminal node. "
"GCU nodes must only be used as subagents."
)
# GCU nodes must not be connected via edges
for edge in self.edges:
if edge.source in gcu_node_ids:
errors.append(
f"GCU node '{edge.source}' is used as edge source (edge '{edge.id}'). "
"GCU nodes must only be used as subagents, not connected via edges."
)
if edge.target in gcu_node_ids:
errors.append(
f"GCU node '{edge.target}' is used as edge target (edge '{edge.id}'). "
"GCU nodes must only be used as subagents, not connected via edges."
)
# GCU nodes must be referenced in at least one parent's sub_agents
referenced_subagents = set()
for node in self.nodes:
for sa_id in node.sub_agents or []:
referenced_subagents.add(sa_id)
orphaned = gcu_node_ids - referenced_subagents
for nid in orphaned:
errors.append(
f"GCU node '{nid}' is not referenced in any node's sub_agents list. "
"GCU nodes must be declared as subagents of a parent node."
)
return errors
+185
View File
@@ -0,0 +1,185 @@
"""Pre-load validation for agent graphs.
Runs structural and credential checks before MCP servers are spawned.
Fails fast with actionable error messages.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from framework.graph.edge import GraphSpec
from framework.graph.node import NodeSpec
logger = logging.getLogger(__name__)
class PreloadValidationError(Exception):
"""Raised when pre-load validation fails."""
def __init__(self, errors: list[str]):
self.errors = errors
msg = "Pre-load validation failed:\n" + "\n".join(f" - {e}" for e in errors)
super().__init__(msg)
@dataclass
class PreloadResult:
"""Result of pre-load validation."""
valid: bool
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
def validate_graph_structure(graph: GraphSpec) -> list[str]:
"""Run graph structural validation (includes GCU subagent-only checks).
Delegates to GraphSpec.validate() which checks entry/terminal nodes,
edge references, reachability, fan-out rules, and GCU constraints.
"""
return graph.validate()
def validate_credentials(
nodes: list[NodeSpec],
*,
interactive: bool = True,
skip: bool = False,
) -> None:
"""Validate agent credentials.
Calls ``validate_agent_credentials`` which performs two-phase validation:
1. Presence check (env var, encrypted store, Aden sync)
2. Health check (lightweight HTTP call to verify the key works)
On failure raises ``CredentialError`` with ``validation_result`` and
``failed_cred_names`` attributes preserved from the upstream check.
In interactive mode (CLI with TTY), attempts recovery via the
credential setup flow before re-raising.
"""
if skip:
return
from framework.credentials.validation import validate_agent_credentials
if not interactive:
# Non-interactive: let CredentialError propagate with full context.
# validate_agent_credentials attaches .validation_result and
# .failed_cred_names to the exception automatically.
validate_agent_credentials(nodes)
return
import sys
from framework.credentials.models import CredentialError
try:
validate_agent_credentials(nodes)
except CredentialError as e:
if not sys.stdin.isatty():
raise
print(f"\n{e}", file=sys.stderr)
from framework.credentials.validation import build_setup_session_from_error
session = build_setup_session_from_error(e, nodes=nodes)
if not session.missing:
raise
result = session.run_interactive()
if not result.success:
# Preserve the original validation_result so callers can
# inspect which credentials are still missing.
exc = CredentialError(
"Credential setup incomplete. Run again after configuring the required credentials."
)
if hasattr(e, "validation_result"):
exc.validation_result = e.validation_result # type: ignore[attr-defined]
if hasattr(e, "failed_cred_names"):
exc.failed_cred_names = e.failed_cred_names # type: ignore[attr-defined]
raise exc from None
# Re-validate after successful setup — this will raise if still broken,
# with fresh validation_result attached to the new exception.
validate_agent_credentials(nodes)
def credential_errors_to_json(exc: Exception) -> dict:
"""Extract structured credential failure details from a CredentialError.
Returns a dict suitable for JSON serialization with enough detail for
the queen to report actionable guidance to the user. Falls back to
``str(exc)`` when rich metadata is not available.
"""
result = getattr(exc, "validation_result", None)
if result is None:
return {
"error": "credentials_required",
"message": str(exc),
}
failed = result.failed
missing = []
for c in failed:
if c.available:
status = "invalid"
elif c.aden_not_connected:
status = "aden_not_connected"
else:
status = "missing"
entry: dict = {
"credential": c.credential_name,
"env_var": c.env_var,
"status": status,
}
if c.tools:
entry["tools"] = c.tools
if c.node_types:
entry["node_types"] = c.node_types
if c.help_url:
entry["help_url"] = c.help_url
if c.validation_message:
entry["validation_message"] = c.validation_message
missing.append(entry)
return {
"error": "credentials_required",
"message": str(exc),
"missing_credentials": missing,
}
def run_preload_validation(
graph: GraphSpec,
*,
interactive: bool = True,
skip_credential_validation: bool = False,
) -> PreloadResult:
"""Run all pre-load validations.
Order:
1. Graph structure (includes GCU subagent-only checks) non-recoverable
2. Credentials potentially recoverable via interactive setup
Raises PreloadValidationError for structural issues.
Raises CredentialError for credential issues.
"""
# 1. Structural validation (calls graph.validate() which includes GCU checks)
graph_errors = validate_graph_structure(graph)
if graph_errors:
raise PreloadValidationError(graph_errors)
# 2. Credential validation
validate_credentials(
graph.nodes,
interactive=interactive,
skip=skip_credential_validation,
)
return PreloadResult(valid=True)
+7 -51
View File
@@ -12,7 +12,6 @@ from typing import TYPE_CHECKING, Any
from framework.config import get_hive_config, get_preferred_model
from framework.credentials.validation import (
ensure_credential_key_env as _ensure_credential_key_env,
validate_agent_credentials,
)
from framework.graph import Goal
from framework.graph.edge import (
@@ -25,6 +24,7 @@ from framework.graph.edge import (
from framework.graph.executor import ExecutionResult
from framework.graph.node import NodeSpec
from framework.llm.provider import LLMProvider, Tool
from framework.runner.preload_validation import run_preload_validation
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime, AgentRuntimeConfig, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
@@ -679,9 +679,13 @@ class AgentRunner:
self._agent_runtime: AgentRuntime | None = None
self._uses_async_entry_points = self.graph.has_async_entry_points()
# Validate credentials before spawning MCP servers.
# Pre-load validation: structural checks + credentials.
# Fails fast with actionable guidance — no MCP noise on screen.
self._validate_credentials()
run_preload_validation(
self.graph,
interactive=self._interactive,
skip_credential_validation=self.skip_credential_validation,
)
# Auto-discover tools from tools.py
tools_path = agent_path / "tools.py"
@@ -698,54 +702,6 @@ class AgentRunner:
if mcp_config_path.exists():
self._load_mcp_servers_from_config(mcp_config_path)
def _validate_credentials(self) -> None:
"""Check that required credentials are available before spawning MCP servers.
If ``interactive`` is True and stdin is a TTY, automatically launches
the interactive credential setup flow so the user can fix the issue
in-place. Re-validates after setup succeeds.
When ``interactive`` is False (e.g. TUI callers), the CredentialError
propagates immediately so the caller can handle it with its own UI.
"""
if self.skip_credential_validation:
return
if not self._interactive:
# Let the CredentialError propagate — caller handles UI.
validate_agent_credentials(self.graph.nodes)
return
import sys
from framework.credentials.models import CredentialError
try:
validate_agent_credentials(self.graph.nodes)
return # All good
except CredentialError as e:
if not sys.stdin.isatty():
raise
# Interactive: show the error then enter credential setup
print(f"\n{e}", file=sys.stderr)
from framework.credentials.validation import build_setup_session_from_error
session = build_setup_session_from_error(e, nodes=self.graph.nodes)
if not session.missing:
raise
result = session.run_interactive()
if not result.success:
raise CredentialError(
"Credential setup incomplete. "
"Run again after configuring the required credentials."
) from None
# Re-validate after setup
validate_agent_credentials(self.graph.nodes)
@staticmethod
def _import_agent_module(agent_path: Path):
"""Import an agent package from its directory path.
+14 -10
View File
@@ -41,7 +41,7 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any
from framework.credentials.models import CredentialError
from framework.credentials.validation import validate_agent_credentials
from framework.runner.preload_validation import credential_errors_to_json, validate_credentials
from framework.runtime.event_bus import AgentEvent, EventType
from framework.server.app import validate_agent_path
@@ -185,12 +185,15 @@ def register_queen_lifecycle_tools(
cred_error: CredentialError | None = None
try:
await loop.run_in_executor(
None, lambda: validate_agent_credentials(runtime.graph.nodes)
None,
lambda: validate_credentials(
runtime.graph.nodes,
interactive=False,
skip=False,
),
)
except CredentialError as e:
cred_error = e
except Exception as e:
logger.warning("Credential validation failed: %s", e)
runner = getattr(session, "runner", None)
if runner:
@@ -241,6 +244,11 @@ def register_queen_lifecycle_tools(
}
)
except CredentialError as e:
# Build structured error with per-credential details so the
# queen can report exactly what's missing and how to fix it.
error_payload = credential_errors_to_json(e)
error_payload["agent_path"] = str(getattr(session, "worker_path", "") or "")
# Emit SSE event so the frontend opens the credentials modal
bus = getattr(session, "event_bus", None)
if bus is not None:
@@ -248,14 +256,10 @@ def register_queen_lifecycle_tools(
AgentEvent(
type=EventType.CREDENTIALS_REQUIRED,
stream_id="queen",
data={
"error": "credentials_required",
"message": str(e),
"agent_path": str(getattr(session, "worker_path", "") or ""),
},
data=error_payload,
)
)
return json.dumps({"error": "credentials_required", "message": str(e)})
return json.dumps(error_payload)
except Exception as e:
return json.dumps({"error": f"Failed to start worker: {e}"})
+6 -3
View File
@@ -47,8 +47,11 @@ class DummyLLMProvider(LLMProvider):
) -> AsyncIterator[StreamEvent]:
self._call_count += 1
if self._call_count == 1:
# First call: set the output via tool call
# Each execution takes 2 LLM calls:
# - Odd calls (1, 3, 5, ...): set output via tool call
# - Even calls (2, 4, 6, ...): finish with text
if self._call_count % 2 == 1:
# First call of each execution: set the output via tool call
yield ToolCallEvent(
tool_use_id=f"tc_{self._call_count}",
tool_name="set_output",
@@ -56,7 +59,7 @@ class DummyLLMProvider(LLMProvider):
)
yield FinishEvent(stop_reason="tool_use", input_tokens=10, output_tokens=10)
else:
# Subsequent calls: just finish with text
# Second call of each execution: finish with text
yield TextDeltaEvent(content="Done.", snapshot="Done.")
yield FinishEvent(stop_reason="end_turn", input_tokens=5, output_tokens=5)
@@ -106,7 +106,13 @@ fetch_emails_node = NodeSpec(
node_type="event_loop",
client_facing=False,
max_node_visits=0,
input_keys=["rules", "max_emails", "next_page_token", "last_processed_timestamp", "query"],
input_keys=[
"rules",
"max_emails",
"next_page_token",
"last_processed_timestamp",
"query",
],
output_keys=["emails", "next_page_token"],
nullable_output_keys=["next_page_token"],
system_prompt="""\