refactor(architecture): revamp

This commit is contained in:
Timothy
2026-04-07 09:19:03 -07:00
parent d10912be15
commit 93dc35dcbb
54 changed files with 2342 additions and 1236 deletions
+3 -3
View File
@@ -28,8 +28,8 @@ try:
from framework.llm import AnthropicProvider # noqa: F401
except ImportError:
pass
from framework.runner import AgentRunner
from framework.runtime.core import Runtime
from framework.runner import AgentLoader
from framework.runtime.core import DecisionTracker # noqa: F401
from framework.schemas.decision import Decision, DecisionEvaluation, Option, Outcome
from framework.schemas.run import Problem, Run, RunSummary
@@ -59,7 +59,7 @@ __all__ = [
"LLMProvider",
"AnthropicProvider",
# Runner
"AgentRunner",
"AgentLoader",
# Testing
"Test",
"TestResult",
+14
View File
@@ -0,0 +1,14 @@
"""Agent loop -- the core agent execution primitive."""
from framework.graph.conversation import ( # noqa: F401
ConversationStore,
Message,
NodeConversation,
)
from framework.graph.event_loop_node import ( # noqa: F401
AgentLoop,
JudgeProtocol,
JudgeVerdict,
LoopConfig,
OutputAccumulator,
)
@@ -0,0 +1,7 @@
"""Agent loop internals -- compaction, judge, tools, subagent execution.
Re-exports from legacy locations for the new import path.
"""
from framework.graph.event_loop.compaction import * # noqa: F401, F403
from framework.graph.event_loop.synthetic_tools import * # noqa: F401, F403
+9 -1
View File
@@ -8,6 +8,14 @@ FRAMEWORK_AGENTS_DIR = Path(__file__).parent
def list_framework_agents() -> list[Path]:
"""List all framework agent directories."""
return sorted(
[p for p in FRAMEWORK_AGENTS_DIR.iterdir() if p.is_dir() and (p / "agent.py").exists()],
[
p
for p in FRAMEWORK_AGENTS_DIR.iterdir()
if p.is_dir()
and (
(p / "agent.json").exists()
or (p / "agent.py").exists()
)
],
key=lambda p: p.name,
)
@@ -28,7 +28,7 @@ from framework.graph.executor import ExecutionResult
from framework.llm import LiteLLMProvider
from framework.runner.mcp_registry import MCPRegistry
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
from framework.runtime.agent_runtime import AgentHost, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config
@@ -37,7 +37,7 @@ from .nodes import build_tester_node
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from framework.runner import AgentRunner
from framework.runner import AgentLoader
logger = logging.getLogger(__name__)
@@ -233,7 +233,7 @@ requires_account_selection = True
"""Signal TUI to show account picker before starting the agent."""
def configure_for_account(runner: AgentRunner, account: dict) -> None:
def configure_for_account(runner: AgentLoader, account: dict) -> None:
"""Scope the tester node's tools to the selected provider.
Handles both Aden accounts (account= routing) and local accounts
@@ -325,7 +325,7 @@ def _activate_local_account(credential_id: str, alias: str) -> None:
def _configure_aden_node(
runner: AgentRunner,
runner: AgentLoader,
provider: str,
alias: str,
detail: str,
@@ -368,7 +368,7 @@ or any other identifier — always use the alias exactly as shown.
def _configure_local_node(
runner: AgentRunner,
runner: AgentLoader,
provider: str,
alias: str,
identity: dict,
@@ -497,7 +497,7 @@ class CredentialTesterAgent:
def __init__(self, config=None):
self.config = config or default_config
self._selected_account: dict | None = None
self._agent_runtime: AgentRuntime | None = None
self._agent_runtime: AgentHost | None = None
self._tool_registry: ToolRegistry | None = None
self._storage_path: Path | None = None
+34 -23
View File
@@ -112,13 +112,33 @@ def _count_runs(agent_name: str) -> int:
def _extract_agent_stats(agent_path: Path) -> tuple[int, int, list[str]]:
"""Extract node count, tool count, and tags from an agent directory.
Prefers agent.py (AST-parsed) over agent.json for node/tool counts
since agent.json may be stale. Tags are only available from agent.json.
Checks agent.json (declarative) first, then agent.py (legacy).
"""
import ast
node_count, tool_count, tags = 0, 0, []
# Declarative JSON agents (preferred)
agent_json = agent_path / "agent.json"
if agent_json.exists():
try:
data = json.loads(agent_json.read_text(encoding="utf-8"))
if isinstance(data, dict):
json_nodes = data.get("nodes", [])
node_count = len(json_nodes)
tools: set[str] = set()
for n in json_nodes:
node_tools = n.get("tools", {})
if isinstance(node_tools, dict):
tools.update(node_tools.get("allowed", []))
elif isinstance(node_tools, list):
tools.update(node_tools)
tool_count = len(tools)
return node_count, tool_count, tags
except Exception:
pass
# Legacy: agent.py (AST-parsed)
agent_py = agent_path / "agent.py"
if agent_py.exists():
try:
@@ -132,21 +152,6 @@ def _extract_agent_stats(agent_path: Path) -> tuple[int, int, list[str]]:
except Exception:
pass
agent_json = agent_path / "agent.json"
if agent_json.exists():
try:
data = json.loads(agent_json.read_text(encoding="utf-8"))
json_nodes = data.get("graph", {}).get("nodes", []) or data.get("nodes", [])
if node_count == 0:
node_count = len(json_nodes)
tools: set[str] = set()
for n in json_nodes:
tools.update(n.get("tools", []))
tool_count = len(tools)
tags = data.get("agent", {}).get("tags", [])
except Exception:
pass
return node_count, tool_count, tags
@@ -179,13 +184,19 @@ def discover_agents() -> dict[str, list[AgentEntry]]:
node_count, tool_count, tags = _extract_agent_stats(path)
if not used_config:
agent_json = path / "agent.json"
if agent_json.exists():
# Try agent.json (declarative) for metadata
agent_json_path = path / "agent.json"
if agent_json_path.exists():
try:
data = json.loads(agent_json.read_text(encoding="utf-8"))
meta = data.get("agent", {})
name = meta.get("name", name)
desc = meta.get("description", desc)
data = json.loads(
agent_json_path.read_text(encoding="utf-8"),
)
if isinstance(data, dict):
raw_name = data.get("name", name)
if "-" in raw_name and " " not in raw_name:
raw_name = raw_name.replace("-", " ").title()
name = raw_name
desc = data.get("description", desc)
except Exception:
pass
+46 -32
View File
@@ -177,7 +177,7 @@ _shared_building_knowledge = (
## Paths (MANDATORY)
**Always use RELATIVE paths** \
(e.g. `exports/agent_name/config.py`, `exports/agent_name/nodes/__init__.py`).
(e.g. `exports/agent_name/agent.json`).
**Never use absolute paths** like `/mnt/data/...` or `/workspace/...` they fail.
The project root is implicit.
@@ -485,40 +485,54 @@ When a user says "my agent is failing" or "debug this agent":
in the planning phase. The draft metadata will pre-populate descriptions, \
goals, success criteria, and node metadata in the generated files.**
Call `initialize_and_build_agent(agent_name, nodes)` to generate all package \
files. The agent_name must be snake_case (e.g., "my_agent"). Pass node names \
Call `initialize_and_build_agent(agent_name, nodes)` to generate the agent. \
The agent_name must be snake_case (e.g., "my_agent"). Pass node names \
as comma-separated string (e.g., "gather,process,review").
The tool creates: config.py, nodes/__init__.py, agent.py, \
__init__.py, __main__.py, mcp_servers.json, tests/conftest.py.
The generated files are **structurally complete** with correct imports, \
class definition, `validate()` method, `default_agent` export, and \
`__init__.py` re-exports. They pass validation as-is.
The tool creates a **declarative** agent as two files:
- `agent.json` the entire agent definition (goal, nodes, edges, prompts, tools)
- `mcp_servers.json` tool server config (auto-generated)
`mcp_servers.json` is auto-generated with hive-tools as the default. \
Do NOT manually create or overwrite `mcp_servers.json`.
**There are NO Python files.** No agent.py, config.py, nodes/__init__.py, \
__init__.py, or __main__.py. The framework loads agent.json directly.
### Customizing generated files
### Customizing the generated agent
**CRITICAL: Use `edit_file` to customize TODO placeholders. \
NEVER use `write_file` to rewrite generated files from scratch. \
Rewriting breaks imports, class structure, and causes validation failures.**
Use `edit_file` on `exports/{name}/agent.json` to customize TODO placeholders:
- System prompts (in each node's `system_prompt:` field)
- Tool access (in each node's `tools:` field — use `policy: explicit` with `allowed: [...]`)
- Input/output keys, success criteria, goal description
- Identity prompt (top-level `identity_prompt:` field)
- Edge conditions and routing
Safe to edit with `edit_file`:
- System prompts, tools, input_keys, output_keys, success_criteria in \
nodes/__init__.py
- Goal description, success criteria values, constraint values, edge \
definitions, identity_prompt in agent.py
- CLI options in __main__.py
- For triggers (timers/webhooks), add entries to triggers.json in the \
agent's export directory
**Template variables:** Add a `variables:` section at the top of agent.json \
and use `{{variable_name}}` in system prompts for config injection:
```yaml
variables:
spreadsheet_id: "1ZVx..."
nodes:
- id: start
system_prompt: |
Use spreadsheet: {{spreadsheet_id}}
```
Do NOT modify or rewrite:
- Import statements at top of agent.py (they are correct)
- The agent class definition, `validate()`, `_build_graph()`, `_setup()`, \
or lifecycle methods (start/stop/run)
- `__init__.py` exports (all required variables are already re-exported)
- `default_agent = ClassName()` at bottom of agent.py
### Tool access in nodes
Each node declares its tool access policy:
```yaml
# Explicit list (recommended)
tools:
policy: explicit
allowed: [web_search, save_data]
# All tools from registry (for GCU browser nodes)
tools:
policy: all
# No tools (for handoff/summary nodes)
tools:
policy: none
```
## 6. Verify and Load
@@ -999,12 +1013,12 @@ user says "replan", "go back", "let's redesign", "change the approach", \
If you discover that the agent graph has structural problems GCU nodes \
in the linear flow, missing edges, wrong node connections, incorrect \
sub-agent assignments you MUST call replan_agent() and fix the draft. \
Do NOT attempt to fix topology by editing agent.py directly. The graph \
Do NOT attempt to fix topology by editing agent.json directly. The graph \
structure is defined by the draft dissolution code-gen pipeline. \
Editing code to rewire nodes bypasses the flowchart and creates drift \
between what the user sees and what the code does.
Editing the config to rewire nodes bypasses the flowchart and creates drift \
between what the user sees and what the config does.
**WRONG:** "Let me fix agent.py to remove GCU nodes from edges..."
**WRONG:** "Let me fix agent.json to remove GCU nodes from edges..."
**RIGHT:** Call replan_agent(), fix the draft with save_agent_draft(), \
get user approval, then confirm_and_build() the corrected code is \
generated automatically.
@@ -0,0 +1,227 @@
# Declarative Agent File Templates
Agents are defined as a single `agent.yaml` file. No Python code needed.
The runner loads this file directly -- no `agent.py`, `config.py`, or
`nodes/__init__.py` required.
## agent.yaml -- Complete Agent Definition
```yaml
name: my-agent
version: 1.0.0
description: What this agent does.
metadata:
intro_message: Welcome! What would you like me to do?
# Template variables -- substituted into system_prompt and identity_prompt
# via {{variable_name}} syntax. Use this for config values that appear
# in prompts (spreadsheet IDs, API endpoints, account names, etc.)
variables:
spreadsheet_id: "1ZVxWDL..."
sheet_name: "contacts"
goal:
description: What this agent achieves.
success_criteria:
- "First success criterion"
- "Second success criterion"
constraints:
- "Hard constraint the agent must respect"
identity_prompt: |
You are a helpful agent.
conversation_mode: continuous # always "continuous" for Hive agents
loop_config:
max_iterations: 100
max_tool_calls_per_turn: 30
max_context_tokens: 32000
# MCP servers to connect (resolved by name from ~/.hive/mcp_registry/)
mcp_servers:
- name: hive-tools
- name: gcu-tools
nodes:
# Node 1: Process (autonomous entry node)
# The queen handles intake and passes structured input via
# run_agent_with_input(task). NO client-facing intake node.
- id: process
name: Process
description: Execute the task using available tools
max_node_visits: 0 # 0 = unlimited (forever-alive agents)
input_keys: [user_request, feedback]
output_keys: [results]
nullable_output_keys: [feedback]
tools:
policy: explicit
allowed: [web_search, web_scrape, save_data, load_data, list_data_files]
success_criteria: Results are complete and accurate.
system_prompt: |
You are a processing agent. Your task is in memory under "user_request".
If "feedback" is present, this is a revision.
Work in phases:
1. Use tools to gather/process data
2. Analyze results
3. Call set_output in a SEPARATE turn:
- set_output("results", "structured results")
# Node 2: Handoff (autonomous)
- id: handoff
name: Handoff
description: Prepare worker results for queen review
max_node_visits: 0
input_keys: [results, user_request]
output_keys: [next_action, feedback, worker_summary]
nullable_output_keys: [feedback, worker_summary]
tools:
policy: none # handoff nodes don't need tools
success_criteria: Results are packaged for queen decision-making.
system_prompt: |
Do NOT talk to the user directly. The queen is the only user interface.
If blocked, call escalate(reason, context) then set:
- set_output("next_action", "escalated")
- set_output("feedback", "what help is needed")
Otherwise summarize and set:
- set_output("worker_summary", "short summary for queen")
- set_output("next_action", "done") or "revise"
- set_output("feedback", "what to revise") only when revising
edges:
- from_node: process
to_node: handoff
# Feedback loop
- from_node: handoff
to_node: process
condition: conditional
condition_expr: "str(next_action).lower() == 'revise'"
priority: 2
# Escalation loop
- from_node: handoff
to_node: process
condition: conditional
condition_expr: "str(next_action).lower() == 'escalated'"
priority: 3
# Loop back for next task
- from_node: handoff
to_node: process
condition: conditional
condition_expr: "str(next_action).lower() == 'done'"
entry_node: process
terminal_nodes: [] # [] = forever-alive
```
## Key differences from Python templates
| Before (Python) | After (YAML) |
|-------------------------------------|----------------------------------------|
| `agent.py` (250 lines boilerplate) | Not needed |
| `config.py` (dataclass + metadata) | `variables:` + `metadata:` in YAML |
| `nodes/__init__.py` (NodeSpec calls)| `nodes:` list in YAML |
| `__init__.py`, `__main__.py` | Not needed |
| f-string config injection | `{{variable_name}}` templates |
| `mcp_servers.json` (separate file) | `mcp_servers:` in YAML (or keep file) |
## Node types
| Type | Description | Tools |
|--------------|---------------------------------------|--------------------------|
| `event_loop` | LLM-driven orchestration (default) | Explicit list or `none` |
| `gcu` | Browser automation via GCU tools | `policy: all` (auto) |
## Tool access policies
```yaml
# Explicit list (recommended for most nodes)
tools:
policy: explicit
allowed: [web_search, save_data]
# All tools from registry (for GCU browser nodes)
tools:
policy: all
# No tools (for handoff/summary nodes)
tools:
policy: none
```
## Edge conditions
| Condition | When to use |
|---------------|-------------------------------------------------------|
| `on_success` | Default. Next node after current succeeds. |
| `on_failure` | Fallback path when current node fails. |
| `always` | Always traverse regardless of outcome. |
| `conditional` | Evaluate `condition_expr` against shared memory keys. |
| `llm_decide` | Let the LLM decide at runtime. |
## Template variables
Use `{{variable_name}}` in `system_prompt` and `identity_prompt`.
Variables are defined in the top-level `variables:` map.
```yaml
variables:
spreadsheet_id: "1ZVxWDL..."
api_endpoint: "https://api.example.com"
nodes:
- id: start
system_prompt: |
Connect to spreadsheet: {{spreadsheet_id}}
API endpoint: {{api_endpoint}}
```
## Entry points
Default is a single manual entry point. For timer/scheduled triggers:
```yaml
entry_points:
- id: default
trigger_type: manual
- id: daily-check
trigger_type: timer
trigger_config:
interval_minutes: 30
```
## mcp_servers.json -- Still Supported
The `mcp_servers.json` file is still loaded automatically if present alongside
`agent.yaml`. You can also inline servers in the YAML:
```yaml
mcp_servers:
- name: hive-tools
- name: gcu-tools
```
Both approaches work. The JSON file takes precedence for backward compatibility.
## Migration from Python agents
Run the migration tool to convert existing agents:
```bash
uv run python -m framework.tools.migrate_agent exports/my_agent
```
This generates `agent.yaml` from the existing `agent.py` + `nodes/` + `config.py`.
The original files are left untouched. Once verified, you can delete the Python files.
## Files after migration
```
my_agent/
agent.yaml # The only required file
mcp_servers.json # Optional (can inline in YAML)
flowchart.json # Optional (auto-generated)
```
@@ -1,306 +1,194 @@
# Hive Agent Framework Condensed Reference
# Hive Agent Framework -- Condensed Reference
## Architecture
Agents are Python packages in `exports/`:
Agents are declarative JSON configs in `exports/`:
```
exports/my_agent/
├── __init__.py # MUST re-export ALL module-level vars from agent.py
├── __main__.py # CLI (run, tui, info, validate, shell)
├── agent.py # Graph construction (goal, edges, agent class)
├── config.py # Runtime config
├── nodes/__init__.py # Node definitions (NodeSpec)
├── mcp_servers.json # MCP tool server config
└── tests/ # pytest tests
agent.json # The entire agent definition
mcp_servers.json # MCP tool server config (optional, prefer registry refs)
```
## Agent Loading Contract
No Python files. No `__init__.py`, `__main__.py`, `config.py`, or `nodes/`.
`AgentRunner.load()` imports the package (`__init__.py`) and reads these
module-level variables via `getattr()`:
## Agent Loading
| Variable | Required | Default if missing | Consequence |
|----------|----------|--------------------|-------------|
| `goal` | YES | `None` | **FATAL** — "must define goal, nodes, edges" |
| `nodes` | YES | `None` | **FATAL** — same error |
| `edges` | YES | `None` | **FATAL** — same error |
| `entry_node` | no | `nodes[0].id` | Probably wrong node |
| `entry_points` | no | `{}` | **Nodes unreachable** — validation fails |
| `terminal_nodes` | **YES** | `[]` | **FATAL** — graph must have at least one terminal node |
| `pause_nodes` | no | `[]` | OK |
| `conversation_mode` | no | not passed | Isolated mode (no context carryover) |
| `identity_prompt` | no | not passed | No agent-level identity |
| `loop_config` | no | `{}` | No iteration limits |
| `triggers.json` (file) | no | not present | No triggers (timers, webhooks) |
`AgentLoader.load()` reads `agent.json` and builds the execution graph.
If `agent.py` exists (legacy), it's loaded as a Python module instead.
**CRITICAL:** `__init__.py` MUST import and re-export ALL of these from
`agent.py`. Missing exports silently fall back to defaults, causing
hard-to-debug failures.
## agent.json Schema
**Why `default_agent.validate()` is NOT sufficient:**
`validate()` checks the agent CLASS's internal graph (self.nodes, self.edges).
These are always correct because the constructor references agent.py's module
vars directly. But `AgentRunner.load()` reads from the PACKAGE (`__init__.py`),
not the class. So `validate()` passes while `AgentRunner.load()` fails.
Always test with `AgentRunner.load("exports/{name}")` — this is the same
code path the TUI and `hive run` use.
## Goal
Defines success criteria and constraints:
```python
goal = Goal(
id="kebab-case-id",
name="Display Name",
description="What the agent does",
success_criteria=[
SuccessCriterion(id="sc-id", description="...", metric="...", target="...", weight=0.25),
],
constraints=[
Constraint(id="c-id", description="...", constraint_type="hard", category="quality"),
],
)
```json
{
"name": "my-agent",
"version": "1.0.0",
"description": "What this agent does",
"goal": {
"description": "What to achieve",
"success_criteria": ["criterion 1", "criterion 2"],
"constraints": ["constraint 1"]
},
"identity_prompt": "You are a helpful agent.",
"conversation_mode": "continuous",
"loop_config": {
"max_iterations": 100,
"max_tool_calls_per_turn": 30,
"max_context_tokens": 32000
},
"mcp_servers": [
{"name": "hive-tools"},
{"name": "gcu-tools"}
],
"variables": {
"spreadsheet_id": "1ZVx..."
},
"nodes": [...],
"edges": [...],
"entry_node": "process",
"terminal_nodes": []
}
```
- 3-5 success criteria, weights sum to 1.0
- 1-5 constraints (hard/soft, categories: quality, accuracy, interaction, functional)
## NodeSpec Fields
## Template Variables
Use `{{variable_name}}` in `system_prompt` and `identity_prompt`. Variables
are defined in the top-level `variables` object:
```json
{
"variables": {"sheet_id": "1ZVx..."},
"nodes": [{
"id": "start",
"system_prompt": "Use sheet: {{sheet_id}}"
}]
}
```
## Node Fields
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| id | str | required | kebab-case identifier |
| name | str | required | Display name |
| name | str | id | Display name |
| description | str | required | What the node does |
| node_type | str | required | `"event_loop"` or `"gcu"` (browser automation — see GCU Guide appendix) |
| input_keys | list[str] | required | Memory keys this node reads |
| output_keys | list[str] | required | Memory keys this node writes via set_output |
| node_type | str | "event_loop" | `"event_loop"` or `"gcu"` (browser) |
| input_keys | list | [] | Memory keys this node reads |
| output_keys | list | [] | Memory keys this node writes via set_output |
| system_prompt | str | "" | LLM instructions |
| tools | list[str] | [] | Tool names from MCP servers |
| client_facing | bool | False | Deprecated compatibility field. Queen interactivity is implicit; workers should escalate instead |
| nullable_output_keys | list[str] | [] | Keys that may remain unset |
| max_node_visits | int | 0 | 0=unlimited (default); >1 for one-shot feedback loops |
| max_retries | int | 3 | Retries on failure |
| tools | object | {} | Tool access policy (see below) |
| nullable_output_keys | list | [] | Keys that may remain unset |
| max_node_visits | int | 1 | 0=unlimited (for forever-alive agents) |
| success_criteria | str | "" | Natural language for judge evaluation |
| sub_agents | list | [] | Node IDs that can be invoked as subagents |
| client_facing | bool | false | Whether output is shown to user |
## EdgeSpec Fields
## Tool Access Policies
Each node declares its tools via a policy object:
```json
{"tools": {"policy": "explicit", "allowed": ["web_search", "save_data"]}}
{"tools": {"policy": "all"}}
{"tools": {"policy": "none"}}
```
- `explicit` (default): only named tools. Empty `allowed` = zero tools.
- `all`: all tools from registry (for GCU browser nodes).
- `none`: no tools (for handoff/summary nodes).
## Edge Fields
| Field | Type | Description |
|-------|------|-------------|
| id | str | kebab-case identifier |
| source | str | Source node ID |
| target | str | Target node ID |
| condition | EdgeCondition | ON_SUCCESS, ON_FAILURE, ALWAYS, CONDITIONAL |
| condition_expr | str | Python expression evaluated against memory (for CONDITIONAL) |
| priority | int | Positive=forward (evaluated first), negative=feedback (loop-back) |
| from_node | str | Source node ID |
| to_node | str | Target node ID |
| condition | str | `on_success`, `on_failure`, `always`, `conditional` |
| condition_expr | str | Python expression for conditional routing |
| priority | int | Higher = evaluated first |
condition_expr examples:
- `"needs_more_research == True"`
- `"str(next_action).lower() == 'revise'"`
## Key Patterns
### STEP 1/STEP 2 (Client-Facing Nodes)
```
**STEP 1 — Respond to the user (text only, NO tool calls):**
[Present information, ask questions]
**STEP 2 — After the user responds, call set_output:**
- set_output("key", "value based on user response")
```
This prevents premature set_output before user interaction.
### Fewer, Richer Nodes (CRITICAL)
**Hard limit: 3-6 nodes for most agents.** Never exceed 6 unless the user
explicitly requests a complex multi-phase pipeline.
**Hard limit: 3-6 nodes for most agents.** Each node boundary serializes
outputs and destroys in-context information. Merge unless:
1. Client-facing boundary (different interaction models)
2. Disjoint tool sets
3. Parallel execution (fan-out branches)
Each node boundary serializes outputs to the shared buffer and **destroys** all
in-context information: tool call results, intermediate reasoning, conversation
history. A research node that searches, fetches, and analyzes in ONE node keeps
all source material in its conversation context. Split across 3 nodes, each
downstream node only sees the serialized summary string.
**Decision framework — merge unless ANY of these apply:**
1. **Client-facing boundary** — Autonomous and client-facing work MUST be
separate nodes (different interaction models)
2. **Disjoint tool sets** — If tools are fundamentally different (e.g., web
search vs database), separate nodes make sense
3. **Parallel execution** — Fan-out branches must be separate nodes
**Red flags that you have too many nodes:**
- A node with 0 tools (pure LLM reasoning) → merge into predecessor/successor
- A node that sets only 1 trivial output → collapse into predecessor
- Multiple consecutive autonomous nodes → combine into one rich node
- A "report" node that presents analysis → merge into the client-facing node
- A "confirm" or "schedule" node that doesn't call any external service → remove
**Typical agent structure (2 nodes):**
**Typical structure (2 nodes):**
```
process (autonomous) ←→ review (queen-mediated)
```
The queen owns intake — she gathers requirements from the user, then
passes structured input via `run_agent_with_input(task)`. When building
the agent, design the entry node's `input_keys` to match what the queen
will provide at run time. Worker agents should NOT have a client-facing
intake node. Mid-execution review/approval should happen through queen
escalation rather than direct worker HITL.
For simpler agents, just 1 autonomous node:
```
process (autonomous) — loops back to itself
process (autonomous) <-> review (queen-mediated)
```
### nullable_output_keys
For inputs that only arrive on certain edges:
```python
research_node = NodeSpec(
input_keys=["brief", "feedback"],
nullable_output_keys=["feedback"], # Only present on feedback edge
max_node_visits=3,
)
```
### Mutually Exclusive Outputs
For routing decisions:
```python
review_node = NodeSpec(
output_keys=["approved", "feedback"],
nullable_output_keys=["approved", "feedback"], # Node sets one or the other
)
```
### Continuous Loop Pattern
Mark the primary event_loop node as terminal: `terminal_nodes=["process"]`.
The node has `output_keys` and can complete when the agent finishes its work.
Use `conversation_mode="continuous"` to preserve context across transitions.
The queen owns intake. Worker agents should NOT have a client-facing intake
node. Mid-execution review should happen through queen escalation.
### set_output
- Synthetic tool injected by framework
- Call separately from real tool calls (separate turn)
- `set_output("key", "value")` stores to the shared buffer
## Edge Conditions
| Condition | When |
|-----------|------|
| ON_SUCCESS | Node completed successfully |
| ON_FAILURE | Node failed |
| ALWAYS | Unconditional |
| CONDITIONAL | condition_expr evaluates to True against memory |
condition_expr examples:
- `"needs_more_research == True"`
- `"str(next_action).lower() == 'new_agent'"`
- `"feedback is not None"`
## Graph Lifecycle
### Graph Lifecycle
| Pattern | terminal_nodes | When |
|---------|---------------|------|
| **Continuous loop** | `["node-with-output-keys"]` | **DEFAULT for all agents** |
| Continuous loop | `["node-with-output-keys"]` | DEFAULT for all agents |
| Linear | `["last-node"]` | One-shot/batch agents |
**Every graph must have at least one terminal node.** Terminal nodes
define where execution ends. For interactive agents that loop continuously,
mark the primary event_loop node as terminal (it has `output_keys` and can
complete at any point). The framework default for `max_node_visits` is 0
(unbounded), so nodes work correctly in continuous loops without explicit
override. Only set `max_node_visits > 0` in one-shot agents with feedback loops.
Every node must have at least one outgoing edge — no dead ends.
Every graph must have at least one terminal node.
## Continuous Conversation Mode
### Continuous Conversation Mode
`conversation_mode` has ONLY two valid states:
- `"continuous"` recommended for interactive agents
- Omit entirely isolated per-node conversations (each node starts fresh)
- `"continuous"` -- recommended (context carries across node transitions)
- Omit entirely -- isolated per-node conversations
**INVALID values** (do NOT use): `"client_facing"`, `"interactive"`,
`"adaptive"`, `"shared"`. These do not exist in the framework.
When `conversation_mode="continuous"`:
- Same conversation thread carries across node transitions
- Layered system prompts: identity (agent-level) + narrative + focus (per-node)
- Transition markers inserted at boundaries
- Compaction happens opportunistically at phase transitions
**INVALID values:** `"client_facing"`, `"interactive"`, `"shared"`.
## loop_config
Only three valid keys:
```python
loop_config = {
"max_iterations": 100, # Max LLM turns per node visit
"max_tool_calls_per_turn": 20, # Max tool calls per LLM response
"max_context_tokens": 32000, # Triggers conversation compaction
```json
{
"max_iterations": 100,
"max_tool_calls_per_turn": 20,
"max_context_tokens": 32000
}
```
**INVALID keys** (do NOT use): `"strategy"`, `"mode"`, `"timeout"`,
`"temperature"`. These are silently ignored or cause errors.
## Data Tools (Spillover)
For large data that exceeds context:
- `save_data(filename, data)` — Write to session data dir
- `load_data(filename, offset, limit)` — Read with pagination
- `list_data_files()` — List files
- `serve_file_to_user(filename, label)` — Clickable file:// URI
- `save_data(filename, data)` -- write to session data dir
- `load_data(filename, offset, limit)` -- read with pagination
- `list_data_files()` -- list files
- `serve_file_to_user(filename, label)` -- clickable file URI
`data_dir` is auto-injected by framework — LLM never sees it.
`data_dir` is auto-injected by framework.
## Fan-Out / Fan-In
Multiple ON_SUCCESS edges from same source parallel execution via asyncio.gather().
- Parallel nodes must have disjoint output_keys
- Only one branch may have client_facing nodes
- Fan-in node gets all outputs in the shared buffer
Multiple `on_success` edges from same source = parallel execution.
Parallel nodes must have disjoint output_keys.
## Judge System
- **Implicit** (default): ACCEPTs when LLM finishes with no tool calls and all required outputs set
- **SchemaJudge**: Validates against Pydantic model
- **Custom**: Implement `evaluate(context) -> JudgeVerdict`
Judge is the SOLE acceptance mechanism — no ad-hoc framework gating.
## Triggers (Timers, Webhooks)
For agents that react to external events, create a `triggers.json` file
in the agent's export directory:
```json
[
{
"id": "daily-check",
"name": "Daily Check",
"trigger_type": "timer",
"trigger_config": {"cron": "0 9 * * *"},
"task": "Run the daily check process"
}
]
```
### Key Fields
- `trigger_type`: `"timer"` or `"webhook"`
- `trigger_config`: `{"cron": "0 9 * * *"}` or `{"interval_minutes": 20}`
- `task`: describes what the worker should do when the trigger fires
- Triggers can also be created/removed at runtime via `set_trigger` / `remove_trigger` queen tools
## Tool Discovery
Do NOT rely on a static tool list — it will be outdated. Always call
`list_agent_tools()` with NO arguments first to see ALL available tools.
Only use `group=` or `output_schema=` as follow-up calls after seeing the
full list.
Always call `list_agent_tools()` first to see available tools.
Do NOT rely on a static tool list.
```
list_agent_tools() # ALWAYS call this first
list_agent_tools(group="gmail", output_schema="full") # then drill into a category
list_agent_tools("exports/my_agent/mcp_servers.json") # specific agent's tools
list_agent_tools() # full summary
list_agent_tools(group="gmail", output_schema="full") # drill into category
```
After building, run `validate_agent_package("{name}")` to check everything at once.
Common tool categories (verify via list_agent_tools):
- **Web**: search, scrape, PDF
- **Data**: save/load/append/list data files, serve to user
- **File**: view, write, replace, diff, list, grep
- **Communication**: email, gmail, slack, telegram
- **CRM**: hubspot, apollo, calcom
- **GitHub**: stargazers, user profiles, repos
- **Vision**: image analysis
- **Time**: current time
After building, run `validate_agent_package("{name}")` to check everything.
@@ -22,10 +22,10 @@ def mock_mode():
@pytest_asyncio.fixture(scope="session")
async def runner(tmp_path_factory, mock_mode):
from framework.runner.runner import AgentRunner
from framework.runner.runner import AgentLoader
storage = tmp_path_factory.mktemp("agent_storage")
r = AgentRunner.load(AGENT_PATH, mock_mode=mock_mode, storage_path=storage)
r = AgentLoader.load(AGENT_PATH, mock_mode=mock_mode, storage_path=storage)
r._setup()
yield r
await r.cleanup_async()
+19 -5
View File
@@ -533,7 +533,9 @@ class CredentialSetupSession:
def load_agent_nodes(agent_path: str | Path) -> list:
"""Load NodeSpec list from an agent's agent.py or agent.json.
"""Load NodeSpec list from an agent directory.
Checks agent.json (declarative) first, then agent.py (legacy).
Args:
agent_path: Path to agent directory.
@@ -542,16 +544,28 @@ def load_agent_nodes(agent_path: str | Path) -> list:
List of NodeSpec objects (empty list if agent can't be loaded).
"""
agent_path = Path(agent_path)
agent_json_file = agent_path / "agent.json"
agent_py = agent_path / "agent.py"
agent_json = agent_path / "agent.json"
if agent_py.exists():
if agent_json_file.exists():
return _load_nodes_from_json_declarative(agent_json_file)
elif agent_py.exists():
return _load_nodes_from_python_agent(agent_path)
elif agent_json.exists():
return _load_nodes_from_json_agent(agent_json)
return []
def _load_nodes_from_json_declarative(agent_json: Path) -> list:
"""Load nodes from a declarative JSON agent."""
try:
from framework.runner.runner import load_agent_config
data = json.loads(agent_json.read_text(encoding="utf-8"))
graph, _ = load_agent_config(data)
return list(graph.nodes)
except Exception:
return []
def _load_nodes_from_python_agent(agent_path: Path) -> list:
"""Load nodes from a Python-based agent."""
import importlib.util
+6 -6
View File
@@ -5,20 +5,20 @@ from framework.graph.context_handoff import ContextHandoff, HandoffContext
from framework.graph.conversation import ConversationStore, Message, NodeConversation
from framework.graph.edge import DEFAULT_MAX_TOKENS, EdgeCondition, EdgeSpec, GraphSpec
from framework.graph.event_loop_node import (
EventLoopNode,
AgentLoop,
JudgeProtocol,
JudgeVerdict,
LoopConfig,
OutputAccumulator,
)
from framework.graph.executor import GraphExecutor
from framework.graph.executor import Orchestrator
from framework.graph.goal import Constraint, Goal, GoalStatus, SuccessCriterion
from framework.graph.node import NodeContext, NodeProtocol, NodeResult, NodeSpec
from framework.graph.worker_agent import (
Activation,
FanOutTag,
FanOutTracker,
WorkerAgent,
NodeWorker,
WorkerCompletion,
WorkerLifecycle,
)
@@ -40,13 +40,13 @@ __all__ = [
"GraphSpec",
"DEFAULT_MAX_TOKENS",
# Executor
"GraphExecutor",
"Orchestrator",
# Conversation
"NodeConversation",
"ConversationStore",
"Message",
# Event Loop
"EventLoopNode",
"AgentLoop",
"LoopConfig",
"OutputAccumulator",
"JudgeProtocol",
@@ -55,7 +55,7 @@ __all__ = [
"ContextHandoff",
"HandoffContext",
# Worker Agent
"WorkerAgent",
"NodeWorker",
"WorkerLifecycle",
"WorkerCompletion",
"Activation",
+20 -4
View File
@@ -16,7 +16,7 @@ from typing import Any
from framework.graph.edge import GraphSpec
from framework.graph.goal import Goal
from framework.graph.node import DataBuffer, NodeContext, NodeProtocol, NodeSpec
from framework.runtime.core import Runtime
from framework.runtime.core import DecisionTracker
@dataclass
@@ -26,7 +26,7 @@ class GraphContext:
graph: GraphSpec
goal: Goal
buffer: DataBuffer
runtime: Runtime
runtime: DecisionTracker
llm: Any # LLMProvider
tools: list[Any] # list[Tool]
tool_executor: Any # Callable
@@ -131,11 +131,27 @@ def _resolve_available_tools(
tools: list[Any],
override_tools: list[Any] | None,
) -> list[Any]:
"""Select tools available to the current node."""
"""Select tools available to the current node.
Respects ``node_spec.tool_access_policy``:
- ``"all"`` -- all tools from the registry (no filtering).
- ``"explicit"`` -- only tools whose name appears in ``node_spec.tools``.
If the list is empty, **no tools** are given (default-deny).
- ``"none"`` -- no tools at all.
"""
if override_tools is not None:
return list(override_tools)
policy = getattr(node_spec, "tool_access_policy", "explicit")
if policy == "none":
return []
if policy == "all":
return list(tools)
# "explicit" (default): only tools named in node_spec.tools.
if not node_spec.tools:
return []
@@ -155,7 +171,7 @@ def _derive_input_data(buffer: DataBuffer, input_keys: list[str]) -> dict[str, A
def build_node_context(
*,
runtime: Runtime,
runtime: DecisionTracker,
node_spec: NodeSpec,
buffer: DataBuffer,
goal: Goal,
@@ -22,7 +22,7 @@ from framework.runner.tool_registry import ToolRegistry
from framework.runtime.event_bus import EventBus
if TYPE_CHECKING:
from framework.graph.event_loop_node import EventLoopNode
from framework.graph.event_loop_node import AgentLoop
logger = logging.getLogger(__name__)
@@ -33,7 +33,7 @@ async def execute_subagent(
task: str,
*,
config: LoopConfig,
event_loop_node_cls: type[EventLoopNode],
event_loop_node_cls: type[AgentLoop],
escalation_receiver_cls: Callable[[], Any],
accumulator: OutputAccumulator | None = None,
event_bus: EventBus | None = None,
@@ -176,19 +176,35 @@ async def execute_subagent(
finally:
registry.pop(escalation_id, None)
# 3. Filter tools for subagent
# 3. Filter tools for subagent (respects tool_access_policy).
policy = getattr(subagent_spec, "tool_access_policy", "explicit")
subagent_tool_names = set(subagent_spec.tools or [])
tool_source = ctx.all_tools if ctx.all_tools else ctx.available_tools
# GCU auto-population
if subagent_spec.node_type == "gcu" and not subagent_tool_names:
if policy == "none":
subagent_tools: list = []
elif policy == "all":
# GCU auto-population (existing behavior) or "all" policy.
subagent_tools = [t for t in tool_source if t.name != "delegate_to_sub_agent"]
elif subagent_spec.node_type == "gcu" and not subagent_tool_names:
# GCU nodes without explicit tools get everything (backward compat).
subagent_tools = [t for t in tool_source if t.name != "delegate_to_sub_agent"]
else:
subagent_tools = [
t
for t in tool_source
if t.name in subagent_tool_names and t.name != "delegate_to_sub_agent"
]
# "explicit" (default): only tools named in subagent_spec.tools.
# Default-deny: empty list means zero tools.
if not subagent_tool_names:
logger.info(
"Subagent '%s' has tool_access_policy='explicit' with no tools declared "
"-- running with zero tools (default-deny).",
agent_id,
)
subagent_tools = []
else:
subagent_tools = [
t
for t in tool_source
if t.name in subagent_tool_names and t.name != "delegate_to_sub_agent"
]
missing = subagent_tool_names - {t.name for t in subagent_tools}
if missing:
@@ -223,7 +223,9 @@ def build_delegate_tool(sub_agents: list[str], node_registry: dict[str, Any]) ->
spec = node_registry.get(agent_id)
if spec:
desc = getattr(spec, "description", "(no description)")
agent_descriptions.append(f"- {agent_id}: {desc}")
tools_list = getattr(spec, "tools", [])
tools_desc = ", ".join(tools_list) if tools_list else "(no tools)"
agent_descriptions.append(f"- {agent_id}: {desc} [tools: {tools_desc}]")
else:
agent_descriptions.append(f"- {agent_id}: (not found in registry)")
+18 -18
View File
@@ -224,7 +224,7 @@ OutputAccumulator = event_loop_types.OutputAccumulator
# ---------------------------------------------------------------------------
class EventLoopNode(NodeProtocol):
class AgentLoop(NodeProtocol):
"""Multi-turn LLM streaming loop with tool execution and judge evaluation.
Lifecycle:
@@ -307,7 +307,7 @@ class EventLoopNode(NodeProtocol):
async def execute(self, ctx: NodeContext) -> NodeResult:
"""Run the event loop."""
logger.debug(
"[EventLoopNode.execute] Starting execution for node=%s, stream=%s",
"[AgentLoop.execute] Starting execution for node=%s, stream=%s",
ctx.node_id,
ctx.stream_id,
)
@@ -320,7 +320,7 @@ class EventLoopNode(NodeProtocol):
# Store skill dirs for AS-9 file-read interception in _execute_tool
self._skill_dirs: list[str] = ctx.skill_dirs
logger.debug(
"[EventLoopNode.execute] node_id=%s, execution_id=%s, max_iterations=%d",
"[AgentLoop.execute] node_id=%s, execution_id=%s, max_iterations=%d",
node_id,
execution_id,
self._config.max_iterations,
@@ -565,11 +565,11 @@ class EventLoopNode(NodeProtocol):
# 6. Main loop
logger.debug(
"[EventLoopNode.execute] Entering main loop, start_iteration=%d", start_iteration
"[AgentLoop.execute] Entering main loop, start_iteration=%d", start_iteration
)
for iteration in range(start_iteration, self._config.max_iterations):
iter_start = time.time()
logger.debug("[EventLoopNode.execute] iteration=%d starting", iteration)
logger.debug("[AgentLoop.execute] iteration=%d starting", iteration)
# 6a. Check pause (no current-iteration data yet — only log_node_complete needed)
if await self._check_pause(ctx, conversation, iteration):
@@ -601,18 +601,18 @@ class EventLoopNode(NodeProtocol):
# 6b. Drain injection queue
logger.debug(
"[EventLoopNode.execute] iteration=%d: draining injection queue...", iteration
"[AgentLoop.execute] iteration=%d: draining injection queue...", iteration
)
drained_injections = await self._drain_injection_queue(conversation, ctx)
logger.debug(
"[EventLoopNode.execute] iteration=%d: drained %d injections",
"[AgentLoop.execute] iteration=%d: drained %d injections",
iteration,
drained_injections,
)
# 6b1. Drain trigger queue (framework-level signals)
drained_triggers = await self._drain_trigger_queue(conversation)
logger.debug(
"[EventLoopNode.execute] iteration=%d: drained %d triggers",
"[AgentLoop.execute] iteration=%d: drained %d triggers",
iteration,
drained_triggers,
)
@@ -743,7 +743,7 @@ class EventLoopNode(NodeProtocol):
len(conversation.messages),
)
logger.debug(
"[EventLoopNode.execute] iteration=%d: entering _run_single_turn loop", iteration
"[AgentLoop.execute] iteration=%d: entering _run_single_turn loop", iteration
)
_stream_retry_count = 0
_turn_cancelled = False
@@ -752,7 +752,7 @@ class EventLoopNode(NodeProtocol):
while True:
try:
logger.debug(
"[EventLoopNode.execute] iteration=%d: calling _run_single_turn (retry=%d)",
"[AgentLoop.execute] iteration=%d: calling _run_single_turn (retry=%d)",
iteration,
_stream_retry_count,
)
@@ -773,7 +773,7 @@ class EventLoopNode(NodeProtocol):
ctx, conversation, tools, iteration, accumulator
)
logger.debug(
"[EventLoopNode.execute] iteration=%d:"
"[AgentLoop.execute] iteration=%d:"
" _run_single_turn completed successfully",
iteration,
)
@@ -842,13 +842,13 @@ class EventLoopNode(NodeProtocol):
break # success — exit retry loop
except TurnCancelled:
logger.debug("[EventLoopNode.execute] iteration=%d: TurnCancelled", iteration)
logger.debug("[AgentLoop.execute] iteration=%d: TurnCancelled", iteration)
_turn_cancelled = True
break
except Exception as e:
logger.debug(
"[EventLoopNode.execute] iteration=%d:"
"[AgentLoop.execute] iteration=%d:"
" Exception in _run_single_turn: %s (%s)",
iteration,
type(e).__name__,
@@ -1988,7 +1988,7 @@ class EventLoopNode(NodeProtocol):
image_content: Optional list of OpenAI-style image blocks to attach.
"""
logger.debug(
"[EventLoopNode.inject_event] content_len=%d,"
"[AgentLoop.inject_event] content_len=%d,"
" is_client_input=%s, has_images=%s,"
" queue_size_before=%d",
len(content) if content else 0,
@@ -1998,15 +1998,15 @@ class EventLoopNode(NodeProtocol):
)
try:
await self._injection_queue.put((content, is_client_input, image_content))
logger.debug("[EventLoopNode.inject_event] Message queued successfully")
logger.debug("[AgentLoop.inject_event] Message queued successfully")
except Exception as e:
logger.exception("[EventLoopNode.inject_event] Failed to queue message: %s", e)
logger.exception("[AgentLoop.inject_event] Failed to queue message: %s", e)
raise
try:
self._input_ready.set()
logger.debug("[EventLoopNode.inject_event] _input_ready.set() called")
logger.debug("[AgentLoop.inject_event] _input_ready.set() called")
except Exception as e:
logger.exception("[EventLoopNode.inject_event] Failed to set _input_ready: %s", e)
logger.exception("[AgentLoop.inject_event] Failed to set _input_ready: %s", e)
raise
async def inject_trigger(self, trigger: TriggerEvent) -> None:
+17 -17
View File
@@ -30,7 +30,7 @@ from framework.graph.node import (
from framework.graph.validator import OutputValidator
from framework.llm.provider import LLMProvider, Tool
from framework.observability import set_trace_context
from framework.runtime.core import Runtime
from framework.runtime.core import DecisionTracker
from framework.schemas.checkpoint import Checkpoint
from framework.storage.checkpoint_store import CheckpointStore
from framework.utils.io import atomic_write
@@ -112,7 +112,7 @@ class ParallelExecutionConfig:
branch_timeout_seconds: float = 300.0
class GraphExecutor:
class Orchestrator:
"""
Executes agent graphs.
@@ -133,7 +133,7 @@ class GraphExecutor:
def __init__(
self,
runtime: Runtime,
runtime: DecisionTracker,
llm: LLMProvider | None = None,
tools: list[Tool] | None = None,
tool_executor: Callable | None = None,
@@ -169,7 +169,7 @@ class GraphExecutor:
Initialize the executor.
Args:
runtime: Runtime for decision logging
runtime: DecisionTracker for decision logging
llm: LLM provider for LLM nodes
tools: Available tools
tool_executor: Function to execute tools
@@ -206,7 +206,7 @@ class GraphExecutor:
self.validator = OutputValidator()
self.logger = logging.getLogger(__name__)
self.logger.debug(
"[GraphExecutor.__init__] Created with"
"[Orchestrator.__init__] Created with"
" stream_id=%s, execution_id=%s,"
" initial node_registry keys: %s",
stream_id,
@@ -745,11 +745,11 @@ class GraphExecutor:
# Check registry first
if node_spec.id in self.node_registry:
logger.debug(
"[GraphExecutor._get_node_implementation] Found node '%s' in registry", node_spec.id
"[Orchestrator._get_node_implementation] Found node '%s' in registry", node_spec.id
)
return self.node_registry[node_spec.id]
logger.debug(
"[GraphExecutor._get_node_implementation]"
"[Orchestrator._get_node_implementation]"
" Node '%s' not in registry (keys: %s),"
" creating new",
node_spec.id,
@@ -776,7 +776,7 @@ class GraphExecutor:
if node_spec.node_type in ("event_loop", "gcu"):
# Auto-create EventLoopNode with sensible defaults.
# Custom configs can still be pre-registered via node_registry.
from framework.graph.event_loop_node import EventLoopNode, LoopConfig
from framework.graph.event_loop_node import AgentLoop, LoopConfig
# Create a FileConversationStore if a storage path is available
conv_store = None
@@ -802,7 +802,7 @@ class GraphExecutor:
lc = self._loop_config
default_max_iter = 100 if node_spec.supports_direct_user_io() else 50
node = EventLoopNode(
node = AgentLoop(
event_bus=self._event_bus,
judge=None, # implicit judge: accept when output_keys are filled
config=LoopConfig(
@@ -821,7 +821,7 @@ class GraphExecutor:
# Cache so inject_event() is reachable for queen interaction and escalation routing
self.node_registry[node_spec.id] = node
logger.debug(
"[GraphExecutor._get_node_implementation]"
"[Orchestrator._get_node_implementation]"
" Cached node '%s' in node_registry,"
" registry now has keys: %s",
node_spec.id,
@@ -1007,10 +1007,10 @@ class GraphExecutor:
branch_impl = self._get_node_implementation(node_spec, graph.cleanup_llm_model)
effective_max_retries = node_spec.max_retries
# Only override for actual EventLoopNode instances, not custom NodeProtocol impls
from framework.graph.event_loop_node import EventLoopNode
# Only override for actual AgentLoop instances, not custom NodeProtocol impls
from framework.graph.event_loop_node import AgentLoop as _AgentLoop # noqa: F811
if isinstance(branch_impl, EventLoopNode) and effective_max_retries > 1:
if isinstance(branch_impl, _AgentLoop) and effective_max_retries > 1:
self.logger.warning(
f"EventLoopNode '{node_spec.id}' has "
f"max_retries={effective_max_retries}. Overriding "
@@ -1305,7 +1305,7 @@ class GraphExecutor:
from framework.graph.worker_agent import (
Activation,
FanOutTag,
WorkerAgent,
NodeWorker,
WorkerCompletion,
WorkerLifecycle,
)
@@ -1352,9 +1352,9 @@ class GraphExecutor:
)
# Create one WorkerAgent per node
workers: dict[str, WorkerAgent] = {}
workers: dict[str, NodeWorker] = {}
for node_spec in graph.nodes:
workers[node_spec.id] = WorkerAgent(node_spec=node_spec, graph_context=gc)
workers[node_spec.id] = NodeWorker(node_spec=node_spec, graph_context=gc)
# Identify entry workers (graph entry node, not based on edge count)
# A node can be the entry point AND have incoming feedback edges.
@@ -1455,7 +1455,7 @@ class GraphExecutor:
def _route_activation(
activation: Activation,
workers_map: dict[str, WorkerAgent],
workers_map: dict[str, NodeWorker],
pending_tasks_map: dict[str, asyncio.Task],
*,
has_event_subscription: bool,
+11 -2
View File
@@ -25,7 +25,7 @@ from typing import Any
from pydantic import BaseModel, Field
from framework.llm.provider import LLMProvider, Tool
from framework.runtime.core import Runtime
from framework.runtime.core import DecisionTracker
logger = logging.getLogger(__name__)
@@ -144,6 +144,15 @@ class NodeSpec(BaseModel):
# For LLM nodes
system_prompt: str | None = Field(default=None, description="System prompt for LLM nodes")
tools: list[str] = Field(default_factory=list, description="Tool names this node can use")
tool_access_policy: str = Field(
default="explicit",
description=(
"Tool access policy for this node. "
"'all' = all tools from registry, "
"'explicit' = only tools listed in `tools` (default, recommended), "
"'none' = no tools at all."
),
)
model: str | None = Field(
default=None, description="Specific model to use (defaults to graph default)"
)
@@ -459,7 +468,7 @@ class NodeContext:
"""
# Core runtime
runtime: Runtime
runtime: DecisionTracker
# Node identity
node_id: str
+4 -4
View File
@@ -109,7 +109,7 @@ class RetryState:
# ---------------------------------------------------------------------------
class WorkerAgent:
class NodeWorker:
"""First-class autonomous worker for one node in the graph.
Lifecycle:
@@ -359,7 +359,7 @@ class WorkerAgent:
# Only skip retries for actual EventLoopNode instances (they handle
# retries internally). Custom NodeProtocol impls registered via
# register_node should be retried by the executor.
from framework.graph.event_loop_node import EventLoopNode as _ELN
from framework.graph.event_loop_node import AgentLoop as _ELN
if isinstance(node_impl, _ELN):
max_retries = 0
@@ -609,7 +609,7 @@ class WorkerAgent:
# Auto-create EventLoopNode
if self.node_spec.node_type in ("event_loop", "gcu"):
from framework.graph.event_loop.types import LoopConfig
from framework.graph.event_loop_node import EventLoopNode
from framework.graph.event_loop_node import AgentLoop
from framework.graph.node import warn_if_deprecated_client_facing
conv_store = None
@@ -623,7 +623,7 @@ class WorkerAgent:
warn_if_deprecated_client_facing(self.node_spec)
default_max_iter = 100 if self.node_spec.supports_direct_user_io() else 50
node = EventLoopNode(
node = AgentLoop(
event_bus=gc.event_bus,
judge=None,
config=LoopConfig(
+12
View File
@@ -0,0 +1,12 @@
"""Host layer -- how agents are triggered and hosted."""
from framework.runtime.agent_runtime import ( # noqa: F401
AgentHost,
AgentRuntimeConfig,
create_agent_runtime,
)
from framework.runtime.event_bus import AgentEvent, EventBus, EventType # noqa: F401
from framework.runtime.execution_stream import ( # noqa: F401
EntryPointSpec,
ExecutionManager,
)
+4
View File
@@ -0,0 +1,4 @@
"""Loader layer -- agent loading from disk (JSON config, MCP, credentials)."""
from framework.runner.runner import AgentLoader # noqa: F401
from framework.runner.tool_registry import ToolRegistry # noqa: F401
+19
View File
@@ -0,0 +1,19 @@
"""Orchestrator layer -- how agents are composed via graphs."""
from framework.graph.context import GraphContext # noqa: F401
from framework.graph.edge import ( # noqa: F401
DEFAULT_MAX_TOKENS,
EdgeCondition,
EdgeSpec,
GraphSpec,
)
from framework.graph.executor import Orchestrator # noqa: F401
from framework.graph.goal import Constraint, Goal, GoalStatus, SuccessCriterion # noqa: F401
from framework.graph.node import ( # noqa: F401
DataBuffer,
NodeContext,
NodeProtocol,
NodeResult,
NodeSpec,
)
from framework.graph.worker_agent import NodeWorker # noqa: F401
+32
View File
@@ -0,0 +1,32 @@
"""Pipeline middleware for the agent runtime.
Stages run in order when :meth:`AgentRuntime.trigger` receives a request.
Each stage can pass the context through, transform the input data, or reject
the request entirely. This is the runtime-level analogue of AstrBot's
pipeline architecture and lets operators compose rate limiting, validation,
cost guards, and custom pre/post-processing without patching core code.
"""
from framework.pipeline.registry import (
build_pipeline_from_config,
build_stage,
register,
)
from framework.pipeline.runner import PipelineRunner
from framework.pipeline.stage import (
PipelineContext,
PipelineRejectedError,
PipelineResult,
PipelineStage,
)
__all__ = [
"PipelineContext",
"PipelineRejectedError",
"PipelineResult",
"PipelineRunner",
"PipelineStage",
"build_pipeline_from_config",
"build_stage",
"register",
]
@@ -0,0 +1,44 @@
"""Execution-level middleware protocol.
Unlike :class:`PipelineStage` (which gates ``AgentHost.trigger()`` at the
request level), execution middleware runs at the start of **every** execution
attempt inside ``ExecutionManager._run_execution()`` -- including resurrection
retries.
Use this for concerns that must re-evaluate per attempt:
- Cost tracking (charge per attempt, not per trigger)
- Tool scoping (different tools on retry)
- Checkpoint config overrides
- Per-execution logging/tracing setup
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any
@dataclass
class ExecutionContext:
"""Context passed to execution middleware."""
execution_id: str
stream_id: str
run_id: str
input_data: dict[str, Any]
session_state: dict[str, Any] | None = None
attempt: int = 1
metadata: dict[str, Any] = field(default_factory=dict)
class ExecutionMiddleware(ABC):
"""Base class for per-execution middleware."""
@abstractmethod
async def on_execution_start(self, ctx: ExecutionContext) -> ExecutionContext:
"""Called before each execution attempt (including resurrections).
Modify and return *ctx* to transform execution parameters.
Raise to abort the execution.
"""
+103
View File
@@ -0,0 +1,103 @@
"""Pipeline stage registry -- maps type names to stage classes.
Stages self-register via the ``@register`` decorator. The
``build_pipeline_from_config`` function reads a declarative config
(from ``~/.hive/configuration.json`` or ``agent.json``) and
instantiates the corresponding stage objects.
Example config::
{
"pipeline": {
"stages": [
{"type": "rate_limit", "order": 200, "config": {"max_requests_per_minute": 60}},
{"type": "cost_guard", "order": 300, "config": {"max_cost_per_request": 0.50}}
]
}
}
"""
from __future__ import annotations
import logging
from typing import Any
from framework.pipeline.runner import PipelineRunner
from framework.pipeline.stage import PipelineStage
logger = logging.getLogger(__name__)
_STAGE_REGISTRY: dict[str, type[PipelineStage]] = {}
def register(name: str):
"""Decorator to register a pipeline stage class by type name.
Usage::
@register("rate_limit")
class RateLimitStage(PipelineStage):
...
"""
def decorator(cls: type[PipelineStage]) -> type[PipelineStage]:
_STAGE_REGISTRY[name] = cls
return cls
return decorator
def get_registered_stages() -> dict[str, type[PipelineStage]]:
"""Return a copy of the stage registry."""
return dict(_STAGE_REGISTRY)
def build_stage(spec: dict[str, Any]) -> PipelineStage:
"""Instantiate a single stage from a config spec.
Args:
spec: Dict with ``type`` (required), ``order`` (optional),
and ``config`` (optional kwargs dict).
Raises:
KeyError: If the stage type is not registered.
"""
stage_type = spec["type"]
if stage_type not in _STAGE_REGISTRY:
available = ", ".join(sorted(_STAGE_REGISTRY)) or "(none)"
raise KeyError(
f"Unknown pipeline stage type '{stage_type}'. "
f"Available: {available}"
)
cls = _STAGE_REGISTRY[stage_type]
config = spec.get("config", {})
stage = cls(**config)
if "order" in spec:
stage.order = spec["order"]
return stage
def build_pipeline_from_config(
stages_config: list[dict[str, Any]],
) -> PipelineRunner:
"""Build a ``PipelineRunner`` from a declarative stages list.
Each entry is ``{"type": "...", "order": N, "config": {...}}``.
"""
# Import built-in stages so they self-register
_ensure_builtins_registered()
stages = [build_stage(s) for s in stages_config]
return PipelineRunner(stages)
def _ensure_builtins_registered() -> None:
"""Import built-in stage modules so their ``@register`` decorators fire."""
if _STAGE_REGISTRY:
return # already populated
try:
import framework.pipeline.stages.rate_limit # noqa: F401
import framework.pipeline.stages.input_validation # noqa: F401
import framework.pipeline.stages.cost_guard # noqa: F401
except ImportError:
pass
+75
View File
@@ -0,0 +1,75 @@
"""Pipeline runner -- executes registered stages in order."""
from __future__ import annotations
import logging
from typing import Any
from framework.pipeline.stage import (
PipelineContext,
PipelineRejectedError,
PipelineStage,
)
logger = logging.getLogger(__name__)
class PipelineRunner:
"""Executes a list of :class:`PipelineStage` instances in ``order``.
The runner is the orchestration layer that :class:`AgentRuntime` calls
on every trigger. Stages execute in ascending ``order`` (ties broken by
registration order). A stage returning ``reject`` short-circuits the
pipeline and causes the trigger to raise :class:`PipelineRejectedError`.
"""
def __init__(self, stages: list[PipelineStage] | None = None) -> None:
self._stages: list[PipelineStage] = sorted(stages or [], key=lambda s: s.order)
@property
def stages(self) -> list[PipelineStage]:
return list(self._stages)
def add_stage(self, stage: PipelineStage) -> None:
"""Add a stage after construction (for dynamic registration)."""
self._stages.append(stage)
self._stages.sort(key=lambda s: s.order)
async def initialize_all(self) -> None:
"""Call ``initialize`` on every registered stage."""
for stage in self._stages:
await stage.initialize()
async def run(self, ctx: PipelineContext) -> PipelineContext:
"""Run all stages. Raises ``PipelineRejectedError`` on rejection.
Returns the (possibly transformed) context.
"""
for stage in self._stages:
result = await stage.process(ctx)
stage_name = stage.__class__.__name__
if result.action == "reject":
reason = result.rejection_reason or "(no reason given)"
logger.warning("Pipeline rejected by %s: %s", stage_name, reason)
raise PipelineRejectedError(stage_name, reason)
if result.action == "transform" and result.input_data is not None:
ctx.input_data = result.input_data
return ctx
async def run_post(self, ctx: PipelineContext, result: Any) -> Any:
"""Run all stages' ``post_process`` hooks in order.
Each stage can transform the result; the final value is returned.
Exceptions are logged and swallowed -- post-processing must not
break a successful execution.
"""
current = result
for stage in self._stages:
try:
current = await stage.post_process(ctx, current)
except Exception:
logger.exception(
"Pipeline post_process raised in %s; continuing with previous result",
stage.__class__.__name__,
)
return current
+69
View File
@@ -0,0 +1,69 @@
"""Pipeline stage base class and request/response types."""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Literal
class PipelineRejectedError(Exception):
"""Raised by ``AgentRuntime.trigger`` when a stage rejects the request."""
def __init__(self, stage_name: str, reason: str) -> None:
super().__init__(f"Pipeline rejected by {stage_name}: {reason}")
self.stage_name = stage_name
self.reason = reason
@dataclass
class PipelineContext:
"""Carries request data through the pipeline.
Stages can mutate ``metadata`` to pass information downstream -- e.g. a
cost-estimation stage might write ``metadata["estimated_cost"]`` so a
budget-guard stage later in the chain can reject requests over budget.
"""
entry_point_id: str
input_data: dict[str, Any]
correlation_id: str | None = None
session_state: dict[str, Any] | None = None
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class PipelineResult:
"""Outcome of a stage's ``process`` call."""
action: Literal["continue", "reject", "transform"] = "continue"
input_data: dict[str, Any] | None = None
rejection_reason: str | None = None
class PipelineStage(ABC):
"""Base class for all middleware stages.
Subclasses implement :meth:`process`. The class attribute ``order``
controls stage ordering (lower runs first; 100 is the default).
Stages may also implement :meth:`initialize` for one-time setup and
:meth:`post_process` to decorate the execution result.
"""
order: int = 100
async def initialize(self) -> None:
"""Called once when the runtime starts."""
return None
@abstractmethod
async def process(self, ctx: PipelineContext) -> PipelineResult:
"""Process the incoming request.
Return a :class:`PipelineResult` with action ``continue``, ``reject``,
or ``transform``.
"""
async def post_process(self, ctx: PipelineContext, result: Any) -> Any:
"""Optional post-execution hook. Default: pass-through."""
return result
@@ -0,0 +1,11 @@
"""Built-in pipeline stages."""
from framework.pipeline.stages.cost_guard import CostGuardStage
from framework.pipeline.stages.input_validation import InputValidationStage
from framework.pipeline.stages.rate_limit import RateLimitStage
__all__ = [
"CostGuardStage",
"InputValidationStage",
"RateLimitStage",
]
@@ -0,0 +1,35 @@
"""Cost guard stage -- reject requests over a pre-flight budget."""
from __future__ import annotations
from framework.pipeline.registry import register
from framework.pipeline.stage import PipelineContext, PipelineResult, PipelineStage
@register("cost_guard")
class CostGuardStage(PipelineStage):
"""Reject requests whose estimated cost exceeds the per-request budget.
The cost estimate must be populated in ``ctx.metadata["estimated_cost"]``
by an earlier stage (or by the caller). When no estimate is present,
the stage passes through.
"""
order = 300
def __init__(self, max_cost_per_request: float = 1.0) -> None:
self._budget = max_cost_per_request
async def process(self, ctx: PipelineContext) -> PipelineResult:
estimated = ctx.metadata.get("estimated_cost")
if estimated is None:
return PipelineResult(action="continue")
if estimated > self._budget:
return PipelineResult(
action="reject",
rejection_reason=(
f"Estimated cost ${estimated:.4f} exceeds budget "
f"${self._budget:.4f}"
),
)
return PipelineResult(action="continue")
@@ -0,0 +1,47 @@
"""Input validation stage.
Rejects requests whose ``input_data`` does not match the entry point's
declared input schema. Uses a user-provided schema map:
``{entry_point_id: {required_key: expected_type, ...}}``.
"""
from __future__ import annotations
from framework.pipeline.registry import register
from framework.pipeline.stage import PipelineContext, PipelineResult, PipelineStage
@register("input_validation")
class InputValidationStage(PipelineStage):
"""Validate ``input_data`` against per-entry-point schemas.
The schema is a simple dict mapping key -> expected Python type.
For richer validation, substitute a Pydantic-based stage.
"""
order = 100
def __init__(self, schemas: dict[str, dict[str, type]] | None = None) -> None:
self._schemas = schemas or {}
async def process(self, ctx: PipelineContext) -> PipelineResult:
schema = self._schemas.get(ctx.entry_point_id)
if not schema:
return PipelineResult(action="continue")
for key, expected_type in schema.items():
if key not in ctx.input_data:
return PipelineResult(
action="reject",
rejection_reason=f"Missing required input key: '{key}'",
)
value = ctx.input_data[key]
if not isinstance(value, expected_type):
return PipelineResult(
action="reject",
rejection_reason=(
f"Input key '{key}' has type {type(value).__name__}, "
f"expected {expected_type.__name__}"
),
)
return PipelineResult(action="continue")
@@ -0,0 +1,44 @@
"""Per-(entry-point, session) rate limiting stage."""
from __future__ import annotations
import time
from collections import defaultdict
from framework.pipeline.registry import register
from framework.pipeline.stage import PipelineContext, PipelineResult, PipelineStage
@register("rate_limit")
class RateLimitStage(PipelineStage):
"""Reject requests that exceed ``max_requests_per_minute`` per session.
The key is ``<entry_point_id>:<session_id>``. When no session_id is
present in ``session_state``, a single shared "default" bucket is used.
"""
order = 200
def __init__(self, max_requests_per_minute: int = 60) -> None:
self._max_rpm = max_requests_per_minute
self._timestamps: dict[str, list[float]] = defaultdict(list)
async def process(self, ctx: PipelineContext) -> PipelineResult:
session_id = "default"
if ctx.session_state:
session_id = str(ctx.session_state.get("session_id", "default"))
key = f"{ctx.entry_point_id}:{session_id}"
now = time.monotonic()
# Prune entries older than 60s.
self._timestamps[key] = [t for t in self._timestamps[key] if now - t < 60.0]
if len(self._timestamps[key]) >= self._max_rpm:
return PipelineResult(
action="reject",
rejection_reason=(
f"Rate limit exceeded: {self._max_rpm} req/min "
f"for session '{session_id}'"
),
)
self._timestamps[key].append(now)
return PipelineResult(action="continue")
+2 -2
View File
@@ -8,12 +8,12 @@ from framework.runner.protocol import (
MessageType,
OrchestratorResult,
)
from framework.runner.runner import AgentInfo, AgentRunner, ValidationResult
from framework.runner.runner import AgentInfo, AgentLoader, ValidationResult
from framework.runner.tool_registry import ToolRegistry, tool
__all__ = [
# Single agent
"AgentRunner",
"AgentLoader",
"AgentInfo",
"ValidationResult",
"ToolRegistry",
+37 -19
View File
@@ -19,7 +19,7 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
run_parser.add_argument(
"agent_path",
type=str,
help="Path to agent folder (containing agent.json)",
help="Path to agent folder (containing agent.json or agent.py)",
)
run_parser.add_argument(
"--input",
@@ -87,7 +87,7 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
info_parser.add_argument(
"agent_path",
type=str,
help="Path to agent folder (containing agent.json)",
help="Path to agent folder (containing agent.json or agent.py)",
)
info_parser.add_argument(
"--json",
@@ -105,7 +105,7 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
validate_parser.add_argument(
"agent_path",
type=str,
help="Path to agent folder (containing agent.json)",
help="Path to agent folder (containing agent.json or agent.py)",
)
validate_parser.set_defaults(func=cmd_validate)
@@ -310,7 +310,7 @@ def _prompt_before_start(agent_path: str, runner, model: str | None = None):
Updated runner if user proceeds, None if user aborts.
"""
from framework.credentials.setup import CredentialSetupSession
from framework.runner import AgentRunner
from framework.runner import AgentLoader
while True:
print()
@@ -328,7 +328,7 @@ def _prompt_before_start(agent_path: str, runner, model: str | None = None):
if result.success:
# Reload runner with updated credentials
try:
runner = AgentRunner.load(agent_path, model=model)
runner = AgentLoader.load(agent_path, model=model)
except Exception as e:
print(f"Error reloading agent: {e}")
return None
@@ -342,7 +342,7 @@ def cmd_run(args: argparse.Namespace) -> int:
from framework.credentials.models import CredentialError
from framework.observability import configure_logging
from framework.runner import AgentRunner
from framework.runner import AgentLoader
# Set logging level (quiet by default for cleaner output)
if args.quiet:
@@ -390,7 +390,7 @@ def cmd_run(args: argparse.Namespace) -> int:
# Standard execution
# AgentRunner handles credential setup interactively when stdin is a TTY.
try:
runner = AgentRunner.load(
runner = AgentLoader.load(
args.agent_path,
model=args.model,
)
@@ -528,10 +528,10 @@ def cmd_run(args: argparse.Namespace) -> int:
def cmd_info(args: argparse.Namespace) -> int:
"""Show agent information."""
from framework.credentials.models import CredentialError
from framework.runner import AgentRunner
from framework.runner import AgentLoader
try:
runner = AgentRunner.load(args.agent_path)
runner = AgentLoader.load(args.agent_path)
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return 1
@@ -595,10 +595,10 @@ def cmd_info(args: argparse.Namespace) -> int:
def cmd_validate(args: argparse.Namespace) -> int:
"""Validate an exported agent."""
from framework.credentials.models import CredentialError
from framework.runner import AgentRunner
from framework.runner import AgentLoader
try:
runner = AgentRunner.load(args.agent_path)
runner = AgentLoader.load(args.agent_path)
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return 1
@@ -632,7 +632,7 @@ def cmd_validate(args: argparse.Namespace) -> int:
def cmd_list(args: argparse.Namespace) -> int:
"""List available agents."""
from framework.runner import AgentRunner
from framework.runner import AgentLoader
directory = Path(args.directory)
if not directory.exists():
@@ -644,7 +644,7 @@ def cmd_list(args: argparse.Namespace) -> int:
for path in directory.iterdir():
if _is_valid_agent_dir(path):
try:
runner = AgentRunner.load(path)
runner = AgentLoader.load(path)
info = runner.info()
agents.append(
{
@@ -775,7 +775,7 @@ def cmd_shell(args: argparse.Namespace) -> int:
from framework.credentials.models import CredentialError
from framework.observability import configure_logging
from framework.runner import AgentRunner
from framework.runner import AgentLoader
configure_logging(level="INFO")
@@ -789,7 +789,7 @@ def cmd_shell(args: argparse.Namespace) -> int:
return 1
try:
runner = AgentRunner.load(agent_path)
runner = AgentLoader.load(agent_path)
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return 1
@@ -1004,17 +1004,35 @@ def _get_framework_agents_dir() -> Path:
def _extract_python_agent_metadata(agent_path: Path) -> tuple[str, str]:
"""Extract name and description from a Python-based agent's config.py.
"""Extract name and description from an agent directory.
Uses AST parsing to safely extract values without executing code.
Checks agent.json first (declarative), then falls back to config.py
(legacy Python). Uses AST parsing for Python to avoid executing code.
Returns (name, description) tuple, with fallbacks if parsing fails.
"""
import ast
config_path = agent_path / "config.py"
fallback_name = agent_path.name.replace("_", " ").title()
fallback_desc = "(Python-based agent)"
# Declarative agent: read from agent.json
agent_json = agent_path / "agent.json"
if agent_json.exists():
try:
import json
data = json.loads(agent_json.read_text(encoding="utf-8"))
if isinstance(data, dict):
name = data.get("name", fallback_name)
# Convert kebab-case to Title Case for display
if "-" in name and " " not in name:
name = name.replace("-", " ").title()
desc = data.get("description", fallback_desc)
return name, desc
except Exception:
pass
config_path = agent_path / "config.py"
if not config_path.exists():
return fallback_name, fallback_desc
@@ -1083,7 +1101,7 @@ def _is_valid_agent_dir(path: Path) -> bool:
def _has_agents(directory: Path) -> bool:
"""Check if a directory contains any valid agents (folders with agent.json or agent.py)."""
"""Check if a directory contains any valid agents."""
if not directory.exists():
return False
return any(_is_valid_agent_dir(p) for p in directory.iterdir())
+186 -18
View File
@@ -25,7 +25,7 @@ from framework.graph.node import NodeSpec
from framework.llm.provider import LLMProvider, Tool
from framework.runner.preload_validation import run_preload_validation
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime, AgentRuntimeConfig, create_agent_runtime
from framework.runtime.agent_runtime import AgentHost, AgentRuntimeConfig, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from framework.runtime.runtime_log_store import RuntimeLogStore
from framework.tools.flowchart_utils import generate_fallback_flowchart
@@ -881,6 +881,173 @@ class ValidationResult:
missing_credentials: list[str] = field(default_factory=list)
def _resolve_template_vars(text: str | None, variables: dict[str, str]) -> str | None:
"""Resolve ``{{variable_name}}`` placeholders in *text*."""
if text is None or not variables:
return text
import re
def _replace(m: re.Match) -> str:
key = m.group(1).strip()
return variables.get(key, m.group(0))
return re.sub(r"\{\{(.+?)\}\}", _replace, text)
def load_agent_config(data: str | dict) -> tuple[GraphSpec, Goal]:
"""Load ``GraphSpec`` and ``Goal`` from a declarative :class:`AgentConfig`.
The declarative format uses a ``name`` key at the top level, unlike the
legacy export format which uses ``graph``/``goal`` keys. The runner
auto-detects the format in :meth:`AgentLoader.load`.
Template variables in ``config.variables`` are resolved in all
``system_prompt`` and ``identity_prompt`` fields via ``{{var_name}}``.
Returns:
Tuple of (GraphSpec, Goal)
"""
from framework.graph.edge import EdgeCondition, EdgeSpec
from framework.graph.goal import Constraint, Goal as GoalModel, SuccessCriterion
from framework.schemas.agent_config import AgentConfig
if isinstance(data, str):
data = json.loads(data)
config = AgentConfig.model_validate(data)
tvars = config.variables
# Build Goal
success_criteria = [
SuccessCriterion(
id=f"sc-{i}",
description=sc,
metric="llm_judge",
target="",
)
for i, sc in enumerate(config.goal.success_criteria)
]
constraints = [
Constraint(
id=f"c-{i}",
description=c,
constraint_type="hard",
category="general",
)
for i, c in enumerate(config.goal.constraints)
]
goal = GoalModel(
id=f"{config.name}-goal",
name=config.name,
description=config.goal.description,
success_criteria=success_criteria,
constraints=constraints,
)
# Build nodes
condition_map = {
"always": EdgeCondition.ALWAYS,
"on_success": EdgeCondition.ON_SUCCESS,
"on_failure": EdgeCondition.ON_FAILURE,
"conditional": EdgeCondition.CONDITIONAL,
"llm_decide": EdgeCondition.LLM_DECIDE,
}
nodes = []
for nc in config.nodes:
# Resolve tool access: node-level config -> agent-level fallback
if nc.tools.policy == "explicit" and nc.tools.allowed:
tools_list = nc.tools.allowed
tool_policy = "explicit"
elif nc.tools.policy == "none":
tools_list = []
tool_policy = "none"
elif nc.tools.policy == "all":
tools_list = []
tool_policy = "all"
else:
# Inherit agent-level tool config
if config.tools.policy == "explicit" and config.tools.allowed:
tools_list = config.tools.allowed
else:
tools_list = []
tool_policy = config.tools.policy
node_kwargs: dict = {
"id": nc.id,
"name": nc.name or nc.id,
"description": nc.description or "",
"node_type": nc.node_type,
"system_prompt": _resolve_template_vars(nc.system_prompt, tvars),
"tools": tools_list,
"tool_access_policy": tool_policy,
"sub_agents": nc.sub_agents,
"model": nc.model,
"input_keys": nc.input_keys,
"output_keys": nc.output_keys,
"nullable_output_keys": nc.nullable_output_keys,
"max_iterations": nc.max_iterations,
"success_criteria": nc.success_criteria,
"skip_judge": nc.skip_judge,
}
# Optional fields -- only pass when set (avoids overriding defaults)
if nc.client_facing:
node_kwargs["client_facing"] = nc.client_facing
if nc.max_node_visits != 1:
node_kwargs["max_node_visits"] = nc.max_node_visits
if nc.failure_criteria:
node_kwargs["failure_criteria"] = nc.failure_criteria
if nc.max_retries is not None:
node_kwargs["max_retries"] = nc.max_retries
nodes.append(NodeSpec(**node_kwargs))
# Build edges
edges = []
for i, ec in enumerate(config.edges):
edges.append(
EdgeSpec(
id=f"e-{i}-{ec.from_node}-{ec.to_node}",
source=ec.from_node,
target=ec.to_node,
condition=condition_map.get(ec.condition, EdgeCondition.ON_SUCCESS),
condition_expr=ec.condition_expr,
priority=ec.priority,
input_mapping=ec.input_mapping,
)
)
# Build entry_points dict for GraphSpec
entry_points_dict: dict = {}
if config.entry_points:
for ep in config.entry_points:
entry_points_dict[ep.id] = ep.entry_node or config.entry_node
else:
entry_points_dict = {"default": config.entry_node}
# Build GraphSpec
graph_kwargs: dict = {
"id": f"{config.name}-graph",
"goal_id": goal.id,
"version": config.version,
"entry_node": config.entry_node,
"entry_points": entry_points_dict,
"terminal_nodes": config.terminal_nodes,
"pause_nodes": config.pause_nodes,
"nodes": nodes,
"edges": edges,
"max_tokens": config.max_tokens,
"loop_config": dict(config.loop_config),
"conversation_mode": config.conversation_mode,
"identity_prompt": _resolve_template_vars(
config.identity_prompt, tvars
) or "",
}
graph = GraphSpec(**graph_kwargs)
return graph, goal
def load_agent_export(data: str | dict) -> tuple[GraphSpec, Goal]:
"""
Load GraphSpec and Goal from export_graph() output.
@@ -979,7 +1146,7 @@ def load_agent_export(data: str | dict) -> tuple[GraphSpec, Goal]:
return graph, goal
class AgentRunner:
class AgentLoader:
"""
Loads and runs exported agents with minimal boilerplate.
@@ -991,15 +1158,15 @@ class AgentRunner:
Usage:
# Simple usage
runner = AgentRunner.load("exports/outbound-sales-agent")
runner = AgentLoader.load("exports/outbound-sales-agent")
result = await runner.run({"lead_id": "123"})
# With context manager
async with AgentRunner.load("exports/outbound-sales-agent") as runner:
async with AgentLoader.load("exports/outbound-sales-agent") as runner:
result = await runner.run({"lead_id": "123"})
# With custom tools
runner = AgentRunner.load("exports/outbound-sales-agent")
runner = AgentLoader.load("exports/outbound-sales-agent")
runner.register_tool("my_tool", my_tool_func)
result = await runner.run({"lead_id": "123"})
"""
@@ -1027,7 +1194,7 @@ class AgentRunner:
credential_store: Any | None = None,
):
"""
Initialize the runner (use AgentRunner.load() instead).
Initialize the runner (use AgentLoader.load() instead).
Args:
agent_path: Path to agent folder
@@ -1082,7 +1249,7 @@ class AgentRunner:
self._approval_callback: Callable | None = None
# AgentRuntime — unified execution path for all agents
self._agent_runtime: AgentRuntime | None = None
self._agent_runtime: AgentHost | None = None
# Pre-load validation: structural checks + credentials.
# Fails fast with actionable guidance — no MCP noise on screen.
run_preload_validation(
@@ -1158,7 +1325,7 @@ class AgentRunner:
interactive: bool = True,
skip_credential_validation: bool | None = None,
credential_store: Any | None = None,
) -> "AgentRunner":
) -> "AgentLoader":
"""
Load an agent from an export folder.
@@ -1299,21 +1466,22 @@ class AgentRunner:
runner._agent_skills = agent_skills
return runner
# Fallback: load from agent.json (legacy JSON-based agents)
# Fallback: load from agent.json (declarative config)
agent_json_path = agent_path / "agent.json"
if not agent_json_path.is_file():
raise FileNotFoundError(f"No agent.py or agent.json found in {agent_path}")
with open(agent_json_path, encoding="utf-8") as f:
export_data = f.read()
export_data = agent_json_path.read_text(encoding="utf-8")
if not export_data.strip():
raise ValueError(f"Empty agent export file: {agent_json_path}")
raise ValueError(f"Empty agent.json: {agent_json_path}")
try:
graph, goal = load_agent_export(export_data)
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON in agent export file: {agent_json_path}") from exc
parsed = json.loads(export_data)
graph, goal = load_agent_config(parsed)
logger.info(
"Loaded declarative agent config from agent.json (name=%s)",
parsed.get("name"),
)
# Generate flowchart.json if missing (for legacy JSON-based agents)
generate_fallback_flowchart(graph, goal, agent_path)
@@ -2268,7 +2436,7 @@ class AgentRunner:
# Run synchronous cleanup
self.cleanup()
async def __aenter__(self) -> "AgentRunner":
async def __aenter__(self) -> "AgentLoader":
"""Context manager entry."""
self._setup()
if self._agent_runtime is not None:
+2 -2
View File
@@ -1,5 +1,5 @@
"""Runtime core for agent execution."""
from framework.runtime.core import Runtime
from framework.runtime.core import DecisionTracker
__all__ = ["Runtime"]
__all__ = ["DecisionTracker"]
+79 -18
View File
@@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, Any
from framework.graph.checkpoint_config import CheckpointConfig
from framework.graph.executor import ExecutionResult
from framework.runtime.event_bus import EventBus
from framework.runtime.execution_stream import EntryPointSpec, ExecutionStream
from framework.runtime.execution_stream import EntryPointSpec, ExecutionManager
from framework.runtime.outcome_aggregator import OutcomeAggregator
from framework.runtime.runtime_log_store import RuntimeLogStore
from framework.runtime.shared_state import SharedBufferManager
@@ -30,6 +30,7 @@ if TYPE_CHECKING:
from framework.graph.edge import GraphSpec
from framework.graph.goal import Goal
from framework.llm.provider import LLMProvider, Tool
from framework.pipeline.stage import PipelineStage
from framework.skills.manager import SkillsManagerConfig
logger = logging.getLogger(__name__)
@@ -37,7 +38,7 @@ logger = logging.getLogger(__name__)
@dataclass
class AgentRuntimeConfig:
"""Configuration for AgentRuntime."""
"""Configuration for AgentHost."""
max_concurrent_executions: int = 100
cache_ttl: float = 60.0
@@ -62,14 +63,14 @@ class _GraphRegistration:
graph: "GraphSpec"
goal: "Goal"
entry_points: dict[str, EntryPointSpec]
streams: dict[str, ExecutionStream] # ep_id -> stream (NOT namespaced)
streams: dict[str, ExecutionManager] # ep_id -> stream (NOT namespaced)
storage_subpath: str # relative to session root, e.g. "graphs/email_agent"
event_subscriptions: list[str] = field(default_factory=list)
timer_tasks: list[asyncio.Task] = field(default_factory=list)
timer_next_fire: dict[str, float] = field(default_factory=dict)
class AgentRuntime:
class AgentHost:
"""
Top-level runtime that manages agent lifecycle and concurrent executions.
@@ -142,6 +143,7 @@ class AgentRuntime:
skills_catalog_prompt: str = "",
protocols_prompt: str = "",
skill_dirs: list[str] | None = None,
pipeline_stages: "list[PipelineStage] | None" = None,
):
"""
Initialize agent runtime.
@@ -171,6 +173,7 @@ class AgentRuntime:
skills_catalog_prompt: Deprecated. Pre-rendered skills catalog.
protocols_prompt: Deprecated. Pre-rendered operational protocols.
"""
from framework.pipeline.runner import PipelineRunner
from framework.skills.manager import SkillsManager
self.graph = graph
@@ -180,6 +183,14 @@ class AgentRuntime:
self._checkpoint_config = checkpoint_config
self.accounts_prompt = accounts_prompt
# Pipeline middleware: runs before every trigger() dispatch.
# Accepts either pre-built stage objects or loads from config.
if pipeline_stages:
self._pipeline = PipelineRunner(pipeline_stages)
else:
self._pipeline = self._load_pipeline_from_config()
# --- Skill lifecycle: runtime owns the SkillsManager ---
if skills_manager_config is not None:
# New path: config-driven, runtime handles loading
@@ -251,7 +262,7 @@ class AgentRuntime:
# Entry points and streams (primary graph)
self._entry_points: dict[str, EntryPointSpec] = {}
self._streams: dict[str, ExecutionStream] = {}
self._streams: dict[str, ExecutionManager] = {}
# Webhook server (created on start if webhook_routes configured)
self._webhook_server: Any = None
@@ -275,7 +286,7 @@ class AgentRuntime:
self.intro_message: str = ""
# ------------------------------------------------------------------
# Skill prompt accessors (read by ExecutionStream constructors)
# Skill prompt accessors (read by ExecutionManager constructors)
# ------------------------------------------------------------------
@property
@@ -342,7 +353,7 @@ class AgentRuntime:
# Create streams for each entry point
for ep_id, spec in self._entry_points.items():
stream = ExecutionStream(
stream = ExecutionManager(
stream_id=ep_id,
entry_spec=spec,
graph=self.graph,
@@ -790,9 +801,15 @@ class AgentRuntime:
timer_next_fire=self._timer_next_fire,
)
# Start skill hot-reload watcher (no-op if watchfiles not installed)
await self._skills_manager.start_watching()
# Initialize pipeline stages (one-time setup)
await self._pipeline.initialize_all()
self._running = True
self._timers_paused = False
logger.info(f"AgentRuntime started with {len(self._streams)} streams")
logger.info(f"AgentHost started with {len(self._streams)} streams")
async def stop(self) -> None:
"""Stop the agent runtime and all streams."""
@@ -827,11 +844,14 @@ class AgentRuntime:
self._streams.clear()
self._graphs.clear()
# Stop skill hot-reload watcher
await self._skills_manager.stop_watching()
# Stop storage
await self._storage.stop()
self._running = False
logger.info("AgentRuntime stopped")
logger.info("AgentHost stopped")
def pause_timers(self) -> None:
"""Pause all timer-driven entry points.
@@ -850,7 +870,7 @@ class AgentRuntime:
self,
entry_point_id: str,
graph_id: str | None = None,
) -> ExecutionStream | None:
) -> ExecutionManager | None:
"""Find the stream for an entry point, searching the active graph first.
Lookup order:
@@ -874,6 +894,32 @@ class AgentRuntime:
# Primary graph (also stored in self._streams)
return self._streams.get(entry_point_id)
@staticmethod
def _load_pipeline_from_config():
"""Build pipeline from ``~/.hive/configuration.json`` ``pipeline`` key.
Returns an empty pipeline if no config is set.
"""
from framework.config import get_hive_config
from framework.pipeline.registry import build_pipeline_from_config
from framework.pipeline.runner import PipelineRunner
config = get_hive_config()
stages_config = config.get("pipeline", {}).get("stages", [])
if not stages_config:
return PipelineRunner([])
return build_pipeline_from_config(stages_config)
async def _reload_pipeline(self) -> None:
"""Hot-reload pipeline from config. Atomic swap."""
new_pipeline = self._load_pipeline_from_config()
await new_pipeline.initialize_all()
self._pipeline = new_pipeline
logger.info(
"Pipeline reloaded: %d stages",
len(new_pipeline.stages),
)
def _prune_idempotency_keys(self) -> None:
"""Prune expired idempotency keys based on TTL and max size."""
ttl = self._config.idempotency_ttl_seconds
@@ -924,7 +970,7 @@ class AgentRuntime:
RuntimeError: If runtime not running
"""
if not self._running:
raise RuntimeError("AgentRuntime is not running")
raise RuntimeError("AgentHost is not running")
# Idempotency check: return cached execution_id for duplicate keys.
if idempotency_key is not None:
@@ -938,6 +984,21 @@ class AgentRuntime:
)
return cached
# Run pipeline middleware (rate limiting, validation, cost guards, ...)
# Raises PipelineRejectedError if any stage rejects.
if self._pipeline.stages:
from framework.pipeline.stage import PipelineContext
pipeline_ctx = PipelineContext(
entry_point_id=entry_point_id,
input_data=input_data,
correlation_id=correlation_id,
session_state=session_state,
)
pipeline_ctx = await self._pipeline.run(pipeline_ctx)
# Stages may have transformed the input_data.
input_data = pipeline_ctx.input_data
stream = self._resolve_stream(entry_point_id, graph_id)
if stream is None:
raise ValueError(f"Entry point '{entry_point_id}' not found")
@@ -1032,9 +1093,9 @@ class AgentRuntime:
graph_log_store = RuntimeLogStore(graph_base / "runtime_logs")
# Create streams for each entry point
streams: dict[str, ExecutionStream] = {}
streams: dict[str, ExecutionManager] = {}
for ep_id, spec in entry_points.items():
stream = ExecutionStream(
stream = ExecutionManager(
stream_id=f"{graph_id}::{ep_id}",
entry_spec=spec,
graph=graph,
@@ -1501,7 +1562,7 @@ class AgentRuntime:
# Search primary graph's streams for an active session.
# Skip isolated streams — they have their own session directories
# and must never be used as a shared session.
all_streams: list[tuple[str, ExecutionStream]] = []
all_streams: list[tuple[str, ExecutionManager]] = []
for _gid, reg in self._graphs.items():
for ep_id, stream in reg.streams.items():
# Skip isolated entry points — they run in their own namespace
@@ -1662,7 +1723,7 @@ class AgentRuntime:
return max(0.0, mono - time.monotonic())
return None
def get_stream(self, entry_point_id: str) -> ExecutionStream | None:
def get_stream(self, entry_point_id: str) -> ExecutionManager | None:
"""Get a specific execution stream."""
return self._streams.get(entry_point_id)
@@ -1851,9 +1912,9 @@ def create_agent_runtime(
skills_catalog_prompt: str = "",
protocols_prompt: str = "",
skill_dirs: list[str] | None = None,
) -> AgentRuntime:
) -> AgentHost:
"""
Create and configure an AgentRuntime with entry points.
Create and configure an AgentHost with entry points.
Convenience factory that creates runtime and registers entry points.
Runtime logging is enabled by default for observability.
@@ -1895,7 +1956,7 @@ def create_agent_runtime(
storage_path_obj = Path(storage_path) if isinstance(storage_path, str) else storage_path
runtime_log_store = RuntimeLogStore(storage_path_obj / "runtime_logs")
runtime = AgentRuntime(
runtime = AgentHost(
graph=graph,
goal=goal,
storage_path=storage_path,
+1 -1
View File
@@ -21,7 +21,7 @@ from framework.storage.concurrent import ConcurrentStorage
logger = logging.getLogger(__name__)
class Runtime:
class DecisionTracker:
"""
The runtime environment that agents execute within.
+26 -11
View File
@@ -19,10 +19,10 @@ from datetime import datetime
from typing import TYPE_CHECKING, Any
from framework.graph.checkpoint_config import CheckpointConfig
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.graph.executor import ExecutionResult, Orchestrator
from framework.runtime.event_bus import EventBus
from framework.runtime.shared_state import IsolationLevel, SharedBufferManager
from framework.runtime.stream_runtime import StreamRuntime, StreamRuntimeAdapter
from framework.runtime.stream_runtime import StreamDecisionTracker, StreamRuntimeAdapter
if TYPE_CHECKING:
from framework.graph.edge import GraphSpec
@@ -133,7 +133,7 @@ class ExecutionContext:
status: str = "pending" # pending, running, completed, failed, paused
class ExecutionStream:
class ExecutionManager:
"""
Manages concurrent executions for a single entry point.
@@ -196,6 +196,7 @@ class ExecutionStream:
colony_worker_sessions_dir: Any = None,
colony_recall_cache: dict[str, str] | None = None,
colony_reflect_llm: Any = None,
execution_middleware: list | None = None,
):
"""
Initialize execution stream.
@@ -255,6 +256,7 @@ class ExecutionStream:
self._colony_worker_sessions_dir = colony_worker_sessions_dir
self._colony_recall_cache = colony_recall_cache
self._colony_reflect_llm = colony_reflect_llm
self._execution_middleware = execution_middleware or []
_es_logger = logging.getLogger(__name__)
if protocols_prompt:
@@ -270,7 +272,7 @@ class ExecutionStream:
)
# Create stream-scoped runtime
self._runtime = StreamRuntime(
self._runtime = StreamDecisionTracker(
stream_id=stream_id,
storage=storage,
outcome_aggregator=outcome_aggregator,
@@ -279,7 +281,7 @@ class ExecutionStream:
# Execution tracking
self._active_executions: dict[str, ExecutionContext] = {}
self._execution_tasks: dict[str, asyncio.Task] = {}
self._active_executors: dict[str, GraphExecutor] = {}
self._active_executors: dict[str, Orchestrator] = {}
self._cancel_reasons: dict[str, str] = {}
self._execution_results: OrderedDict[str, ExecutionResult] = OrderedDict()
self._execution_result_times: dict[str, float] = {}
@@ -704,13 +706,26 @@ class ExecutionStream:
# the executor's session_state (memory + resume_from) carries
# forward so the next attempt resumes at the failed node.
while True:
# Run execution middleware (per-attempt, including resurrections)
if self._execution_middleware:
from framework.pipeline.execution_middleware import (
ExecutionContext as _ExecMwCtx,
)
mw_ctx = _ExecMwCtx(
execution_id=execution_id,
stream_id=self.stream_id,
run_id=ctx.run_id or "",
input_data=_current_input_data or {},
session_state=_current_session_state,
attempt=_resurrection_count + 1,
)
for mw in self._execution_middleware:
mw_ctx = await mw.on_execution_start(mw_ctx)
_current_input_data = mw_ctx.input_data
# Create executor for this execution.
# Each execution gets its own storage under sessions/{exec_id}/
# so conversations, spillover, and data files are all scoped
# to this execution. The executor sets data_dir via execution
# context (contextvars) so data tools and spillover share the
# same session-scoped directory.
executor = GraphExecutor(
executor = Orchestrator(
runtime=runtime_adapter,
llm=self._llm,
tools=self._tools,
+2 -2
View File
@@ -23,7 +23,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class StreamRuntime:
class StreamDecisionTracker:
"""
Thread-safe runtime for a single execution stream.
@@ -431,7 +431,7 @@ class StreamRuntimeAdapter:
by providing the same API as Runtime but routing to a specific execution.
"""
def __init__(self, stream_runtime: StreamRuntime, execution_id: str):
def __init__(self, stream_runtime: StreamDecisionTracker, execution_id: str):
"""
Create adapter for a specific execution.
@@ -20,7 +20,7 @@ from framework.graph import Goal
from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec
from framework.graph.goal import Constraint, SuccessCriterion
from framework.graph.node import NodeSpec
from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
from framework.runtime.agent_runtime import AgentHost, create_agent_runtime
from framework.runtime.event_bus import AgentEvent, EventBus, EventType
from framework.runtime.execution_stream import EntryPointSpec
from framework.runtime.outcome_aggregator import OutcomeAggregator
@@ -396,7 +396,7 @@ class TestAgentRuntime:
def test_register_entry_point(self, sample_graph, sample_goal, temp_storage):
"""Test registering entry points."""
runtime = AgentRuntime(
runtime = AgentHost(
graph=sample_graph,
goal=sample_goal,
storage_path=temp_storage,
@@ -416,7 +416,7 @@ class TestAgentRuntime:
def test_register_duplicate_entry_point_fails(self, sample_graph, sample_goal, temp_storage):
"""Test that duplicate entry point IDs fail."""
runtime = AgentRuntime(
runtime = AgentHost(
graph=sample_graph,
goal=sample_goal,
storage_path=temp_storage,
@@ -436,7 +436,7 @@ class TestAgentRuntime:
def test_register_invalid_entry_node_fails(self, sample_graph, sample_goal, temp_storage):
"""Test that invalid entry nodes fail."""
runtime = AgentRuntime(
runtime = AgentHost(
graph=sample_graph,
goal=sample_goal,
storage_path=temp_storage,
@@ -455,7 +455,7 @@ class TestAgentRuntime:
@pytest.mark.asyncio
async def test_start_stop_lifecycle(self, sample_graph, sample_goal, temp_storage):
"""Test runtime start/stop lifecycle."""
runtime = AgentRuntime(
runtime = AgentHost(
graph=sample_graph,
goal=sample_goal,
storage_path=temp_storage,
@@ -485,7 +485,7 @@ class TestAgentRuntime:
@pytest.mark.asyncio
async def test_trigger_requires_running(self, sample_graph, sample_goal, temp_storage):
"""Test that trigger fails if runtime not running."""
runtime = AgentRuntime(
runtime = AgentHost(
graph=sample_graph,
goal=sample_goal,
storage_path=temp_storage,
@@ -551,7 +551,7 @@ class TestTimerEntryPoints:
@pytest.mark.asyncio
async def test_interval_timer_starts_task(self, sample_graph, sample_goal, temp_storage):
"""Test that interval_minutes timer creates an async task."""
runtime = AgentRuntime(
runtime = AgentHost(
graph=sample_graph,
goal=sample_goal,
storage_path=temp_storage,
@@ -581,7 +581,7 @@ class TestTimerEntryPoints:
@pytest.mark.asyncio
async def test_cron_timer_starts_task(self, sample_graph, sample_goal, temp_storage):
"""Test that cron expression timer creates an async task."""
runtime = AgentRuntime(
runtime = AgentHost(
graph=sample_graph,
goal=sample_goal,
storage_path=temp_storage,
@@ -611,7 +611,7 @@ class TestTimerEntryPoints:
self, sample_graph, sample_goal, temp_storage, caplog
):
"""Test that an invalid cron expression logs a warning and skips."""
runtime = AgentRuntime(
runtime = AgentHost(
graph=sample_graph,
goal=sample_goal,
storage_path=temp_storage,
@@ -640,7 +640,7 @@ class TestTimerEntryPoints:
"""Test that when both cron and interval_minutes are set, cron wins."""
import logging
runtime = AgentRuntime(
runtime = AgentHost(
graph=sample_graph,
goal=sample_goal,
storage_path=temp_storage,
@@ -667,7 +667,7 @@ class TestTimerEntryPoints:
@pytest.mark.asyncio
async def test_no_interval_or_cron_warns(self, sample_graph, sample_goal, temp_storage, caplog):
"""Test that timer with neither cron nor interval_minutes logs a warning."""
runtime = AgentRuntime(
runtime = AgentHost(
graph=sample_graph,
goal=sample_goal,
storage_path=temp_storage,
@@ -692,7 +692,7 @@ class TestTimerEntryPoints:
@pytest.mark.asyncio
async def test_cron_immediate_fires_first(self, sample_graph, sample_goal, temp_storage):
"""Test that run_immediately=True with cron doesn't set next_fire before first run."""
runtime = AgentRuntime(
runtime = AgentHost(
graph=sample_graph,
goal=sample_goal,
storage_path=temp_storage,
@@ -733,7 +733,7 @@ class TestCancelAllTasks:
self, sample_graph, sample_goal, temp_storage
):
"""Test that cancel_all_tasks_async returns False with no running tasks."""
runtime = AgentRuntime(
runtime = AgentHost(
graph=sample_graph,
goal=sample_goal,
storage_path=temp_storage,
@@ -759,7 +759,7 @@ class TestCancelAllTasks:
self, sample_graph, sample_goal, temp_storage
):
"""Test that cancel_all_tasks_async cancels a running task and returns True."""
runtime = AgentRuntime(
runtime = AgentHost(
graph=sample_graph,
goal=sample_goal,
storage_path=temp_storage,
@@ -804,7 +804,7 @@ class TestCancelAllTasks:
self, sample_graph, sample_goal, temp_storage
):
"""Test that cancel_all_tasks_async cancels tasks across multiple streams."""
runtime = AgentRuntime(
runtime = AgentHost(
graph=sample_graph,
goal=sample_goal,
storage_path=temp_storage,
@@ -1,4 +1,4 @@
"""Tests for webhook idempotency key support in AgentRuntime.trigger()."""
"""Tests for webhook idempotency key support in AgentHost.trigger()."""
import asyncio
import time
@@ -7,7 +7,7 @@ from unittest.mock import AsyncMock, MagicMock
import pytest
from framework.runtime.agent_runtime import AgentRuntime, AgentRuntimeConfig
from framework.runtime.agent_runtime import AgentHost, AgentRuntimeConfig
def _make_runtime(ttl=300.0, max_keys=10000):
@@ -16,7 +16,7 @@ def _make_runtime(ttl=300.0, max_keys=10000):
Uses ``object.__new__`` to skip ``__init__`` and its heavy dependencies
(storage, LLM, skills) we only need the cache and config for these tests.
"""
runtime = object.__new__(AgentRuntime)
runtime = object.__new__(AgentHost)
runtime._config = AgentRuntimeConfig(idempotency_ttl_seconds=ttl, idempotency_max_keys=max_keys)
runtime._running = True
runtime._lock = asyncio.Lock()
@@ -157,28 +157,28 @@ class TestTriggerIdempotency:
"""trigger() accepts idempotency_key as a keyword argument."""
import inspect
sig = inspect.signature(AgentRuntime.trigger)
sig = inspect.signature(AgentHost.trigger)
assert "idempotency_key" in sig.parameters
def test_idempotency_key_defaults_to_none(self):
"""idempotency_key defaults to None (backward compatible)."""
import inspect
sig = inspect.signature(AgentRuntime.trigger)
sig = inspect.signature(AgentHost.trigger)
assert sig.parameters["idempotency_key"].default is None
def test_trigger_and_wait_accepts_idempotency_key(self):
"""trigger_and_wait() also accepts idempotency_key."""
import inspect
sig = inspect.signature(AgentRuntime.trigger_and_wait)
sig = inspect.signature(AgentHost.trigger_and_wait)
assert "idempotency_key" in sig.parameters
def test_trigger_and_wait_idempotency_key_defaults_to_none(self):
"""trigger_and_wait() idempotency_key defaults to None."""
import inspect
sig = inspect.signature(AgentRuntime.trigger_and_wait)
sig = inspect.signature(AgentHost.trigger_and_wait)
assert sig.parameters["idempotency_key"].default is None
@pytest.mark.asyncio
@@ -3,13 +3,13 @@
from pathlib import Path
from unittest.mock import MagicMock
from framework.graph.executor import GraphExecutor
from framework.graph.executor import Orchestrator
from framework.runtime.runtime_log_store import RuntimeLogStore
from framework.runtime.runtime_logger import RuntimeLogger
def test_graph_executor_uses_custom_session_dir_name_for_runtime_logs():
executor = GraphExecutor(
executor = Orchestrator(
runtime=MagicMock(),
storage_path=Path("/tmp/test-agent/sessions/my-custom-session"),
)
@@ -13,7 +13,7 @@ from unittest.mock import patch
import aiohttp
import pytest
from framework.runtime.agent_runtime import AgentRuntime, AgentRuntimeConfig
from framework.runtime.agent_runtime import AgentHost, AgentRuntimeConfig # noqa: F401
from framework.runtime.event_bus import AgentEvent, EventBus, EventType
from framework.runtime.execution_stream import EntryPointSpec
from framework.runtime.webhook_server import (
@@ -518,7 +518,7 @@ class TestEventDrivenEntryPoints:
)
with tempfile.TemporaryDirectory() as tmpdir:
runtime = AgentRuntime(
runtime = AgentHost(
graph=graph,
goal=goal,
storage_path=Path(tmpdir),
@@ -589,7 +589,7 @@ class TestEventDrivenEntryPoints:
)
with tempfile.TemporaryDirectory() as tmpdir:
runtime = AgentRuntime(
runtime = AgentHost(
graph=graph,
goal=goal,
storage_path=Path(tmpdir),
@@ -644,7 +644,7 @@ class TestEventDrivenEntryPoints:
graph, goal = self._make_graph_and_goal()
with tempfile.TemporaryDirectory() as tmpdir:
runtime = AgentRuntime(
runtime = AgentHost(
graph=graph,
goal=goal,
storage_path=Path(tmpdir),
@@ -671,7 +671,7 @@ class TestEventDrivenEntryPoints:
graph, goal = self._make_graph_and_goal()
with tempfile.TemporaryDirectory() as tmpdir:
runtime = AgentRuntime(
runtime = AgentHost(
graph=graph,
goal=goal,
storage_path=Path(tmpdir),
+193
View File
@@ -0,0 +1,193 @@
"""Declarative agent configuration schema.
Allows defining agents via JSON/YAML config files instead of Python modules.
The ``AgentConfig`` model is the top-level schema loaded from ``agent.json``.
The runner detects this format by checking for a ``name`` key at the top level.
Template variables
------------------
System prompts and identity_prompt support ``{{variable_name}}`` placeholders.
These are resolved at load time from ``AgentConfig.variables``.
"""
from __future__ import annotations
from pydantic import BaseModel, Field
class ToolAccessConfig(BaseModel):
"""Declarative tool access policy.
Controls which tools a node/agent has access to.
* ``all`` -- every tool from the registry.
* ``explicit`` -- only tools listed in ``allowed`` (default; empty = zero tools).
* ``none`` -- no tools at all.
"""
policy: str = Field(
default="explicit",
description="One of: 'all', 'explicit', 'none'.",
)
allowed: list[str] = Field(
default_factory=list,
description="Tool names when policy='explicit'.",
)
denied: list[str] = Field(
default_factory=list,
description="Tool names to deny (applied after allowed).",
)
class NodeConfig(BaseModel):
"""Declarative node definition."""
id: str
name: str | None = None
description: str | None = None
node_type: str = Field(
default="event_loop",
description="event_loop | gcu",
)
system_prompt: str | None = None
tools: ToolAccessConfig = Field(default_factory=ToolAccessConfig)
sub_agents: list[str] = Field(default_factory=list)
model: str | None = None
input_keys: list[str] = Field(default_factory=list)
output_keys: list[str] = Field(default_factory=list)
nullable_output_keys: list[str] = Field(default_factory=list)
max_iterations: int = 30
max_node_visits: int = 1
client_facing: bool = False
success_criteria: str | None = None
failure_criteria: str | None = None
skip_judge: bool = False
max_retries: int | None = None
class EdgeConfig(BaseModel):
"""Declarative edge definition."""
from_node: str = Field(description="Source node ID.")
to_node: str = Field(description="Target node ID.")
condition: str = Field(
default="on_success",
description="always | on_success | on_failure | conditional | llm_decide",
)
condition_expr: str | None = None
input_mapping: dict[str, str] = Field(default_factory=dict)
priority: int = 1
class GoalConfig(BaseModel):
"""Simplified goal definition for declarative config."""
description: str
success_criteria: list[str] = Field(default_factory=list)
constraints: list[str] = Field(default_factory=list)
class EntryPointConfig(BaseModel):
"""Entry point configuration."""
id: str = "default"
name: str = "Default"
entry_node: str | None = None # defaults to AgentConfig.entry_node
trigger_type: str = Field(
default="manual",
description="manual | scheduled | timer",
)
trigger_config: dict = Field(default_factory=dict)
isolation_level: str = "shared"
max_concurrent: int | None = None
class MCPServerRef(BaseModel):
"""Reference to an MCP server to connect for this agent."""
name: str
config: dict | None = None
class MetadataConfig(BaseModel):
"""Agent metadata for display / intro messages."""
intro_message: str = ""
class AgentConfig(BaseModel):
"""Top-level declarative agent configuration.
Load from ``agent.json`` and pass to
:func:`framework.runner.runner.load_agent_config` to build the
``GraphSpec`` + ``Goal`` pair.
Example (YAML)::
name: lead-enrichment-agent
version: 1.0.0
variables:
spreadsheet_id: "1ZVx..."
sheet_name: "contacts"
goal:
description: "Enrich leads in Google Sheets"
success_criteria:
- "All unprocessed leads enriched"
constraints:
- "Browser-only research"
identity_prompt: |
You are the Lead Enrichment Agent...
nodes:
- id: start
tools: {policy: explicit, allowed: [google_sheets_get_values]}
system_prompt: |
Spreadsheet ID: {{spreadsheet_id}}
...
"""
name: str
version: str = "1.0.0"
description: str | None = None
metadata: MetadataConfig = Field(default_factory=MetadataConfig)
# Template variables -- substituted into prompts via {{var_name}}
variables: dict[str, str] = Field(default_factory=dict)
# Goal
goal: GoalConfig
# Graph structure
nodes: list[NodeConfig]
edges: list[EdgeConfig]
entry_node: str
terminal_nodes: list[str] = Field(default_factory=list)
pause_nodes: list[str] = Field(default_factory=list)
# Entry points (if omitted, a single "default" manual entry is created)
entry_points: list[EntryPointConfig] = Field(default_factory=list)
# Agent-level tool defaults (nodes inherit unless they override)
tools: ToolAccessConfig = Field(default_factory=ToolAccessConfig)
mcp_servers: list[MCPServerRef] = Field(default_factory=list)
# LLM / execution
model: str | None = None
max_tokens: int = 4096
conversation_mode: str = "continuous"
identity_prompt: str = ""
loop_config: dict = Field(
default_factory=lambda: {
"max_iterations": 100,
"max_tool_calls_per_turn": 30,
"max_context_tokens": 32000,
},
)
# Pipeline overrides (per-agent, merged with global config)
pipeline: dict = Field(
default_factory=dict,
description="Per-agent pipeline stage overrides. Same format as global pipeline config.",
)
# Resource limits
max_cost_per_run: float | None = None
+6 -6
View File
@@ -66,10 +66,10 @@ async def create_queen(
)
from framework.agents.queen.nodes.thinking_hook import select_expert_persona
from framework.graph.event_loop_node import HookContext, HookResult
from framework.graph.executor import GraphExecutor
from framework.graph.executor import Orchestrator
from framework.runner.mcp_registry import MCPRegistry
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.core import Runtime
from framework.runtime.core import DecisionTracker # noqa: F401
from framework.runtime.event_bus import AgentEvent, EventType
from framework.tools.queen_lifecycle_tools import (
QueenPhaseState,
@@ -329,13 +329,13 @@ async def create_queen(
)
# ---- Queen event loop --------------------------------------------
queen_runtime = Runtime(hive_home / "queen")
queen_runtime = DecisionTracker(hive_home / "queen")
async def _queen_loop():
logger.debug("[_queen_loop] Starting queen loop for session %s", session.id)
try:
logger.debug("[_queen_loop] Creating GraphExecutor...")
executor = GraphExecutor(
logger.debug("[_queen_loop] Creating Orchestrator...")
executor = Orchestrator(
runtime=queen_runtime,
llm=session.llm,
tools=queen_tools,
@@ -353,7 +353,7 @@ async def create_queen(
skills_catalog_prompt=phase_state.skills_catalog_prompt,
)
session.queen_executor = executor
logger.debug("[_queen_loop] GraphExecutor created and stored in session.queen_executor")
logger.debug("[_queen_loop] Orchestrator created and stored in session.queen_executor")
# Wire inject_notification so phase switches notify the queen LLM
async def _inject_phase_notification(content: str) -> None:
+2 -2
View File
@@ -280,7 +280,7 @@ class SessionManager:
Sets up the runner, runtime, and session fields. Does NOT notify
the queen callers handle that step.
"""
from framework.runner import AgentRunner
from framework.runner import AgentLoader
agent_path = Path(agent_path)
resolved_graph_id = graph_id or agent_path.name
@@ -302,7 +302,7 @@ class SessionManager:
resolved_model = model or session_model or self._model
runner = await loop.run_in_executor(
None,
lambda: AgentRunner.load(
lambda: AgentLoader.load(
agent_path,
model=resolved_model,
interactive=False,
+16
View File
@@ -56,6 +56,16 @@ class SkillDiscovery:
def __init__(self, config: DiscoveryConfig | None = None):
self._config = config or DiscoveryConfig()
self._scanned_dirs: list[Path] = []
@property
def scanned_directories(self) -> list[str]:
"""Return the skill directories that were scanned during discovery.
Populated after :meth:`discover` runs. Used by the hot-reload
watcher to know which directories to monitor for changes.
"""
return [str(d) for d in self._scanned_dirs if d.exists()]
def discover(self) -> list[ParsedSkill]:
"""Scan all scopes and return deduplicated skill list.
@@ -70,11 +80,13 @@ class SkillDiscovery:
Later entries override earlier ones on name collision.
"""
all_skills: list[ParsedSkill] = []
self._scanned_dirs = []
# Framework scope (lowest precedence)
if not self._config.skip_framework_scope:
framework_dir = Path(__file__).parent / "_default_skills"
if framework_dir.is_dir():
self._scanned_dirs.append(framework_dir)
all_skills.extend(self._scan_scope(framework_dir, "framework"))
# User scope
@@ -84,11 +96,13 @@ class SkillDiscovery:
# Cross-client (lower precedence within user scope)
user_agents = home / ".agents" / "skills"
if user_agents.is_dir():
self._scanned_dirs.append(user_agents)
all_skills.extend(self._scan_scope(user_agents, "user"))
# Hive-specific (higher precedence within user scope)
user_hive = home / ".hive" / "skills"
if user_hive.is_dir():
self._scanned_dirs.append(user_hive)
all_skills.extend(self._scan_scope(user_hive, "user"))
# Project scope (highest precedence)
@@ -98,11 +112,13 @@ class SkillDiscovery:
# Cross-client
project_agents = root / ".agents" / "skills"
if project_agents.is_dir():
self._scanned_dirs.append(project_agents)
all_skills.extend(self._scan_scope(project_agents, "project"))
# Hive-specific
project_hive = root / ".hive" / "skills"
if project_hive.is_dir():
self._scanned_dirs.append(project_hive)
all_skills.extend(self._scan_scope(project_hive, "project"))
resolved = self._resolve_collisions(all_skills)
+92
View File
@@ -68,6 +68,9 @@ class SkillsManager:
self._protocols_prompt: str = ""
self._allowlisted_dirs: list[str] = []
self._default_mgr: object = None # DefaultSkillManager, set after load()
# Hot-reload state
self._watched_dirs: list[str] = []
self._watcher_task: object = None # asyncio.Task, set by start_watching()
# ------------------------------------------------------------------
# Factory for backwards-compat bridge
@@ -124,6 +127,7 @@ class SkillsManager:
discovery = SkillDiscovery(DiscoveryConfig(project_root=self._config.project_root))
discovered = discovery.discover()
self._watched_dirs = discovery.scanned_directories
# Trust-gate project-scope skills (AS-13)
discovered = TrustGate(interactive=self._config.interactive).filter_and_gate(
@@ -173,6 +177,94 @@ class SkillsManager:
else:
logger.warning("Skill system produced empty protocols_prompt")
# ------------------------------------------------------------------
# Hot-reload: watch skill directories for SKILL.md changes.
# ------------------------------------------------------------------
async def start_watching(self) -> None:
"""Start a background task watching skill directories for changes.
When a ``SKILL.md`` file is added/modified/removed, the cached
``skills_catalog_prompt`` is rebuilt. The next node iteration picks
up the new prompt automatically via the ``dynamic_prompt_provider``.
Silently no-ops when ``watchfiles`` is not installed or when no
directories are being watched (e.g. bare mode, no project_root).
"""
import asyncio
try:
import watchfiles # noqa: F401 -- optional dep check
except ImportError:
logger.debug("watchfiles not installed; skill hot-reload disabled")
return
if not self._watched_dirs:
logger.debug("No skill directories to watch; hot-reload skipped")
return
if self._watcher_task is not None:
return # already watching
self._watcher_task = asyncio.create_task(
self._watch_loop(),
name="skills-hot-reload",
)
logger.info(
"Skill hot-reload enabled (watching %d directories)",
len(self._watched_dirs),
)
async def stop_watching(self) -> None:
"""Cancel the background watcher task (if running)."""
import asyncio
task = self._watcher_task
if task is None:
return
self._watcher_task = None
if not task.done(): # type: ignore[attr-defined]
task.cancel() # type: ignore[attr-defined]
try:
await task # type: ignore[misc]
except asyncio.CancelledError:
pass
async def _watch_loop(self) -> None:
"""Background coroutine that watches SKILL.md files and triggers reload."""
import asyncio
import watchfiles
def _filter(_change: object, path: str) -> bool:
return path.endswith("SKILL.md")
try:
async for changes in watchfiles.awatch(
*self._watched_dirs,
watch_filter=_filter,
debounce=1000,
):
paths = [p for _, p in changes]
logger.info("SKILL.md changes detected: %s", paths)
try:
self._reload()
except Exception:
logger.exception("Skill reload failed; keeping previous prompts")
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Skill watcher crashed; hot-reload disabled for this session")
def _reload(self) -> None:
"""Re-run discovery and rebuild cached prompts."""
# Reset loaded flag so _do_load actually re-runs.
self._loaded = False
self._do_load()
self._loaded = True
logger.info("Skills reloaded: protocols=%d chars, catalog=%d chars",
len(self._protocols_prompt), len(self._catalog_prompt))
# ------------------------------------------------------------------
# Prompt accessors (consumed by downstream layers)
# ------------------------------------------------------------------
+1 -1
View File
@@ -68,7 +68,7 @@ for _p in ["exports", "core"]:
sys.path.insert(0, _path)
import pytest
from framework.runner.runner import AgentRunner
from framework.runner.runner import AgentLoader
from framework.runtime.event_bus import EventType
AGENT_PATH = Path(__file__).resolve().parents[1]
+273
View File
@@ -0,0 +1,273 @@
"""Migrate a Python-based agent export to declarative agent.yaml.
Usage::
uv run python -m framework.tools.migrate_agent exports/lead_enrichment_agent
Reads agent.py, nodes/__init__.py, config.py, and mcp_servers.json from the
given directory and writes an ``agent.yaml`` file that is equivalent. The
original Python files are left untouched.
After migration, verify with::
uv run python -c "
from framework.runner.runner import load_agent_config
import yaml, pathlib
data = yaml.safe_load(pathlib.Path('exports/lead_enrichment_agent/agent.yaml').read_text())
graph, goal = load_agent_config(data)
print(f'OK: {len(graph.nodes)} nodes, {len(graph.edges)} edges')
"
"""
from __future__ import annotations
import importlib
import importlib.util
import json
import logging
import sys
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
def _import_module_from_path(module_name: str, file_path: Path) -> Any:
"""Import a Python file as a module."""
spec = importlib.util.spec_from_file_location(module_name, file_path)
if spec is None or spec.loader is None:
raise ImportError(f"Cannot import {file_path}")
mod = importlib.util.module_from_spec(spec)
sys.modules[module_name] = mod
spec.loader.exec_module(mod)
return mod
def _node_to_dict(node: Any) -> dict:
"""Convert a NodeSpec instance to a YAML-friendly dict."""
d: dict[str, Any] = {"id": node.id}
if node.name and node.name != node.id:
d["name"] = node.name
if node.description:
d["description"] = node.description
if node.node_type != "event_loop":
d["node_type"] = node.node_type
if node.client_facing:
d["client_facing"] = True
if node.max_node_visits != 1:
d["max_node_visits"] = node.max_node_visits
if node.input_keys:
d["input_keys"] = list(node.input_keys)
if node.output_keys:
d["output_keys"] = list(node.output_keys)
if node.nullable_output_keys:
d["nullable_output_keys"] = list(node.nullable_output_keys)
# Tools
tools_list = list(node.tools) if node.tools else []
if tools_list:
d["tools"] = {"policy": "explicit", "allowed": tools_list}
elif node.node_type == "gcu":
d["tools"] = {"policy": "all"}
else:
d["tools"] = {"policy": "none"}
if node.sub_agents:
d["sub_agents"] = list(node.sub_agents)
if node.success_criteria:
d["success_criteria"] = node.success_criteria
if getattr(node, "failure_criteria", None):
d["failure_criteria"] = node.failure_criteria
if getattr(node, "max_retries", None):
d["max_retries"] = node.max_retries
if getattr(node, "skip_judge", False):
d["skip_judge"] = True
if getattr(node, "max_iterations", 30) != 30:
d["max_iterations"] = node.max_iterations
if node.system_prompt:
d["system_prompt"] = node.system_prompt
return d
def _edge_to_dict(edge: Any) -> dict:
"""Convert an EdgeSpec instance to a YAML-friendly dict."""
d: dict[str, Any] = {
"from_node": edge.source,
"to_node": edge.target,
}
cond = str(edge.condition.value) if hasattr(edge.condition, "value") else str(edge.condition)
if cond != "on_success":
d["condition"] = cond
if edge.condition_expr:
d["condition"] = "conditional"
d["condition_expr"] = edge.condition_expr
if edge.priority and edge.priority != 1:
d["priority"] = edge.priority
if edge.input_mapping:
d["input_mapping"] = dict(edge.input_mapping)
return d
def migrate_agent(agent_dir: str | Path) -> dict:
"""Read a Python-based agent export and return the declarative config dict.
The returned dict can be serialized to YAML or JSON.
"""
agent_dir = Path(agent_dir).resolve()
agent_py = agent_dir / "agent.py"
if not agent_py.exists():
raise FileNotFoundError(f"No agent.py in {agent_dir}")
# Make the agent importable as a package (handles relative imports)
parent = str(agent_dir.parent)
if parent not in sys.path:
sys.path.insert(0, parent)
pkg_name = agent_dir.name
agent_mod = importlib.import_module(f"{pkg_name}.agent")
# Extract module-level variables
goal = getattr(agent_mod, "goal", None)
nodes = getattr(agent_mod, "nodes", [])
edges = getattr(agent_mod, "edges", [])
entry_node = getattr(agent_mod, "entry_node", "")
terminal_nodes = getattr(agent_mod, "terminal_nodes", [])
pause_nodes = getattr(agent_mod, "pause_nodes", [])
conversation_mode = getattr(agent_mod, "conversation_mode", "continuous")
identity_prompt = getattr(agent_mod, "identity_prompt", "")
loop_config = getattr(agent_mod, "loop_config", {})
# Config / metadata
config_mod = None
config_py = agent_dir / "config.py"
if config_py.exists():
try:
config_mod = importlib.import_module(f"{pkg_name}.config")
except ImportError:
pass
metadata = getattr(config_mod, "metadata", None)
default_config = getattr(config_mod, "default_config", None)
# Agent name
name = agent_dir.name
if metadata and hasattr(metadata, "name"):
name = str(metadata.name).lower().replace(" ", "-")
# Build config dict
config: dict[str, Any] = {
"name": name,
"version": getattr(metadata, "version", "1.0.0") if metadata else "1.0.0",
}
if goal and goal.description:
config["description"] = goal.description
if metadata and hasattr(metadata, "intro_message") and metadata.intro_message:
intro = metadata.intro_message
if intro and "TODO" not in intro:
config["metadata"] = {"intro_message": intro}
# Variables (detect config fields injected into prompts)
variables: dict[str, str] = {}
_SKIP_CONFIG = {"model", "temperature", "max_tokens", "api_key", "api_base"}
if default_config:
for attr in dir(default_config):
if attr.startswith("_") or attr in _SKIP_CONFIG:
continue
val = getattr(default_config, attr)
if isinstance(val, str) and val:
variables[attr] = val
if variables:
config["variables"] = variables
# Goal
if goal:
goal_dict: dict[str, Any] = {"description": goal.description}
if goal.success_criteria:
goal_dict["success_criteria"] = [sc.description for sc in goal.success_criteria]
if goal.constraints:
goal_dict["constraints"] = [c.description for c in goal.constraints]
config["goal"] = goal_dict
# Identity / conversation / loop
if identity_prompt:
config["identity_prompt"] = identity_prompt
if conversation_mode and conversation_mode != "continuous":
config["conversation_mode"] = conversation_mode
if loop_config:
config["loop_config"] = dict(loop_config)
# MCP servers
mcp_path = agent_dir / "mcp_servers.json"
if mcp_path.exists():
with open(mcp_path) as f:
mcp_data = json.load(f)
if mcp_data:
config["mcp_servers"] = [{"name": name} for name in mcp_data]
# Nodes
config["nodes"] = [_node_to_dict(n) for n in nodes]
# Edges
config["edges"] = [_edge_to_dict(e) for e in edges]
# Graph structure
config["entry_node"] = entry_node
if terminal_nodes:
config["terminal_nodes"] = terminal_nodes
if pause_nodes:
config["pause_nodes"] = pause_nodes
return config
def write_yaml(config: dict, output_path: Path) -> None:
"""Write config dict to YAML with clean formatting."""
try:
import yaml
except ImportError:
raise ImportError("PyYAML required: uv pip install pyyaml") from None
# Custom representer for multiline strings
def _str_representer(dumper: yaml.Dumper, data: str) -> Any:
if "\n" in data:
return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
return dumper.represent_scalar("tag:yaml.org,2002:str", data)
yaml.add_representer(str, _str_representer)
with open(output_path, "w") as f:
yaml.dump(
config, f,
default_flow_style=False, sort_keys=False,
allow_unicode=True, width=120,
)
logger.info("Wrote %s", output_path)
def main() -> None:
"""CLI entry point."""
logging.basicConfig(level=logging.INFO, format="%(message)s")
if len(sys.argv) < 2:
print("Usage: uv run python -m framework.tools.migrate_agent <agent_dir>")
sys.exit(1)
agent_dir = Path(sys.argv[1])
config = migrate_agent(agent_dir)
output = agent_dir / "agent.yaml"
write_yaml(config, output)
print(f"Wrote {output}")
n_nodes = len(config["nodes"])
n_edges = len(config["edges"])
print(f"\nMigrated {config['name']}: {n_nodes} nodes, {n_edges} edges")
print("\nVerify with:")
print(f" uv run python -m framework.tools.migrate_agent --verify {output}")
if __name__ == "__main__":
main()
+52 -43
View File
@@ -56,7 +56,7 @@ from framework.tools.flowchart_utils import (
if TYPE_CHECKING:
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime
from framework.runtime.agent_runtime import AgentHost
from framework.runtime.event_bus import EventBus
logger = logging.getLogger(__name__)
@@ -330,7 +330,7 @@ class QueenPhaseState:
)
def build_worker_profile(runtime: AgentRuntime, agent_path: Path | str | None = None) -> str:
def build_worker_profile(runtime: AgentHost, agent_path: Path | str | None = None) -> str:
"""Build a worker capability profile from its graph/goal definition.
Injected into the queen's system prompt so it knows what the worker
@@ -823,7 +823,7 @@ def register_queen_lifecycle_tools(
session: Any = None,
session_id: str | None = None,
# Legacy params — used by TUI when not passing a session object
graph_runtime: AgentRuntime | None = None,
graph_runtime: AgentHost | None = None,
event_bus: EventBus | None = None,
storage_path: Path | None = None,
# Server context — enables load_built_agent tool
@@ -2561,7 +2561,7 @@ def register_queen_lifecycle_tools(
return s
def _build_preamble(
runtime: AgentRuntime,
runtime: AgentHost,
) -> dict[str, Any]:
"""Build the lightweight preamble: status, node, elapsed, iteration.
@@ -2719,7 +2719,7 @@ def register_queen_lifecycle_tools(
return "\n".join(lines)
async def _format_memory(runtime: AgentRuntime) -> str:
async def _format_memory(runtime: AgentHost) -> str:
"""Format the worker's shared buffer snapshot and recent changes."""
from framework.runtime.shared_state import IsolationLevel
@@ -2872,7 +2872,7 @@ def register_queen_lifecycle_tools(
header = f"{total} issue(s) detected."
return header + "\n\n" + "\n".join(lines)
async def _format_progress(runtime: AgentRuntime, bus: EventBus) -> str:
async def _format_progress(runtime: AgentHost, bus: EventBus) -> str:
"""Format goal progress, token consumption, and execution outcomes."""
lines = []
@@ -2928,7 +2928,7 @@ def register_queen_lifecycle_tools(
return "\n".join(lines)
def _build_full_json(
runtime: AgentRuntime,
runtime: AgentHost,
bus: EventBus,
preamble: dict[str, Any],
last_n: int,
@@ -3482,50 +3482,59 @@ def register_queen_lifecycle_tools(
if not resolved_path.exists():
return json.dumps({"error": f"Agent path does not exist: {agent_path}"})
# Pre-check: verify the module exports goal/nodes/edges before
# attempting the full load. This gives the queen an actionable
# error message instead of a cryptic ImportError or TypeError.
try:
import importlib
import sys as _sys
# Pre-check: verify the agent can be loaded before attempting
# the full session load. Declarative (agent.json) agents skip
# the Python import check since AgentRunner.load() handles them.
_has_yaml = (resolved_path / "agent.json").exists()
if not _has_yaml:
# Legacy Python agent: verify module exports goal/nodes/edges
try:
import importlib
import sys as _sys
pkg_name = resolved_path.name
parent_dir = str(resolved_path.resolve().parent)
# Temporarily put parent on sys.path for import
if parent_dir not in _sys.path:
_sys.path.insert(0, parent_dir)
# Evict stale cached modules
stale = [n for n in _sys.modules if n == pkg_name or n.startswith(f"{pkg_name}.")]
for n in stale:
del _sys.modules[n]
pkg_name = resolved_path.name
parent_dir = str(resolved_path.resolve().parent)
if parent_dir not in _sys.path:
_sys.path.insert(0, parent_dir)
stale = [
n for n in _sys.modules
if n == pkg_name or n.startswith(f"{pkg_name}.")
]
for n in stale:
del _sys.modules[n]
mod = importlib.import_module(pkg_name)
missing_attrs = [
attr for attr in ("goal", "nodes", "edges") if getattr(mod, attr, None) is None
]
if missing_attrs:
mod = importlib.import_module(pkg_name)
missing_attrs = [
attr
for attr in ("goal", "nodes", "edges")
if getattr(mod, attr, None) is None
]
if missing_attrs:
return json.dumps(
{
"error": (
f"Agent module '{pkg_name}' is missing module-level "
f"attributes: {', '.join(missing_attrs)}. "
f"Fix: in {pkg_name}/__init__.py, add "
f"'from .agent import {', '.join(missing_attrs)}' "
f"so that 'import {pkg_name}' exposes them at "
f"package level."
)
}
)
except Exception as pre_err:
return json.dumps(
{
"error": (
f"Agent module '{pkg_name}' is missing module-level "
f"attributes: {', '.join(missing_attrs)}. "
f"Fix: in {pkg_name}/__init__.py, add "
f"'from .agent import {', '.join(missing_attrs)}' "
f"so that 'import {pkg_name}' exposes them at package level."
f"Failed to import agent module "
f"'{resolved_path.name}': {pre_err}. "
f"Fix: ensure {resolved_path.name}/__init__.py "
f"exists and can be imported without errors "
f"(check syntax, missing dependencies, and "
f"relative imports)."
)
}
)
except Exception as pre_err:
return json.dumps(
{
"error": (
f"Failed to import agent module '{resolved_path.name}': {pre_err}. "
f"Fix: ensure {resolved_path.name}/__init__.py exists and can be "
f"imported without errors (check syntax, missing dependencies, "
f"and relative imports)."
)
}
)
try:
updated_session = await session_manager.load_graph(
+9 -8
View File
@@ -22,12 +22,12 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING:
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime
from framework.runtime.agent_runtime import AgentHost
logger = logging.getLogger(__name__)
def register_graph_tools(registry: ToolRegistry, runtime: AgentRuntime) -> int:
def register_graph_tools(registry: ToolRegistry, runtime: AgentHost) -> int:
"""Register graph lifecycle tools bound to *runtime*.
Returns the number of tools registered.
@@ -41,11 +41,12 @@ def register_graph_tools(registry: ToolRegistry, runtime: AgentRuntime) -> int:
async def load_agent(agent_path: str) -> str:
"""Load an agent graph from disk into the running session.
The agent is imported from *agent_path* (a directory containing
``agent.py``). Its graph, goal, and entry points are registered
as a secondary graph on the runtime. Returns a JSON summary.
The agent is loaded from *agent_path* (a directory containing
``agent.json`` or ``agent.py``). Its graph, goal, and entry points
are registered as a secondary graph on the runtime. Returns a JSON
summary.
"""
from framework.runner.runner import AgentRunner
from framework.runner.runner import AgentLoader
from framework.runtime.execution_stream import EntryPointSpec
from framework.server.app import validate_agent_path
@@ -57,7 +58,7 @@ def register_graph_tools(registry: ToolRegistry, runtime: AgentRuntime) -> int:
return json.dumps({"error": f"Agent path does not exist: {agent_path}"})
try:
runner = AgentRunner.load(path)
runner = AgentLoader.load(path)
except Exception as exc:
return json.dumps({"error": f"Failed to load agent: {exc}"})
@@ -105,7 +106,7 @@ def register_graph_tools(registry: ToolRegistry, runtime: AgentRuntime) -> int:
"properties": {
"agent_path": {
"type": "string",
"description": "Path to the agent directory (containing agent.py)",
"description": "Path to the agent directory",
},
},
"required": ["agent_path"],
+3
View File
@@ -0,0 +1,3 @@
"""Tracker layer -- decision/run logging for Builder analysis."""
from framework.runtime.core import DecisionTracker # noqa: F401
File diff suppressed because it is too large Load Diff