feat: add node parameters and fix problems for initialize_agent_package

This commit is contained in:
Richard Tang
2026-03-06 15:54:27 -08:00
parent 0ba781609a
commit 1344d3bb8e
3 changed files with 547 additions and 2189 deletions
File diff suppressed because it is too large Load Diff
+9 -35
View File
@@ -3,42 +3,16 @@
import sys
import os
# Add project paths so imports work
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "core"))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "tools"))
from framework.builder.package_generator import (
BuildSession,
initialize_agent_package,
)
from framework.graph import EdgeCondition, EdgeSpec, Goal, NodeSpec, SuccessCriterion
import framework.builder.package_generator as pg
# Set PROJECT_ROOT before importing
import tools.coder_tools_server as srv
srv.PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
# Create a minimal build session
session = BuildSession(name="richard_test2")
session.goal = Goal(
id="goal_1",
name="Test Goal",
description="A simple test agent",
success_criteria=[
SuccessCriterion(id="sc_1", description="Completes successfully", metric="llm_judge", target="success")
],
constraints=[],
)
session.nodes = [
NodeSpec(
id="start",
name="Start Node",
description="Entry point",
node_type="event_loop",
input_keys=[],
output_keys=["result"],
system_prompt="You are a helpful assistant.",
),
]
session.edges = []
# Set as active session (in-memory only, no disk persistence)
pg._session = session
# Now call initialize_agent_package
result = initialize_agent_package("richard_test2")
# Access the underlying function (FastMCP wraps it as FunctionTool)
tool = srv.initialize_agent_package
result = tool.fn("richard_test2", nodes=["intake", "process", "review"])
print(result)
+538 -46
View File
@@ -1350,65 +1350,557 @@ def validate_agent_package(agent_name: str) -> str:
# ── Meta-agent: Package initialization ─────────────────────────────────────
def _snake_to_camel(name: str) -> str:
"""Convert snake_case to CamelCase."""
return "".join(word.capitalize() for word in name.split("_"))
def _node_var_name(node_id: str) -> str:
"""Convert node id to a Python variable name."""
return node_id.replace("-", "_") + "_node"
@mcp.tool()
def initialize_agent_package(agent_name: str, agent_json_path: str) -> str:
"""Generate a full Python agent package from an exported agent.json file.
def initialize_agent_package(agent_name: str, nodes: list[str] | None = None) -> str:
"""Scaffold a new agent package with placeholder files.
Reads the agent.json (goal, nodes, edges), validates the graph, and
generates all files needed for a runnable agent in exports/{agent_name}/:
Creates exports/{agent_name}/ with all files needed for a runnable agent:
config.py, nodes/__init__.py, agent.py, __init__.py, __main__.py,
mcp_servers.json, tests/conftest.py, agent.json, README.md.
mcp_servers.json, tests/conftest.py.
Call this INSTEAD of manually writing package files.
After initialization, customize the generated files:
- System prompts and node logic in nodes/__init__.py
- Goal and edges in agent.py
- CLI options in __main__.py
Args:
agent_name: Name for the agent package. Must be snake_case (e.g. 'my_agent').
agent_json_path: Path to the exported agent.json file to import.
nodes: Optional list of node names (snake_case or kebab-case).
If omitted, a single 'start' node is created.
Example: ['intake', 'process', 'review']
Returns:
JSON with files written, validation warnings, and next steps.
JSON with files written and next steps.
"""
resolved = _resolve_path(agent_json_path)
if not os.path.isfile(resolved):
return json.dumps({"success": False, "error": f"File not found: {agent_json_path}"})
import re
env = os.environ.copy()
core_path = os.path.join(PROJECT_ROOT, "core")
exports_path = os.path.join(PROJECT_ROOT, "exports")
fw_agents_path = os.path.join(PROJECT_ROOT, "core", "framework", "agents")
pythonpath = env.get("PYTHONPATH", "")
path_parts = [core_path, exports_path, fw_agents_path, PROJECT_ROOT]
if pythonpath:
path_parts.append(pythonpath)
env["PYTHONPATH"] = os.pathsep.join(path_parts)
if not re.match(r"^[a-z][a-z0-9_]*$", agent_name):
return json.dumps({
"success": False,
"error": (
f"Invalid agent_name '{agent_name}'. Must be snake_case: "
"lowercase letters, numbers, underscores, starting with a letter."
),
})
try:
proc = subprocess.run(
[
"uv", "run", "python", "-m",
"framework.builder.package_generator",
agent_name,
resolved,
],
capture_output=True,
text=True,
timeout=60,
env=env,
cwd=PROJECT_ROOT,
stdin=subprocess.DEVNULL,
if not nodes:
nodes = ["start"]
class_name = _snake_to_camel(agent_name)
human_name = agent_name.replace("_", " ").title()
entry_node = nodes[0]
exports_dir = os.path.join(PROJECT_ROOT, "exports", agent_name)
nodes_dir = os.path.join(exports_dir, "nodes")
tests_dir = os.path.join(exports_dir, "tests")
os.makedirs(nodes_dir, exist_ok=True)
os.makedirs(tests_dir, exist_ok=True)
files_written: dict[str, dict] = {}
def _write(rel_path: str, content: str) -> None:
full = os.path.join(exports_dir, rel_path)
os.makedirs(os.path.dirname(full), exist_ok=True)
with open(full, "w", encoding="utf-8") as f:
f.write(content)
files_written[rel_path] = {
"path": f"exports/{agent_name}/{rel_path}",
"size_bytes": os.path.getsize(full),
}
# -- config.py --
_write("config.py", f'''\
"""Runtime configuration."""
import json
from dataclasses import dataclass, field
from pathlib import Path
def _load_preferred_model() -> str:
"""Load preferred model from ~/.hive/configuration.json."""
config_path = Path.home() / ".hive" / "configuration.json"
if config_path.exists():
try:
with open(config_path) as f:
config = json.load(f)
llm = config.get("llm", {{}})
if llm.get("provider") and llm.get("model"):
return f"{{llm[\'provider\']}}/{{llm[\'model\']}}"
except Exception:
pass
return "anthropic/claude-sonnet-4-20250514"
@dataclass
class RuntimeConfig:
model: str = field(default_factory=_load_preferred_model)
temperature: float = 0.7
max_tokens: int = 40000
api_key: str | None = None
api_base: str | None = None
default_config = RuntimeConfig()
@dataclass
class AgentMetadata:
name: str = "{human_name}"
version: str = "1.0.0"
description: str = "TODO: Add agent description."
intro_message: str = "TODO: Add intro message."
metadata = AgentMetadata()
''')
# -- nodes/__init__.py --
node_specs = []
node_var_names = []
for node_id in nodes:
var = _node_var_name(node_id)
node_var_names.append(var)
is_first = (node_id == entry_node)
node_specs.append(f'''\
{var} = NodeSpec(
id="{node_id}",
name="{node_id.replace("_", " ").replace("-", " ").title()}",
description="TODO: Describe what this node does.",
node_type="event_loop",
client_facing={is_first},
max_node_visits=0,
input_keys=[],
output_keys=[],
nullable_output_keys=[],
success_criteria="TODO: Define success criteria.",
system_prompt="""\\
TODO: Add system prompt for this node.
""",
tools=[],
)''')
nodes_init = f'''\
"""Node definitions for {human_name}."""
from framework.graph import NodeSpec
{chr(10).join(node_specs)}
__all__ = {node_var_names!r}
'''
_write("nodes/__init__.py", nodes_init)
# -- agent.py --
node_imports = ", ".join(node_var_names)
nodes_list = ", ".join(node_var_names)
edge_defs = []
for i in range(len(nodes) - 1):
src, tgt = nodes[i], nodes[i + 1]
edge_defs.append(f'''\
EdgeSpec(
id="{src}-to-{tgt}",
source="{src}",
target="{tgt}",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),''')
edges_str = "\n".join(edge_defs) if edge_defs else " # TODO: Add edges"
_write("agent.py", f'''\
"""Agent graph construction for {human_name}."""
from pathlib import Path
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult
from framework.graph.checkpoint_config import CheckpointConfig
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import {node_imports}
# Goal definition
goal = Goal(
id="{agent_name}-goal",
name="{human_name}",
description="TODO: Describe the agent's goal.",
success_criteria=[
SuccessCriterion(
id="sc-1",
description="TODO: Define success criterion.",
metric="TODO",
target="TODO",
weight=1.0,
),
],
constraints=[
Constraint(
id="c-1",
description="TODO: Define constraint.",
constraint_type="hard",
category="functional",
),
],
)
# Node list
nodes = [{nodes_list}]
# Edge definitions
edges = [
{edges_str}
]
# Graph configuration
entry_node = "{entry_node}"
entry_points = {{"start": "{entry_node}"}}
pause_nodes = []
terminal_nodes = []
conversation_mode = "continuous"
identity_prompt = "TODO: Add identity prompt."
loop_config = {{
"max_iterations": 100,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
}}
class {class_name}:
def __init__(self, config=None):
self.config = config or default_config
self.goal = goal
self.nodes = nodes
self.edges = edges
self.entry_node = entry_node
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._graph = None
self._agent_runtime = None
self._tool_registry = None
self._storage_path = None
def _build_graph(self):
return GraphSpec(
id="{agent_name}-graph",
goal_id=self.goal.id,
version="1.0.0",
entry_node=self.entry_node,
entry_points=self.entry_points,
terminal_nodes=self.terminal_nodes,
pause_nodes=self.pause_nodes,
nodes=self.nodes,
edges=self.edges,
default_model=self.config.model,
max_tokens=self.config.max_tokens,
loop_config=loop_config,
conversation_mode=conversation_mode,
identity_prompt=identity_prompt,
)
if proc.returncode == 0:
return proc.stdout.strip() or json.dumps({"success": True})
else:
return json.dumps({
"success": False,
"error": proc.stderr.strip()[:3000],
"stdout": proc.stdout.strip()[:1000],
})
except subprocess.TimeoutExpired:
return json.dumps({"success": False, "error": "Package generation timed out (60s)"})
except Exception as e:
return json.dumps({"success": False, "error": str(e)})
def _setup(self):
self._storage_path = Path.home() / ".hive" / "agents" / "{agent_name}"
self._storage_path.mkdir(parents=True, exist_ok=True)
self._tool_registry = ToolRegistry()
mcp_config = Path(__file__).parent / "mcp_servers.json"
if mcp_config.exists():
self._tool_registry.load_mcp_config(mcp_config)
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
tools = list(self._tool_registry.get_tools().values())
tool_executor = self._tool_registry.get_executor()
self._graph = self._build_graph()
self._agent_runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=[
EntryPointSpec(
id="default",
name="Default",
entry_node=self.entry_node,
trigger_type="manual",
isolation_level="shared",
),
],
llm=llm,
tools=tools,
tool_executor=tool_executor,
checkpoint_config=CheckpointConfig(
enabled=True,
checkpoint_on_node_complete=True,
checkpoint_max_age_days=7,
async_checkpoint=True,
),
)
async def start(self):
if self._agent_runtime is None:
self._setup()
if not self._agent_runtime.is_running:
await self._agent_runtime.start()
async def stop(self):
if self._agent_runtime and self._agent_runtime.is_running:
await self._agent_runtime.stop()
self._agent_runtime = None
async def trigger_and_wait(
self,
entry_point="default",
input_data=None,
timeout=None,
session_state=None,
):
if self._agent_runtime is None:
raise RuntimeError("Agent not started. Call start() first.")
return await self._agent_runtime.trigger_and_wait(
entry_point_id=entry_point,
input_data=input_data or {{}},
session_state=session_state,
)
async def run(self, context, session_state=None):
await self.start()
try:
result = await self.trigger_and_wait(
"default", context, session_state=session_state
)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
await self.stop()
def info(self):
return {{
"name": metadata.name,
"version": metadata.version,
"description": metadata.description,
"goal": {{
"name": self.goal.name,
"description": self.goal.description,
}},
"nodes": [n.id for n in self.nodes],
"edges": [e.id for e in self.edges],
"entry_node": self.entry_node,
"entry_points": self.entry_points,
"terminal_nodes": self.terminal_nodes,
"client_facing_nodes": [n.id for n in self.nodes if n.client_facing],
}}
def validate(self):
errors, warnings = [], []
node_ids = {{n.id for n in self.nodes}}
for e in self.edges:
if e.source not in node_ids:
errors.append(f"Edge {{e.id}}: source '{{e.source}}' not found")
if e.target not in node_ids:
errors.append(f"Edge {{e.id}}: target '{{e.target}}' not found")
if self.entry_node not in node_ids:
errors.append(f"Entry node '{{self.entry_node}}' not found")
for t in self.terminal_nodes:
if t not in node_ids:
errors.append(f"Terminal node '{{t}}' not found")
for ep_id, nid in self.entry_points.items():
if nid not in node_ids:
errors.append(f"Entry point '{{ep_id}}' references unknown node '{{nid}}'")
return {{"valid": len(errors) == 0, "errors": errors, "warnings": warnings}}
default_agent = {class_name}()
''')
# -- __init__.py --
_write("__init__.py", f'''\
"""{human_name} — TODO: Add description."""
from .agent import (
{class_name},
default_agent,
goal,
nodes,
edges,
entry_node,
entry_points,
pause_nodes,
terminal_nodes,
conversation_mode,
identity_prompt,
loop_config,
)
from .config import default_config, metadata
__all__ = [
"{class_name}",
"default_agent",
"goal",
"nodes",
"edges",
"entry_node",
"entry_points",
"pause_nodes",
"terminal_nodes",
"conversation_mode",
"identity_prompt",
"loop_config",
"default_config",
"metadata",
]
''')
# -- __main__.py --
_write("__main__.py", f'''\
"""CLI entry point for {human_name}."""
import asyncio
import json
import logging
import sys
import click
from .agent import default_agent, {class_name}
def setup_logging(verbose=False, debug=False):
if debug:
level, fmt = logging.DEBUG, "%(asctime)s %(name)s: %(message)s"
elif verbose:
level, fmt = logging.INFO, "%(message)s"
else:
level, fmt = logging.WARNING, "%(levelname)s: %(message)s"
logging.basicConfig(level=level, format=fmt, stream=sys.stderr)
@click.group()
@click.version_option(version="1.0.0")
def cli():
"""{human_name}."""
pass
@cli.command()
@click.option("--verbose", "-v", is_flag=True)
def run(verbose):
"""Execute the agent."""
setup_logging(verbose=verbose)
result = asyncio.run(default_agent.run({{}}))
click.echo(
json.dumps(
{{"success": result.success, "output": result.output}},
indent=2,
default=str,
)
)
sys.exit(0 if result.success else 1)
@cli.command()
def info():
"""Show agent info."""
data = default_agent.info()
click.echo(
f"Agent: {{data[\'name\']}}\\nVersion: {{data[\'version\']}}\\nDescription: {{data[\'description\']}}"
)
click.echo(f"Nodes: {{', '.join(data[\'nodes\'])}}")
click.echo(f"Client-facing: {{', '.join(data[\'client_facing_nodes\'])}}")
@cli.command()
def validate():
"""Validate agent structure."""
v = default_agent.validate()
if v["valid"]:
click.echo("Agent is valid")
else:
click.echo("Errors:")
for e in v["errors"]:
click.echo(f" {{e}}")
sys.exit(0 if v["valid"] else 1)
if __name__ == "__main__":
cli()
''')
# -- mcp_servers.json --
_write("mcp_servers.json", json.dumps({
"hive-tools": {
"transport": "stdio",
"command": "uv",
"args": ["run", "python", "mcp_server.py", "--stdio"],
"cwd": "../../tools",
"description": "Hive tools MCP server",
}
}, indent=2))
# -- tests/conftest.py --
_write("tests/conftest.py", f'''\
"""Test fixtures."""
import sys
from pathlib import Path
import pytest
_repo_root = Path(__file__).resolve().parents[3]
for _p in ["exports", "core"]:
_path = str(_repo_root / _p)
if _path not in sys.path:
sys.path.insert(0, _path)
AGENT_PATH = str(Path(__file__).resolve().parents[1])
@pytest.fixture(scope="session")
def agent_module():
"""Import the agent package for structural validation."""
import importlib
return importlib.import_module(Path(AGENT_PATH).name)
@pytest.fixture(scope="session")
def runner_loaded():
"""Load the agent through AgentRunner (structural only, no LLM needed)."""
from framework.runner.runner import AgentRunner
return AgentRunner.load(AGENT_PATH)
''')
return json.dumps({
"success": True,
"agent_name": agent_name,
"class_name": class_name,
"entry_node": entry_node,
"nodes": nodes,
"files_written": files_written,
"file_count": len(files_written),
"next_steps": [
f"Customize node definitions in exports/{agent_name}/nodes/__init__.py",
f"Define goal and edges in exports/{agent_name}/agent.py",
f"Run validate_agent_package(\"{agent_name}\") to check structure",
],
}, indent=2)
# ── Main ──────────────────────────────────────────────────────────────────