backend apis for open hive
This commit is contained in:
@@ -370,6 +370,42 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
|
||||
)
|
||||
setup_creds_parser.set_defaults(func=cmd_setup_credentials)
|
||||
|
||||
# serve command (HTTP API server)
|
||||
serve_parser = subparsers.add_parser(
|
||||
"serve",
|
||||
help="Start HTTP API server",
|
||||
description="Start an HTTP server exposing REST + SSE APIs for agent control.",
|
||||
)
|
||||
serve_parser.add_argument(
|
||||
"--host",
|
||||
type=str,
|
||||
default="127.0.0.1",
|
||||
help="Host to bind (default: 127.0.0.1)",
|
||||
)
|
||||
serve_parser.add_argument(
|
||||
"--port",
|
||||
"-p",
|
||||
type=int,
|
||||
default=8787,
|
||||
help="Port to listen on (default: 8787)",
|
||||
)
|
||||
serve_parser.add_argument(
|
||||
"--agent",
|
||||
"-a",
|
||||
type=str,
|
||||
action="append",
|
||||
default=[],
|
||||
help="Agent path to preload (repeatable)",
|
||||
)
|
||||
serve_parser.add_argument(
|
||||
"--model",
|
||||
"-m",
|
||||
type=str,
|
||||
default=None,
|
||||
help="LLM model for preloaded agents",
|
||||
)
|
||||
serve_parser.set_defaults(func=cmd_serve)
|
||||
|
||||
|
||||
def _load_resume_state(
|
||||
agent_path: str, session_id: str, checkpoint_id: str | None = None
|
||||
@@ -1915,3 +1951,60 @@ def cmd_setup_credentials(args: argparse.Namespace) -> int:
|
||||
|
||||
result = session.run_interactive()
|
||||
return 0 if result.success else 1
|
||||
|
||||
|
||||
def cmd_serve(args: argparse.Namespace) -> int:
|
||||
"""Start the HTTP API server."""
|
||||
import logging
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
from framework.server.app import create_app
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||||
)
|
||||
|
||||
model = getattr(args, "model", None)
|
||||
app = create_app(model=model)
|
||||
|
||||
async def run_server():
|
||||
manager = app["manager"]
|
||||
|
||||
# Preload agents specified via --agent
|
||||
for agent_path in args.agent:
|
||||
try:
|
||||
slot = await manager.load_agent(agent_path, model=model)
|
||||
print(f"Loaded agent: {slot.id} ({slot.info.name})")
|
||||
except Exception as e:
|
||||
print(f"Error loading {agent_path}: {e}")
|
||||
|
||||
# Start server using AppRunner/TCPSite (same pattern as webhook_server.py)
|
||||
runner = web.AppRunner(app)
|
||||
await runner.setup()
|
||||
site = web.TCPSite(runner, args.host, args.port)
|
||||
await site.start()
|
||||
|
||||
print()
|
||||
print(f"Hive API server running on http://{args.host}:{args.port}")
|
||||
print(f"Health: http://{args.host}:{args.port}/api/health")
|
||||
print(f"Agents loaded: {len(manager.list_agents())}")
|
||||
print()
|
||||
print("Press Ctrl+C to stop")
|
||||
|
||||
# Run forever until interrupted
|
||||
try:
|
||||
await asyncio.Event().wait()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
await manager.shutdown_all()
|
||||
await runner.cleanup()
|
||||
|
||||
try:
|
||||
asyncio.run(run_server())
|
||||
except KeyboardInterrupt:
|
||||
print("\nServer stopped.")
|
||||
|
||||
return 0
|
||||
|
||||
@@ -1038,6 +1038,25 @@ class AgentRuntime:
|
||||
"""Get all registered entry points."""
|
||||
return list(self._entry_points.values())
|
||||
|
||||
def find_awaiting_node(self) -> tuple[str | None, str | None]:
|
||||
"""Find a node currently awaiting client input.
|
||||
|
||||
Searches all graphs and streams for an active executor whose
|
||||
node has ``_awaiting_input`` set.
|
||||
|
||||
Returns:
|
||||
(node_id, graph_id) if found, otherwise (None, None).
|
||||
"""
|
||||
for graph_id, reg in self._graphs.items():
|
||||
for stream in reg.streams.values():
|
||||
if not stream.is_awaiting_input:
|
||||
continue
|
||||
for executor in stream._active_executors.values():
|
||||
for node_id, node in executor.node_registry.items():
|
||||
if getattr(node, "_awaiting_input", False):
|
||||
return node_id, graph_id
|
||||
return None, None
|
||||
|
||||
def get_stream(self, entry_point_id: str) -> ExecutionStream | None:
|
||||
"""Get a specific execution stream."""
|
||||
return self._streams.get(entry_point_id)
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
"""HTTP API server for the Hive agent framework."""
|
||||
@@ -0,0 +1,153 @@
|
||||
"""Multi-agent lifecycle manager for the HTTP API server.
|
||||
|
||||
Manages loading, unloading, and listing agents. Each loaded agent
|
||||
is tracked as an AgentSlot holding a runner, runtime, and metadata.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Sentinel placed in _slots while an agent is loading (prevents duplicate loads).
|
||||
_LOADING = object()
|
||||
|
||||
|
||||
@dataclass
|
||||
class AgentSlot:
|
||||
"""A loaded agent with its runtime resources."""
|
||||
|
||||
id: str
|
||||
agent_path: Path
|
||||
runner: Any # AgentRunner
|
||||
runtime: Any # AgentRuntime
|
||||
info: Any # AgentInfo
|
||||
loaded_at: float
|
||||
|
||||
|
||||
class AgentManager:
|
||||
"""Manages concurrent agent lifecycles.
|
||||
|
||||
Thread-safe via asyncio.Lock. Agents are loaded via run_in_executor
|
||||
(blocking I/O) then started on the event loop — same pattern as
|
||||
tui/app.py.
|
||||
"""
|
||||
|
||||
def __init__(self, model: str | None = None) -> None:
|
||||
self._slots: dict[str, AgentSlot] = {}
|
||||
self._model = model
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
async def load_agent(
|
||||
self,
|
||||
agent_path: str | Path,
|
||||
agent_id: str | None = None,
|
||||
model: str | None = None,
|
||||
) -> AgentSlot:
|
||||
"""Load an agent from disk and start its runtime.
|
||||
|
||||
Args:
|
||||
agent_path: Path to agent folder (containing agent.json or agent.py).
|
||||
agent_id: Optional identifier; defaults to directory name.
|
||||
model: LLM model override; falls back to manager default.
|
||||
|
||||
Returns:
|
||||
The AgentSlot for the loaded agent.
|
||||
|
||||
Raises:
|
||||
ValueError: If agent_id is already loaded.
|
||||
FileNotFoundError: If agent_path is invalid.
|
||||
"""
|
||||
from framework.runner import AgentRunner
|
||||
|
||||
agent_path = Path(agent_path)
|
||||
resolved_id = agent_id or agent_path.name
|
||||
resolved_model = model or self._model
|
||||
|
||||
async with self._lock:
|
||||
if resolved_id in self._slots:
|
||||
raise ValueError(f"Agent '{resolved_id}' is already loaded")
|
||||
self._slots[resolved_id] = _LOADING # claim slot
|
||||
|
||||
try:
|
||||
# Blocking I/O — load in executor (same as tui/app.py:362-368)
|
||||
loop = asyncio.get_running_loop()
|
||||
runner = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: AgentRunner.load(
|
||||
agent_path,
|
||||
model=resolved_model,
|
||||
interactive=False,
|
||||
),
|
||||
)
|
||||
|
||||
# Setup (LLM provider, runtime, tools)
|
||||
if runner._agent_runtime is None:
|
||||
await loop.run_in_executor(None, runner._setup)
|
||||
|
||||
runtime = runner._agent_runtime
|
||||
|
||||
# Start runtime on event loop
|
||||
if runtime and not runtime.is_running:
|
||||
await runtime.start()
|
||||
|
||||
info = runner.info()
|
||||
|
||||
slot = AgentSlot(
|
||||
id=resolved_id,
|
||||
agent_path=agent_path,
|
||||
runner=runner,
|
||||
runtime=runtime,
|
||||
info=info,
|
||||
loaded_at=time.time(),
|
||||
)
|
||||
|
||||
async with self._lock:
|
||||
self._slots[resolved_id] = slot
|
||||
|
||||
logger.info(f"Agent '{resolved_id}' loaded from {agent_path}")
|
||||
return slot
|
||||
|
||||
except Exception:
|
||||
async with self._lock:
|
||||
self._slots.pop(resolved_id, None)
|
||||
raise
|
||||
|
||||
async def unload_agent(self, agent_id: str) -> bool:
|
||||
"""Unload an agent and release its resources.
|
||||
|
||||
Returns True if the agent was found and unloaded.
|
||||
"""
|
||||
async with self._lock:
|
||||
slot = self._slots.pop(agent_id, None)
|
||||
|
||||
if slot is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
await slot.runner.cleanup_async()
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning up agent '{agent_id}': {e}")
|
||||
|
||||
logger.info(f"Agent '{agent_id}' unloaded")
|
||||
return True
|
||||
|
||||
def get_agent(self, agent_id: str) -> AgentSlot | None:
|
||||
slot = self._slots.get(agent_id)
|
||||
if slot is _LOADING:
|
||||
return None
|
||||
return slot
|
||||
|
||||
def list_agents(self) -> list[AgentSlot]:
|
||||
return [s for s in self._slots.values() if s is not _LOADING]
|
||||
|
||||
async def shutdown_all(self) -> None:
|
||||
"""Gracefully unload all agents. Called on server shutdown."""
|
||||
agent_ids = list(self._slots.keys())
|
||||
for agent_id in agent_ids:
|
||||
await self.unload_agent(agent_id)
|
||||
logger.info("All agents unloaded")
|
||||
@@ -0,0 +1,168 @@
|
||||
"""aiohttp Application factory for the Hive HTTP API server."""
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
from framework.server.agent_manager import AgentManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def safe_path_segment(value: str) -> str:
|
||||
"""Validate a URL path parameter is a safe filesystem name.
|
||||
|
||||
Raises HTTPBadRequest if the value contains path separators or
|
||||
traversal sequences. aiohttp decodes ``%2F`` inside route params,
|
||||
so a raw ``{session_id}`` can contain ``/`` or ``..`` after decoding.
|
||||
"""
|
||||
if "/" in value or "\\" in value or ".." in value:
|
||||
raise web.HTTPBadRequest(reason="Invalid path parameter")
|
||||
return value
|
||||
|
||||
|
||||
# Allowed CORS origins (localhost on any port)
|
||||
_CORS_ORIGINS = {"http://localhost", "http://127.0.0.1"}
|
||||
|
||||
|
||||
def _is_cors_allowed(origin: str) -> bool:
|
||||
"""Check if origin is localhost/127.0.0.1 on any port."""
|
||||
if not origin:
|
||||
return False
|
||||
for base in _CORS_ORIGINS:
|
||||
if origin == base or origin.startswith(base + ":"):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@web.middleware
|
||||
async def cors_middleware(request: web.Request, handler):
|
||||
"""CORS middleware scoped to localhost origins."""
|
||||
origin = request.headers.get("Origin", "")
|
||||
|
||||
# Handle preflight
|
||||
if request.method == "OPTIONS":
|
||||
response = web.Response(status=204)
|
||||
else:
|
||||
try:
|
||||
response = await handler(request)
|
||||
except web.HTTPException as exc:
|
||||
response = exc
|
||||
|
||||
if _is_cors_allowed(origin):
|
||||
response.headers["Access-Control-Allow-Origin"] = origin
|
||||
response.headers["Access-Control-Allow-Methods"] = "GET, POST, DELETE, OPTIONS"
|
||||
response.headers["Access-Control-Allow-Headers"] = "Content-Type"
|
||||
response.headers["Access-Control-Max-Age"] = "3600"
|
||||
|
||||
return response
|
||||
|
||||
|
||||
@web.middleware
|
||||
async def error_middleware(request: web.Request, handler):
|
||||
"""Catch exceptions and return JSON error responses."""
|
||||
try:
|
||||
return await handler(request)
|
||||
except web.HTTPException:
|
||||
raise # Let aiohttp handle its own HTTP exceptions
|
||||
except Exception as e:
|
||||
logger.exception(f"Unhandled error: {e}")
|
||||
return web.json_response(
|
||||
{"error": str(e), "type": type(e).__name__},
|
||||
status=500,
|
||||
)
|
||||
|
||||
|
||||
async def _on_shutdown(app: web.Application) -> None:
|
||||
"""Gracefully unload all agents on server shutdown."""
|
||||
manager: AgentManager = app["manager"]
|
||||
await manager.shutdown_all()
|
||||
|
||||
|
||||
async def handle_health(request: web.Request) -> web.Response:
|
||||
"""GET /api/health — simple health check."""
|
||||
manager: AgentManager = request.app["manager"]
|
||||
return web.json_response(
|
||||
{
|
||||
"status": "ok",
|
||||
"agents_loaded": len(manager.list_agents()),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def create_app(model: str | None = None) -> web.Application:
|
||||
"""Create and configure the aiohttp Application.
|
||||
|
||||
Args:
|
||||
model: Default LLM model for agent loading.
|
||||
|
||||
Returns:
|
||||
Configured aiohttp Application ready to run.
|
||||
"""
|
||||
app = web.Application(middlewares=[cors_middleware, error_middleware])
|
||||
|
||||
# Store manager on app for handlers
|
||||
app["manager"] = AgentManager(model=model)
|
||||
|
||||
# Register shutdown hook
|
||||
app.on_shutdown.append(_on_shutdown)
|
||||
|
||||
# Health check
|
||||
app.router.add_get("/api/health", handle_health)
|
||||
|
||||
# Register route modules
|
||||
from framework.server.routes_agents import register_routes as register_agent_routes
|
||||
from framework.server.routes_events import register_routes as register_event_routes
|
||||
from framework.server.routes_execution import register_routes as register_execution_routes
|
||||
from framework.server.routes_graphs import register_routes as register_graph_routes
|
||||
from framework.server.routes_logs import register_routes as register_log_routes
|
||||
from framework.server.routes_sessions import register_routes as register_session_routes
|
||||
|
||||
register_agent_routes(app)
|
||||
register_execution_routes(app)
|
||||
register_event_routes(app)
|
||||
register_session_routes(app)
|
||||
register_graph_routes(app)
|
||||
register_log_routes(app)
|
||||
|
||||
# Static file serving — Option C production mode
|
||||
# If frontend/dist/ exists, serve built frontend files on /
|
||||
_setup_static_serving(app)
|
||||
|
||||
return app
|
||||
|
||||
|
||||
def _setup_static_serving(app: web.Application) -> None:
|
||||
"""Serve frontend static files if the dist directory exists."""
|
||||
# Try relative to CWD (repo root) and relative to this file
|
||||
candidates = [
|
||||
Path("frontend/dist"),
|
||||
Path(__file__).resolve().parent.parent.parent.parent / "frontend" / "dist",
|
||||
]
|
||||
|
||||
dist_dir: Path | None = None
|
||||
for candidate in candidates:
|
||||
if candidate.is_dir() and (candidate / "index.html").exists():
|
||||
dist_dir = candidate.resolve()
|
||||
break
|
||||
|
||||
if dist_dir is None:
|
||||
logger.debug("No frontend/dist found — skipping static file serving")
|
||||
return
|
||||
|
||||
logger.info(f"Serving frontend from {dist_dir}")
|
||||
|
||||
async def handle_spa(request: web.Request) -> web.FileResponse:
|
||||
"""Serve static files with SPA fallback to index.html."""
|
||||
rel_path = request.match_info.get("path", "")
|
||||
file_path = dist_dir / rel_path
|
||||
|
||||
if file_path.is_file():
|
||||
return web.FileResponse(file_path)
|
||||
|
||||
# SPA fallback
|
||||
return web.FileResponse(dist_dir / "index.html")
|
||||
|
||||
# Catch-all for SPA — must be registered LAST so /api routes take priority
|
||||
app.router.add_get("/{path:.*}", handle_spa)
|
||||
@@ -0,0 +1,188 @@
|
||||
"""Agent CRUD and discovery routes."""
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
from framework.server.agent_manager import AgentManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_manager(request: web.Request) -> AgentManager:
|
||||
return request.app["manager"]
|
||||
|
||||
|
||||
def _slot_to_dict(slot) -> dict:
|
||||
"""Serialize an AgentSlot to a JSON-friendly dict."""
|
||||
return {
|
||||
"id": slot.id,
|
||||
"agent_path": str(slot.agent_path),
|
||||
"name": slot.info.name,
|
||||
"description": slot.info.description,
|
||||
"goal": slot.info.goal_name,
|
||||
"node_count": slot.info.node_count,
|
||||
"loaded_at": slot.loaded_at,
|
||||
"uptime_seconds": round(time.time() - slot.loaded_at, 1),
|
||||
}
|
||||
|
||||
|
||||
async def handle_discover(request: web.Request) -> web.Response:
|
||||
"""GET /api/discover — discover agents from filesystem."""
|
||||
from framework.tui.screens.agent_picker import discover_agents
|
||||
|
||||
groups = discover_agents()
|
||||
result = {}
|
||||
for category, entries in groups.items():
|
||||
result[category] = [
|
||||
{
|
||||
"path": str(entry.path),
|
||||
"name": entry.name,
|
||||
"description": entry.description,
|
||||
"category": entry.category,
|
||||
"session_count": entry.session_count,
|
||||
"node_count": entry.node_count,
|
||||
"tool_count": entry.tool_count,
|
||||
"tags": entry.tags,
|
||||
}
|
||||
for entry in entries
|
||||
]
|
||||
return web.json_response(result)
|
||||
|
||||
|
||||
async def handle_list_agents(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents — list all loaded agents."""
|
||||
manager = _get_manager(request)
|
||||
agents = [_slot_to_dict(slot) for slot in manager.list_agents()]
|
||||
return web.json_response({"agents": agents})
|
||||
|
||||
|
||||
async def handle_load_agent(request: web.Request) -> web.Response:
|
||||
"""POST /api/agents — load an agent from disk.
|
||||
|
||||
Body: {"agent_path": "...", "agent_id": "...", "model": "..."}
|
||||
"""
|
||||
manager = _get_manager(request)
|
||||
body = await request.json()
|
||||
|
||||
agent_path = body.get("agent_path")
|
||||
if not agent_path:
|
||||
return web.json_response({"error": "agent_path is required"}, status=400)
|
||||
|
||||
agent_id = body.get("agent_id")
|
||||
model = body.get("model")
|
||||
|
||||
try:
|
||||
slot = await manager.load_agent(agent_path, agent_id=agent_id, model=model)
|
||||
except ValueError as e:
|
||||
return web.json_response({"error": str(e)}, status=409)
|
||||
except FileNotFoundError as e:
|
||||
return web.json_response({"error": str(e)}, status=404)
|
||||
except Exception as e:
|
||||
logger.exception(f"Error loading agent: {e}")
|
||||
return web.json_response({"error": str(e)}, status=500)
|
||||
|
||||
return web.json_response(_slot_to_dict(slot), status=201)
|
||||
|
||||
|
||||
async def handle_get_agent(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents/{agent_id} — get agent details."""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
data = _slot_to_dict(slot)
|
||||
|
||||
# Add entry points
|
||||
if slot.runtime:
|
||||
data["entry_points"] = [
|
||||
{
|
||||
"id": ep.id,
|
||||
"name": ep.name,
|
||||
"entry_node": ep.entry_node,
|
||||
"trigger_type": ep.trigger_type,
|
||||
}
|
||||
for ep in slot.runtime.get_entry_points()
|
||||
]
|
||||
data["graphs"] = slot.runtime.list_graphs()
|
||||
|
||||
return web.json_response(data)
|
||||
|
||||
|
||||
async def handle_unload_agent(request: web.Request) -> web.Response:
|
||||
"""DELETE /api/agents/{agent_id} — unload an agent."""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
|
||||
removed = await manager.unload_agent(agent_id)
|
||||
if not removed:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
return web.json_response({"unloaded": agent_id})
|
||||
|
||||
|
||||
async def handle_stats(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents/{agent_id}/stats — runtime statistics."""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
stats = slot.runtime.get_stats() if slot.runtime else {}
|
||||
return web.json_response(stats)
|
||||
|
||||
|
||||
async def handle_entry_points(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents/{agent_id}/entry-points — list entry points."""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
eps = slot.runtime.get_entry_points() if slot.runtime else []
|
||||
return web.json_response(
|
||||
{
|
||||
"entry_points": [
|
||||
{
|
||||
"id": ep.id,
|
||||
"name": ep.name,
|
||||
"entry_node": ep.entry_node,
|
||||
"trigger_type": ep.trigger_type,
|
||||
}
|
||||
for ep in eps
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def handle_graphs(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents/{agent_id}/graphs — list loaded graphs."""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
graphs = slot.runtime.list_graphs() if slot.runtime else []
|
||||
return web.json_response({"graphs": graphs})
|
||||
|
||||
|
||||
def register_routes(app: web.Application) -> None:
|
||||
"""Register agent CRUD routes on the application."""
|
||||
app.router.add_get("/api/discover", handle_discover)
|
||||
app.router.add_get("/api/agents", handle_list_agents)
|
||||
app.router.add_post("/api/agents", handle_load_agent)
|
||||
app.router.add_get("/api/agents/{agent_id}", handle_get_agent)
|
||||
app.router.add_delete("/api/agents/{agent_id}", handle_unload_agent)
|
||||
app.router.add_get("/api/agents/{agent_id}/stats", handle_stats)
|
||||
app.router.add_get("/api/agents/{agent_id}/entry-points", handle_entry_points)
|
||||
app.router.add_get("/api/agents/{agent_id}/graphs", handle_graphs)
|
||||
@@ -0,0 +1,109 @@
|
||||
"""SSE event streaming route."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
from framework.runtime.event_bus import EventType
|
||||
from framework.server.agent_manager import AgentManager
|
||||
from framework.server.sse import SSEResponse
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Default event types streamed to clients
|
||||
DEFAULT_EVENT_TYPES = [
|
||||
EventType.CLIENT_OUTPUT_DELTA,
|
||||
EventType.CLIENT_INPUT_REQUESTED,
|
||||
EventType.LLM_TEXT_DELTA,
|
||||
EventType.TOOL_CALL_STARTED,
|
||||
EventType.TOOL_CALL_COMPLETED,
|
||||
EventType.EXECUTION_STARTED,
|
||||
EventType.EXECUTION_COMPLETED,
|
||||
EventType.EXECUTION_FAILED,
|
||||
EventType.EXECUTION_PAUSED,
|
||||
EventType.NODE_LOOP_STARTED,
|
||||
EventType.NODE_LOOP_COMPLETED,
|
||||
EventType.EDGE_TRAVERSED,
|
||||
EventType.GOAL_PROGRESS,
|
||||
]
|
||||
|
||||
# Keepalive interval in seconds
|
||||
KEEPALIVE_INTERVAL = 15.0
|
||||
|
||||
|
||||
def _parse_event_types(query_param: str | None) -> list[EventType]:
|
||||
"""Parse comma-separated event type names into EventType values.
|
||||
|
||||
Falls back to DEFAULT_EVENT_TYPES if param is empty or invalid.
|
||||
"""
|
||||
if not query_param:
|
||||
return DEFAULT_EVENT_TYPES
|
||||
|
||||
result = []
|
||||
for name in query_param.split(","):
|
||||
name = name.strip()
|
||||
try:
|
||||
result.append(EventType(name))
|
||||
except ValueError:
|
||||
logger.warning(f"Unknown event type filter: {name}")
|
||||
|
||||
return result or DEFAULT_EVENT_TYPES
|
||||
|
||||
|
||||
async def handle_events(request: web.Request) -> web.StreamResponse:
|
||||
"""GET /api/agents/{agent_id}/events — SSE event stream.
|
||||
|
||||
Query params:
|
||||
types: Comma-separated event type names to filter (optional).
|
||||
"""
|
||||
manager: AgentManager = request.app["manager"]
|
||||
agent_id = request.match_info["agent_id"]
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
event_bus = slot.runtime.event_bus
|
||||
event_types = _parse_event_types(request.query.get("types"))
|
||||
|
||||
# Per-client buffer queue
|
||||
queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
|
||||
|
||||
async def on_event(event) -> None:
|
||||
"""Push event dict into queue; drop if full."""
|
||||
try:
|
||||
queue.put_nowait(event.to_dict())
|
||||
except asyncio.QueueFull:
|
||||
pass # Drop oldest-undelivered; client will catch up
|
||||
|
||||
# Subscribe to EventBus
|
||||
sub_id = event_bus.subscribe(
|
||||
event_types=event_types,
|
||||
handler=on_event,
|
||||
)
|
||||
|
||||
sse = SSEResponse()
|
||||
await sse.prepare(request)
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
data = await asyncio.wait_for(queue.get(), timeout=KEEPALIVE_INTERVAL)
|
||||
await sse.send_event(data, event=data.get("type"))
|
||||
except TimeoutError:
|
||||
await sse.send_keepalive()
|
||||
except ConnectionResetError:
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
event_bus.unsubscribe(sub_id)
|
||||
logger.debug(f"SSE client disconnected from agent '{agent_id}'")
|
||||
|
||||
return sse.response
|
||||
|
||||
|
||||
def register_routes(app: web.Application) -> None:
|
||||
"""Register SSE event streaming route."""
|
||||
app.router.add_get("/api/agents/{agent_id}/events", handle_events)
|
||||
@@ -0,0 +1,308 @@
|
||||
"""Execution control routes — trigger, inject, chat, resume, stop, replay."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
from framework.server.agent_manager import AgentManager
|
||||
from framework.server.app import safe_path_segment
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_manager(request: web.Request) -> AgentManager:
|
||||
return request.app["manager"]
|
||||
|
||||
|
||||
def _get_slot_or_404(request: web.Request):
|
||||
"""Lookup agent slot; returns (slot, None) or (None, error_response)."""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
slot = manager.get_agent(agent_id)
|
||||
if slot is None:
|
||||
return None, web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
return slot, None
|
||||
|
||||
|
||||
async def handle_trigger(request: web.Request) -> web.Response:
|
||||
"""POST /api/agents/{agent_id}/trigger — start an execution.
|
||||
|
||||
Body: {"entry_point_id": "default", "input_data": {...}, "session_state": {...}?}
|
||||
"""
|
||||
slot, err = _get_slot_or_404(request)
|
||||
if err:
|
||||
return err
|
||||
|
||||
body = await request.json()
|
||||
entry_point_id = body.get("entry_point_id", "default")
|
||||
input_data = body.get("input_data", {})
|
||||
session_state = body.get("session_state")
|
||||
|
||||
execution_id = await slot.runtime.trigger(
|
||||
entry_point_id,
|
||||
input_data,
|
||||
session_state=session_state,
|
||||
)
|
||||
|
||||
return web.json_response({"execution_id": execution_id})
|
||||
|
||||
|
||||
async def handle_inject(request: web.Request) -> web.Response:
|
||||
"""POST /api/agents/{agent_id}/inject — inject input into a waiting node.
|
||||
|
||||
Body: {"node_id": "...", "content": "...", "graph_id": "..."}
|
||||
"""
|
||||
slot, err = _get_slot_or_404(request)
|
||||
if err:
|
||||
return err
|
||||
|
||||
body = await request.json()
|
||||
node_id = body.get("node_id")
|
||||
content = body.get("content", "")
|
||||
graph_id = body.get("graph_id")
|
||||
|
||||
if not node_id:
|
||||
return web.json_response({"error": "node_id is required"}, status=400)
|
||||
|
||||
delivered = await slot.runtime.inject_input(node_id, content, graph_id=graph_id)
|
||||
return web.json_response({"delivered": delivered})
|
||||
|
||||
|
||||
async def handle_chat(request: web.Request) -> web.Response:
|
||||
"""POST /api/agents/{agent_id}/chat — convenience endpoint.
|
||||
|
||||
Auto-routes: inject if a node is awaiting input, else trigger.
|
||||
Body: {"message": "hello"}
|
||||
"""
|
||||
slot, err = _get_slot_or_404(request)
|
||||
if err:
|
||||
return err
|
||||
|
||||
body = await request.json()
|
||||
message = body.get("message", "")
|
||||
|
||||
if not message:
|
||||
return web.json_response({"error": "message is required"}, status=400)
|
||||
|
||||
# Check if any node is awaiting input
|
||||
node_id, graph_id = slot.runtime.find_awaiting_node()
|
||||
|
||||
if node_id:
|
||||
delivered = await slot.runtime.inject_input(node_id, message, graph_id=graph_id)
|
||||
return web.json_response(
|
||||
{
|
||||
"status": "injected",
|
||||
"node_id": node_id,
|
||||
"delivered": delivered,
|
||||
}
|
||||
)
|
||||
else:
|
||||
execution_id = await slot.runtime.trigger(
|
||||
"default",
|
||||
{"user_request": message},
|
||||
)
|
||||
return web.json_response(
|
||||
{
|
||||
"status": "started",
|
||||
"execution_id": execution_id,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def handle_goal_progress(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents/{agent_id}/goal-progress — evaluate goal progress."""
|
||||
slot, err = _get_slot_or_404(request)
|
||||
if err:
|
||||
return err
|
||||
|
||||
progress = await slot.runtime.get_goal_progress()
|
||||
return web.json_response(progress, dumps=lambda obj: json.dumps(obj, default=str))
|
||||
|
||||
|
||||
def _sessions_dir(slot) -> Path:
|
||||
"""Resolve the sessions directory for an agent slot."""
|
||||
agent_name = slot.agent_path.name
|
||||
return Path.home() / ".hive" / "agents" / agent_name / "sessions"
|
||||
|
||||
|
||||
async def handle_resume(request: web.Request) -> web.Response:
|
||||
"""POST /api/agents/{agent_id}/resume — resume a paused execution.
|
||||
|
||||
Body: {"session_id": "...", "checkpoint_id": "..." (optional)}
|
||||
|
||||
Reads session state (and optionally a checkpoint), builds a
|
||||
session_state dict, and calls runtime.trigger() — same pattern as
|
||||
the TUI /resume command.
|
||||
"""
|
||||
slot, err = _get_slot_or_404(request)
|
||||
if err:
|
||||
return err
|
||||
|
||||
body = await request.json()
|
||||
session_id = body.get("session_id")
|
||||
checkpoint_id = body.get("checkpoint_id")
|
||||
|
||||
if not session_id:
|
||||
return web.json_response({"error": "session_id is required"}, status=400)
|
||||
|
||||
session_id = safe_path_segment(session_id)
|
||||
if checkpoint_id:
|
||||
checkpoint_id = safe_path_segment(checkpoint_id)
|
||||
|
||||
# Read session state
|
||||
session_dir = _sessions_dir(slot) / session_id
|
||||
state_path = session_dir / "state.json"
|
||||
if not state_path.exists():
|
||||
return web.json_response({"error": "Session not found"}, status=404)
|
||||
|
||||
try:
|
||||
state = json.loads(state_path.read_text())
|
||||
except (json.JSONDecodeError, OSError) as e:
|
||||
return web.json_response({"error": f"Failed to read session: {e}"}, status=500)
|
||||
|
||||
if checkpoint_id:
|
||||
# Checkpoint-based recovery (time-travel)
|
||||
resume_session_state = {
|
||||
"resume_session_id": session_id,
|
||||
"resume_from_checkpoint": checkpoint_id,
|
||||
}
|
||||
else:
|
||||
# Session-state resume (same as TUI /resume)
|
||||
progress = state.get("progress", {})
|
||||
paused_at = progress.get("paused_at") or progress.get("resume_from")
|
||||
resume_session_state = {
|
||||
"resume_session_id": session_id,
|
||||
"memory": state.get("memory", {}),
|
||||
"execution_path": progress.get("path", []),
|
||||
"node_visit_counts": progress.get("node_visit_counts", {}),
|
||||
}
|
||||
if paused_at:
|
||||
resume_session_state["paused_at"] = paused_at
|
||||
|
||||
# Get entry point and original input data
|
||||
entry_points = slot.runtime.get_entry_points()
|
||||
if not entry_points:
|
||||
return web.json_response({"error": "No entry points available"}, status=400)
|
||||
|
||||
input_data = state.get("input_data", {})
|
||||
|
||||
execution_id = await slot.runtime.trigger(
|
||||
entry_points[0].id,
|
||||
input_data=input_data,
|
||||
session_state=resume_session_state,
|
||||
)
|
||||
|
||||
return web.json_response(
|
||||
{
|
||||
"execution_id": execution_id,
|
||||
"resumed_from": session_id,
|
||||
"checkpoint_id": checkpoint_id,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def handle_stop(request: web.Request) -> web.Response:
|
||||
"""POST /api/agents/{agent_id}/stop — cancel a running execution.
|
||||
|
||||
Body: {"execution_id": "..."}
|
||||
|
||||
Cancels the execution task via stream.cancel_execution(). The
|
||||
GraphExecutor catches CancelledError, saves state, and marks the
|
||||
session as paused — so the execution is resumable via /resume.
|
||||
Also mounted on /pause as an alias.
|
||||
"""
|
||||
slot, err = _get_slot_or_404(request)
|
||||
if err:
|
||||
return err
|
||||
|
||||
body = await request.json()
|
||||
execution_id = body.get("execution_id")
|
||||
|
||||
if not execution_id:
|
||||
return web.json_response({"error": "execution_id is required"}, status=400)
|
||||
|
||||
# Search all graphs/streams for the execution and cancel it
|
||||
for graph_id in slot.runtime.list_graphs():
|
||||
reg = slot.runtime.get_graph_registration(graph_id)
|
||||
if reg is None:
|
||||
continue
|
||||
for _ep_id, stream in reg.streams.items():
|
||||
cancelled = await stream.cancel_execution(execution_id)
|
||||
if cancelled:
|
||||
return web.json_response(
|
||||
{
|
||||
"stopped": True,
|
||||
"execution_id": execution_id,
|
||||
}
|
||||
)
|
||||
|
||||
return web.json_response({"stopped": False, "error": "Execution not found"}, status=404)
|
||||
|
||||
|
||||
async def handle_replay(request: web.Request) -> web.Response:
|
||||
"""POST /api/agents/{agent_id}/replay — re-run from a checkpoint.
|
||||
|
||||
Body: {"session_id": "...", "checkpoint_id": "..."}
|
||||
|
||||
Loads the specified checkpoint and triggers a new execution using
|
||||
the checkpoint's state. Unlike resume, this creates a fresh
|
||||
execution rather than continuing the paused one.
|
||||
"""
|
||||
slot, err = _get_slot_or_404(request)
|
||||
if err:
|
||||
return err
|
||||
|
||||
body = await request.json()
|
||||
session_id = body.get("session_id")
|
||||
checkpoint_id = body.get("checkpoint_id")
|
||||
|
||||
if not session_id:
|
||||
return web.json_response({"error": "session_id is required"}, status=400)
|
||||
if not checkpoint_id:
|
||||
return web.json_response({"error": "checkpoint_id is required"}, status=400)
|
||||
|
||||
session_id = safe_path_segment(session_id)
|
||||
checkpoint_id = safe_path_segment(checkpoint_id)
|
||||
|
||||
# Verify checkpoint exists
|
||||
cp_path = _sessions_dir(slot) / session_id / "checkpoints" / f"{checkpoint_id}.json"
|
||||
if not cp_path.exists():
|
||||
return web.json_response({"error": "Checkpoint not found"}, status=404)
|
||||
|
||||
entry_points = slot.runtime.get_entry_points()
|
||||
if not entry_points:
|
||||
return web.json_response({"error": "No entry points available"}, status=400)
|
||||
|
||||
# Use checkpoint-based recovery which creates a new execution
|
||||
replay_session_state = {
|
||||
"resume_session_id": session_id,
|
||||
"resume_from_checkpoint": checkpoint_id,
|
||||
}
|
||||
|
||||
execution_id = await slot.runtime.trigger(
|
||||
entry_points[0].id,
|
||||
input_data={},
|
||||
session_state=replay_session_state,
|
||||
)
|
||||
|
||||
return web.json_response(
|
||||
{
|
||||
"execution_id": execution_id,
|
||||
"replayed_from": session_id,
|
||||
"checkpoint_id": checkpoint_id,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def register_routes(app: web.Application) -> None:
|
||||
"""Register execution control routes."""
|
||||
app.router.add_post("/api/agents/{agent_id}/trigger", handle_trigger)
|
||||
app.router.add_post("/api/agents/{agent_id}/inject", handle_inject)
|
||||
app.router.add_post("/api/agents/{agent_id}/chat", handle_chat)
|
||||
app.router.add_post("/api/agents/{agent_id}/pause", handle_stop) # alias
|
||||
app.router.add_post("/api/agents/{agent_id}/resume", handle_resume)
|
||||
app.router.add_post("/api/agents/{agent_id}/stop", handle_stop)
|
||||
app.router.add_post("/api/agents/{agent_id}/replay", handle_replay)
|
||||
app.router.add_get("/api/agents/{agent_id}/goal-progress", handle_goal_progress)
|
||||
@@ -0,0 +1,191 @@
|
||||
"""Graph and node inspection routes — node list, node detail, node criteria."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
from framework.server.agent_manager import AgentManager
|
||||
from framework.server.app import safe_path_segment
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_manager(request: web.Request) -> AgentManager:
|
||||
return request.app["manager"]
|
||||
|
||||
|
||||
def _get_graph_spec(slot, graph_id: str):
|
||||
"""Get GraphSpec for a graph_id. Returns (graph_spec, None) or (None, error_response)."""
|
||||
reg = slot.runtime.get_graph_registration(graph_id)
|
||||
if reg is None:
|
||||
return None, web.json_response({"error": f"Graph '{graph_id}' not found"}, status=404)
|
||||
return reg.graph, None
|
||||
|
||||
|
||||
def _node_to_dict(node) -> dict:
|
||||
"""Serialize a NodeSpec to a JSON-friendly dict."""
|
||||
return {
|
||||
"id": node.id,
|
||||
"name": node.name,
|
||||
"description": node.description,
|
||||
"node_type": node.node_type,
|
||||
"input_keys": node.input_keys,
|
||||
"output_keys": node.output_keys,
|
||||
"nullable_output_keys": node.nullable_output_keys,
|
||||
"tools": node.tools,
|
||||
"routes": node.routes,
|
||||
"max_retries": node.max_retries,
|
||||
"max_node_visits": node.max_node_visits,
|
||||
"client_facing": node.client_facing,
|
||||
"success_criteria": node.success_criteria,
|
||||
}
|
||||
|
||||
|
||||
async def handle_list_nodes(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents/{agent_id}/graphs/{graph_id}/nodes — list nodes.
|
||||
|
||||
Returns all nodes in the graph with their static spec. If a session_id
|
||||
query param is provided, enriches each node with runtime status from
|
||||
the session's progress data.
|
||||
"""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
graph_id = request.match_info["graph_id"]
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
graph, err = _get_graph_spec(slot, graph_id)
|
||||
if err:
|
||||
return err
|
||||
|
||||
nodes = [_node_to_dict(n) for n in graph.nodes]
|
||||
|
||||
# Optionally enrich with session progress
|
||||
session_id = request.query.get("session_id")
|
||||
if session_id:
|
||||
session_id = safe_path_segment(session_id)
|
||||
from pathlib import Path
|
||||
|
||||
state_path = (
|
||||
Path.home()
|
||||
/ ".hive"
|
||||
/ "agents"
|
||||
/ slot.agent_path.name
|
||||
/ "sessions"
|
||||
/ session_id
|
||||
/ "state.json"
|
||||
)
|
||||
if state_path.exists():
|
||||
try:
|
||||
state = json.loads(state_path.read_text())
|
||||
progress = state.get("progress", {})
|
||||
visit_counts = progress.get("node_visit_counts", {})
|
||||
failures = progress.get("nodes_with_failures", [])
|
||||
current = progress.get("current_node")
|
||||
path = progress.get("path", [])
|
||||
|
||||
for node in nodes:
|
||||
nid = node["id"]
|
||||
node["visit_count"] = visit_counts.get(nid, 0)
|
||||
node["has_failures"] = nid in failures
|
||||
node["is_current"] = nid == current
|
||||
node["in_path"] = nid in path
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass # Skip enrichment on error
|
||||
|
||||
return web.json_response({"nodes": nodes})
|
||||
|
||||
|
||||
async def handle_get_node(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents/{agent_id}/graphs/{graph_id}/nodes/{node_id} — node detail."""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
graph_id = request.match_info["graph_id"]
|
||||
node_id = request.match_info["node_id"]
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
graph, err = _get_graph_spec(slot, graph_id)
|
||||
if err:
|
||||
return err
|
||||
|
||||
node_spec = graph.get_node(node_id)
|
||||
if node_spec is None:
|
||||
return web.json_response({"error": f"Node '{node_id}' not found"}, status=404)
|
||||
|
||||
data = _node_to_dict(node_spec)
|
||||
|
||||
# Include edges originating from this node
|
||||
edges = [
|
||||
{"target": e.target, "condition": e.condition, "priority": e.priority}
|
||||
for e in graph.edges
|
||||
if e.source == node_id
|
||||
]
|
||||
data["edges"] = edges
|
||||
|
||||
return web.json_response(data)
|
||||
|
||||
|
||||
async def handle_node_criteria(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents/{agent_id}/graphs/{graph_id}/nodes/{node_id}/criteria
|
||||
|
||||
Returns the success criteria for a node plus any judge verdicts from
|
||||
logs (if session_id is provided).
|
||||
"""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
graph_id = request.match_info["graph_id"]
|
||||
node_id = request.match_info["node_id"]
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
graph, err = _get_graph_spec(slot, graph_id)
|
||||
if err:
|
||||
return err
|
||||
|
||||
node_spec = graph.get_node(node_id)
|
||||
if node_spec is None:
|
||||
return web.json_response({"error": f"Node '{node_id}' not found"}, status=404)
|
||||
|
||||
result: dict = {
|
||||
"node_id": node_id,
|
||||
"success_criteria": node_spec.success_criteria,
|
||||
"output_keys": node_spec.output_keys,
|
||||
}
|
||||
|
||||
# If session_id provided, look for judge verdicts in logs
|
||||
session_id = request.query.get("session_id")
|
||||
if session_id:
|
||||
log_store = getattr(slot.runtime, "_runtime_log_store", None)
|
||||
if log_store:
|
||||
details = await log_store.load_details(session_id)
|
||||
if details:
|
||||
node_details = [n for n in details.nodes if n.node_id == node_id]
|
||||
if node_details:
|
||||
latest = node_details[-1]
|
||||
result["last_execution"] = {
|
||||
"success": latest.success,
|
||||
"error": latest.error,
|
||||
"retry_count": latest.retry_count,
|
||||
"needs_attention": latest.needs_attention,
|
||||
"attention_reasons": latest.attention_reasons,
|
||||
}
|
||||
|
||||
return web.json_response(result, dumps=lambda obj: json.dumps(obj, default=str))
|
||||
|
||||
|
||||
def register_routes(app: web.Application) -> None:
|
||||
"""Register graph/node inspection routes."""
|
||||
app.router.add_get("/api/agents/{agent_id}/graphs/{graph_id}/nodes", handle_list_nodes)
|
||||
app.router.add_get("/api/agents/{agent_id}/graphs/{graph_id}/nodes/{node_id}", handle_get_node)
|
||||
app.router.add_get(
|
||||
"/api/agents/{agent_id}/graphs/{graph_id}/nodes/{node_id}/criteria",
|
||||
handle_node_criteria,
|
||||
)
|
||||
@@ -0,0 +1,134 @@
|
||||
"""Log and observability routes — agent logs, node-scoped logs."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
from framework.server.agent_manager import AgentManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_manager(request: web.Request) -> AgentManager:
|
||||
return request.app["manager"]
|
||||
|
||||
|
||||
def _storage_path(slot) -> Path:
|
||||
"""Resolve the storage root for an agent slot."""
|
||||
agent_name = slot.agent_path.name
|
||||
return Path.home() / ".hive" / "agents" / agent_name
|
||||
|
||||
|
||||
async def handle_logs(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents/{agent_id}/logs — agent-level logs.
|
||||
|
||||
Query params:
|
||||
session_id: Scope to a specific session (optional).
|
||||
level: "summary" | "details" | "tools" (default: "summary").
|
||||
limit: Max results when listing summaries (default: 20).
|
||||
|
||||
Without session_id: returns list of run summaries.
|
||||
With session_id: returns the requested log level for that session.
|
||||
"""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
log_store = getattr(slot.runtime, "_runtime_log_store", None)
|
||||
if log_store is None:
|
||||
return web.json_response({"error": "Logging not enabled for this agent"}, status=404)
|
||||
|
||||
session_id = request.query.get("session_id")
|
||||
level = request.query.get("level", "summary")
|
||||
limit = int(request.query.get("limit", "20"))
|
||||
|
||||
if not session_id:
|
||||
# List run summaries across all sessions
|
||||
summaries = await log_store.list_runs(limit=limit)
|
||||
return web.json_response(
|
||||
{"logs": [s.model_dump() for s in summaries]},
|
||||
dumps=lambda obj: json.dumps(obj, default=str),
|
||||
)
|
||||
|
||||
# Scoped to a specific session
|
||||
if level == "details":
|
||||
details = await log_store.load_details(session_id)
|
||||
if details is None:
|
||||
return web.json_response({"error": "No detail logs found"}, status=404)
|
||||
return web.json_response(
|
||||
{"session_id": session_id, "nodes": [n.model_dump() for n in details.nodes]},
|
||||
dumps=lambda obj: json.dumps(obj, default=str),
|
||||
)
|
||||
elif level == "tools":
|
||||
tool_logs = await log_store.load_tool_logs(session_id)
|
||||
if tool_logs is None:
|
||||
return web.json_response({"error": "No tool logs found"}, status=404)
|
||||
return web.json_response(
|
||||
{"session_id": session_id, "steps": [s.model_dump() for s in tool_logs.steps]},
|
||||
dumps=lambda obj: json.dumps(obj, default=str),
|
||||
)
|
||||
else:
|
||||
# Default: summary
|
||||
summary = await log_store.load_summary(session_id)
|
||||
if summary is None:
|
||||
return web.json_response({"error": "No summary log found"}, status=404)
|
||||
return web.json_response(
|
||||
summary.model_dump(),
|
||||
dumps=lambda obj: json.dumps(obj, default=str),
|
||||
)
|
||||
|
||||
|
||||
async def handle_node_logs(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents/{agent_id}/graphs/{graph_id}/nodes/{node_id}/logs
|
||||
|
||||
Node-scoped logs. Returns detail and tool log entries filtered to the
|
||||
specified node.
|
||||
|
||||
Query params:
|
||||
session_id: Required — which session's logs to read.
|
||||
level: "details" | "tools" | "all" (default: "all").
|
||||
"""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
node_id = request.match_info["node_id"]
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
log_store = getattr(slot.runtime, "_runtime_log_store", None)
|
||||
if log_store is None:
|
||||
return web.json_response({"error": "Logging not enabled"}, status=404)
|
||||
|
||||
session_id = request.query.get("session_id")
|
||||
if not session_id:
|
||||
return web.json_response({"error": "session_id query param is required"}, status=400)
|
||||
|
||||
level = request.query.get("level", "all")
|
||||
result: dict = {"session_id": session_id, "node_id": node_id}
|
||||
|
||||
if level in ("details", "all"):
|
||||
details = await log_store.load_details(session_id)
|
||||
if details:
|
||||
result["details"] = [n.model_dump() for n in details.nodes if n.node_id == node_id]
|
||||
|
||||
if level in ("tools", "all"):
|
||||
tool_logs = await log_store.load_tool_logs(session_id)
|
||||
if tool_logs:
|
||||
result["tool_logs"] = [s.model_dump() for s in tool_logs.steps if s.node_id == node_id]
|
||||
|
||||
return web.json_response(result, dumps=lambda obj: json.dumps(obj, default=str))
|
||||
|
||||
|
||||
def register_routes(app: web.Application) -> None:
|
||||
"""Register log routes."""
|
||||
app.router.add_get("/api/agents/{agent_id}/logs", handle_logs)
|
||||
app.router.add_get(
|
||||
"/api/agents/{agent_id}/graphs/{graph_id}/nodes/{node_id}/logs",
|
||||
handle_node_logs,
|
||||
)
|
||||
@@ -0,0 +1,260 @@
|
||||
"""Session browsing routes — list, inspect, delete, restore, messages."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
from framework.server.agent_manager import AgentManager
|
||||
from framework.server.app import safe_path_segment
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_manager(request: web.Request) -> AgentManager:
|
||||
return request.app["manager"]
|
||||
|
||||
|
||||
def _sessions_dir(slot) -> Path:
|
||||
"""Resolve the sessions directory for an agent slot.
|
||||
|
||||
Storage layout: ~/.hive/agents/{agent_name}/sessions/
|
||||
"""
|
||||
agent_name = slot.agent_path.name
|
||||
return Path.home() / ".hive" / "agents" / agent_name / "sessions"
|
||||
|
||||
|
||||
async def handle_list_sessions(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents/{agent_id}/sessions — list sessions."""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
sess_dir = _sessions_dir(slot)
|
||||
if not sess_dir.exists():
|
||||
return web.json_response({"sessions": []})
|
||||
|
||||
sessions = []
|
||||
for d in sorted(sess_dir.iterdir(), reverse=True):
|
||||
if not d.is_dir() or not d.name.startswith("session_"):
|
||||
continue
|
||||
|
||||
entry: dict = {"session_id": d.name}
|
||||
|
||||
state_path = d / "state.json"
|
||||
if state_path.exists():
|
||||
try:
|
||||
state = json.loads(state_path.read_text())
|
||||
entry["status"] = state.get("status", "unknown")
|
||||
entry["started_at"] = state.get("started_at")
|
||||
entry["completed_at"] = state.get("completed_at")
|
||||
progress = state.get("progress", {})
|
||||
entry["steps"] = progress.get("steps_executed", 0)
|
||||
entry["paused_at"] = progress.get("paused_at")
|
||||
except (json.JSONDecodeError, OSError):
|
||||
entry["status"] = "error"
|
||||
|
||||
# Count checkpoints
|
||||
cp_dir = d / "checkpoints"
|
||||
if cp_dir.exists():
|
||||
entry["checkpoint_count"] = sum(1 for f in cp_dir.iterdir() if f.suffix == ".json")
|
||||
else:
|
||||
entry["checkpoint_count"] = 0
|
||||
|
||||
sessions.append(entry)
|
||||
|
||||
return web.json_response({"sessions": sessions})
|
||||
|
||||
|
||||
async def handle_get_session(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents/{agent_id}/sessions/{session_id} — session detail."""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
session_id = safe_path_segment(request.match_info["session_id"])
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
state_path = _sessions_dir(slot) / session_id / "state.json"
|
||||
if not state_path.exists():
|
||||
return web.json_response({"error": "Session not found"}, status=404)
|
||||
|
||||
try:
|
||||
state = json.loads(state_path.read_text())
|
||||
except (json.JSONDecodeError, OSError) as e:
|
||||
return web.json_response({"error": f"Failed to read session: {e}"}, status=500)
|
||||
|
||||
return web.json_response(state)
|
||||
|
||||
|
||||
async def handle_list_checkpoints(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents/{agent_id}/sessions/{session_id}/checkpoints"""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
session_id = safe_path_segment(request.match_info["session_id"])
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
cp_dir = _sessions_dir(slot) / session_id / "checkpoints"
|
||||
if not cp_dir.exists():
|
||||
return web.json_response({"checkpoints": []})
|
||||
|
||||
checkpoints = []
|
||||
for f in sorted(cp_dir.iterdir(), reverse=True):
|
||||
if f.suffix != ".json":
|
||||
continue
|
||||
try:
|
||||
data = json.loads(f.read_text())
|
||||
checkpoints.append(
|
||||
{
|
||||
"checkpoint_id": f.stem,
|
||||
"current_node": data.get("current_node"),
|
||||
"next_node": data.get("next_node"),
|
||||
"is_clean": data.get("is_clean", False),
|
||||
"timestamp": data.get("timestamp"),
|
||||
}
|
||||
)
|
||||
except (json.JSONDecodeError, OSError):
|
||||
checkpoints.append({"checkpoint_id": f.stem, "error": "unreadable"})
|
||||
|
||||
return web.json_response({"checkpoints": checkpoints})
|
||||
|
||||
|
||||
async def handle_delete_session(request: web.Request) -> web.Response:
|
||||
"""DELETE /api/agents/{agent_id}/sessions/{session_id} — delete a session."""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
session_id = safe_path_segment(request.match_info["session_id"])
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
session_path = _sessions_dir(slot) / session_id
|
||||
if not session_path.exists():
|
||||
return web.json_response({"error": "Session not found"}, status=404)
|
||||
|
||||
shutil.rmtree(session_path)
|
||||
return web.json_response({"deleted": session_id})
|
||||
|
||||
|
||||
async def handle_restore_checkpoint(request: web.Request) -> web.Response:
|
||||
"""POST /api/agents/{agent_id}/sessions/{session_id}/checkpoints/{checkpoint_id}/restore
|
||||
|
||||
Restore execution from a specific checkpoint. Triggers a new execution
|
||||
using the checkpoint state — same mechanism as the replay endpoint but
|
||||
scoped under the session/checkpoint path.
|
||||
"""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
session_id = safe_path_segment(request.match_info["session_id"])
|
||||
checkpoint_id = safe_path_segment(request.match_info["checkpoint_id"])
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
# Verify checkpoint exists
|
||||
cp_path = _sessions_dir(slot) / session_id / "checkpoints" / f"{checkpoint_id}.json"
|
||||
if not cp_path.exists():
|
||||
return web.json_response({"error": "Checkpoint not found"}, status=404)
|
||||
|
||||
entry_points = slot.runtime.get_entry_points()
|
||||
if not entry_points:
|
||||
return web.json_response({"error": "No entry points available"}, status=400)
|
||||
|
||||
restore_session_state = {
|
||||
"resume_session_id": session_id,
|
||||
"resume_from_checkpoint": checkpoint_id,
|
||||
}
|
||||
|
||||
execution_id = await slot.runtime.trigger(
|
||||
entry_points[0].id,
|
||||
input_data={},
|
||||
session_state=restore_session_state,
|
||||
)
|
||||
|
||||
return web.json_response(
|
||||
{
|
||||
"execution_id": execution_id,
|
||||
"restored_from": session_id,
|
||||
"checkpoint_id": checkpoint_id,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def handle_messages(request: web.Request) -> web.Response:
|
||||
"""GET /api/agents/{agent_id}/sessions/{session_id}/messages
|
||||
|
||||
Retrieve chat message history for a session. Reads per-node conversation
|
||||
files, merges by sequence number, and returns a unified message list.
|
||||
|
||||
Query params:
|
||||
node_id: Scope to a specific node's conversation (optional).
|
||||
"""
|
||||
manager = _get_manager(request)
|
||||
agent_id = request.match_info["agent_id"]
|
||||
session_id = safe_path_segment(request.match_info["session_id"])
|
||||
slot = manager.get_agent(agent_id)
|
||||
|
||||
if slot is None:
|
||||
return web.json_response({"error": f"Agent '{agent_id}' not found"}, status=404)
|
||||
|
||||
convs_dir = _sessions_dir(slot) / session_id / "conversations"
|
||||
if not convs_dir.exists():
|
||||
return web.json_response({"messages": []})
|
||||
|
||||
filter_node = request.query.get("node_id")
|
||||
all_messages = []
|
||||
|
||||
for node_dir in convs_dir.iterdir():
|
||||
if not node_dir.is_dir():
|
||||
continue
|
||||
if filter_node and node_dir.name != filter_node:
|
||||
continue
|
||||
|
||||
parts_dir = node_dir / "parts"
|
||||
if not parts_dir.exists():
|
||||
continue
|
||||
|
||||
for part_file in sorted(parts_dir.iterdir()):
|
||||
if part_file.suffix != ".json":
|
||||
continue
|
||||
try:
|
||||
part = json.loads(part_file.read_text())
|
||||
part["_node_id"] = node_dir.name
|
||||
all_messages.append(part)
|
||||
except (json.JSONDecodeError, OSError):
|
||||
continue
|
||||
|
||||
# Sort by sequence number
|
||||
all_messages.sort(key=lambda m: m.get("seq", 0))
|
||||
|
||||
return web.json_response({"messages": all_messages})
|
||||
|
||||
|
||||
def register_routes(app: web.Application) -> None:
|
||||
"""Register session browsing routes."""
|
||||
app.router.add_get("/api/agents/{agent_id}/sessions", handle_list_sessions)
|
||||
app.router.add_get("/api/agents/{agent_id}/sessions/{session_id}", handle_get_session)
|
||||
app.router.add_delete("/api/agents/{agent_id}/sessions/{session_id}", handle_delete_session)
|
||||
app.router.add_get(
|
||||
"/api/agents/{agent_id}/sessions/{session_id}/checkpoints",
|
||||
handle_list_checkpoints,
|
||||
)
|
||||
app.router.add_post(
|
||||
"/api/agents/{agent_id}/sessions/{session_id}/checkpoints/{checkpoint_id}/restore",
|
||||
handle_restore_checkpoint,
|
||||
)
|
||||
app.router.add_get(
|
||||
"/api/agents/{agent_id}/sessions/{session_id}/messages",
|
||||
handle_messages,
|
||||
)
|
||||
@@ -0,0 +1,73 @@
|
||||
"""Server-Sent Events helper wrapping aiohttp StreamResponse."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SSEResponse:
|
||||
"""Thin wrapper around aiohttp StreamResponse for SSE streaming.
|
||||
|
||||
Usage:
|
||||
sse = SSEResponse()
|
||||
await sse.prepare(request)
|
||||
await sse.send_event({"key": "value"}, event="update")
|
||||
await sse.send_keepalive()
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._response: web.StreamResponse | None = None
|
||||
|
||||
async def prepare(self, request: web.Request) -> web.StreamResponse:
|
||||
"""Prepare the SSE response with correct headers."""
|
||||
self._response = web.StreamResponse(
|
||||
status=200,
|
||||
headers={
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
)
|
||||
await self._response.prepare(request)
|
||||
return self._response
|
||||
|
||||
async def send_event(
|
||||
self,
|
||||
data: dict,
|
||||
event: str | None = None,
|
||||
id: str | None = None,
|
||||
) -> None:
|
||||
"""Serialize and send an SSE event.
|
||||
|
||||
Args:
|
||||
data: JSON-serializable dict to send as the data field.
|
||||
event: Optional SSE event type.
|
||||
id: Optional SSE event id.
|
||||
"""
|
||||
if self._response is None:
|
||||
raise RuntimeError("SSEResponse not prepared; call prepare() first")
|
||||
|
||||
parts: list[str] = []
|
||||
if id is not None:
|
||||
parts.append(f"id: {id}\n")
|
||||
if event is not None:
|
||||
parts.append(f"event: {event}\n")
|
||||
payload = json.dumps(data, default=str)
|
||||
parts.append(f"data: {payload}\n")
|
||||
parts.append("\n")
|
||||
|
||||
await self._response.write("".join(parts).encode("utf-8"))
|
||||
|
||||
async def send_keepalive(self) -> None:
|
||||
"""Send an SSE comment as a keepalive heartbeat."""
|
||||
if self._response is None:
|
||||
raise RuntimeError("SSEResponse not prepared; call prepare() first")
|
||||
await self._response.write(b": keepalive\n\n")
|
||||
|
||||
@property
|
||||
def response(self) -> web.StreamResponse | None:
|
||||
return self._response
|
||||
File diff suppressed because it is too large
Load Diff
@@ -21,6 +21,7 @@ dependencies = [
|
||||
[project.optional-dependencies]
|
||||
tui = ["textual>=0.75.0"]
|
||||
webhook = ["aiohttp>=3.9.0"]
|
||||
server = ["aiohttp>=3.9.0"]
|
||||
|
||||
[project.scripts]
|
||||
hive = "framework.cli:main"
|
||||
|
||||
@@ -783,6 +783,9 @@ dependencies = [
|
||||
]
|
||||
|
||||
[package.optional-dependencies]
|
||||
server = [
|
||||
{ name = "aiohttp" },
|
||||
]
|
||||
tui = [
|
||||
{ name = "textual" },
|
||||
]
|
||||
@@ -798,6 +801,7 @@ dev = [
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [
|
||||
{ name = "aiohttp", marker = "extra == 'server'", specifier = ">=3.9.0" },
|
||||
{ name = "aiohttp", marker = "extra == 'webhook'", specifier = ">=3.9.0" },
|
||||
{ name = "anthropic", specifier = ">=0.40.0" },
|
||||
{ name = "fastmcp", specifier = ">=2.0.0" },
|
||||
@@ -812,7 +816,7 @@ requires-dist = [
|
||||
{ name = "textual", marker = "extra == 'tui'", specifier = ">=0.75.0" },
|
||||
{ name = "tools", editable = "tools" },
|
||||
]
|
||||
provides-extras = ["tui", "webhook"]
|
||||
provides-extras = ["tui", "webhook", "server"]
|
||||
|
||||
[package.metadata.requires-dev]
|
||||
dev = [
|
||||
|
||||
Reference in New Issue
Block a user