feat: muti graph agent session

This commit is contained in:
Timothy
2026-02-18 12:46:59 -08:00
parent 5914d28cbe
commit 7dfc75b3e6
26 changed files with 2102 additions and 102 deletions
+1 -1
View File
@@ -119,7 +119,7 @@ async def main():
# 9. Verify Results # 9. Verify Results
if result.success: if result.success:
print(f"\nSuccess!") print("\nSuccess!")
print(f"Path taken: {' -> '.join(result.path)}") print(f"Path taken: {' -> '.join(result.path)}")
print(f"Final output: {result.output.get('final_greeting')}") print(f"Final output: {result.output.get('final_greeting')}")
else: else:
@@ -0,0 +1,19 @@
"""Reusable Agent Guardian — monitors and recovers from agent failures.
Usage::
from framework.agents.guardian import attach_guardian
runner._setup()
attach_guardian(runner._agent_runtime, runner._tool_registry)
await runner._agent_runtime.start()
"""
from .attach import attach_guardian
from .node import ALL_GUARDIAN_TOOLS, guardian_node
__all__ = [
"attach_guardian",
"guardian_node",
"ALL_GUARDIAN_TOOLS",
]
+91
View File
@@ -0,0 +1,91 @@
"""Attach the Agent Guardian to any AgentRuntime.
Usage::
from framework.agents.guardian import attach_guardian
runner._setup()
runtime = runner._agent_runtime
attach_guardian(runtime, runner._tool_registry)
await runtime.start()
Must be called **before** ``runtime.start()`` it injects the
guardian node into the graph and registers an event-driven entry point.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime
from framework.runtime.execution_stream import EntryPointSpec
from .node import ALL_GUARDIAN_TOOLS, guardian_node
logger = logging.getLogger(__name__)
GUARDIAN_ENTRY_POINT = EntryPointSpec(
id="guardian",
name="Agent Guardian",
entry_node="guardian",
trigger_type="event",
trigger_config={
"event_types": ["execution_failed"],
"exclude_own_graph": True,
},
isolation_level="shared",
)
def attach_guardian(
runtime: AgentRuntime,
tool_registry: ToolRegistry,
) -> None:
"""Inject the guardian node into *runtime*'s graph and register its entry point.
1. Registers graph lifecycle tools if not already present.
2. Refreshes the runtime's tool list and executor.
3. Adds the guardian node (with dynamically filtered tools) to the graph.
4. Registers an event-driven entry point that fires on ``execution_failed``.
Must be called **before** ``runtime.start()``.
Raises:
RuntimeError: If the runtime is already running.
"""
from framework.tools.session_graph_tools import register_graph_tools
# 1. Register graph lifecycle tools if not already present
if not tool_registry.has_tool("load_agent"):
register_graph_tools(tool_registry, runtime)
# 2. Refresh tool schemas and executor on the runtime
runtime._tools = list(tool_registry.get_tools().values())
runtime._tool_executor = tool_registry.get_executor()
# 3. Filter guardian tools to only those available in the registry
available = set(tool_registry.get_tools().keys())
filtered_tools = [t for t in ALL_GUARDIAN_TOOLS if t in available]
# Build guardian node with filtered tool list
node = guardian_node.model_copy(update={"tools": filtered_tools})
# Add to the runtime's graph (so register_entry_point validation passes)
runtime.graph.nodes.append(node)
# Mark guardian as reachable in graph-level entry_points so
# GraphSpec.validate() doesn't flag it as unreachable.
runtime.graph.entry_points["guardian"] = "guardian"
# 4. Register event-driven entry point
runtime.register_entry_point(GUARDIAN_ENTRY_POINT)
logger.info(
"Guardian attached with %d tools: %s",
len(filtered_tools),
filtered_tools,
)
+106
View File
@@ -0,0 +1,106 @@
"""Guardian node specification.
The Agent Guardian is an event-driven watchdog that monitors supervised
agent graphs. It triggers on ``execution_failed`` events, assesses
failure severity, checks user presence, and decides: ask the user for
help (if present), attempt an autonomous fix (if away), or escalate
catastrophic failures for post-mortem.
"""
from framework.graph import NodeSpec
# Full tool list. ``attach_guardian()`` filters this at runtime to
# only the tools actually registered in the agent's ToolRegistry so
# that tool validation never fails.
ALL_GUARDIAN_TOOLS = [
# File I/O — available when the agent has hive-tools MCP
"read_file",
"write_file",
"edit_file",
"search_files",
"run_command",
# Graph lifecycle — always registered by attach_guardian()
"load_agent",
"unload_agent",
"start_agent",
"restart_agent",
"get_user_presence",
"list_agents",
]
guardian_node = NodeSpec(
id="guardian",
name="Agent Guardian",
description=(
"Event-driven guardian that monitors supervised agent graphs. "
"Triggers on EXECUTION_FAILED events from secondary graphs, "
"assesses failure severity, and decides: ask the user for help "
"(if present), attempt an autonomous fix (if away), or escalate "
"catastrophic failures for post-mortem."
),
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["failure_event"],
output_keys=["resolution"],
nullable_output_keys=["resolution"],
success_criteria=(
"Failure is resolved — either by user guidance, autonomous fix, or documented escalation."
),
system_prompt="""\
You are the Agent Guardian a watchdog that fires when a supervised \
agent graph fails. Your job: triage, fix, or escalate.
# Context
You receive a failure event from an agent graph. The event contains \
the graph_id, error message, and execution details. You also have \
access to shared session memory and the user presence status.
# Decision Protocol
1. **Assess severity.** Read the error. Is it:
- Transient (timeout, rate limit, network blip) -> auto-retry
- Configuration (bad API key, missing tool) -> needs user input
- Logic bug (wrong output format, infinite loop) -> needs code fix
- Catastrophic (data corruption, unrecoverable) -> escalate
2. **Check user presence.** Call get_user_presence().
- **present** (idle < 2 min): Ask the user for guidance. Present the \
error clearly and suggest options.
- **idle** (2-10 min): Attempt autonomous fix first. If it fails, \
queue a notification for when user returns.
- **away** (> 10 min) or **never_seen**: Attempt autonomous fix. \
Save escalation log via write_file if fix fails.
3. **Act.**
- For transient errors: restart_agent(graph_id), then start_agent.
- For config issues: if user present, ask. If away, log and wait.
- For logic bugs: read the agent's source code, identify the issue, \
fix with edit_file, restart_agent, start_agent.
- For catastrophic: save detailed escalation log, unload the agent.
# Tools
- get_user_presence() -- check if user is active
- list_agents() -- see loaded graphs and status
- load_agent(path) -- load an agent graph
- unload_agent(graph_id) -- remove a graph
- start_agent(graph_id, entry_point, input_data) -- trigger execution
- restart_agent(graph_id) -- unload for reload
- read_file, write_file, edit_file -- inspect/fix agent source code \
(available when the agent's MCP server provides them)
- run_command -- run shell commands (available when provided by MCP)
# Rules
- Be concise. State the problem, your assessment, and your action.
- If asking the user, present the error and 2-3 concrete options.
- After a fix attempt, verify it works before declaring success.
- set_output("resolution", "...") only after the issue is resolved or \
escalated. Use a brief description: "auto-fixed: retry after timeout", \
"escalated: missing API key", "user-resolved: updated config".
""",
# Placeholder — attach_guardian() replaces this with the filtered list
tools=ALL_GUARDIAN_TOOLS,
)
+11 -6
View File
@@ -90,10 +90,10 @@ goal = Goal(
], ],
) )
# Single node — the entire agent is one continuous loop # Nodes — single coder node (guardian is now auto-attached by the framework)
nodes = [coder_node] nodes = [coder_node]
# No edges needed — single forever-alive node # No edges needed — single forever-alive event_loop node
edges = [] edges = []
# Graph configuration # Graph configuration
@@ -102,6 +102,9 @@ entry_points = {"start": "coder"}
pause_nodes = [] pause_nodes = []
terminal_nodes = [] # Forever-alive: loops until user exits terminal_nodes = [] # Forever-alive: loops until user exits
# No async entry points — guardian is now auto-attached via attach_guardian()
async_entry_points = []
# Module-level variables read by AgentRunner.load() # Module-level variables read by AgentRunner.load()
conversation_mode = "continuous" conversation_mode = "continuous"
identity_prompt = ( identity_prompt = (
@@ -126,9 +129,8 @@ class HiveCoderAgent:
""" """
Hive Coder builds Hive agent packages from natural language. Hive Coder builds Hive agent packages from natural language.
Single-node architecture inspired by opencode's while(true) loop. Single-node architecture: the coder runs in a continuous while(true) loop.
One continuous conversation handles discover design implement The guardian watchdog is auto-attached by the framework in TUI mode.
verify present iterate.
""" """
def __init__(self, config=None): def __init__(self, config=None):
@@ -140,6 +142,7 @@ class HiveCoderAgent:
self.entry_points = entry_points self.entry_points = entry_points
self.pause_nodes = pause_nodes self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes self.terminal_nodes = terminal_nodes
self.async_entry_points = async_entry_points
self._graph: GraphSpec | None = None self._graph: GraphSpec | None = None
self._agent_runtime: AgentRuntime | None = None self._agent_runtime: AgentRuntime | None = None
self._tool_registry: ToolRegistry | None = None self._tool_registry: ToolRegistry | None = None
@@ -162,6 +165,7 @@ class HiveCoderAgent:
loop_config=loop_config, loop_config=loop_config,
conversation_mode=conversation_mode, conversation_mode=conversation_mode,
identity_prompt=identity_prompt, identity_prompt=identity_prompt,
async_entry_points=self.async_entry_points,
) )
def _setup(self, mock_mode=False) -> None: def _setup(self, mock_mode=False) -> None:
@@ -203,7 +207,7 @@ class HiveCoderAgent:
entry_node=self.entry_node, entry_node=self.entry_node,
trigger_type="manual", trigger_type="manual",
isolation_level="shared", isolation_level="shared",
) ),
] ]
self._agent_runtime = create_agent_runtime( self._agent_runtime = create_agent_runtime(
@@ -215,6 +219,7 @@ class HiveCoderAgent:
tools=tools, tools=tools,
tool_executor=tool_executor, tool_executor=tool_executor,
checkpoint_config=checkpoint_config, checkpoint_config=checkpoint_config,
graph_id="hive_coder",
) )
async def start(self, mock_mode=False) -> None: async def start(self, mock_mode=False) -> None:
@@ -359,6 +359,27 @@ After user confirms satisfaction:
If building another agent, just start the loop again no need to \ If building another agent, just start the loop again no need to \
set_output until the user is done. set_output until the user is done.
## 7. Live Test (optional)
After the user approves, offer to load and run the agent in-session. \
This runs it alongside you, with the Agent Guardian watching for \
failures automatically.
```
load_agent("exports/{name}") # registers as secondary graph
start_agent("{name}") # triggers default entry point
```
If the agent fails, the guardian fires and triages. You can also:
- `list_agents()` see all loaded graphs and status
- `restart_agent("{name}")` then `load_agent` pick up code changes
- `unload_agent("{name}")` remove it from the session
- `get_user_presence()` check if user is around
The agent runs in a shared session: it can read memory you've set and \
its outputs are visible to you. If the guardian escalates a failure, \
you'll see the error and can fix the code, then reload.
""", """,
tools=[ tools=[
"read_file", "read_file",
@@ -377,6 +398,12 @@ set_output until the user is done.
"list_agent_checkpoints", "list_agent_checkpoints",
"get_agent_checkpoint", "get_agent_checkpoint",
"run_agent_tests", "run_agent_tests",
# Graph lifecycle tools (multi-graph sessions)
"load_agent",
"unload_agent",
"start_agent",
"restart_agent",
"get_user_presence",
], ],
) )
+78 -16
View File
@@ -274,10 +274,21 @@ class EventLoopNode(NodeProtocol):
) )
accumulator = OutputAccumulator(store=self._conversation_store) accumulator = OutputAccumulator(store=self._conversation_store)
start_iteration = 0 start_iteration = 0
_restored_recent_responses: list[str] = []
_restored_tool_fingerprints: list[list[tuple[str, str]]] = []
else: else:
# Try crash-recovery restore from store, then fall back to fresh. # Try crash-recovery restore from store, then fall back to fresh.
conversation, accumulator, start_iteration = await self._restore(ctx) restored = await self._restore(ctx)
if conversation is None: if restored is not None:
conversation = restored.conversation
accumulator = restored.accumulator
start_iteration = restored.start_iteration
_restored_recent_responses = restored.recent_responses
_restored_tool_fingerprints = restored.recent_tool_fingerprints
else:
_restored_recent_responses = []
_restored_tool_fingerprints = []
# Fresh conversation: either isolated mode or first node in continuous mode. # Fresh conversation: either isolated mode or first node in continuous mode.
from framework.graph.prompt_composer import _with_datetime from framework.graph.prompt_composer import _with_datetime
@@ -320,10 +331,9 @@ class EventLoopNode(NodeProtocol):
# 4. Publish loop started # 4. Publish loop started
await self._publish_loop_started(stream_id, node_id) await self._publish_loop_started(stream_id, node_id)
# 5. Stall / doom loop detection state # 5. Stall / doom loop detection state (restored from cursor if resuming)
recent_responses: list[str] = [] recent_responses: list[str] = _restored_recent_responses
recent_tool_fingerprints: list[list[tuple[str, str]]] = [] recent_tool_fingerprints: list[list[tuple[str, str]]] = _restored_tool_fingerprints
user_interaction_count = 0 # tracks how many times this node blocked for user input
# 6. Main loop # 6. Main loop
for iteration in range(start_iteration, self._config.max_iterations): for iteration in range(start_iteration, self._config.max_iterations):
@@ -620,8 +630,15 @@ class EventLoopNode(NodeProtocol):
# Text-only turn breaks the doom loop chain # Text-only turn breaks the doom loop chain
recent_tool_fingerprints.clear() recent_tool_fingerprints.clear()
# 6g. Write cursor checkpoint # 6g. Write cursor checkpoint (includes stall/doom state for resume)
await self._write_cursor(ctx, conversation, accumulator, iteration) await self._write_cursor(
ctx,
conversation,
accumulator,
iteration,
recent_responses=recent_responses,
recent_tool_fingerprints=recent_tool_fingerprints,
)
# 6h. Client-facing input blocking # 6h. Client-facing input blocking
# #
@@ -738,7 +755,6 @@ class EventLoopNode(NodeProtocol):
conversation=conversation if _is_continuous else None, conversation=conversation if _is_continuous else None,
) )
user_interaction_count += 1
recent_responses.clear() recent_responses.clear()
if _cf_auto: if _cf_auto:
@@ -2207,29 +2223,60 @@ class EventLoopNode(NodeProtocol):
# Persistence: restore, cursor, injection, pause # Persistence: restore, cursor, injection, pause
# ------------------------------------------------------------------- # -------------------------------------------------------------------
@dataclass
class _RestoredState:
"""State recovered from a previous checkpoint."""
conversation: NodeConversation
accumulator: OutputAccumulator
start_iteration: int
recent_responses: list[str]
recent_tool_fingerprints: list[list[tuple[str, str]]]
async def _restore( async def _restore(
self, self,
ctx: NodeContext, ctx: NodeContext,
) -> tuple[NodeConversation | None, OutputAccumulator | None, int]: ) -> _RestoredState | None:
"""Attempt to restore from a previous checkpoint.""" """Attempt to restore from a previous checkpoint.
Returns a ``_RestoredState`` with conversation, accumulator, iteration
counter, and stall/doom-loop detection state everything needed to
resume exactly where execution stopped.
"""
if self._conversation_store is None: if self._conversation_store is None:
return None, None, 0 return None
conversation = await NodeConversation.restore(self._conversation_store) conversation = await NodeConversation.restore(self._conversation_store)
if conversation is None: if conversation is None:
return None, None, 0 return None
accumulator = await OutputAccumulator.restore(self._conversation_store) accumulator = await OutputAccumulator.restore(self._conversation_store)
cursor = await self._conversation_store.read_cursor() cursor = await self._conversation_store.read_cursor()
start_iteration = cursor.get("iteration", 0) + 1 if cursor else 0 start_iteration = cursor.get("iteration", 0) + 1 if cursor else 0
# Restore stall/doom-loop detection state
recent_responses: list[str] = cursor.get("recent_responses", []) if cursor else []
raw_fps = cursor.get("recent_tool_fingerprints", []) if cursor else []
recent_tool_fingerprints: list[list[tuple[str, str]]] = [
[tuple(pair) for pair in fps] # type: ignore[misc]
for fps in raw_fps
]
logger.info( logger.info(
f"Restored event loop: iteration={start_iteration}, " f"Restored event loop: iteration={start_iteration}, "
f"messages={conversation.message_count}, " f"messages={conversation.message_count}, "
f"outputs={list(accumulator.values.keys())}" f"outputs={list(accumulator.values.keys())}, "
f"stall_window={len(recent_responses)}, "
f"doom_window={len(recent_tool_fingerprints)}"
)
return EventLoopNode._RestoredState(
conversation=conversation,
accumulator=accumulator,
start_iteration=start_iteration,
recent_responses=recent_responses,
recent_tool_fingerprints=recent_tool_fingerprints,
) )
return conversation, accumulator, start_iteration
async def _write_cursor( async def _write_cursor(
self, self,
@@ -2237,8 +2284,15 @@ class EventLoopNode(NodeProtocol):
conversation: NodeConversation, conversation: NodeConversation,
accumulator: OutputAccumulator, accumulator: OutputAccumulator,
iteration: int, iteration: int,
*,
recent_responses: list[str] | None = None,
recent_tool_fingerprints: list[list[tuple[str, str]]] | None = None,
) -> None: ) -> None:
"""Write checkpoint cursor for crash recovery.""" """Write checkpoint cursor for crash recovery.
Persists iteration counter, accumulator outputs, and stall/doom-loop
detection state so that resume picks up exactly where execution stopped.
"""
if self._conversation_store: if self._conversation_store:
cursor = await self._conversation_store.read_cursor() or {} cursor = await self._conversation_store.read_cursor() or {}
cursor.update( cursor.update(
@@ -2249,6 +2303,14 @@ class EventLoopNode(NodeProtocol):
"outputs": accumulator.to_dict(), "outputs": accumulator.to_dict(),
} }
) )
# Persist stall/doom-loop detection state for reliable resume
if recent_responses is not None:
cursor["recent_responses"] = recent_responses
if recent_tool_fingerprints is not None:
# Convert list[list[tuple]] → list[list[list]] for JSON
cursor["recent_tool_fingerprints"] = [
[list(pair) for pair in fps] for fps in recent_tool_fingerprints
]
await self._conversation_store.write_cursor(cursor) await self._conversation_store.write_cursor(cursor)
async def _drain_injection_queue(self, conversation: NodeConversation) -> int: async def _drain_injection_queue(self, conversation: NodeConversation) -> int:
+49
View File
@@ -1290,6 +1290,36 @@ class GraphExecutor:
# Handle cancellation (e.g., TUI quit) - save as paused instead of failed # Handle cancellation (e.g., TUI quit) - save as paused instead of failed
self.logger.info("⏸ Execution cancelled - saving state for resume") self.logger.info("⏸ Execution cancelled - saving state for resume")
# Flush WIP accumulator outputs from the interrupted node's
# cursor.json into SharedMemory so they survive resume. The
# accumulator writes to cursor.json on every set() call, but
# only writes to SharedMemory when the judge ACCEPTs. Without
# this, edge conditions checking these keys see None on resume.
if current_node_id and self._storage_path:
try:
import json as _json
cursor_path = (
self._storage_path / "conversations" / current_node_id / "cursor.json"
)
if cursor_path.exists():
cursor_data = _json.loads(cursor_path.read_text(encoding="utf-8"))
wip_outputs = cursor_data.get("outputs", {})
for key, value in wip_outputs.items():
if value is not None:
memory.write(key, value, validate=False)
if wip_outputs:
self.logger.info(
"Flushed %d WIP accumulator outputs to memory: %s",
len(wip_outputs),
list(wip_outputs.keys()),
)
except Exception:
self.logger.debug(
"Could not flush accumulator outputs from cursor",
exc_info=True,
)
# Save memory and state for resume # Save memory and state for resume
saved_memory = memory.read_all() saved_memory = memory.read_all()
session_state_out: dict[str, Any] = { session_state_out: dict[str, Any] = {
@@ -1367,6 +1397,25 @@ class GraphExecutor:
execution_quality="failed", execution_quality="failed",
) )
# Flush WIP accumulator outputs (same as CancelledError path)
if current_node_id and self._storage_path:
try:
import json as _json
cursor_path = (
self._storage_path / "conversations" / current_node_id / "cursor.json"
)
if cursor_path.exists():
cursor_data = _json.loads(cursor_path.read_text(encoding="utf-8"))
for key, value in cursor_data.get("outputs", {}).items():
if value is not None:
memory.write(key, value, validate=False)
except Exception:
self.logger.debug(
"Could not flush accumulator outputs from cursor",
exc_info=True,
)
# Save memory and state for potential resume # Save memory and state for potential resume
saved_memory = memory.read_all() saved_memory = memory.read_all()
session_state_out: dict[str, Any] = { session_state_out: dict[str, Any] = {
+2 -5
View File
@@ -36,6 +36,7 @@ from framework.graph import ( # noqa: E402
NodeSpec, NodeSpec,
SuccessCriterion, SuccessCriterion,
) )
# Testing framework imports # Testing framework imports
from framework.testing.prompts import ( # noqa: E402 from framework.testing.prompts import ( # noqa: E402
PYTEST_TEST_FILE_HEADER, PYTEST_TEST_FILE_HEADER,
@@ -665,9 +666,7 @@ def add_node(
# Reject removed node types # Reject removed node types
if node_type in ("function", "llm_tool_use", "llm_generate"): if node_type in ("function", "llm_tool_use", "llm_generate"):
errors.append( errors.append(f"Node type '{node_type}' is no longer supported. Use 'event_loop' instead.")
f"Node type '{node_type}' is no longer supported. Use 'event_loop' instead."
)
if node_type == "router" and not routes_dict: if node_type == "router" and not routes_dict:
errors.append(f"Router node '{node_id}' must specify routes") errors.append(f"Router node '{node_id}' must specify routes")
@@ -1377,7 +1376,6 @@ def validate_graph() -> str:
f"must be a subset of output_keys {node.output_keys}" f"must be a subset of output_keys {node.output_keys}"
) )
# Warn if all event_loop nodes are client_facing (common misconfiguration) # Warn if all event_loop nodes are client_facing (common misconfiguration)
el_nodes = [n for n in session.nodes if n.node_type == "event_loop"] el_nodes = [n for n in session.nodes if n.node_type == "event_loop"]
cf_el_nodes = [n for n in el_nodes if n.client_facing] cf_el_nodes = [n for n in el_nodes if n.client_facing]
@@ -1413,7 +1411,6 @@ def validate_graph() -> str:
"event_loop_nodes": event_loop_nodes, "event_loop_nodes": event_loop_nodes,
"client_facing_nodes": client_facing_nodes, "client_facing_nodes": client_facing_nodes,
"feedback_edges": feedback_edges, "feedback_edges": feedback_edges,
"deprecated_node_types": deprecated_nodes,
} }
) )
+105 -4
View File
@@ -75,6 +75,11 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
default=None, default=None,
help="Resume from a specific checkpoint (requires --resume-session)", help="Resume from a specific checkpoint (requires --resume-session)",
) )
run_parser.add_argument(
"--no-guardian",
action="store_true",
help="Disable the Agent Guardian watchdog in TUI mode",
)
run_parser.set_defaults(func=cmd_run) run_parser.set_defaults(func=cmd_run)
# info command # info command
@@ -206,6 +211,11 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
default=None, default=None,
help="LLM model to use (any LiteLLM-compatible name)", help="LLM model to use (any LiteLLM-compatible name)",
) )
tui_parser.add_argument(
"--no-guardian",
action="store_true",
help="Disable the Agent Guardian watchdog",
)
tui_parser.set_defaults(func=cmd_tui) tui_parser.set_defaults(func=cmd_tui)
# code command (Hive Coder — framework agent builder) # code command (Hive Coder — framework agent builder)
@@ -221,6 +231,11 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
default=None, default=None,
help="LLM model to use (any LiteLLM-compatible name)", help="LLM model to use (any LiteLLM-compatible name)",
) )
code_parser.add_argument(
"--no-guardian",
action="store_true",
help="Disable the Agent Guardian watchdog",
)
code_parser.set_defaults(func=cmd_code) code_parser.set_defaults(func=cmd_code)
# sessions command group (checkpoint/resume management) # sessions command group (checkpoint/resume management)
@@ -462,6 +477,12 @@ def cmd_run(args: argparse.Namespace) -> int:
print(f"\n{e}", file=sys.stderr) print(f"\n{e}", file=sys.stderr)
return return
# Attach guardian watchdog (before start — register_entry_point requires it)
if not getattr(args, "no_guardian", False) and runner._agent_runtime:
from framework.agents.guardian import attach_guardian
attach_guardian(runner._agent_runtime, runner._tool_registry)
# Start runtime before TUI so it's ready for user input # Start runtime before TUI so it's ready for user input
if runner._agent_runtime and not runner._agent_runtime.is_running: if runner._agent_runtime and not runner._agent_runtime.is_running:
await runner._agent_runtime.start() await runner._agent_runtime.start()
@@ -1250,7 +1271,11 @@ def _get_framework_agents_dir() -> Path:
return Path(__file__).resolve().parent.parent / "agents" return Path(__file__).resolve().parent.parent / "agents"
def _launch_agent_tui(agent_path: str | Path, model: str | None = None) -> int: def _launch_agent_tui(
agent_path: str | Path,
model: str | None = None,
no_guardian: bool = False,
) -> int:
"""Load an agent and launch the TUI. Shared by cmd_tui and cmd_code.""" """Load an agent and launch the TUI. Shared by cmd_tui and cmd_code."""
from framework.credentials.models import CredentialError from framework.credentials.models import CredentialError
from framework.runner import AgentRunner from framework.runner import AgentRunner
@@ -1276,6 +1301,12 @@ def _launch_agent_tui(agent_path: str | Path, model: str | None = None) -> int:
print(f"\n{e}", file=sys.stderr) print(f"\n{e}", file=sys.stderr)
return return
# Attach guardian watchdog (before start)
if not no_guardian and runner._agent_runtime:
from framework.agents.guardian import attach_guardian
attach_guardian(runner._agent_runtime, runner._tool_registry)
if runner._agent_runtime and not runner._agent_runtime.is_running: if runner._agent_runtime and not runner._agent_runtime.is_running:
await runner._agent_runtime.start() await runner._agent_runtime.start()
@@ -1349,11 +1380,20 @@ def cmd_tui(args: argparse.Namespace) -> int:
if not agent_path: if not agent_path:
return 1 return 1
return _launch_agent_tui(agent_path, model=args.model) return _launch_agent_tui(
agent_path,
model=args.model,
no_guardian=getattr(args, "no_guardian", False),
)
def cmd_code(args: argparse.Namespace) -> int: def cmd_code(args: argparse.Namespace) -> int:
"""Launch Hive Coder to build agents.""" """Launch Hive Coder with multi-graph support.
Unlike ``_launch_agent_tui``, this sets up graph lifecycle tools and
assigns ``graph_id="hive_coder"`` so the coder can load, supervise,
and restart secondary agent graphs within the same session.
"""
import logging import logging
logging.basicConfig(level=logging.WARNING, format="%(message)s") logging.basicConfig(level=logging.WARNING, format="%(message)s")
@@ -1370,7 +1410,68 @@ def cmd_code(args: argparse.Namespace) -> int:
if fa_str not in sys.path: if fa_str not in sys.path:
sys.path.insert(0, fa_str) sys.path.insert(0, fa_str)
return _launch_agent_tui(hive_coder_path, model=args.model) from framework.credentials.models import CredentialError
from framework.runner import AgentRunner
from framework.tools.session_graph_tools import register_graph_tools
from framework.tui.app import AdenTUI
async def run_with_tui():
try:
runner = AgentRunner.load(hive_coder_path, model=args.model)
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return
except Exception as e:
print(f"Error loading agent: {e}")
return
if runner._agent_runtime is None:
try:
runner._setup()
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return
runtime = runner._agent_runtime
# -- Multi-graph setup --
# Tag the primary graph so events carry graph_id="hive_coder"
runtime._graph_id = "hive_coder"
runtime._active_graph_id = "hive_coder"
# Register graph lifecycle tools (load_agent, unload_agent, etc.)
register_graph_tools(runner._tool_registry, runtime)
# Refresh tool schemas AND executor so streams see the new tools.
# The executor closure references the registry dict by ref, but
# refreshing both is robust against any copy-on-read behavior.
runtime._tools = list(runner._tool_registry.get_tools().values())
runtime._tool_executor = runner._tool_registry.get_executor()
# Attach guardian watchdog (before start — skips re-registering
# graph tools since register_graph_tools() was already called above)
if not getattr(args, "no_guardian", False):
from framework.agents.guardian import attach_guardian
attach_guardian(runtime, runner._tool_registry)
if not runtime.is_running:
await runtime.start()
app = AdenTUI(runtime)
try:
await app.run_async()
except Exception as e:
import traceback
traceback.print_exc()
print(f"TUI error: {e}")
await runner.cleanup_async()
asyncio.run(run_with_tui())
print("TUI session ended.")
return 0
def _extract_python_agent_metadata(agent_path: Path) -> tuple[str, str]: def _extract_python_agent_metadata(agent_path: Path) -> tuple[str, str]:
+56
View File
@@ -776,6 +776,7 @@ class AgentRunner:
runtime_log_store=log_store, runtime_log_store=log_store,
checkpoint_config=checkpoint_config, checkpoint_config=checkpoint_config,
config=runtime_config, config=runtime_config,
graph_id=self.graph.id or self.agent_path.name,
) )
# Pass intro_message through for TUI display # Pass intro_message through for TUI display
@@ -1393,6 +1394,61 @@ Respond with JSON only:
type=MessageType.RESPONSE, type=MessageType.RESPONSE,
) )
@classmethod
async def setup_as_secondary(
cls,
agent_path: str | Path,
runtime: AgentRuntime,
graph_id: str | None = None,
) -> str:
"""Load an agent and register it as a secondary graph on *runtime*.
Uses :meth:`AgentRunner.load` to parse the agent, then calls
:meth:`AgentRuntime.add_graph` with the extracted graph, goal,
and entry points.
Args:
agent_path: Path to the agent directory
runtime: The running AgentRuntime to attach to
graph_id: Optional graph identifier (defaults to directory name)
Returns:
The graph_id used for registration
"""
agent_path = Path(agent_path)
runner = cls.load(agent_path)
gid = graph_id or agent_path.name
# Build entry points
entry_points: dict[str, EntryPointSpec] = {}
if runner.graph.entry_node:
entry_points["default"] = EntryPointSpec(
id="default",
name="Default",
entry_node=runner.graph.entry_node,
trigger_type="manual",
isolation_level="shared",
)
for aep in runner.graph.async_entry_points:
entry_points[aep.id] = EntryPointSpec(
id=aep.id,
name=aep.name,
entry_node=aep.entry_node,
trigger_type=aep.trigger_type,
trigger_config=aep.trigger_config,
isolation_level=aep.isolation_level,
priority=aep.priority,
max_concurrent=aep.max_concurrent,
)
await runtime.add_graph(
graph_id=gid,
graph=runner.graph,
goal=runner.goal,
entry_points=entry_points,
)
return gid
def cleanup(self) -> None: def cleanup(self) -> None:
"""Clean up resources (synchronous).""" """Clean up resources (synchronous)."""
# Clean up MCP client connections # Clean up MCP client connections
+30 -7
View File
@@ -1,5 +1,6 @@
"""Tool discovery and registration for agent runner.""" """Tool discovery and registration for agent runner."""
import asyncio
import contextvars import contextvars
import importlib.util import importlib.util
import inspect import inspect
@@ -224,8 +225,19 @@ class ToolRegistry:
Get unified tool executor function. Get unified tool executor function.
Returns a function that dispatches to the appropriate tool executor. Returns a function that dispatches to the appropriate tool executor.
Handles both sync and async tool implementations async results are
wrapped so that ``EventLoopNode._execute_tool`` can await them.
""" """
def _wrap_result(tool_use_id: str, result: Any) -> ToolResult:
if isinstance(result, ToolResult):
return result
return ToolResult(
tool_use_id=tool_use_id,
content=json.dumps(result) if not isinstance(result, str) else result,
is_error=False,
)
def executor(tool_use: ToolUse) -> ToolResult: def executor(tool_use: ToolUse) -> ToolResult:
if tool_use.name not in self._tools: if tool_use.name not in self._tools:
return ToolResult( return ToolResult(
@@ -237,13 +249,24 @@ class ToolRegistry:
registered = self._tools[tool_use.name] registered = self._tools[tool_use.name]
try: try:
result = registered.executor(tool_use.input) result = registered.executor(tool_use.input)
if isinstance(result, ToolResult):
return result # Async tool: wrap the awaitable so the caller can await it
return ToolResult( if asyncio.iscoroutine(result) or asyncio.isfuture(result):
tool_use_id=tool_use.id,
content=json.dumps(result) if not isinstance(result, str) else result, async def _await_and_wrap():
is_error=False, try:
) r = await result
return _wrap_result(tool_use.id, r)
except Exception as exc:
return ToolResult(
tool_use_id=tool_use.id,
content=json.dumps({"error": str(exc)}),
is_error=True,
)
return _await_and_wrap()
return _wrap_result(tool_use.id, result)
except Exception as e: except Exception as e:
return ToolResult( return ToolResult(
tool_use_id=tool_use.id, tool_use_id=tool_use.id,
+402 -32
View File
@@ -48,6 +48,20 @@ class AgentRuntimeConfig:
# Each dict: {"source_id": str, "path": str, "methods": ["POST"], "secret": str|None} # Each dict: {"source_id": str, "path": str, "methods": ["POST"], "secret": str|None}
@dataclass
class _GraphRegistration:
"""Tracks a loaded graph and its runtime resources."""
graph: "GraphSpec"
goal: "Goal"
entry_points: dict[str, EntryPointSpec]
streams: dict[str, ExecutionStream] # ep_id -> stream (NOT namespaced)
storage_subpath: str # relative to session root, e.g. "graphs/email_agent"
event_subscriptions: list[str] = field(default_factory=list)
timer_tasks: list[asyncio.Task] = field(default_factory=list)
timer_next_fire: dict[str, float] = field(default_factory=dict)
class AgentRuntime: class AgentRuntime:
""" """
Top-level runtime that manages agent lifecycle and concurrent executions. Top-level runtime that manages agent lifecycle and concurrent executions.
@@ -111,6 +125,7 @@ class AgentRuntime:
config: AgentRuntimeConfig | None = None, config: AgentRuntimeConfig | None = None,
runtime_log_store: Any = None, runtime_log_store: Any = None,
checkpoint_config: CheckpointConfig | None = None, checkpoint_config: CheckpointConfig | None = None,
graph_id: str | None = None,
): ):
""" """
Initialize agent runtime. Initialize agent runtime.
@@ -125,6 +140,7 @@ class AgentRuntime:
config: Optional runtime configuration config: Optional runtime configuration
runtime_log_store: Optional RuntimeLogStore for per-execution logging runtime_log_store: Optional RuntimeLogStore for per-execution logging
checkpoint_config: Optional checkpoint configuration for resumable sessions checkpoint_config: Optional checkpoint configuration for resumable sessions
graph_id: Optional identifier for the primary graph (defaults to "primary")
""" """
self.graph = graph self.graph = graph
self.goal = goal self.goal = goal
@@ -132,6 +148,16 @@ class AgentRuntime:
self._runtime_log_store = runtime_log_store self._runtime_log_store = runtime_log_store
self._checkpoint_config = checkpoint_config self._checkpoint_config = checkpoint_config
# Primary graph identity
self._graph_id: str = graph_id or "primary"
# Multi-graph state
self._graphs: dict[str, _GraphRegistration] = {}
self._active_graph_id: str = self._graph_id
# User presence tracking (monotonic timestamp of last inject_input)
self._last_user_input_time: float = 0.0
# Initialize storage # Initialize storage
storage_path_obj = Path(storage_path) if isinstance(storage_path, str) else storage_path storage_path_obj = Path(storage_path) if isinstance(storage_path, str) else storage_path
self._storage = ConcurrentStorage( self._storage = ConcurrentStorage(
@@ -153,15 +179,15 @@ class AgentRuntime:
self._tools = tools or [] self._tools = tools or []
self._tool_executor = tool_executor self._tool_executor = tool_executor
# Entry points and streams # Entry points and streams (primary graph)
self._entry_points: dict[str, EntryPointSpec] = {} self._entry_points: dict[str, EntryPointSpec] = {}
self._streams: dict[str, ExecutionStream] = {} self._streams: dict[str, ExecutionStream] = {}
# Webhook server (created on start if webhook_routes configured) # Webhook server (created on start if webhook_routes configured)
self._webhook_server: Any = None self._webhook_server: Any = None
# Event-driven entry point subscriptions # Event-driven entry point subscriptions (primary graph)
self._event_subscriptions: list[str] = [] self._event_subscriptions: list[str] = []
# Timer tasks for scheduled entry points # Timer tasks for scheduled entry points (primary graph)
self._timer_tasks: list[asyncio.Task] = [] self._timer_tasks: list[asyncio.Task] = []
# Next fire time for each timer entry point (ep_id -> datetime) # Next fire time for each timer entry point (ep_id -> datetime)
self._timer_next_fire: dict[str, float] = {} self._timer_next_fire: dict[str, float] = {}
@@ -246,6 +272,7 @@ class AgentRuntime:
runtime_log_store=self._runtime_log_store, runtime_log_store=self._runtime_log_store,
session_store=self._session_store, session_store=self._session_store,
checkpoint_config=self._checkpoint_config, checkpoint_config=self._checkpoint_config,
graph_id=self._graph_id,
) )
await stream.start() await stream.start()
self._streams[ep_id] = stream self._streams[ep_id] = stream
@@ -291,29 +318,38 @@ class AgentRuntime:
) )
continue continue
# Capture ep_id in closure # Capture ep_id and config in closure
def _make_handler(entry_point_id: str): exclude_own = tc.get("exclude_own_graph", False)
def _make_handler(entry_point_id: str, _exclude_own: bool):
async def _on_event(event): async def _on_event(event):
if self._running and entry_point_id in self._streams: if not self._running or entry_point_id not in self._streams:
# Run in the same session as the primary entry return
# point so memory (e.g. user-defined rules) is # Skip events originating from this graph's own
# shared and logs land in one session directory. # executions (e.g. guardian should not fire on
session_state = self._get_primary_session_state( # hive_coder failures — only secondary graphs).
exclude_entry_point=entry_point_id if _exclude_own and event.graph_id == self._graph_id:
) return
await self.trigger( # Run in the same session as the primary entry
entry_point_id, # point so memory (e.g. user-defined rules) is
{"event": event.to_dict()}, # shared and logs land in one session directory.
session_state=session_state, session_state = self._get_primary_session_state(
) exclude_entry_point=entry_point_id
)
await self.trigger(
entry_point_id,
{"event": event.to_dict()},
session_state=session_state,
)
return _on_event return _on_event
sub_id = self._event_bus.subscribe( sub_id = self._event_bus.subscribe(
event_types=event_types, event_types=event_types,
handler=_make_handler(ep_id), handler=_make_handler(ep_id, exclude_own),
filter_stream=tc.get("filter_stream"), filter_stream=tc.get("filter_stream"),
filter_node=tc.get("filter_node"), filter_node=tc.get("filter_node"),
filter_graph=tc.get("filter_graph"),
) )
self._event_subscriptions.append(sub_id) self._event_subscriptions.append(sub_id)
@@ -464,6 +500,18 @@ class AgentRuntime:
ep_id, ep_id,
) )
# Register primary graph
self._graphs[self._graph_id] = _GraphRegistration(
graph=self.graph,
goal=self.goal,
entry_points=dict(self._entry_points),
streams=dict(self._streams),
storage_subpath="",
event_subscriptions=list(self._event_subscriptions),
timer_tasks=list(self._timer_tasks),
timer_next_fire=self._timer_next_fire,
)
self._running = True self._running = True
logger.info(f"AgentRuntime started with {len(self._streams)} streams") logger.info(f"AgentRuntime started with {len(self._streams)} streams")
@@ -473,12 +521,17 @@ class AgentRuntime:
return return
async with self._lock: async with self._lock:
# Cancel timer tasks # Stop secondary graphs first
secondary_ids = [gid for gid in self._graphs if gid != self._graph_id]
for gid in secondary_ids:
await self._teardown_graph(gid)
# Cancel primary timer tasks
for task in self._timer_tasks: for task in self._timer_tasks:
task.cancel() task.cancel()
self._timer_tasks.clear() self._timer_tasks.clear()
# Unsubscribe event-driven entry points # Unsubscribe primary event-driven entry points
for sub_id in self._event_subscriptions: for sub_id in self._event_subscriptions:
self._event_bus.unsubscribe(sub_id) self._event_bus.unsubscribe(sub_id)
self._event_subscriptions.clear() self._event_subscriptions.clear()
@@ -488,11 +541,12 @@ class AgentRuntime:
await self._webhook_server.stop() await self._webhook_server.stop()
self._webhook_server = None self._webhook_server = None
# Stop all streams # Stop all primary streams
for stream in self._streams.values(): for stream in self._streams.values():
await stream.stop() await stream.stop()
self._streams.clear() self._streams.clear()
self._graphs.clear()
# Stop storage # Stop storage
await self._storage.stop() await self._storage.stop()
@@ -559,7 +613,280 @@ class AgentRuntime:
raise ValueError(f"Entry point '{entry_point_id}' not found") raise ValueError(f"Entry point '{entry_point_id}' not found")
return await stream.wait_for_completion(exec_id, timeout) return await stream.wait_for_completion(exec_id, timeout)
def _get_primary_session_state(self, exclude_entry_point: str) -> dict[str, Any] | None: # === MULTI-GRAPH MANAGEMENT ===
async def add_graph(
self,
graph_id: str,
graph: "GraphSpec",
goal: "Goal",
entry_points: dict[str, EntryPointSpec],
storage_subpath: str | None = None,
) -> None:
"""Load a secondary graph into this runtime session.
Creates execution streams for the graph's entry points, sets up
event/timer triggers, and registers the graph. Shares the same
EventBus, state.json, and data directory as the primary graph.
Can be called while the runtime is running.
Args:
graph_id: Unique identifier for the graph
graph: Graph specification
goal: Goal driving this graph's execution
entry_points: Entry point specs (ep_id -> spec)
storage_subpath: Relative path under session root for this
graph's conversations/checkpoints. Defaults to
``"graphs/{graph_id}"``.
Raises:
ValueError: If graph_id already registered or entry node missing
"""
if graph_id in self._graphs:
raise ValueError(f"Graph '{graph_id}' already registered")
subpath = storage_subpath or f"graphs/{graph_id}"
# Validate entry nodes exist in graph
for _ep_id, spec in entry_points.items():
if graph.get_node(spec.entry_node) is None:
raise ValueError(f"Entry node '{spec.entry_node}' not found in graph '{graph_id}'")
# Create streams for each entry point
streams: dict[str, ExecutionStream] = {}
for ep_id, spec in entry_points.items():
stream = ExecutionStream(
stream_id=f"{graph_id}::{ep_id}",
entry_spec=spec,
graph=graph,
goal=goal,
state_manager=self._state_manager,
storage=self._storage,
outcome_aggregator=self._outcome_aggregator,
event_bus=self._event_bus,
llm=self._llm,
tools=self._tools,
tool_executor=self._tool_executor,
result_retention_max=self._config.execution_result_max,
result_retention_ttl_seconds=self._config.execution_result_ttl_seconds,
runtime_log_store=self._runtime_log_store,
session_store=self._session_store,
checkpoint_config=self._checkpoint_config,
graph_id=graph_id,
)
if self._running:
await stream.start()
streams[ep_id] = stream
# Set up event-driven subscriptions
from framework.runtime.event_bus import EventType as _ET
event_subs: list[str] = []
for ep_id, spec in entry_points.items():
if spec.trigger_type != "event":
continue
tc = spec.trigger_config
event_types = [_ET(et) for et in tc.get("event_types", [])]
if not event_types:
logger.warning(
"Entry point '%s::%s' has trigger_type='event' "
"but no event_types in trigger_config",
graph_id,
ep_id,
)
continue
namespaced_ep = f"{graph_id}::{ep_id}"
exclude_own = tc.get("exclude_own_graph", False)
def _make_handler(entry_point_id: str, gid: str, _exclude_own: bool):
async def _on_event(event):
if not self._running or gid not in self._graphs:
return
# Skip events from this graph's own executions
if _exclude_own and event.graph_id == gid:
return
reg = self._graphs[gid]
local_ep = entry_point_id.split("::", 1)[-1]
stream = reg.streams.get(local_ep)
if stream is None:
return
session_state = self._get_primary_session_state(
local_ep,
source_graph_id=gid,
)
await stream.execute(
{"event": event.to_dict()},
session_state=session_state,
)
return _on_event
sub_id = self._event_bus.subscribe(
event_types=event_types,
handler=_make_handler(namespaced_ep, graph_id, exclude_own),
filter_stream=tc.get("filter_stream"),
filter_node=tc.get("filter_node"),
filter_graph=tc.get("filter_graph"),
)
event_subs.append(sub_id)
# Set up timer-driven entry points
timer_tasks: list[asyncio.Task] = []
timer_next_fire: dict[str, float] = {}
for ep_id, spec in entry_points.items():
if spec.trigger_type != "timer":
continue
tc = spec.trigger_config
interval = tc.get("interval_minutes")
run_immediately = tc.get("run_immediately", False)
if interval and interval > 0 and self._running:
def _make_timer(gid: str, local_ep: str, mins: float, immediate: bool):
async def _timer_loop():
interval_secs = mins * 60
if not immediate:
timer_next_fire[local_ep] = time.monotonic() + interval_secs
await asyncio.sleep(interval_secs)
while self._running and gid in self._graphs:
timer_next_fire.pop(local_ep, None)
try:
reg = self._graphs.get(gid)
if not reg:
break
stream = reg.streams.get(local_ep)
if not stream:
break
session_state = self._get_primary_session_state(
local_ep, source_graph_id=gid
)
await stream.execute(
{"event": {"source": "timer", "reason": "scheduled"}},
session_state=session_state,
)
except Exception:
logger.error(
"Timer trigger failed for '%s::%s'",
gid,
local_ep,
exc_info=True,
)
timer_next_fire[local_ep] = time.monotonic() + interval_secs
await asyncio.sleep(interval_secs)
return _timer_loop
task = asyncio.create_task(
_make_timer(graph_id, ep_id, interval, run_immediately)()
)
timer_tasks.append(task)
self._graphs[graph_id] = _GraphRegistration(
graph=graph,
goal=goal,
entry_points=entry_points,
streams=streams,
storage_subpath=subpath,
event_subscriptions=event_subs,
timer_tasks=timer_tasks,
timer_next_fire=timer_next_fire,
)
logger.info(
"Added graph '%s' with %d entry points (%d streams)",
graph_id,
len(entry_points),
len(streams),
)
async def remove_graph(self, graph_id: str) -> None:
"""Remove a secondary graph from this runtime session.
Stops all streams, cancels timers, unsubscribes events, and
removes the registration. Cannot remove the primary graph.
Args:
graph_id: Graph to remove
Raises:
ValueError: If graph_id is the primary graph or not found
"""
if graph_id == self._graph_id:
raise ValueError("Cannot remove the primary graph")
if graph_id not in self._graphs:
raise ValueError(f"Graph '{graph_id}' not found")
await self._teardown_graph(graph_id)
logger.info("Removed graph '%s'", graph_id)
async def _teardown_graph(self, graph_id: str) -> None:
"""Internal: stop and clean up all resources for a graph."""
reg = self._graphs.pop(graph_id, None)
if reg is None:
return
# Cancel timers
for task in reg.timer_tasks:
task.cancel()
# Unsubscribe events
for sub_id in reg.event_subscriptions:
self._event_bus.unsubscribe(sub_id)
# Stop streams
for stream in reg.streams.values():
await stream.stop()
# Reset active graph if it was the removed one
if self._active_graph_id == graph_id:
self._active_graph_id = self._graph_id
def list_graphs(self) -> list[str]:
"""Return all registered graph IDs (primary first)."""
result = []
if self._graph_id in self._graphs:
result.append(self._graph_id)
for gid in self._graphs:
if gid != self._graph_id:
result.append(gid)
return result
@property
def graph_id(self) -> str:
"""The primary graph's ID."""
return self._graph_id
@property
def active_graph_id(self) -> str:
"""The currently focused graph (for TUI routing)."""
return self._active_graph_id
@active_graph_id.setter
def active_graph_id(self, value: str) -> None:
if value not in self._graphs:
raise ValueError(f"Graph '{value}' not registered")
self._active_graph_id = value
@property
def user_idle_seconds(self) -> float:
"""Seconds since the user last provided input.
Returns ``float('inf')`` if no input has been received yet.
"""
if self._last_user_input_time == 0.0:
return float("inf")
return time.monotonic() - self._last_user_input_time
def get_graph_registration(self, graph_id: str) -> _GraphRegistration | None:
"""Get the registration for a specific graph (or None)."""
return self._graphs.get(graph_id)
def _get_primary_session_state(
self,
exclude_entry_point: str,
*,
source_graph_id: str | None = None,
) -> dict[str, Any] | None:
"""Build session_state so an async entry point runs in the primary session. """Build session_state so an async entry point runs in the primary session.
Looks for an active execution from another stream (the "primary" Looks for an active execution from another stream (the "primary"
@@ -576,6 +903,15 @@ class AgentRuntime:
which is kept up-to-date by ``GraphExecutor._write_progress()`` which is kept up-to-date by ``GraphExecutor._write_progress()``
at every node transition. at every node transition.
Searches across ALL graphs' streams (primary + secondary) so
event-driven entry points on secondary graphs can share the
primary session.
Args:
exclude_entry_point: Entry point ID to skip (the one being triggered)
source_graph_id: Graph the exclude_entry_point belongs to (for
resolving the entry node spec). Defaults to primary graph.
Returns ``None`` if no primary session is active (the webhook Returns ``None`` if no primary session is active (the webhook
execution will just create its own session). execution will just create its own session).
""" """
@@ -583,13 +919,27 @@ class AgentRuntime:
# Determine which memory keys the async entry node needs. # Determine which memory keys the async entry node needs.
allowed_keys: set[str] | None = None allowed_keys: set[str] | None = None
ep_spec = self._entry_points.get(exclude_entry_point) # Look up the entry node from the correct graph
src_graph_id = source_graph_id or self._graph_id
src_reg = self._graphs.get(src_graph_id)
ep_spec = (
src_reg.entry_points.get(exclude_entry_point)
if src_reg
else self._entry_points.get(exclude_entry_point)
)
if ep_spec: if ep_spec:
entry_node = self.graph.get_node(ep_spec.entry_node) graph = src_reg.graph if src_reg else self.graph
entry_node = graph.get_node(ep_spec.entry_node)
if entry_node and entry_node.input_keys: if entry_node and entry_node.input_keys:
allowed_keys = set(entry_node.input_keys) allowed_keys = set(entry_node.input_keys)
for ep_id, stream in self._streams.items(): # Search ALL graphs' streams for an active session
all_streams: list[tuple[str, ExecutionStream]] = []
for _gid, reg in self._graphs.items():
for ep_id, stream in reg.streams.items():
all_streams.append((ep_id, stream))
for ep_id, stream in all_streams:
if ep_id == exclude_entry_point: if ep_id == exclude_entry_point:
continue continue
for exec_id in stream.active_execution_ids: for exec_id in stream.active_execution_ids:
@@ -619,23 +969,37 @@ class AgentRuntime:
) )
return None return None
async def inject_input(self, node_id: str, content: str) -> bool: async def inject_input(self, node_id: str, content: str, graph_id: str | None = None) -> bool:
"""Inject user input into a running client-facing node. """Inject user input into a running client-facing node.
Routes input to the EventLoopNode identified by ``node_id`` Routes input to the EventLoopNode identified by ``node_id``.
across all active streams. Used by the TUI ChatRepl to deliver Searches the specified graph (or active graph) first, then all others.
user responses during client-facing node execution.
Args: Args:
node_id: The node currently waiting for input node_id: The node currently waiting for input
content: The user's input text content: The user's input text
graph_id: Optional graph to search first (defaults to active graph)
Returns: Returns:
True if input was delivered, False if no matching node found True if input was delivered, False if no matching node found
""" """
for stream in self._streams.values(): # Track user presence
if await stream.inject_input(node_id, content): self._last_user_input_time = time.monotonic()
return True
# Search target graph first
target = graph_id or self._active_graph_id
if target in self._graphs:
for stream in self._graphs[target].streams.values():
if await stream.inject_input(node_id, content):
return True
# Then search all other graphs
for gid, reg in self._graphs.items():
if gid == target:
continue
for stream in reg.streams.values():
if await stream.inject_input(node_id, content):
return True
return False return False
async def get_goal_progress(self) -> dict[str, Any]: async def get_goal_progress(self) -> dict[str, Any]:
@@ -696,6 +1060,7 @@ class AgentRuntime:
event_types: list, event_types: list,
handler: Callable, handler: Callable,
filter_stream: str | None = None, filter_stream: str | None = None,
filter_graph: str | None = None,
) -> str: ) -> str:
""" """
Subscribe to agent events. Subscribe to agent events.
@@ -704,6 +1069,7 @@ class AgentRuntime:
event_types: Types of events to receive event_types: Types of events to receive
handler: Async function to call when event occurs handler: Async function to call when event occurs
filter_stream: Only receive events from this stream filter_stream: Only receive events from this stream
filter_graph: Only receive events from this graph
Returns: Returns:
Subscription ID (use to unsubscribe) Subscription ID (use to unsubscribe)
@@ -712,6 +1078,7 @@ class AgentRuntime:
event_types=event_types, event_types=event_types,
handler=handler, handler=handler,
filter_stream=filter_stream, filter_stream=filter_stream,
filter_graph=filter_graph,
) )
def unsubscribe_from_events(self, subscription_id: str) -> bool: def unsubscribe_from_events(self, subscription_id: str) -> bool:
@@ -779,6 +1146,7 @@ def create_agent_runtime(
runtime_log_store: Any = None, runtime_log_store: Any = None,
enable_logging: bool = True, enable_logging: bool = True,
checkpoint_config: CheckpointConfig | None = None, checkpoint_config: CheckpointConfig | None = None,
graph_id: str | None = None,
) -> AgentRuntime: ) -> AgentRuntime:
""" """
Create and configure an AgentRuntime with entry points. Create and configure an AgentRuntime with entry points.
@@ -801,6 +1169,7 @@ def create_agent_runtime(
Set to False to disable logging entirely. Set to False to disable logging entirely.
checkpoint_config: Optional checkpoint configuration for resumable sessions. checkpoint_config: Optional checkpoint configuration for resumable sessions.
If None, uses default checkpointing behavior. If None, uses default checkpointing behavior.
graph_id: Optional identifier for the primary graph (defaults to "primary").
Returns: Returns:
Configured AgentRuntime (not yet started) Configured AgentRuntime (not yet started)
@@ -822,6 +1191,7 @@ def create_agent_runtime(
config=config, config=config,
runtime_log_store=runtime_log_store, runtime_log_store=runtime_log_store,
checkpoint_config=checkpoint_config, checkpoint_config=checkpoint_config,
graph_id=graph_id,
) )
for spec in entry_points: for spec in entry_points:
+13
View File
@@ -95,6 +95,7 @@ class AgentEvent:
data: dict[str, Any] = field(default_factory=dict) data: dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now) timestamp: datetime = field(default_factory=datetime.now)
correlation_id: str | None = None # For tracking related events correlation_id: str | None = None # For tracking related events
graph_id: str | None = None # Which graph emitted this event (multi-graph sessions)
def to_dict(self) -> dict: def to_dict(self) -> dict:
"""Convert to dictionary for serialization.""" """Convert to dictionary for serialization."""
@@ -106,6 +107,7 @@ class AgentEvent:
"data": self.data, "data": self.data,
"timestamp": self.timestamp.isoformat(), "timestamp": self.timestamp.isoformat(),
"correlation_id": self.correlation_id, "correlation_id": self.correlation_id,
"graph_id": self.graph_id,
} }
@@ -123,6 +125,7 @@ class Subscription:
filter_stream: str | None = None # Only receive events from this stream filter_stream: str | None = None # Only receive events from this stream
filter_node: str | None = None # Only receive events from this node filter_node: str | None = None # Only receive events from this node
filter_execution: str | None = None # Only receive events from this execution filter_execution: str | None = None # Only receive events from this execution
filter_graph: str | None = None # Only receive events from this graph
class EventBus: class EventBus:
@@ -182,6 +185,7 @@ class EventBus:
filter_stream: str | None = None, filter_stream: str | None = None,
filter_node: str | None = None, filter_node: str | None = None,
filter_execution: str | None = None, filter_execution: str | None = None,
filter_graph: str | None = None,
) -> str: ) -> str:
""" """
Subscribe to events. Subscribe to events.
@@ -192,6 +196,7 @@ class EventBus:
filter_stream: Only receive events from this stream filter_stream: Only receive events from this stream
filter_node: Only receive events from this node filter_node: Only receive events from this node
filter_execution: Only receive events from this execution filter_execution: Only receive events from this execution
filter_graph: Only receive events from this graph
Returns: Returns:
Subscription ID (use to unsubscribe) Subscription ID (use to unsubscribe)
@@ -206,6 +211,7 @@ class EventBus:
filter_stream=filter_stream, filter_stream=filter_stream,
filter_node=filter_node, filter_node=filter_node,
filter_execution=filter_execution, filter_execution=filter_execution,
filter_graph=filter_graph,
) )
self._subscriptions[sub_id] = subscription self._subscriptions[sub_id] = subscription
@@ -271,6 +277,10 @@ class EventBus:
if subscription.filter_execution and subscription.filter_execution != event.execution_id: if subscription.filter_execution and subscription.filter_execution != event.execution_id:
return False return False
# Check graph filter
if subscription.filter_graph and subscription.filter_graph != event.graph_id:
return False
return True return True
async def _execute_handlers( async def _execute_handlers(
@@ -873,6 +883,7 @@ class EventBus:
stream_id: str | None = None, stream_id: str | None = None,
node_id: str | None = None, node_id: str | None = None,
execution_id: str | None = None, execution_id: str | None = None,
graph_id: str | None = None,
timeout: float | None = None, timeout: float | None = None,
) -> AgentEvent | None: ) -> AgentEvent | None:
""" """
@@ -883,6 +894,7 @@ class EventBus:
stream_id: Filter by stream stream_id: Filter by stream
node_id: Filter by node node_id: Filter by node
execution_id: Filter by execution execution_id: Filter by execution
graph_id: Filter by graph
timeout: Maximum time to wait (seconds) timeout: Maximum time to wait (seconds)
Returns: Returns:
@@ -903,6 +915,7 @@ class EventBus:
filter_stream=stream_id, filter_stream=stream_id,
filter_node=node_id, filter_node=node_id,
filter_execution=execution_id, filter_execution=execution_id,
filter_graph=graph_id,
) )
try: try:
+46 -13
View File
@@ -26,7 +26,7 @@ if TYPE_CHECKING:
from framework.graph.edge import GraphSpec from framework.graph.edge import GraphSpec
from framework.graph.goal import Goal from framework.graph.goal import Goal
from framework.llm.provider import LLMProvider, Tool from framework.llm.provider import LLMProvider, Tool
from framework.runtime.event_bus import EventBus from framework.runtime.event_bus import AgentEvent, EventBus
from framework.runtime.outcome_aggregator import OutcomeAggregator from framework.runtime.outcome_aggregator import OutcomeAggregator
from framework.storage.concurrent import ConcurrentStorage from framework.storage.concurrent import ConcurrentStorage
from framework.storage.session_store import SessionStore from framework.storage.session_store import SessionStore
@@ -34,6 +34,31 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class _GraphScopedEventBus:
"""Thin proxy that stamps ``graph_id`` on every published event.
The ``GraphExecutor`` and ``EventLoopNode`` emit events via the
convenience methods on ``EventBus`` (e.g. ``emit_llm_text_delta``).
Rather than threading ``graph_id`` through every one of those 20+
methods, this proxy intercepts ``publish()`` and sets ``graph_id``
before forwarding to the real bus. All other attribute access is
delegated unchanged.
"""
__slots__ = ("_bus", "_graph_id")
def __init__(self, bus: "EventBus", graph_id: str) -> None:
object.__setattr__(self, "_bus", bus)
object.__setattr__(self, "_graph_id", graph_id)
async def publish(self, event: "AgentEvent") -> None: # type: ignore[override]
event.graph_id = object.__getattribute__(self, "_graph_id")
await object.__getattribute__(self, "_bus").publish(event)
def __getattr__(self, name: str) -> Any:
return getattr(object.__getattribute__(self, "_bus"), name)
@dataclass @dataclass
class EntryPointSpec: class EntryPointSpec:
"""Specification for an entry point.""" """Specification for an entry point."""
@@ -117,6 +142,7 @@ class ExecutionStream:
runtime_log_store: Any = None, runtime_log_store: Any = None,
session_store: "SessionStore | None" = None, session_store: "SessionStore | None" = None,
checkpoint_config: CheckpointConfig | None = None, checkpoint_config: CheckpointConfig | None = None,
graph_id: str | None = None,
): ):
""" """
Initialize execution stream. Initialize execution stream.
@@ -136,11 +162,13 @@ class ExecutionStream:
runtime_log_store: Optional RuntimeLogStore for per-execution logging runtime_log_store: Optional RuntimeLogStore for per-execution logging
session_store: Optional SessionStore for unified session storage session_store: Optional SessionStore for unified session storage
checkpoint_config: Optional checkpoint configuration for resumable sessions checkpoint_config: Optional checkpoint configuration for resumable sessions
graph_id: Optional graph identifier for multi-graph sessions
""" """
self.stream_id = stream_id self.stream_id = stream_id
self.entry_spec = entry_spec self.entry_spec = entry_spec
self.graph = graph self.graph = graph
self.goal = goal self.goal = goal
self.graph_id = graph_id
self._state_manager = state_manager self._state_manager = state_manager
self._storage = storage self._storage = storage
self._outcome_aggregator = outcome_aggregator self._outcome_aggregator = outcome_aggregator
@@ -173,6 +201,11 @@ class ExecutionStream:
self._semaphore = asyncio.Semaphore(entry_spec.max_concurrent) self._semaphore = asyncio.Semaphore(entry_spec.max_concurrent)
self._lock = asyncio.Lock() self._lock = asyncio.Lock()
# Graph-scoped event bus (stamps graph_id on published events)
self._scoped_event_bus = self._event_bus
if self._event_bus and self.graph_id:
self._scoped_event_bus = _GraphScopedEventBus(self._event_bus, self.graph_id)
# State # State
self._running = False self._running = False
@@ -185,10 +218,10 @@ class ExecutionStream:
logger.info(f"ExecutionStream '{self.stream_id}' started") logger.info(f"ExecutionStream '{self.stream_id}' started")
# Emit stream started event # Emit stream started event
if self._event_bus: if self._scoped_event_bus:
from framework.runtime.event_bus import AgentEvent, EventType from framework.runtime.event_bus import AgentEvent, EventType
await self._event_bus.publish( await self._scoped_event_bus.publish(
AgentEvent( AgentEvent(
type=EventType.STREAM_STARTED, type=EventType.STREAM_STARTED,
stream_id=self.stream_id, stream_id=self.stream_id,
@@ -251,10 +284,10 @@ class ExecutionStream:
logger.info(f"ExecutionStream '{self.stream_id}' stopped") logger.info(f"ExecutionStream '{self.stream_id}' stopped")
# Emit stream stopped event # Emit stream stopped event
if self._event_bus: if self._scoped_event_bus:
from framework.runtime.event_bus import AgentEvent, EventType from framework.runtime.event_bus import AgentEvent, EventType
await self._event_bus.publish( await self._scoped_event_bus.publish(
AgentEvent( AgentEvent(
type=EventType.STREAM_STOPPED, type=EventType.STREAM_STOPPED,
stream_id=self.stream_id, stream_id=self.stream_id,
@@ -358,8 +391,8 @@ class ExecutionStream:
try: try:
# Emit started event # Emit started event
if self._event_bus: if self._scoped_event_bus:
await self._event_bus.emit_execution_started( await self._scoped_event_bus.emit_execution_started(
stream_id=self.stream_id, stream_id=self.stream_id,
execution_id=execution_id, execution_id=execution_id,
input_data=ctx.input_data, input_data=ctx.input_data,
@@ -404,7 +437,7 @@ class ExecutionStream:
llm=self._llm, llm=self._llm,
tools=self._tools, tools=self._tools,
tool_executor=self._tool_executor, tool_executor=self._tool_executor,
event_bus=self._event_bus, event_bus=self._scoped_event_bus,
stream_id=self.stream_id, stream_id=self.stream_id,
storage_path=exec_storage, storage_path=exec_storage,
runtime_logger=runtime_logger, runtime_logger=runtime_logger,
@@ -454,16 +487,16 @@ class ExecutionStream:
await self._write_session_state(execution_id, ctx, result=result) await self._write_session_state(execution_id, ctx, result=result)
# Emit completion/failure event # Emit completion/failure event
if self._event_bus: if self._scoped_event_bus:
if result.success: if result.success:
await self._event_bus.emit_execution_completed( await self._scoped_event_bus.emit_execution_completed(
stream_id=self.stream_id, stream_id=self.stream_id,
execution_id=execution_id, execution_id=execution_id,
output=result.output, output=result.output,
correlation_id=ctx.correlation_id, correlation_id=ctx.correlation_id,
) )
else: else:
await self._event_bus.emit_execution_failed( await self._scoped_event_bus.emit_execution_failed(
stream_id=self.stream_id, stream_id=self.stream_id,
execution_id=execution_id, execution_id=execution_id,
error=result.error or "Unknown error", error=result.error or "Unknown error",
@@ -541,8 +574,8 @@ class ExecutionStream:
pass # Don't let end_run errors mask the original error pass # Don't let end_run errors mask the original error
# Emit failure event # Emit failure event
if self._event_bus: if self._scoped_event_bus:
await self._event_bus.emit_execution_failed( await self._scoped_event_bus.emit_execution_failed(
stream_id=self.stream_id, stream_id=self.stream_id,
execution_id=execution_id, execution_id=execution_id,
error=str(e), error=str(e),
View File
+325
View File
@@ -0,0 +1,325 @@
"""Graph lifecycle tools for multi-graph sessions.
These tools allow an agent (e.g. hive_coder) to load, unload, start,
restart, and query other agent graphs within the same runtime session.
Usage::
from framework.tools.session_graph_tools import register_graph_tools
register_graph_tools(tool_registry, runtime)
The tools are registered as async Python functions on the ToolRegistry.
They close over the ``AgentRuntime`` instance no ContextVar needed
since the runtime is a stable, long-lived object.
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime
logger = logging.getLogger(__name__)
def register_graph_tools(registry: ToolRegistry, runtime: AgentRuntime) -> int:
"""Register graph lifecycle tools bound to *runtime*.
Returns the number of tools registered.
"""
from framework.llm.provider import Tool
tools_registered = 0
# --- load_agent -----------------------------------------------------------
async def load_agent(agent_path: str) -> str:
"""Load an agent graph from disk into the running session.
The agent is imported from *agent_path* (a directory containing
``agent.py``). Its graph, goal, and entry points are registered
as a secondary graph on the runtime. Returns a JSON summary.
"""
from framework.runner.runner import AgentRunner
from framework.runtime.execution_stream import EntryPointSpec
path = Path(agent_path).resolve()
if not path.exists():
return json.dumps({"error": f"Agent path does not exist: {path}"})
try:
runner = AgentRunner.load(path)
except Exception as exc:
return json.dumps({"error": f"Failed to load agent: {exc}"})
graph_id = path.name
if graph_id in list(runtime.list_graphs()):
return json.dumps({"error": f"Graph '{graph_id}' is already loaded"})
# Build entry point dict from the loaded graph
entry_points: dict[str, EntryPointSpec] = {}
# Primary entry point
if runner.graph.entry_node:
entry_points["default"] = EntryPointSpec(
id="default",
name="Default",
entry_node=runner.graph.entry_node,
trigger_type="manual",
isolation_level="shared",
)
# Async entry points
for aep in runner.graph.async_entry_points:
entry_points[aep.id] = EntryPointSpec(
id=aep.id,
name=aep.name,
entry_node=aep.entry_node,
trigger_type=aep.trigger_type,
trigger_config=aep.trigger_config,
isolation_level=aep.isolation_level,
priority=aep.priority,
max_concurrent=aep.max_concurrent,
)
await runtime.add_graph(
graph_id=graph_id,
graph=runner.graph,
goal=runner.goal,
entry_points=entry_points,
)
return json.dumps(
{
"graph_id": graph_id,
"entry_points": list(entry_points.keys()),
"nodes": [n.id for n in runner.graph.nodes],
"status": "loaded",
}
)
_load_tool = Tool(
name="load_agent",
description=(
"Load an agent graph from disk into the current session. "
"The agent runs alongside the primary agent, sharing memory and data."
),
parameters={
"type": "object",
"properties": {
"agent_path": {
"type": "string",
"description": "Path to the agent directory (containing agent.py)",
},
},
"required": ["agent_path"],
},
)
registry.register("load_agent", _load_tool, lambda inputs: load_agent(**inputs))
tools_registered += 1
# --- unload_agent ---------------------------------------------------------
async def unload_agent(graph_id: str) -> str:
"""Stop and remove a secondary agent graph from the session."""
try:
await runtime.remove_graph(graph_id)
return json.dumps({"graph_id": graph_id, "status": "unloaded"})
except ValueError as exc:
return json.dumps({"error": str(exc)})
_unload_tool = Tool(
name="unload_agent",
description="Stop and remove a loaded agent graph from the session.",
parameters={
"type": "object",
"properties": {
"graph_id": {
"type": "string",
"description": "ID of the graph to unload",
},
},
"required": ["graph_id"],
},
)
registry.register("unload_agent", _unload_tool, lambda inputs: unload_agent(**inputs))
tools_registered += 1
# --- start_agent ----------------------------------------------------------
async def start_agent(
graph_id: str, entry_point: str = "default", input_data: str = "{}"
) -> str:
"""Trigger an entry point on a loaded agent graph."""
reg = runtime.get_graph_registration(graph_id)
if reg is None:
return json.dumps({"error": f"Graph '{graph_id}' not found"})
stream = reg.streams.get(entry_point)
if stream is None:
return json.dumps(
{
"error": f"Entry point '{entry_point}' not found on graph '{graph_id}'",
"available": list(reg.streams.keys()),
}
)
try:
data = json.loads(input_data) if isinstance(input_data, str) else input_data
except json.JSONDecodeError as exc:
return json.dumps({"error": f"Invalid JSON input: {exc}"})
session_state = runtime._get_primary_session_state(entry_point, source_graph_id=graph_id)
exec_id = await stream.execute(data, session_state=session_state)
return json.dumps(
{
"graph_id": graph_id,
"entry_point": entry_point,
"execution_id": exec_id,
"status": "triggered",
}
)
_start_tool = Tool(
name="start_agent",
description="Trigger an entry point on a loaded agent graph to start execution.",
parameters={
"type": "object",
"properties": {
"graph_id": {
"type": "string",
"description": "ID of the graph to start",
},
"entry_point": {
"type": "string",
"description": "Entry point to trigger (default: 'default')",
},
"input_data": {
"type": "string",
"description": "JSON string of input data for the execution",
},
},
"required": ["graph_id"],
},
)
registry.register("start_agent", _start_tool, lambda inputs: start_agent(**inputs))
tools_registered += 1
# --- restart_agent --------------------------------------------------------
async def restart_agent(graph_id: str) -> str:
"""Unload and reload an agent graph (picks up code changes)."""
reg = runtime.get_graph_registration(graph_id)
if reg is None:
return json.dumps({"error": f"Graph '{graph_id}' not found"})
if graph_id == runtime.graph_id:
return json.dumps({"error": "Cannot restart the primary graph"})
# Remember the graph spec so we can reload it
# The graph_id is the agent directory name by convention
# We need to find the original agent path
# For now, use the graph's id to locate the agent
try:
await runtime.remove_graph(graph_id)
except ValueError as exc:
return json.dumps({"error": f"Failed to unload: {exc}"})
# Reload by calling load_agent with the graph_id as path hint
# The caller should use load_agent explicitly if the path is different
return json.dumps(
{
"graph_id": graph_id,
"status": "unloaded",
"note": "Use load_agent to reload with updated code",
}
)
_restart_tool = Tool(
name="restart_agent",
description=(
"Unload an agent graph. Use load_agent afterwards to reload with updated code."
),
parameters={
"type": "object",
"properties": {
"graph_id": {
"type": "string",
"description": "ID of the graph to restart",
},
},
"required": ["graph_id"],
},
)
registry.register("restart_agent", _restart_tool, lambda inputs: restart_agent(**inputs))
tools_registered += 1
# --- list_agents ----------------------------------------------------------
def list_agents() -> str:
"""List all agent graphs in the current session with their status."""
graphs = []
for gid in runtime.list_graphs():
reg = runtime.get_graph_registration(gid)
if reg is None:
continue
graphs.append(
{
"graph_id": gid,
"is_primary": gid == runtime.graph_id,
"is_active": gid == runtime.active_graph_id,
"entry_points": list(reg.entry_points.keys()),
"active_executions": sum(
len(s.active_execution_ids) for s in reg.streams.values()
),
}
)
return json.dumps({"graphs": graphs})
_list_tool = Tool(
name="list_agents",
description="List all loaded agent graphs and their status.",
parameters={"type": "object", "properties": {}},
)
registry.register("list_agents", _list_tool, lambda inputs: list_agents())
tools_registered += 1
# --- get_user_presence ----------------------------------------------------
def get_user_presence() -> str:
"""Return user idle time and presence status."""
idle = runtime.user_idle_seconds
if idle == float("inf"):
status = "never_seen"
elif idle < 120:
status = "present"
elif idle < 600:
status = "idle"
else:
status = "away"
return json.dumps(
{
"idle_seconds": idle if idle != float("inf") else None,
"status": status,
}
)
_presence_tool = Tool(
name="get_user_presence",
description=(
"Check if the user is currently active. Returns idle time "
"and a status of 'present', 'idle', 'away', or 'never_seen'."
),
parameters={"type": "object", "properties": {}},
)
registry.register("get_user_presence", _presence_tool, lambda inputs: get_user_presence())
tools_registered += 1
logger.info("Registered %d graph lifecycle tools", tools_registered)
return tools_registered
+68 -3
View File
@@ -394,6 +394,32 @@ class AdenTUI(App):
try: try:
et = event.type et = event.type
# --- Multi-graph filtering ---
# If the event has a graph_id and it's not the active graph,
# show a notification for important events and drop the rest.
if event.graph_id is not None and event.graph_id != self.runtime.active_graph_id:
if et == EventType.CLIENT_INPUT_REQUESTED:
self.notify(
f"[bold]{event.graph_id}[/bold] is waiting for input",
severity="warning",
timeout=10,
)
elif et == EventType.EXECUTION_FAILED:
error = event.data.get("error", "Unknown error")[:60]
self.notify(
f"[bold red]{event.graph_id}[/bold red] failed: {error}",
severity="error",
timeout=10,
)
elif et == EventType.EXECUTION_COMPLETED:
self.notify(
f"[bold green]{event.graph_id}[/bold green] completed",
severity="information",
timeout=5,
)
# All other background events are silently dropped (visible in logs)
return
# --- Chat REPL events --- # --- Chat REPL events ---
if et in (EventType.LLM_TEXT_DELTA, EventType.CLIENT_OUTPUT_DELTA): if et in (EventType.LLM_TEXT_DELTA, EventType.CLIENT_OUTPUT_DELTA):
self.chat_repl.handle_text_delta( self.chat_repl.handle_text_delta(
@@ -418,6 +444,7 @@ class AdenTUI(App):
elif et == EventType.CLIENT_INPUT_REQUESTED: elif et == EventType.CLIENT_INPUT_REQUESTED:
self.chat_repl.handle_input_requested( self.chat_repl.handle_input_requested(
event.node_id or event.data.get("node_id", ""), event.node_id or event.data.get("node_id", ""),
graph_id=event.graph_id,
) )
elif et == EventType.NODE_LOOP_STARTED: elif et == EventType.NODE_LOOP_STARTED:
self.chat_repl.handle_node_started(event.node_id or "") self.chat_repl.handle_node_started(event.node_id or "")
@@ -543,6 +570,27 @@ class AdenTUI(App):
exc_info=True, exc_info=True,
) )
def action_switch_graph(self, graph_id: str) -> None:
"""Switch the active graph focus in the TUI."""
try:
self.runtime.active_graph_id = graph_id
except ValueError:
self.notify(f"Graph '{graph_id}' not found", severity="error", timeout=3)
return
# Update status bar
self.status_bar.set_graph_id(graph_id)
# Update graph view
reg = self.runtime.get_graph_registration(graph_id)
if reg:
self.graph_view.switch_graph(reg.graph)
# Flush chat streaming state
self.chat_repl.flush_streaming()
self.notify(f"Switched to graph: {graph_id}", severity="information", timeout=3)
def save_screenshot(self, filename: str | None = None) -> str: def save_screenshot(self, filename: str | None = None) -> str:
"""Save a screenshot of the current screen as SVG (viewable in browsers). """Save a screenshot of the current screen as SVG (viewable in browsers).
@@ -627,7 +675,19 @@ class AdenTUI(App):
# Find and cancel the execution task - executor will catch and save state # Find and cancel the execution task - executor will catch and save state
task_cancelled = False task_cancelled = False
for stream in self.runtime._streams.values(): # Search active graph first, then all graphs
all_streams = []
active_reg = self.runtime.get_graph_registration(self.runtime.active_graph_id)
if active_reg:
all_streams.extend(active_reg.streams.values())
for gid in self.runtime.list_graphs():
if gid == self.runtime.active_graph_id:
continue
reg = self.runtime.get_graph_registration(gid)
if reg:
all_streams.extend(reg.streams.values())
for stream in all_streams:
exec_id = chat_repl._current_exec_id exec_id = chat_repl._current_exec_id
task = stream._execution_tasks.get(exec_id) task = stream._execution_tasks.get(exec_id)
if task and not task.done(): if task and not task.done():
@@ -677,8 +737,13 @@ class AdenTUI(App):
chat_repl = self.query_one(ChatRepl) chat_repl = self.query_one(ChatRepl)
if chat_repl._current_exec_id: if chat_repl._current_exec_id:
# Find the stream with this execution # Find the stream with this execution across all graphs
for stream in self.runtime._streams.values(): all_streams = []
for gid in self.runtime.list_graphs():
reg = self.runtime.get_graph_registration(gid)
if reg:
all_streams.extend(reg.streams.values())
for stream in all_streams:
exec_id = chat_repl._current_exec_id exec_id = chat_repl._current_exec_id
task = stream._execution_tasks.get(exec_id) task = stream._execution_tasks.get(exec_id)
if task and not task.done(): if task and not task.done():
+115 -6
View File
@@ -112,6 +112,7 @@ class ChatRepl(Vertical):
self._streaming_snapshot: str = "" self._streaming_snapshot: str = ""
self._waiting_for_input: bool = False self._waiting_for_input: bool = False
self._input_node_id: str | None = None self._input_node_id: str | None = None
self._input_graph_id: str | None = None
self._pending_ask_question: str = "" self._pending_ask_question: str = ""
self._active_node_id: str | None = None # Currently executing node self._active_node_id: str | None = None # Currently executing node
self._resume_session = resume_session self._resume_session = resume_session
@@ -203,15 +204,19 @@ class ChatRepl(Vertical):
[bold]/resume[/bold] <session_id> - Resume session by ID [bold]/resume[/bold] <session_id> - Resume session by ID
[bold]/recover[/bold] <session_id> <cp_id> - Recover from specific checkpoint [bold]/recover[/bold] <session_id> <cp_id> - Recover from specific checkpoint
[bold]/pause[/bold] - Pause current execution (Ctrl+Z) [bold]/pause[/bold] - Pause current execution (Ctrl+Z)
[bold]/graphs[/bold] - List loaded graphs and their status
[bold]/graph[/bold] <id> - Switch active graph focus
[bold]/load[/bold] <path> - Load an agent graph into the session
[bold]/unload[/bold] <id> - Remove a graph from the session
[bold]/help[/bold] - Show this help message [bold]/help[/bold] - Show this help message
[dim]Examples:[/dim] [dim]Examples:[/dim]
/sessions [dim]# List all sessions[/dim] /sessions [dim]# List all sessions[/dim]
/sessions session_20260208_143022 [dim]# Show session details[/dim]
/resume [dim]# Show numbered session list[/dim]
/resume 1 [dim]# Resume first listed session[/dim] /resume 1 [dim]# Resume first listed session[/dim]
/resume session_20260208_143022 [dim]# Resume by full session ID[/dim] /graphs [dim]# Show loaded agent graphs[/dim]
/recover session_20260208_143022 cp_xxx [dim]# Recover from specific checkpoint[/dim] /graph email_agent [dim]# Switch focus to email_agent[/dim]
/load exports/email_agent [dim]# Load agent into session[/dim]
/unload email_agent [dim]# Remove agent from session[/dim]
/pause [dim]# Pause (or Ctrl+Z)[/dim] /pause [dim]# Pause (or Ctrl+Z)[/dim]
""") """)
elif cmd == "/sessions": elif cmd == "/sessions":
@@ -254,6 +259,23 @@ class ChatRepl(Vertical):
await self._cmd_recover(session_id, checkpoint_id) await self._cmd_recover(session_id, checkpoint_id)
elif cmd == "/pause": elif cmd == "/pause":
await self._cmd_pause() await self._cmd_pause()
elif cmd == "/graphs":
self._cmd_graphs()
elif cmd == "/graph":
if len(parts) < 2:
self._write_history("[bold red]Usage:[/bold red] /graph <graph_id>")
else:
self._cmd_switch_graph(parts[1].strip())
elif cmd == "/load":
if len(parts) < 2:
self._write_history("[bold red]Usage:[/bold red] /load <agent_path>")
else:
await self._cmd_load_graph(parts[1].strip())
elif cmd == "/unload":
if len(parts) < 2:
self._write_history("[bold red]Usage:[/bold red] /unload <graph_id>")
else:
await self._cmd_unload_graph(parts[1].strip())
else: else:
self._write_history( self._write_history(
f"[bold red]Unknown command:[/bold red] {cmd}\n" f"[bold red]Unknown command:[/bold red] {cmd}\n"
@@ -698,6 +720,90 @@ class ChatRepl(Vertical):
if not task_cancelled: if not task_cancelled:
self._write_history("[bold yellow]Execution already completed[/bold yellow]") self._write_history("[bold yellow]Execution already completed[/bold yellow]")
def _cmd_graphs(self) -> None:
"""List all loaded graphs and their status."""
graphs = self.runtime.list_graphs()
if not graphs:
self._write_history("[dim]No graphs loaded[/dim]")
return
lines = ["[bold cyan]Loaded Graphs:[/bold cyan]"]
for gid in graphs:
reg = self.runtime.get_graph_registration(gid)
if reg is None:
continue
is_primary = gid == self.runtime.graph_id
is_active = gid == self.runtime.active_graph_id
markers = []
if is_primary:
markers.append("primary")
if is_active:
markers.append("active")
marker_str = f" [dim]({', '.join(markers)})[/dim]" if markers else ""
ep_list = ", ".join(reg.entry_points.keys())
active_execs = sum(len(s.active_execution_ids) for s in reg.streams.values())
exec_str = f" [green]{active_execs} running[/green]" if active_execs else ""
lines.append(f" [bold]{gid}[/bold]{marker_str} — eps: {ep_list}{exec_str}")
self._write_history("\n".join(lines))
def _cmd_switch_graph(self, graph_id: str) -> None:
"""Switch the active graph focus."""
try:
self.runtime.active_graph_id = graph_id
except ValueError:
self._write_history(
f"[bold red]Graph '{graph_id}' not found.[/bold red] "
"Use /graphs to see loaded graphs."
)
return
# Tell the app to update the UI
app = self.app
if hasattr(app, "action_switch_graph"):
app.action_switch_graph(graph_id)
else:
self._write_history(f"[bold green]Switched to graph: {graph_id}[/bold green]")
async def _cmd_load_graph(self, agent_path: str) -> None:
"""Load an agent graph into the session."""
from pathlib import Path
path = Path(agent_path).resolve()
if not path.exists():
self._write_history(f"[bold red]Path does not exist:[/bold red] {path}")
return
self._write_history(f"[dim]Loading agent from {path}...[/dim]")
try:
from framework.runner.runner import AgentRunner
graph_id = await AgentRunner.setup_as_secondary(path, self.runtime)
self._write_history(
f"[bold green]Loaded graph '{graph_id}'[/bold green] — "
"use /graphs to see all, /graph to switch"
)
except Exception as e:
self._write_history(f"[bold red]Failed to load agent:[/bold red] {e}")
async def _cmd_unload_graph(self, graph_id: str) -> None:
"""Unload a secondary graph from the session."""
try:
await self.runtime.remove_graph(graph_id)
self._write_history(f"[bold green]Unloaded graph '{graph_id}'[/bold green]")
except ValueError as e:
self._write_history(f"[bold red]Error:[/bold red] {e}")
def flush_streaming(self) -> None:
"""Flush any accumulated streaming text to history.
Called by the app when switching graphs to ensure in-progress
streaming content is preserved before the UI context changes.
"""
if self._streaming_snapshot:
self._write_history(f"[bold blue]Agent:[/bold blue] {self._streaming_snapshot}")
self._streaming_snapshot = ""
def on_mount(self) -> None: def on_mount(self) -> None:
"""Add welcome message and check for resumable sessions.""" """Add welcome message and check for resumable sessions."""
history = self.query_one("#chat-history", RichLog) history = self.query_one("#chat-history", RichLog)
@@ -832,11 +938,13 @@ class ChatRepl(Vertical):
indicator.update("Thinking...") indicator.update("Thinking...")
node_id = self._input_node_id node_id = self._input_node_id
graph_id = self._input_graph_id
self._input_node_id = None self._input_node_id = None
self._input_graph_id = None
try: try:
future = asyncio.run_coroutine_threadsafe( future = asyncio.run_coroutine_threadsafe(
self.runtime.inject_input(node_id, user_input), self.runtime.inject_input(node_id, user_input, graph_id=graph_id),
self._agent_loop, self._agent_loop,
) )
await asyncio.wrap_future(future) await asyncio.wrap_future(future)
@@ -1044,7 +1152,7 @@ class ChatRepl(Vertical):
chat_input.placeholder = "Enter input for agent..." chat_input.placeholder = "Enter input for agent..."
chat_input.focus() chat_input.focus()
def handle_input_requested(self, node_id: str) -> None: def handle_input_requested(self, node_id: str, graph_id: str | None = None) -> None:
"""Handle a client-facing node requesting user input. """Handle a client-facing node requesting user input.
Transitions to 'waiting for input' state: flushes the current Transitions to 'waiting for input' state: flushes the current
@@ -1066,6 +1174,7 @@ class ChatRepl(Vertical):
self._waiting_for_input = True self._waiting_for_input = True
self._input_node_id = node_id or None self._input_node_id = node_id or None
self._input_graph_id = graph_id
indicator = self.query_one("#processing-indicator", Label) indicator = self.query_one("#processing-indicator", Label)
indicator.update("Waiting for your input...") indicator.update("Waiting for your input...")
+20 -6
View File
@@ -52,12 +52,26 @@ class GraphOverview(Vertical):
def __init__(self, runtime: AgentRuntime): def __init__(self, runtime: AgentRuntime):
super().__init__() super().__init__()
self.runtime = runtime self.runtime = runtime
self._override_graph = None # Set by switch_graph() for secondary graphs
self.active_node: str | None = None self.active_node: str | None = None
self.execution_path: list[str] = [] self.execution_path: list[str] = []
# Per-node status strings shown next to the node in the graph display. # Per-node status strings shown next to the node in the graph display.
# e.g. {"planner": "thinking...", "searcher": "web_search..."} # e.g. {"planner": "thinking...", "searcher": "web_search..."}
self._node_status: dict[str, str] = {} self._node_status: dict[str, str] = {}
@property
def _graph(self):
"""The graph currently being displayed (may be a secondary graph)."""
return self._override_graph or self.runtime.graph
def switch_graph(self, graph) -> None:
"""Switch to displaying a different graph and refresh."""
self._override_graph = graph
self.active_node = None
self.execution_path = []
self._node_status = {}
self._display_graph()
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
# Use RichLog for formatted output # Use RichLog for formatted output
yield RichLog(id="graph-display", highlight=True, markup=True) yield RichLog(id="graph-display", highlight=True, markup=True)
@@ -75,7 +89,7 @@ class GraphOverview(Vertical):
def _topo_order(self) -> list[str]: def _topo_order(self) -> list[str]:
"""BFS from entry_node following edges.""" """BFS from entry_node following edges."""
graph = self.runtime.graph graph = self._graph
visited: list[str] = [] visited: list[str] = []
seen: set[str] = set() seen: set[str] = set()
queue = [graph.entry_node] queue = [graph.entry_node]
@@ -102,7 +116,7 @@ class GraphOverview(Vertical):
order_idx = {nid: i for i, nid in enumerate(ordered)} order_idx = {nid: i for i, nid in enumerate(ordered)}
back_edges: list[dict] = [] back_edges: list[dict] = []
for node_id in ordered: for node_id in ordered:
for edge in self.runtime.graph.get_outgoing_edges(node_id): for edge in self._graph.get_outgoing_edges(node_id):
target_idx = order_idx.get(edge.target, -1) target_idx = order_idx.get(edge.target, -1)
source_idx = order_idx.get(node_id, -1) source_idx = order_idx.get(node_id, -1)
if target_idx != -1 and target_idx <= source_idx: if target_idx != -1 and target_idx <= source_idx:
@@ -129,7 +143,7 @@ class GraphOverview(Vertical):
def _render_node_line(self, node_id: str) -> str: def _render_node_line(self, node_id: str) -> str:
"""Render a single node with status symbol and optional status text.""" """Render a single node with status symbol and optional status text."""
graph = self.runtime.graph graph = self._graph
is_terminal = node_id in (graph.terminal_nodes or []) is_terminal = node_id in (graph.terminal_nodes or [])
is_active = node_id == self.active_node is_active = node_id == self.active_node
is_done = node_id in self.execution_path and not is_active is_done = node_id in self.execution_path and not is_active
@@ -160,7 +174,7 @@ class GraphOverview(Vertical):
Back-edges are excluded here they are drawn by the return-channel Back-edges are excluded here they are drawn by the return-channel
overlay in Pass 2. overlay in Pass 2.
""" """
all_edges = self.runtime.graph.get_outgoing_edges(node_id) all_edges = self._graph.get_outgoing_edges(node_id)
if not all_edges: if not all_edges:
return [] return []
@@ -399,7 +413,7 @@ class GraphOverview(Vertical):
display = self.query_one("#graph-display", RichLog) display = self.query_one("#graph-display", RichLog)
display.clear() display.clear()
graph = self.runtime.graph graph = self._graph
display.write(f"[bold cyan]Agent Graph:[/bold cyan] {graph.id}\n") display.write(f"[bold cyan]Agent Graph:[/bold cyan] {graph.id}\n")
ordered = self._topo_order() ordered = self._topo_order()
@@ -515,7 +529,7 @@ class GraphOverview(Vertical):
self._node_status.clear() self._node_status.clear()
self.execution_path.clear() self.execution_path.clear()
entry_node = event.data.get("entry_node") or ( entry_node = event.data.get("entry_node") or (
self.runtime.graph.entry_node if self.runtime else None self._graph.entry_node if self.runtime else None
) )
if entry_node: if entry_node:
self.update_active_node(entry_node) self.update_active_node(entry_node)
@@ -178,7 +178,12 @@ class SelectableRichLog(RichLog):
# Build full text from all lines # Build full text from all lines
all_text = "\n".join(strip.text for strip in self.lines) all_text = "\n".join(strip.text for strip in self.lines)
extracted = sel.extract(all_text) try:
extracted = sel.extract(all_text)
except (IndexError, ValueError):
# Selection coordinates can exceed line count when the virtual
# canvas is larger than the actual content (e.g. after scroll).
return None
return extracted if extracted else None return extracted if extracted else None
def copy_selection(self) -> str | None: def copy_selection(self) -> str | None:
+1 -2
View File
@@ -5,7 +5,7 @@ Focused on minimal success and failure scenarios.
import pytest import pytest
from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec from framework.graph.edge import GraphSpec
from framework.graph.executor import GraphExecutor from framework.graph.executor import GraphExecutor
from framework.graph.goal import Goal from framework.graph.goal import Goal
from framework.graph.node import NodeResult, NodeSpec from framework.graph.node import NodeResult, NodeSpec
@@ -158,7 +158,6 @@ class FakeEventBus:
@pytest.mark.asyncio @pytest.mark.asyncio
# ---- Fake event_loop node (registered, so executor won't emit for it) ---- # ---- Fake event_loop node (registered, so executor won't emit for it) ----
class FakeEventLoopNode: class FakeEventLoopNode:
def validate_input(self, ctx): def validate_input(self, ctx):
+112
View File
@@ -0,0 +1,112 @@
# Hive Coder: Meta-Agent Integration Plan
## Problem
The hive_coder agent currently has 7 file I/O tools (`read_file`, `write_file`, `edit_file`, `list_directory`, `search_files`, `run_command`, `undo_changes`) in `tools/coder_tools_server.py`. It can write agent packages but is **not integrated into the Hive ecosystem**:
1. **No dynamic tool discovery** — It references a static list of hive-tools in `reference/framework_guide.md`. It can't discover what MCP tools are actually available or what parameters they accept.
2. **No runtime observability** — It can't inspect sessions, checkpoints, or logs from agents it builds. When something goes wrong, the user has to manually dig through files.
3. **No test execution** — It can't run an agent's test suite structurally (it could use `run_command` with raw pytest, but has no structured test parsing).
## Solution
Add 8 new tools to `coder_tools_server.py` that give hive_coder deep integration with the Hive framework. Update the system prompt to teach the LLM when and how to use these meta-agent capabilities.
---
## New Tools
### 1. Tool Discovery
**`discover_mcp_tools(server_config_path?)`**
Connect to any MCP server and list all available tools with full schemas. Uses `framework.runner.mcp_client.MCPClient` — the same client the runtime uses. Reads a `mcp_servers.json` file (defaults to hive-tools), connects to each server, calls `list_tools()`, returns tool names + descriptions + input schemas, then disconnects.
This replaces the static tools reference. The LLM now discovers tools dynamically before designing an agent.
### 2. Agent Inventory
**`list_agents()`**
Scan `exports/` for agent packages and `~/.hive/agents/` for runtime data. Returns agent names, descriptions (from `__init__.py`), and session counts. Gives the LLM awareness of what already exists.
### 3-7. Session & Checkpoint Inspection
Ported from `agent_builder_server.py` lines 3484-3856. Pure filesystem reads — JSON + pathlib, zero framework imports.
| Tool | Purpose |
|------|---------|
| `list_agent_sessions(agent_name, status?, limit?)` | List sessions, filterable by status |
| `get_agent_session_state(agent_name, session_id)` | Full session state (memory excluded to prevent context bloat) |
| `get_agent_session_memory(agent_name, session_id, key?)` | Read memory contents from a session |
| `list_agent_checkpoints(agent_name, session_id)` | List checkpoints for debugging |
| `get_agent_checkpoint(agent_name, session_id, checkpoint_id?)` | Load a checkpoint's full state |
**Key difference from agent-builder:** These tools accept `agent_name` (e.g. `"deep_research_agent"`) instead of raw `agent_work_dir` paths. They resolve to `~/.hive/agents/{agent_name}/` internally. Friendlier for the LLM.
### 8. Test Execution
**`run_agent_tests(agent_name, test_types?, fail_fast?)`**
Ported from `agent_builder_server.py` lines 2756-2920. Runs pytest on an agent's test suite, sets PYTHONPATH automatically, parses output into structured results (passed/failed/skipped counts, per-test status, failure details).
---
## Files to Modify
### `tools/coder_tools_server.py` (~400 new lines)
Add all 8 tools after the existing `undo_changes` tool:
```
# ── Meta-agent: Tool discovery ────────────────────────────────
# discover_mcp_tools()
# ── Meta-agent: Agent inventory ───────────────────────────────
# list_agents()
# ── Meta-agent: Session & checkpoint inspection ───────────────
# _resolve_hive_agent_path(), _read_session_json(), _scan_agent_sessions(), _truncate_value()
# list_agent_sessions(), get_agent_session_state(), get_agent_session_memory()
# list_agent_checkpoints(), get_agent_checkpoint()
# ── Meta-agent: Test execution ────────────────────────────────
# run_agent_tests()
```
### `exports/hive_coder/nodes/__init__.py`
- Add 8 new tool names to the `tools` list
- Rewrite system prompt "Tools Available" section with meta-agent tools
- Add "Meta-Agent Capabilities" section teaching:
- Tool discovery before designing agents
- Post-build test execution
- Debugging via session/checkpoint inspection
- Agent awareness via `list_agents()`
### `exports/hive_coder/agent.py`
- Update `identity_prompt` to mention dynamic tool discovery and runtime observability
- Add `dynamic-tool-discovery` constraint to the goal
### `exports/hive_coder/reference/framework_guide.md`
Replace static tools list with a note to use `discover_mcp_tools()` instead.
---
## What's NOT in Scope (deferred to v2)
- **Agent notifications / webhook listener** — Requires always-on listener architecture
- **`compare_agent_checkpoints`** — LLM can compare by reading two checkpoints sequentially
- **Runtime log query tools** — Available in hive-tools MCP; `run_command` can access them now
---
## Verification
1. MCP server starts with all 15 tools (7 existing + 8 new)
2. `discover_mcp_tools()` connects to hive-tools and returns real tool schemas
3. Agent validation passes (`default_agent.validate()`)
4. Session tools work against existing data in `~/.hive/agents/`
5. Smoke test: launch in TUI, ask it to discover tools
+75
View File
@@ -0,0 +1,75 @@
# Hive Queen Bee: Native agent-building agent
## Problem
Building a Hive agent today requires manual assembly of 7+ files (`agent.py`, `config.py`, `nodes/__init__.py`, `__init__.py`, `__main__.py`, `mcp_servers.json`, tests) with precise framework conventions — correct imports, entry_points format, conversation_mode values, STEP 1/STEP 2 prompt patterns, nullable_output_keys, and more. A single missing re-export in `__init__.py` silently breaks `AgentRunner.load()`. This is the #1 friction point for new users and a recurring source of bugs even for experienced ones.
There is no tool that understands the framework deeply enough to produce correct agents. General-purpose coding assistants hallucinate tool names, use wrong import paths (`from core.framework...`), create too many thin nodes, forget module-level exports, and produce agents that fail validation.
## Proposal
Build **Hive Coder** (codename "Queen Bee") — a framework-native coding agent that lives inside the framework itself and builds complete, validated agent packages from natural language.
### Design principles
1. **Single-node, forever-alive** — One continuous EventLoopNode conversation handles the full lifecycle (understand, qualify, design, implement, verify, iterate). No artificial phase boundaries that destroy context.
2. **Meta-agent capabilities** — Not just a file writer. Can discover available MCP tools at runtime, inspect sessions/checkpoints of agents it builds, run their test suites, and debug failures.
3. **Self-verifying** — Runs three validation steps after every build: class validation (graph structure), `AgentRunner.load()` (package export contract), and pytest. Fixes its own errors up to 3 attempts.
4. **Honest qualification** — Assesses framework fit before building. If a use case is a poor fit (needs sub-second latency, pure CRUD, massive data pipelines), says so instead of producing a bad agent.
5. **Reference-grounded** — Ships with embedded reference docs (framework guide, file templates, anti-patterns) that it reads before writing code. No reliance on training data for framework specifics.
### Components
#### `hive_coder` agent (`core/framework/agents/hive_coder/`)
| File | Purpose |
|------|---------|
| `agent.py` | Goal, single-node graph, `HiveCoderAgent` class |
| `nodes/__init__.py` | `coder` EventLoopNode with comprehensive system prompt |
| `config.py` | RuntimeConfig with `~/.hive/configuration.json` auto-detection |
| `__main__.py` | Click CLI (`run`, `tui`, `info`, `validate`, `shell`) |
| `reference/framework_guide.md` | Node types, edges, patterns, async entry points |
| `reference/file_templates.md` | Complete code templates for every agent file |
| `reference/anti_patterns.md` | 22 common mistakes with explanations |
#### Coder Tools MCP Server (`tools/coder_tools_server.py`)
Dedicated tool server providing:
- **File I/O**: `read_file` (with line numbers, offset/limit), `write_file` (auto-mkdir), `edit_file` (9-strategy fuzzy matching ported from opencode), `list_directory`, `search_files` (regex)
- **Shell**: `run_command` (timeout, cwd, output truncation)
- **Git**: `undo_changes` (snapshot-based rollback)
- **Meta-agent**: `discover_mcp_tools`, `list_agents`, `list_agent_sessions`, `get_agent_session_state`, `get_agent_session_memory`, `list_agent_checkpoints`, `get_agent_checkpoint`, `run_agent_tests`
All file operations sandboxed to a configurable project root.
#### Framework changes
- `hive code` CLI command — direct launch shortcut
- `hive tui` — discovers framework agents as a source
- `AgentRuntime` — cron expression support (`croniter`) for async entry points
- `prompt_composer` — appends current datetime to system prompts
- `NodeSpec.max_node_visits` — default changed from 1 to 0 (unbounded), matching forever-alive as the standard pattern
- TUI graph view — cron display and hours in countdown
- CredentialError graceful handling in TUI launch
## Acceptance criteria
- [ ] `hive code` launches Hive Coder in the TUI
- [ ] `hive tui` lists framework agents alongside exports/ and examples/
- [ ] Given "build me a research agent that searches the web and summarizes findings", Hive Coder produces a valid package in `exports/` that passes `AgentRunner.load()`
- [ ] Tool discovery works: agent calls `discover_mcp_tools()` before designing, never fabricates tool names
- [ ] Self-verification: agent runs all 3 validation steps and fixes errors before presenting
- [ ] Cron timers fire on schedule (unit tested)
- [ ] `max_node_visits=0` default does not break existing agents or tests
- [ ] Reference docs are accurate and match current framework behavior
## Non-goals
- Multi-agent orchestration (queen spawning worker agents at runtime) — future work
- GUI/web interface — TUI only for v1
- Auto-publishing to a registry — agents are local packages
+288
View File
@@ -0,0 +1,288 @@
# Plan: Multi-Graph Sessions with Guardian Pattern
## Context
The target experience: hive_coder builds an agent (e.g., email automation), loads it into the same runtime session, and acts as its guardian. The email agent runs autonomously while hive_coder watches for failures. On error, hive_coder asks the user for help if they're around, attempts an autonomous fix if they're away, and escalates catastrophic failures for post-mortem.
This requires multiple agent graphs sharing a single `AgentRuntime` session — shared memory and data, but isolated conversations. The existing runtime already has most of the primitives: `ExecutionStream` accepts its own `graph`, `trigger_type="event"` subscribes entry points to the EventBus, and `_get_primary_session_state()` bridges memory across streams.
## Architecture Overview
```
AgentRuntime (shared EventBus, shared state.json, shared data/)
├── hive_coder graph
│ ├── Stream "default" → coder node (client_facing, manual)
│ └── Stream "guardian" → guardian node (event-driven, subscribes to EXECUTION_FAILED)
└── email_agent graph
└── Stream "email_agent::default" → intake node (client_facing, manual)
```
The guardian entry point on hive_coder fires when email_agent emits `EXECUTION_FAILED`. It receives the failure event in its input, reads shared memory for context, and decides: ask user (if present), auto-fix (if away), or escalate (if catastrophic).
## Gap 1: Event Scoping — `graph_id` on Events
**Problem**: EventBus events carry `stream_id` and `node_id` but no `graph_id`. The guardian needs to subscribe to events from a specific graph (email_agent), not a specific stream name.
**Solution**: Add `graph_id: str | None = None` to `AgentEvent` and `filter_graph` to `Subscription`.
### `core/framework/runtime/event_bus.py`
- `AgentEvent` dataclass: add `graph_id: str | None = None` field, include in `to_dict()`
- `Subscription` dataclass: add `filter_graph: str | None = None`
- `subscribe()`: accept `filter_graph` param, pass to `Subscription`
- `_matches()`: check `filter_graph` against `event.graph_id`
### `core/framework/runtime/execution_stream.py`
- `__init__()`: accept `graph_id: str | None = None`, store as `self.graph_id`
- When emitting events via `_event_bus.publish()`: set `event.graph_id = self.graph_id`
## Gap 2: Multi-Graph Runtime — `add_graph()` / `remove_graph()`
**Problem**: `AgentRuntime.__init__` takes a single `GraphSpec`. We need to add/remove graphs dynamically at runtime.
**Solution**: Keep the primary graph on `__init__`. Add methods to register secondary graphs that create their own `ExecutionStream` instances backed by a different graph.
### `core/framework/runtime/agent_runtime.py`
New instance state:
```python
self._graph_id: str = graph_id or "primary" # ID for the primary graph
self._graphs: dict[str, _GraphRegistration] = {} # graph_id -> registration
self._active_graph_id: str = self._graph_id # TUI focus
```
Where `_GraphRegistration` is a simple dataclass:
```python
@dataclass
class _GraphRegistration:
graph: GraphSpec
goal: Goal
entry_points: dict[str, EntryPointSpec]
streams: dict[str, ExecutionStream]
storage_subpath: str # relative to session root, e.g. "graphs/email_agent"
event_subscriptions: list[str] # EventBus subscription IDs
timer_tasks: list[asyncio.Task]
```
New methods:
- `add_graph(graph_id, graph, goal, entry_points, storage_subpath=None)` — creates streams for the graph using graph-scoped storage, sets up event/timer triggers, stamps `graph_id` on all streams. Can be called while running.
- `remove_graph(graph_id)` — stops streams, cancels timers, unsubscribes events, removes registration. Cannot remove primary graph.
- `list_graphs() -> list[str]` — returns all graph IDs
- `active_graph_id` property with setter — TUI uses this to control which graph's events are displayed
Update existing methods:
- `start()`: stamp `self._graph_id` on primary graph streams (via `ExecutionStream.graph_id`)
- `inject_input(node_id, content)`: search active graph's streams first, then all others
- `_get_primary_session_state()`: search across ALL graphs' streams (not just primary's)
- `stop()`: stop all secondary graph streams/timers/subscriptions too
### Storage Layout
```
~/.hive/agents/hive_coder/sessions/{session_id}/
state.json ← SHARED across all graphs
data/ ← SHARED data directory
conversations/coder/ ← hive_coder conversations
graphs/
email_agent/ ← secondary graph storage root
conversations/
intake/
checkpoints/
```
Secondary graph executors get `storage_path = {session_root}/graphs/{graph_id}/` while `state.json` and `data/` remain at the session root. The `resume_session_id` mechanism in `_get_primary_session_state()` already handles this — secondary executions find the primary session's `state.json`.
**Concurrent state.json writes**: For the guardian pattern (sequential: email_agent fails → guardian triggers), no file lock needed. But since both could technically write concurrently, add a simple `fcntl.flock()` wrapper around `_write_progress()` in the executor. Small, defensive change.
## Gap 3: Guardian Pattern — User Presence + Autonomous Recovery
**Problem**: When email_agent fails, hive_coder's guardian entry point must decide: ask user or auto-fix.
**Solution**: User presence is a runtime-level signal. The guardian's system prompt and event data give it enough context to decide.
### User Presence Tracking
Add to `AgentRuntime`:
```python
self._last_user_input_time: float = 0.0 # monotonic timestamp
```
Updated in `inject_input()` (called whenever user types in TUI). Exposed as:
```python
@property
def user_idle_seconds(self) -> float:
if self._last_user_input_time == 0:
return float('inf')
return time.monotonic() - self._last_user_input_time
```
The guardian node's system prompt instructs the LLM: "If user_idle_seconds < 120, ask the user for guidance via the client-facing interaction. If user is away, attempt an autonomous fix."
This is NOT framework logic — it's prompt-driven. The guardian node is a regular `event_loop` node with `client_facing=True` and tools for code editing + agent lifecycle. The LLM decides the strategy based on presence info injected as context.
### Escalation Model
Escalation = save a structured log entry. No special framework support needed. The guardian node uses `save_data("escalation_log.jsonl", ...)` via the existing data tools. The LLM writes:
```json
{"timestamp": "...", "severity": "catastrophic", "agent": "email_agent", "error": "...", "attempted_fixes": [...], "recommended_action": "..."}
```
Post-mortem: user opens `/data escalation_log.jsonl` or the TUI shows a notification linking to it.
## Gap 4: Graph Lifecycle Tools — Stop/Reload/Restart
**Problem**: hive_coder needs to programmatically stop a broken agent, fix its code, reload it, and restart it.
**Solution**: MCP tools accessible to the active agent. Uses `ContextVar` to access the runtime (same pattern as `data_dir`).
### `core/framework/tools/session_graph_tools.py` (NEW)
```python
async def load_agent(agent_path: str) -> str:
"""Load an agent graph into the running session."""
async def unload_agent(graph_id: str) -> str:
"""Stop and remove an agent graph from the session."""
async def start_agent(graph_id: str, entry_point: str = "default", input_data: str = "{}") -> str:
"""Trigger an entry point on a loaded agent graph."""
async def restart_agent(graph_id: str) -> str:
"""Unload and re-load an agent (picks up code changes)."""
async def list_agents() -> str:
"""List all agent graphs in the current session with their status."""
async def get_user_presence() -> str:
"""Return user idle time and presence status."""
```
These tools call `runtime.add_graph()`, `runtime.remove_graph()`, `runtime.trigger()`, etc.
### Registration
These tools are registered via `ToolRegistry` with `CONTEXT_PARAM` for `runtime` (injected by the executor, same as `data_dir`). Only available when the runtime is multi-graph capable (set by `cmd_code()`).
## Gap 5: TUI Integration — Graph Switching + Background Notifications
### `core/framework/tui/app.py`
- `_route_event()`: check `event.graph_id` against `runtime.active_graph_id`
- Events from active graph: route normally (streaming, chat, etc.)
- `CLIENT_INPUT_REQUESTED` from background graph: show notification bar
- `EXECUTION_FAILED` from background graph: show error notification
- `EXECUTION_COMPLETED` from background: show brief completion notice
- Other background events: silent (visible in logs)
- `action_switch_graph(graph_id)`: update `runtime.active_graph_id`, refresh graph view, show header
### `core/framework/tui/widgets/chat_repl.py`
- Track `_input_graph_id: str | None` alongside `_input_node_id`
- `handle_input_requested(node_id, graph_id)`: if background graph, show notification instead of enabling input
- `_submit_input()`: pass `graph_id` to help `inject_input()` route correctly
- New TUI commands:
- `/graphs` — list loaded graphs and their status
- `/graph <id>` — switch active graph focus
- `/load <path>` — load an agent graph into the session
- `/unload <id>` — remove a graph from the session
- On graph switch: flush streaming state, render graph header separator
### `core/framework/tui/widgets/graph_view.py`
- `switch_graph(graph_id)` — re-render the graph visualization for the new active graph
- When multi-graph active: show tab-like header listing all loaded graphs
## Gap 6: CLI + Runner Integration
### `core/framework/runner/cli.py`
- `cmd_code()` creates the hive_coder runtime with `graph_id="hive_coder"`
- Registers `session_graph_tools` with the tool config so hive_coder's LLM can call them
- Sets `runtime._multi_graph_capable = True` flag
### `core/framework/runner/runner.py`
- New method: `setup_as_secondary(runtime, graph_id)` — configures this runner to join an existing `AgentRuntime` as a secondary graph. Uses the existing `AgentRunner.load()` to parse agent.json, then calls `runtime.add_graph()` with the parsed graph/goal/entry_points.
## Gap 7: Reliable Mid-Node Resume
**Problem**: When an EventLoopNode is interrupted (crash, Ctrl+Z, context switch), resume doesn't restore to exactly where execution stopped. Several pieces of in-node state are lost, which changes behavior post-resume. In multi-graph sessions with parallel execution and frequent context switching, these gaps compound.
### What's already restored correctly
- **Conversation history**: All messages persisted to disk immediately via `FileConversationStore._persist()` — one file per message in `parts/NNNNNNNNNN.json`
- **OutputAccumulator values**: Write-through to `cursor.json` on every `accumulator.set()` call
- **Iteration counter**: Written to `cursor.json` at the end of each iteration (step 6g)
- **Orphaned tool calls**: `_repair_orphaned_tool_calls()` patches in-flight tool calls with error messages so the LLM knows to retry
### What's lost — and fixes
#### 1. `user_interaction_count` (CRITICAL)
Resets to 0 on resume. This controls client-facing blocking semantics: before the first interaction, `set_output`-only turns don't prevent blocking (the LLM must present to the user first). After resume, a node that had 3 user interactions behaves as if the user never interacted.
**Fix**: Persist `user_interaction_count` to `cursor.json` alongside `iteration` and `outputs`. Write it in `_write_cursor()` (step 6g), restore in `_restore()`.
**Files**: `core/framework/graph/event_loop_node.py`
#### 2. Accumulator outputs not in SharedMemory
The `OutputAccumulator` writes to `cursor.json` (durable) but only writes to `SharedMemory` when the judge ACCEPTs. On crash, the CancelledError handler captures `memory.read_all()` — which doesn't include the accumulator's WIP values. On resume, edge conditions checking those memory keys see `None`.
**Fix**: In the executor's `CancelledError` handler, read the interrupted node's `cursor.json` and write any accumulator outputs to `memory` before building `session_state_out`. This ensures resume memory includes WIP output values.
**Files**: `core/framework/graph/executor.py` (CancelledError handler, ~line 1289)
#### 3. Stall/doom-loop detection counters
`recent_responses` and `recent_tool_fingerprints` reset to empty lists. A previously near-stalled node gets a fresh detection budget.
**Fix**: Persist these to `cursor.json`. They're small (last N strings). Write in `_write_cursor()`, restore in `_restore()`.
**Files**: `core/framework/graph/event_loop_node.py`
#### 4. `continuous_conversation` at executor level
In continuous mode, the executor's `continuous_conversation` variable is `None` on resume. The node's `_restore()` recovers messages from disk, but the executor doesn't pre-populate this variable until the node returns.
**Fix**: After a resumed node completes, set `continuous_conversation = result.conversation` (this already happens in the normal path at line 1155 — verify it also runs on the resume path).
**Files**: `core/framework/graph/executor.py`
### Multi-graph specific: independent resume per graph
Each graph in a multi-graph session has its own storage subdirectory (`graphs/{graph_id}/`) with its own `conversations/`, `checkpoints/`, and `cursor.json` files. Resume is already per-executor, so each graph resumes independently. The shared `state.json` at the session root captures the union of all graphs' memory — the `fcntl.flock()` wrapper on `_write_progress()` (Gap 2) ensures concurrent writes don't corrupt it.
### Implementation
These fixes are prerequisite to multi-graph and should be done as **Phase 0** before the EventBus changes:
1. Persist `user_interaction_count` + stall/doom counters to `cursor.json`
2. Restore them in `_restore()`
3. Flush accumulator outputs to SharedMemory in executor's CancelledError handler
4. Verify continuous_conversation is set on resume path
## Implementation Phases
### Phase 0: Reliable Mid-Node Resume (prerequisite)
1. `event_loop_node.py` — persist `user_interaction_count`, `recent_responses`, `recent_tool_fingerprints` to `cursor.json` via `_write_cursor()`; restore in `_restore()`
2. `executor.py` — in CancelledError handler, read interrupted node's `cursor.json` accumulator outputs and write to `memory` before building `session_state_out`
3. `executor.py` — verify `continuous_conversation` is populated on resume path
### Phase 1: EventBus Foundation
1. `event_bus.py``graph_id` on `AgentEvent`, `filter_graph` on `Subscription` + `_matches()`
2. `execution_stream.py` — accept and stamp `graph_id` on emitted events
### Phase 2: Multi-Graph Runtime
3. `agent_runtime.py``_GraphRegistration` dataclass, `add_graph()`, `remove_graph()`, `list_graphs()`, `active_graph_id` property
4. `agent_runtime.py` — update `inject_input()`, `_get_primary_session_state()`, `stop()` for multi-graph
5. `agent_runtime.py` — user presence tracking (`_last_user_input_time`, `user_idle_seconds`)
6. Storage path logic: secondary graphs get `{session_root}/graphs/{graph_id}/`
### Phase 3: Graph Lifecycle Tools
7. `core/framework/tools/session_graph_tools.py``load_agent`, `unload_agent`, `start_agent`, `restart_agent`, `list_agents`, `get_user_presence`
8. `runner.py``setup_as_secondary()` method
### Phase 4: TUI Integration
9. `app.py``graph_id` event filtering, background notifications, `action_switch_graph`
10. `chat_repl.py``/graphs`, `/graph`, `/load`, `/unload` commands, graph_id tracking
11. `graph_view.py` — multi-graph header, `switch_graph()`
### Phase 5: hive_coder Integration
12. `cli.py``cmd_code()` sets up multi-graph capable runtime, registers graph tools
13. hive_coder's agent config — add guardian entry point with `trigger_type="event"` subscribing to `EXECUTION_FAILED`
14. Guardian node system prompt — presence-aware triage logic (ask user / auto-fix / escalate)
## Backward Compatibility
- Single-graph `hive run exports/my_agent` unchanged: `graph_id` defaults to `None`, no secondary graphs loaded, events carry `graph_id=None`, TUI shows no graph switching UI
- All new fields are optional with `None` defaults
- `_get_primary_session_state()` existing behavior preserved when no secondary graphs exist
## Verification
1. **Unit**: `add_graph()` creates streams with correct `graph_id`, events carry `graph_id`, `filter_graph` works in subscriptions, `inject_input()` routes to correct graph
2. **Integration**: Load hive_coder + email_agent, email_agent fails → guardian fires → reads shared memory → decides action
3. **TUI**: `/graphs` shows both, `/graph` switches, background failure notification appears, input routing works across graphs
4. **Backward compat**: `hive run exports/deep_research_agent --tui` works unchanged
5. **Lifecycle**: `restart_agent` picks up code changes, `unload_agent` cleans up streams and subscriptions
+56
View File
@@ -0,0 +1,56 @@
# feat(queen): Hive Queen Bee — native agent-building agent
## Summary
Introduces **Hive Coder** (codename "Queen Bee"), a framework-native coding agent that builds complete Hive agent packages from natural language descriptions. This is a single-node, forever-alive agent inspired by opencode's `while(true)` loop — one continuous conversation handles the full lifecycle: understand, qualify, design, implement, verify, and iterate.
The agent is deeply integrated with the framework: it can discover available MCP tools at runtime, inspect sessions and checkpoints of agents it builds, run their test suites, and self-verify its own output. It ships with a dedicated MCP tools server (`coder_tools_server.py`) providing rich file I/O, fuzzy-match editing, git snapshots, and shell execution — all scoped to a configurable project root.
## What's included
### New: `hive_coder` agent (`core/framework/agents/hive_coder/`)
- **`agent.py`** — Goal with 4 success criteria and 4 constraints, single-node graph, `HiveCoderAgent` class with full runtime lifecycle (start/stop/trigger_and_wait)
- **`nodes/__init__.py`** — Single `coder` EventLoopNode with a comprehensive system prompt covering coding mandates, tool discovery, meta-agent capabilities, node count rules, implementation templates, and a 6-phase workflow
- **`config.py`** — RuntimeConfig with auto-detection of preferred model from `~/.hive/configuration.json`
- **`__main__.py`** — Click CLI with `run`, `tui`, `info`, `validate`, and `shell` subcommands
- **`reference/`** — Framework guide, file templates, and anti-patterns docs embedded as agent reference material
### New: Coder Tools MCP Server (`tools/coder_tools_server.py`)
- 1500-line MCP server providing 15 tools: `read_file`, `write_file`, `edit_file` (with opencode-style 9-strategy fuzzy matching), `list_directory`, `search_files`, `run_command`, `undo_changes`, `discover_mcp_tools`, `list_agents`, `list_agent_sessions`, `get_agent_session_state`, `get_agent_session_memory`, `list_agent_checkpoints`, `get_agent_checkpoint`, `run_agent_tests`
- Path-scoped security: all file operations sandboxed to project root
- Git-based undo: automatic snapshots before writes with `undo_changes` rollback
### Framework changes
- **`hive code` CLI command** — Direct launch shortcut for Hive Coder via `cmd_code` in `runner/cli.py`
- **`hive tui` updated** — Now discovers framework agents alongside exports/ and examples/
- **Cron timer support** — `AgentRuntime` now supports cron expressions (`croniter`) in addition to fixed-interval timers for async entry points
- **Datetime in system prompts** — `prompt_composer._with_datetime()` appends current datetime to all composed system prompts; EventLoopNode also applies it for isolated conversations
- **`max_node_visits` default → 0** — Changed from 1 to 0 (unbounded) across `NodeSpec` and executor, matching the forever-alive pattern as the standard default
- **TUI graph view** — Timer display updated to show cron expressions and hours in countdown
- **CredentialError handling** — `_setup()` calls in TUI launch paths now catch and display credential errors gracefully
### Tests
- New `test_agent_runtime.py` tests for cron-based timer scheduling
## Architecture
```
User ──▶ [coder] (EventLoopNode, client_facing, forever-alive)
│ Tools: coder_tools_server.py (file I/O, shell, git)
│ + meta-agent tools (discover, inspect, test)
└──▶ loops continuously until user exits
```
Single node. No edges. No terminal nodes. The agent stays alive and handles multiple build requests in one session — context accumulates across interactions.
## Test plan
- [ ] `hive code` launches Hive Coder TUI successfully
- [ ] `hive tui` shows "Framework Agents" as a source option
- [ ] Agent can discover tools via `discover_mcp_tools()`
- [ ] Agent generates a valid agent package from a natural language request
- [ ] Generated packages pass `AgentRunner.load()` validation
- [ ] Cron timer tests pass (`test_agent_runtime.py`)
- [ ] Existing tests unaffected by `max_node_visits` default change