Compare commits

...

36 Commits

Author SHA1 Message Date
Timothy 384a1f0560 fix: credential loading 2026-02-24 18:40:39 -08:00
Timothy 72a31c2a65 fix: credential validity, update api readme 2026-02-24 18:11:10 -08:00
bryan e68695ee92 merge 2026-02-24 17:43:29 -08:00
Timothy 6d102382bd fix: session id issues 2026-02-24 17:42:09 -08:00
bryan 56335927e7 change from agentid to session id 2026-02-24 15:53:14 -08:00
Timothy a3fe994b22 fix: remove duplicative queen session starter api 2026-02-24 15:14:02 -08:00
Timothy 5754bdcc78 Merge branch 'feature/session-manager' into feat/open-hive 2026-02-24 15:01:01 -08:00
Timothy eef2fa9ffb feature: session manager, superceding agent manager 2026-02-24 15:00:09 -08:00
bryan 7286907cd4 multiple agent session running 2026-02-24 14:56:24 -08:00
Timothy 1fbb431f1b Merge branch 'fix/globalize-queen-judge' into feat/open-hive 2026-02-24 13:28:29 -08:00
Timothy 0ad52b90d8 fix: globalize queen and judge agent's storage 2026-02-24 13:27:33 -08:00
bryan c44b12cc8b remove subgraph, persistent tabs, node action plan 2026-02-24 12:42:07 -08:00
Timothy 8381c95617 Merge branch 'fix/session-loading-isolation' into feat/open-hive 2026-02-24 11:18:48 -08:00
Timothy 3963855d1d fix: isolate session loading 2026-02-24 11:02:58 -08:00
bryan 7a7ece1805 markdown support, removed subgraph, stop button 2026-02-24 10:40:24 -08:00
bryan 28a71b70a8 readme for http apis 2026-02-24 09:22:56 -08:00
bryan 33d3a13fde Merge branch 'feature/concurrent-judge-runtime' into feat/open-hive 2026-02-24 09:11:42 -08:00
bryan 5ea278a08d integrated queen, worker, judge 2026-02-24 09:09:28 -08:00
bryan c1d5952ad9 Merge branch 'feature/concurrent-judge-runtime' into feat/open-hive 2026-02-24 08:07:31 -08:00
bryan 72673e12fb remove mock data 2026-02-24 08:02:08 -08:00
bryan 3951ee1a7d Merge branch 'main' into feat/open-hive 2026-02-24 07:28:42 -08:00
bryan 1afde51c7b additional graph update 2026-02-24 07:28:11 -08:00
bryan cbeef18f0a wip graph 2026-02-24 07:27:48 -08:00
bryan ecb038c955 chat now creates multiple chats msgs 2026-02-23 19:07:54 -08:00
bryan 96084fea16 wip chat 2026-02-23 17:41:12 -08:00
bryan de9226aae0 credentials 2026-02-23 14:11:16 -08:00
bryan 08819b1609 Merge branch 'main' into feat/open-hive 2026-02-23 11:13:32 -08:00
bryan fcfaca6bd0 Merge branch 'main' into feat/open-hive 2026-02-22 19:50:39 -08:00
bryan 4c7d9ab0fb added click cursor and rename dashboard to workspace 2026-02-22 19:21:37 -08:00
bryan 061aec4b3d my agents configured 2026-02-22 19:04:48 -08:00
bryan 30bd0e483a home page and mock chatroom 2026-02-22 18:03:02 -08:00
bryan 2f31a92d31 Merge branch 'main' into feat/open-hive 2026-02-22 16:06:44 -08:00
bryan 35738c8279 react structure 2026-02-22 14:52:15 -08:00
bryan 6661934fed harden server apis and agent loading 2026-02-20 18:28:52 -08:00
bryan 263d35bbd6 Merge branch 'main' into feat/open-hive 2026-02-20 18:09:01 -08:00
bryan d4788e147a backend apis for open hive 2026-02-20 18:01:51 -08:00
87 changed files with 15537 additions and 19161 deletions
+1
View File
@@ -46,6 +46,7 @@ coverage/
# TypeScript
*.tsbuildinfo
vite.config.d.ts
# Python
__pycache__/
+7 -1
View File
@@ -1,4 +1,4 @@
.PHONY: lint format check test install-hooks help
.PHONY: lint format check test install-hooks help frontend-dev frontend-build
help: ## Show this help
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | \
@@ -26,3 +26,9 @@ test: ## Run all tests
install-hooks: ## Install pre-commit hooks
uv pip install pre-commit
pre-commit install
frontend-dev: ## Start frontend dev server
cd core/frontend && npm run dev
frontend-build: ## Build frontend for production
cd core/frontend && npm run build
@@ -13,6 +13,7 @@ from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import coder_node, queen_node
# ticket_receiver is no longer needed — the queen runs as an independent
# GraphExecutor and receives escalation tickets via inject_event().
# Keeping the import commented for reference:
@@ -36,7 +36,7 @@ Analyze imports, structure, and style in reference agents.
- **Verify assumptions.** Never assume a class, import, or pattern \
exists. Read actual source to confirm. Search if unsure.
- **Discover tools dynamically.** NEVER reference tools from static \
docs. Always run discover_mcp_tools() to see what actually exists.
docs. Always run list_agent_tools() to see what actually exists.
- **Professional objectivity.** If a use case is a poor fit for the \
framework, say so. Technical accuracy over validation.
- **Concise.** No emojis. No preambles. No postambles. Substance only.
@@ -55,8 +55,12 @@ errors yourself. Don't declare success until validation passes.
- undo_changes(path?) restore from git snapshot
## Meta-Agent
- list_agent_tools(server_config_path?) list all tool names available \
for agent building, grouped by category. Call this FIRST before designing.
- discover_mcp_tools(server_config_path?) connect to MCP servers \
and list all available tools with full schemas. Default: hive-tools.
and list all available tools with full schemas. Use for parameter details.
- validate_agent_tools(agent_path) validate that all tools declared \
in an agent's nodes actually exist. Call after building.
- list_agents() list all agent packages in exports/ with session counts
- list_agent_sessions(agent_name, status?, limit?) list sessions
- get_agent_session_state(agent_name, session_id) full session state
@@ -71,14 +75,15 @@ You are not just a file writer. You have deep integration with the \
Hive framework:
## Tool Discovery (MANDATORY before designing)
Before designing any agent, run discover_mcp_tools() to see what \
tools are actually available from the hive-tools MCP server. This \
returns full schemas with parameter names, types, and descriptions. \
NEVER guess tool names or parameters from memory. The tool catalog \
is the ground truth.
Before designing any agent, run list_agent_tools() to get all \
available tool names. ONLY use tools from this list in your node \
definitions. NEVER guess or fabricate tool names from memory.
To check a specific agent's tools:
discover_mcp_tools("exports/{agent_name}/mcp_servers.json")
For full parameter schemas when you need details:
discover_mcp_tools()
To check a specific agent's configured tools:
list_agent_tools("exports/{agent_name}/mcp_servers.json")
## Agent Awareness
Run list_agents() to see what agents already exist. Read their code \
@@ -116,7 +121,7 @@ Ask only what you CANNOT infer. Fill blanks with domain knowledge.
## 2. Qualify
Assess framework fit honestly. Run discover_mcp_tools() to check \
Assess framework fit honestly. Run list_agent_tools() to check \
what tools exist. Read the framework guide:
read_file("core/framework/agents/hive_coder/reference/framework_guide.md")
@@ -277,7 +282,7 @@ STEP 2 — After user responds, call set_output:
**Tools** NEVER fabricate tool names. Common hallucinations: \
csv_read, csv_write, csv_append, file_upload, database_query. \
If discover_mcp_tools() shows these don't exist, use alternatives \
If list_agent_tools() shows these don't exist, use alternatives \
(e.g. save_data/load_data for data persistence).
**Node rules**:
@@ -322,7 +327,7 @@ triggers, use `AsyncEntryPointSpec` (from framework.graph.edge) and \
## 5. Verify
Run THREE validation steps after writing. All must pass:
Run FOUR validation steps after writing. All must pass:
**Step A Class validation** (checks graph structure):
```
@@ -341,7 +346,15 @@ This catches missing __init__.py exports, bad conversation_mode, \
invalid loop_config, and unreachable nodes. If Step A passes but \
Step B fails, the problem is in __init__.py exports.
**Step C Run tests:**
**Step C Tool validation** (checks that declared tools actually exist \
in the agent's MCP servers — catches hallucinated tool names):
```
validate_agent_tools("exports/{name}")
```
If any tools are missing: fix the node definitions to use only tools \
that exist. Run list_agent_tools() to see what's available.
**Step D Run tests:**
```
run_agent_tests("{name}")
```
@@ -374,22 +387,20 @@ set_output until the user is done.
## 7. Live Test (optional)
After the user approves, offer to load and run the agent in-session. \
This runs it alongside you.
After the user approves, offer to load and run the agent in-session.
If running with a queen (server/frontend):
```
load_built_agent("exports/{name}") # loads as the session worker
```
The frontend updates automatically the user sees the agent's graph, \
the tab renames, and you can delegate via start_worker(task).
If running standalone (TUI):
```
load_agent("exports/{name}") # registers as secondary graph
start_agent("{name}") # triggers default entry point
```
You can also:
- `list_agents()` see all loaded graphs and status
- `restart_agent("{name}")` then `load_agent` pick up code changes
- `unload_agent("{name}")` remove it from the session
- `get_user_presence()` check if user is around
The agent runs in a shared session: it can read memory you've set and \
its outputs are visible to you.
""",
tools=[
"read_file",
@@ -400,7 +411,9 @@ its outputs are visible to you.
"run_command",
"undo_changes",
# Meta-agent tools
"list_agent_tools",
"discover_mcp_tools",
"validate_agent_tools",
"list_agents",
"list_agent_sessions",
"get_agent_session_state",
@@ -426,7 +439,7 @@ ticket_triage_node = NodeSpec(
"via event-driven entry point and decides: dismiss or notify the operator."
),
node_type="event_loop",
client_facing=True, # Operator can chat with queen once connected (Ctrl+Q)
client_facing=True, # Operator can chat with queen once connected (Ctrl+Q)
max_node_visits=0,
input_keys=["ticket"],
output_keys=["intervention_decision"],
@@ -500,6 +513,7 @@ queen_node = NodeSpec(
"undo_changes",
# Meta-agent (from coder-tools MCP)
"discover_mcp_tools",
"validate_agent_tools",
"list_agents",
"list_agent_sessions",
"get_agent_session_state",
@@ -515,6 +529,8 @@ queen_node = NodeSpec(
# Monitoring
"get_worker_health_summary",
"notify_operator",
# Agent loading
"load_built_agent",
],
system_prompt="""\
You are the Queen the user's primary interface. You are a coding agent \
@@ -530,7 +546,7 @@ Analyze imports, structure, and style in reference agents.
- **Verify assumptions.** Never assume a class, import, or pattern \
exists. Read actual source to confirm. Search if unsure.
- **Discover tools dynamically.** NEVER reference tools from static \
docs. Always run discover_mcp_tools() to see what actually exists.
docs. Always run list_agent_tools() to see what actually exists.
- **Self-verify.** After writing code, run validation and tests. Fix \
errors yourself. Don't declare success until validation passes.
- **Concise.** No emojis. No preambles. No postambles. Substance only.
@@ -547,8 +563,12 @@ errors yourself. Don't declare success until validation passes.
- undo_changes(path?) restore from git snapshot
## Meta-Agent
- list_agent_tools(server_config_path?) list all tool names available \
for agent building, grouped by category. Call this FIRST before designing.
- discover_mcp_tools(server_config_path?) connect to MCP servers \
and list all available tools with full schemas. Default: hive-tools.
and list all available tools with full schemas. Use for parameter details.
- validate_agent_tools(agent_path) validate that all tools declared \
in an agent's nodes actually exist. Call after building.
- list_agents() list all agent packages in exports/ with session counts
- list_agent_sessions(agent_name, status?, limit?) list sessions
- get_agent_session_state(agent_name, session_id) full session state
@@ -571,23 +591,33 @@ Use this to relay user instructions or concerns.
- notify_operator(ticket_id, analysis, urgency) Alert the user about a \
critical issue. Use sparingly.
## Agent Loading
- load_built_agent(agent_path) Load a newly built agent as the worker in \
this session. Call after building and validating an agent to make it \
available immediately. The user sees the graph update and can interact \
with it without leaving the session.
# Behavior
## Direct coding
You can do any coding task directly reading files, writing code, running \
commands, building agents, debugging. You have the same tools as the worker. \
For quick tasks (reading code, small edits, debugging), do them yourself.
commands, building agents, debugging. For quick tasks, do them yourself.
## Worker delegation
For large, autonomous tasks (building a full agent, running a long pipeline), \
delegate to the worker via start_worker(task). The worker runs in the \
background while you remain available to the user.
The worker is a specialized agent (see Worker Profile at the end of this \
prompt). It can ONLY do what its goal and tools allow.
**Decision rule read the Worker Profile first:**
- The user's request directly matches the worker's goal start_worker(task)
- Anything else do it yourself. Do NOT reframe user requests into \
subtasks to justify delegation.
- Building, modifying, or configuring agents is ALWAYS your job. Never \
delegate agent construction to the worker, even as a "research" subtask.
## When idle (worker not running):
- Greet the user. Ask what they want to build or do.
- For quick tasks, do them directly.
- For large tasks, call start_worker(task) with a clear task description. \
Summarize what you told the worker.
- Greet the user. Mention what the worker can do.
- For tasks matching the worker's goal, call start_worker(task).
- For everything else, do it directly.
## When worker is running:
- If the user asks about progress, call get_worker_status().
@@ -607,7 +637,7 @@ You do not need to relay it. The user will come back to you after responding.
When building Hive agent packages, follow this workflow:
## 1. Understand & Qualify
Hear what the user wants. Run discover_mcp_tools() to check tool availability. \
Hear what the user wants. Run list_agent_tools() to check tool availability. \
Read the framework guide:
read_file("core/framework/agents/hive_coder/reference/framework_guide.md")
@@ -626,12 +656,20 @@ Write files: config.py, nodes/__init__.py, agent.py, __init__.py, \
__main__.py, mcp_servers.json, tests/.
## 4. Verify
Run THREE validation steps:
Run FOUR validation steps:
run_command("python -c 'from {name} import default_agent; print(default_agent.validate())'")
run_command("python -c 'from framework.runner.runner import AgentRunner; \\
r = AgentRunner.load(\"exports/{name}\"); print(\"OK\")'")
validate_agent_tools("exports/{name}")
run_agent_tests("{name}")
## 5. Load into Session
After building and verifying, load the agent into the current session:
load_built_agent("exports/{name}")
This makes the agent available immediately the user sees its graph, \
the tab name updates, and you can delegate to it via start_worker(). \
Do NOT tell the user to run `python -m {name} run` load it here.
# Style
- Concise. No fluff. Direct.
@@ -652,7 +690,9 @@ ALL_QUEEN_TOOLS = [
"run_command",
"undo_changes",
# Meta-agent (from coder-tools MCP)
"list_agent_tools",
"discover_mcp_tools",
"validate_agent_tools",
"list_agents",
"list_agent_sessions",
"get_agent_session_state",
@@ -668,6 +708,8 @@ ALL_QUEEN_TOOLS = [
# Monitoring
"get_worker_health_summary",
"notify_operator",
# Agent loading
"load_built_agent",
]
__all__ = [
@@ -24,7 +24,7 @@
9. **Invalid `loop_config` keys** — Only three valid keys: `max_iterations` (int), `max_tool_calls_per_turn` (int), `max_history_tokens` (int). Keys like `"strategy"`, `"mode"`, `"timeout"` are NOT valid and are silently ignored or cause errors.
10. **Fabricating tools that don't exist** — Never guess tool names. Always verify via `discover_mcp_tools()`. Common hallucinations: `csv_read`, `csv_write`, `csv_append`, `file_upload`, `database_query`. If a required tool doesn't exist, redesign the agent to use tools that DO exist (e.g., `save_data`/`load_data` for data persistence).
10. **Fabricating tools that don't exist** — Never guess tool names. Always verify via `list_agent_tools()` before designing and `validate_agent_tools()` after building. Common hallucinations: `csv_read`, `csv_write`, `csv_append`, `file_upload`, `database_query`, `bulk_fetch_emails`. If a required tool doesn't exist, redesign the agent to use tools that DO exist (e.g., `save_data`/`load_data` for data persistence).
## Design Errors
@@ -413,16 +413,18 @@ See `exports/gmail_inbox_guardian/agent.py` for a complete example with:
## Tool Discovery
Do NOT rely on a static tool list — it will be outdated. Always use
`discover_mcp_tools()` to get the current tool catalog from the
hive-tools MCP server. This returns full schemas including parameter
names, types, and descriptions.
`list_agent_tools()` to get available tool names grouped by category.
For full schemas with parameter details, use `discover_mcp_tools()`.
```
discover_mcp_tools() # default: hive-tools
discover_mcp_tools("exports/my_agent/mcp_servers.json") # specific agent
list_agent_tools() # all available tools
list_agent_tools("exports/my_agent/mcp_servers.json") # specific agent
discover_mcp_tools() # full schemas with params
```
Common tool categories (verify via discover_mcp_tools):
After building, validate tools exist: `validate_agent_tools("exports/{name}")`
Common tool categories (verify via list_agent_tools):
- **Web**: search, scrape, PDF
- **Data**: save/load/append/list data files, serve to user
- **File**: view, write, replace, diff, list, grep
+9 -8
View File
@@ -193,23 +193,24 @@ class AdenCachedStorage(CredentialStorage):
logger.debug(f"Using cached credential '{credential_id}'")
return local_cred
# Try to fetch from Aden
# If nothing local, there's nothing to refresh from Aden.
# sync_all() already fetched all available credentials — anything
# not in local storage doesn't exist on the Aden server.
if local_cred is None:
return None
# Try to refresh stale local credential from Aden
try:
aden_cred = self._aden_provider.fetch_from_aden(credential_id)
if aden_cred:
# Update local cache
self.save(aden_cred)
logger.debug(f"Fetched credential '{credential_id}' from Aden")
return aden_cred
except Exception as e:
logger.warning(f"Failed to fetch '{credential_id}' from Aden: {e}")
logger.info(f"Using stale cached credential '{credential_id}'")
return local_cred
# Fall back to local cache if Aden fails
if local_cred:
logger.info(f"Using stale cached credential '{credential_id}'")
return local_cred
# Return local credential if it exists (may be None)
return local_cred
def load_all_for_provider(self, provider_name: str) -> list[CredentialObject]:
+28 -8
View File
@@ -164,8 +164,20 @@ class CredentialSetupSession:
return cls(missing)
@classmethod
def from_agent_path(cls, agent_path: str | Path) -> CredentialSetupSession:
"""Create a setup session for an agent by path."""
def from_agent_path(
cls,
agent_path: str | Path,
*,
missing_only: bool = True,
) -> CredentialSetupSession:
"""Create a setup session for an agent by path.
Args:
agent_path: Path to agent folder.
missing_only: If True (default), only include credentials that
are NOT yet available. If False, include all required
credentials regardless of availability.
"""
agent_path = Path(agent_path)
# Load agent to get nodes
@@ -180,8 +192,8 @@ class CredentialSetupSession:
# JSON-based agent
nodes = _load_nodes_from_json_agent(agent_json)
missing = detect_missing_credentials_from_nodes(nodes)
return cls(missing)
creds = detect_missing_credentials_from_nodes(nodes, missing_only=missing_only)
return cls(creds)
def run_interactive(self) -> SetupResult:
"""Run the interactive setup flow."""
@@ -552,15 +564,23 @@ class CredentialSetupSession:
self._print("")
def detect_missing_credentials_from_nodes(nodes: list) -> list[MissingCredential]:
def detect_missing_credentials_from_nodes(
nodes: list,
*,
missing_only: bool = True,
) -> list[MissingCredential]:
"""
Detect missing credentials for a list of nodes.
Detect credentials required by a list of nodes.
Args:
nodes: List of NodeSpec objects
missing_only: If True (default), only return credentials that are
NOT yet available. If False, return ALL required credentials
regardless of availability.
Returns:
List of MissingCredential objects for credentials that need setup
(or all required credentials when missing_only=False).
"""
try:
from aden_tools.credentials import CREDENTIAL_SPECS
@@ -617,7 +637,7 @@ def detect_missing_credentials_from_nodes(nodes: list) -> list[MissingCredential
spec = CREDENTIAL_SPECS[cred_name]
cred_id = spec.credential_id or cred_name
if spec.required and not store.is_available(cred_id):
if spec.required and (not missing_only or not store.is_available(cred_id)):
affected_tools = sorted(t for t in required_tools if t in spec.tools)
missing.append(
MissingCredential(
@@ -643,7 +663,7 @@ def detect_missing_credentials_from_nodes(nodes: list) -> list[MissingCredential
spec = CREDENTIAL_SPECS[cred_name]
cred_id = spec.credential_id or cred_name
if spec.required and not store.is_available(cred_id):
if spec.required and (not missing_only or not store.is_available(cred_id)):
affected_types = sorted(t for t in node_types if t in spec.node_types)
missing.append(
MissingCredential(
+9
View File
@@ -396,6 +396,11 @@ class CredentialStore:
Returns:
CredentialObject if found, None otherwise.
"""
# LLMs sometimes pass "provider/alias" as the alias (e.g. "google/wrok"
# instead of just "wrok"). Strip the provider prefix when present.
if alias.startswith(f"{provider_name}/"):
alias = alias[len(provider_name) + 1 :]
if hasattr(self._storage, "load_by_alias"):
return self._storage.load_by_alias(provider_name, alias)
@@ -422,6 +427,10 @@ class CredentialStore:
"""
return self.get_credential(credential_id, refresh_if_needed=False) is not None
def exists(self, credential_id: str) -> bool:
"""Check if a credential exists in storage without triggering provider fetches."""
return self._storage.exists(credential_id)
# --- Validation ---
def validate_for_usage(self, credential_id: str) -> list[str]:
+5
View File
@@ -86,6 +86,11 @@ def _presync_aden_tokens(credential_specs: dict) -> None:
if os.environ.get(spec.env_var):
continue # Already set — don't overwrite
cred_id = spec.credential_id or name
# sync_all() already fetched everything available from Aden.
# Skip credentials not in the store — they aren't connected,
# so fetching individually would fail with "Invalid integration ID".
if not aden_store.exists(cred_id):
continue
try:
value = aden_store.get_key(cred_id, spec.credential_key)
if value:
+7
View File
@@ -30,6 +30,8 @@ class Message:
# Phase-aware compaction metadata (continuous mode)
phase_id: str | None = None
is_transition_marker: bool = False
# True when this message is real human input (from /chat), not a system prompt
is_client_input: bool = False
def to_llm_dict(self) -> dict[str, Any]:
"""Convert to OpenAI-format message dict."""
@@ -67,6 +69,8 @@ class Message:
d["phase_id"] = self.phase_id
if self.is_transition_marker:
d["is_transition_marker"] = self.is_transition_marker
if self.is_client_input:
d["is_client_input"] = self.is_client_input
return d
@classmethod
@@ -81,6 +85,7 @@ class Message:
is_error=data.get("is_error", False),
phase_id=data.get("phase_id"),
is_transition_marker=data.get("is_transition_marker", False),
is_client_input=data.get("is_client_input", False),
)
@@ -248,6 +253,7 @@ class NodeConversation:
content: str,
*,
is_transition_marker: bool = False,
is_client_input: bool = False,
) -> Message:
msg = Message(
seq=self._next_seq,
@@ -255,6 +261,7 @@ class NodeConversation:
content=content,
phase_id=self._current_phase,
is_transition_marker=is_transition_marker,
is_client_input=is_client_input,
)
self._messages.append(msg)
self._next_seq += 1
+109 -17
View File
@@ -202,11 +202,13 @@ class EventLoopNode(NodeProtocol):
self._config = config or LoopConfig()
self._tool_executor = tool_executor
self._conversation_store = conversation_store
self._injection_queue: asyncio.Queue[str] = asyncio.Queue()
self._injection_queue: asyncio.Queue[tuple[str, bool]] = asyncio.Queue()
# Client-facing input blocking state
self._input_ready = asyncio.Event()
self._awaiting_input = False
self._shutdown = False
# Track which nodes already have an action plan emitted (skip on revisit)
self._action_plan_emitted: set[str] = set()
def validate_input(self, ctx: NodeContext) -> list[str]:
"""Validate hard requirements only.
@@ -372,6 +374,16 @@ class EventLoopNode(NodeProtocol):
# 4. Publish loop started
await self._publish_loop_started(stream_id, node_id, execution_id)
# 4b. Fire-and-forget action plan generation (once per node per lifetime)
if (
start_iteration == 0
and ctx.llm
and self._event_bus
and node_id not in self._action_plan_emitted
):
self._action_plan_emitted.add(node_id)
asyncio.create_task(self._generate_action_plan(ctx, stream_id, node_id, execution_id))
# 5. Stall / doom loop detection state (restored from cursor if resuming)
recent_responses: list[str] = _restored_recent_responses
recent_tool_fingerprints: list[list[tuple[str, str]]] = _restored_tool_fingerprints
@@ -562,7 +574,9 @@ class EventLoopNode(NodeProtocol):
node_id,
iteration,
)
await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id)
await self._publish_loop_completed(
stream_id, node_id, iteration + 1, execution_id
)
latency_ms = int((time.time() - start_time) * 1000)
return NodeResult(
success=True,
@@ -713,7 +727,9 @@ class EventLoopNode(NodeProtocol):
if _cf_block:
if self._shutdown:
await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id)
await self._publish_loop_completed(
stream_id, node_id, iteration + 1, execution_id
)
latency_ms = int((time.time() - start_time) * 1000)
_continue_count += 1
if ctx.runtime_logger:
@@ -763,7 +779,9 @@ class EventLoopNode(NodeProtocol):
got_input = await self._await_user_input(ctx)
logger.info("[%s] iter=%d: unblocked, got_input=%s", node_id, iteration, got_input)
if not got_input:
await self._publish_loop_completed(stream_id, node_id, iteration + 1, execution_id)
await self._publish_loop_completed(
stream_id, node_id, iteration + 1, execution_id
)
latency_ms = int((time.time() - start_time) * 1000)
_continue_count += 1
if ctx.runtime_logger:
@@ -1031,7 +1049,9 @@ class EventLoopNode(NodeProtocol):
continue
# 7. Max iterations exhausted
await self._publish_loop_completed(stream_id, node_id, self._config.max_iterations, execution_id)
await self._publish_loop_completed(
stream_id, node_id, self._config.max_iterations, execution_id
)
latency_ms = int((time.time() - start_time) * 1000)
if ctx.runtime_logger:
ctx.runtime_logger.log_node_complete(
@@ -1060,14 +1080,19 @@ class EventLoopNode(NodeProtocol):
conversation=conversation if _is_continuous else None,
)
async def inject_event(self, content: str) -> None:
"""Inject an external event into the running loop.
async def inject_event(self, content: str, *, is_client_input: bool = False) -> None:
"""Inject an external event or user input into the running loop.
The content becomes a user message prepended to the next iteration.
Thread-safe via asyncio.Queue.
Also unblocks _await_user_input() if the node is waiting.
Args:
content: The message text.
is_client_input: True when the message originates from a real
human user (e.g. /chat endpoint), False for external events.
"""
await self._injection_queue.put(content)
await self._injection_queue.put((content, is_client_input))
self._input_ready.set()
def signal_shutdown(self) -> None:
@@ -1194,7 +1219,11 @@ class EventLoopNode(NodeProtocol):
if isinstance(event, TextDeltaEvent):
accumulated_text = event.snapshot
await self._publish_text_delta(
stream_id, node_id, event.content, event.snapshot, ctx,
stream_id,
node_id,
event.content,
event.snapshot,
ctx,
execution_id,
)
@@ -1279,7 +1308,11 @@ class EventLoopNode(NodeProtocol):
executed_in_batch += 1
await self._publish_tool_started(
stream_id, node_id, tc.tool_use_id, tc.tool_name, tc.tool_input,
stream_id,
node_id,
tc.tool_use_id,
tc.tool_name,
tc.tool_input,
execution_id,
)
logger.info(
@@ -2395,7 +2428,8 @@ class EventLoopNode(NodeProtocol):
_is_continuous = getattr(ctx, "continuous_mode", False)
phase_filter = None if _is_continuous else ctx.node_id
conversation = await NodeConversation.restore(
self._conversation_store, phase_id=phase_filter,
self._conversation_store,
phase_id=phase_filter,
)
if conversation is None:
return None
@@ -2468,12 +2502,17 @@ class EventLoopNode(NodeProtocol):
count = 0
while not self._injection_queue.empty():
try:
content = self._injection_queue.get_nowait()
content, is_client_input = self._injection_queue.get_nowait()
logger.info(
"[drain] injected message: %s",
"[drain] injected message (client_input=%s): %s",
is_client_input,
content[:200] if content else "(empty)",
)
await conversation.add_user_message(f"[External event]: {content}")
# Real user input is stored as-is; external events get a prefix
if is_client_input:
await conversation.add_user_message(content, is_client_input=True)
else:
await conversation.add_user_message(f"[External event]: {content}")
count += 1
except asyncio.QueueEmpty:
break
@@ -2515,7 +2554,9 @@ class EventLoopNode(NodeProtocol):
# EventBus publishing helpers
# -------------------------------------------------------------------
async def _publish_loop_started(self, stream_id: str, node_id: str, execution_id: str = "") -> None:
async def _publish_loop_started(
self, stream_id: str, node_id: str, execution_id: str = ""
) -> None:
if self._event_bus:
await self._event_bus.emit_node_loop_started(
stream_id=stream_id,
@@ -2524,7 +2565,56 @@ class EventLoopNode(NodeProtocol):
execution_id=execution_id,
)
async def _publish_iteration(self, stream_id: str, node_id: str, iteration: int, execution_id: str = "") -> None:
async def _generate_action_plan(
self,
ctx: NodeContext,
stream_id: str,
node_id: str,
execution_id: str,
) -> None:
"""Generate a brief action plan via LLM and emit it as an SSE event.
Runs as a fire-and-forget task so it never blocks the main loop.
"""
try:
system_prompt = ctx.node_spec.system_prompt or ""
# Trim to keep the prompt small
prompt_summary = system_prompt[:500]
if len(system_prompt) > 500:
prompt_summary += "..."
tool_names = [t.name for t in ctx.available_tools]
output_keys = ctx.node_spec.output_keys or []
prompt = (
f'You are about to work on a task as node "{node_id}".\n\n'
f"System prompt:\n{prompt_summary}\n\n"
f"Tools available: {tool_names}\n"
f"Required outputs: {output_keys}\n\n"
f"Write a brief action plan (2-5 bullet points) describing "
f"what you will do to complete this task. Be specific and concise.\n"
f"Return ONLY the plan text, no preamble."
)
response = await ctx.llm.acomplete(
messages=[{"role": "user", "content": prompt}],
max_tokens=300,
)
plan = response.content.strip()
if plan and self._event_bus:
await self._event_bus.emit_node_action_plan(
stream_id=stream_id,
node_id=node_id,
plan=plan,
execution_id=execution_id,
)
except Exception as e:
logger.warning("Action plan generation failed for node '%s': %s", node_id, e)
async def _publish_iteration(
self, stream_id: str, node_id: str, iteration: int, execution_id: str = ""
) -> None:
if self._event_bus:
await self._event_bus.emit_node_loop_iteration(
stream_id=stream_id,
@@ -2533,7 +2623,9 @@ class EventLoopNode(NodeProtocol):
execution_id=execution_id,
)
async def _publish_loop_completed(self, stream_id: str, node_id: str, iterations: int, execution_id: str = "") -> None:
async def _publish_loop_completed(
self, stream_id: str, node_id: str, iterations: int, execution_id: str = ""
) -> None:
if self._event_bus:
await self._event_bus.emit_node_loop_completed(
stream_id=stream_id,
+12 -10
View File
@@ -771,7 +771,8 @@ class GraphExecutor:
# Emit node-started event (skip event_loop nodes — they emit their own)
if self._event_bus and node_spec.node_type != "event_loop":
await self._event_bus.emit_node_loop_started(
stream_id=self._stream_id, node_id=current_node_id,
stream_id=self._stream_id,
node_id=current_node_id,
execution_id=self._execution_id,
)
@@ -782,7 +783,9 @@ class GraphExecutor:
# Emit node-completed event (skip event_loop nodes)
if self._event_bus and node_spec.node_type != "event_loop":
await self._event_bus.emit_node_loop_completed(
stream_id=self._stream_id, node_id=current_node_id, iterations=1,
stream_id=self._stream_id,
node_id=current_node_id,
iterations=1,
execution_id=self._execution_id,
)
@@ -1361,9 +1364,7 @@ class GraphExecutor:
try:
import json as _json
cursor_path = (
self._storage_path / "conversations" / "cursor.json"
)
cursor_path = self._storage_path / "conversations" / "cursor.json"
if cursor_path.exists():
cursor_data = _json.loads(cursor_path.read_text(encoding="utf-8"))
wip_outputs = cursor_data.get("outputs", {})
@@ -1464,9 +1465,7 @@ class GraphExecutor:
try:
import json as _json
cursor_path = (
self._storage_path / "conversations" / "cursor.json"
)
cursor_path = self._storage_path / "conversations" / "cursor.json"
if cursor_path.exists():
cursor_data = _json.loads(cursor_path.read_text(encoding="utf-8"))
for key, value in cursor_data.get("outputs", {}).items():
@@ -1949,7 +1948,8 @@ class GraphExecutor:
# Emit node-started event (skip event_loop nodes)
if self._event_bus and node_spec.node_type != "event_loop":
await self._event_bus.emit_node_loop_started(
stream_id=self._stream_id, node_id=branch.node_id,
stream_id=self._stream_id,
node_id=branch.node_id,
execution_id=self._execution_id,
)
@@ -1974,7 +1974,9 @@ class GraphExecutor:
# Emit node-completed event (skip event_loop nodes)
if self._event_bus and node_spec.node_type != "event_loop":
await self._event_bus.emit_node_loop_completed(
stream_id=self._stream_id, node_id=branch.node_id, iterations=1,
stream_id=self._stream_id,
node_id=branch.node_id,
iterations=1,
execution_id=self._execution_id,
)
+48
View File
@@ -328,6 +328,20 @@ class LiteLLMProvider(LLMProvider):
f"Full request dumped to: {dump_path}"
)
# finish_reason=length means the model exhausted max_tokens
# before producing content. Retrying with the same max_tokens
# will never help — return immediately instead of looping.
if finish_reason == "length":
max_tok = kwargs.get("max_tokens", "unset")
logger.error(
f"[retry] {model} returned empty content with "
f"finish_reason=length (max_tokens={max_tok}). "
f"The model exhausted its token budget before "
f"producing visible output. Increase max_tokens "
f"or use a different model. Not retrying."
)
return response
if attempt == retries:
logger.error(
f"[retry] GAVE UP on {model} after {retries + 1} "
@@ -621,6 +635,20 @@ class LiteLLMProvider(LLMProvider):
f"Full request dumped to: {dump_path}"
)
# finish_reason=length means the model exhausted max_tokens
# before producing content. Retrying with the same max_tokens
# will never help — return immediately instead of looping.
if finish_reason == "length":
max_tok = kwargs.get("max_tokens", "unset")
logger.error(
f"[async-retry] {model} returned empty content with "
f"finish_reason=length (max_tokens={max_tok}). "
f"The model exhausted its token budget before "
f"producing visible output. Increase max_tokens "
f"or use a different model. Not retrying."
)
return response
if attempt == retries:
logger.error(
f"[async-retry] GAVE UP on {model} after {retries + 1} "
@@ -903,6 +931,7 @@ class LiteLLMProvider(LLMProvider):
tool_calls_acc: dict[int, dict[str, str]] = {}
input_tokens = 0
output_tokens = 0
stream_finish_reason: str | None = None
try:
response = await litellm.acompletion(**kwargs) # type: ignore[union-attr]
@@ -938,6 +967,7 @@ class LiteLLMProvider(LLMProvider):
# --- Finish ---
if choice.finish_reason:
stream_finish_reason = choice.finish_reason
for _idx, tc_data in sorted(tool_calls_acc.items()):
try:
parsed_args = json.loads(tc_data["arguments"])
@@ -992,6 +1022,24 @@ class LiteLLMProvider(LLMProvider):
for event in tail_events:
yield event
return
# finish_reason=length means the model exhausted
# max_tokens before producing content. Retrying with
# the same max_tokens will never help.
if stream_finish_reason == "length":
max_tok = kwargs.get("max_tokens", "unset")
logger.error(
f"[stream] {self.model} returned empty content "
f"with finish_reason=length "
f"(max_tokens={max_tok}). The model exhausted "
f"its token budget before producing visible "
f"output. Increase max_tokens or use a "
f"different model. Not retrying."
)
for event in tail_events:
yield event
return
wait = _compute_retry_delay(attempt)
token_count, token_method = _estimate_tokens(
self.model,
+5 -5
View File
@@ -108,8 +108,8 @@ judge_node = NodeSpec(
"degradation pattern is detected."
),
node_type="event_loop",
client_facing=False, # Autonomous monitor, not interactive
max_node_visits=0, # Unbounded — runs on every timer tick
client_facing=False, # Autonomous monitor, not interactive
max_node_visits=0, # Unbounded — runs on every timer tick
input_keys=[],
output_keys=["health_verdict"],
nullable_output_keys=["health_verdict"],
@@ -244,15 +244,15 @@ judge_graph = GraphSpec(
version="1.0.0",
entry_node="judge",
entry_points={"health_check": "judge"},
terminal_nodes=[], # Forever-alive: fires on every timer tick
terminal_nodes=[], # Forever-alive: fires on every timer tick
pause_nodes=[],
nodes=[judge_node],
edges=[],
conversation_mode="continuous", # Conversation persists across timer ticks
async_entry_points=[HEALTH_JUDGE_ENTRY_POINT],
loop_config={
"max_iterations": 10, # One check shouldn't take many turns
"max_iterations": 10, # One check shouldn't take many turns
"max_tool_calls_per_turn": 3, # get_summary + optionally emit_ticket
"max_history_tokens": 16000, # Compact — judge only needs recent context
"max_history_tokens": 16000, # Compact — judge only needs recent context
},
)
+95
View File
@@ -360,6 +360,42 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
)
setup_creds_parser.set_defaults(func=cmd_setup_credentials)
# serve command (HTTP API server)
serve_parser = subparsers.add_parser(
"serve",
help="Start HTTP API server",
description="Start an HTTP server exposing REST + SSE APIs for agent control.",
)
serve_parser.add_argument(
"--host",
type=str,
default="127.0.0.1",
help="Host to bind (default: 127.0.0.1)",
)
serve_parser.add_argument(
"--port",
"-p",
type=int,
default=8787,
help="Port to listen on (default: 8787)",
)
serve_parser.add_argument(
"--agent",
"-a",
type=str,
action="append",
default=[],
help="Agent path to preload (repeatable)",
)
serve_parser.add_argument(
"--model",
"-m",
type=str,
default=None,
help="LLM model for preloaded agents",
)
serve_parser.set_defaults(func=cmd_serve)
def _load_resume_state(
agent_path: str, session_id: str, checkpoint_id: str | None = None
@@ -1891,3 +1927,62 @@ def cmd_setup_credentials(args: argparse.Namespace) -> int:
result = session.run_interactive()
return 0 if result.success else 1
def cmd_serve(args: argparse.Namespace) -> int:
"""Start the HTTP API server."""
import logging
from aiohttp import web
from framework.server.app import create_app
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
model = getattr(args, "model", None)
app = create_app(model=model)
async def run_server():
manager = app["manager"]
# Preload agents specified via --agent
for agent_path in args.agent:
try:
session = await manager.create_session_with_worker(agent_path, model=model)
info = session.worker_info
name = info.name if info else session.worker_id
print(f"Loaded agent: {session.worker_id} ({name})")
except Exception as e:
print(f"Error loading {agent_path}: {e}")
# Start server using AppRunner/TCPSite (same pattern as webhook_server.py)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, args.host, args.port)
await site.start()
print()
print(f"Hive API server running on http://{args.host}:{args.port}")
print(f"Health: http://{args.host}:{args.port}/api/health")
print(f"Agents loaded: {sum(1 for s in manager.list_sessions() if s.worker_runtime)}")
print()
print("Press Ctrl+C to stop")
# Run forever until interrupted
try:
await asyncio.Event().wait()
except asyncio.CancelledError:
pass
finally:
await manager.shutdown_all()
await runner.cleanup()
try:
asyncio.run(run_server())
except KeyboardInterrupt:
print("\nServer stopped.")
return 0
+13 -26
View File
@@ -479,37 +479,21 @@ class AgentRunner:
def _import_agent_module(agent_path: Path):
"""Import an agent package from its directory path.
Tries package import first (works when exports/ is on sys.path,
which cli.py:_configure_paths() ensures). Falls back to direct
file import of agent.py via importlib.util.
Ensures the agent's parent directory is on sys.path so the package
can be imported normally (supports relative imports within the agent).
"""
import importlib
import sys
package_name = agent_path.name
parent_dir = str(agent_path.resolve().parent)
# Try importing as a package (works when exports/ is on sys.path)
try:
return importlib.import_module(package_name)
except ImportError:
pass
# Ensure the parent directory is on sys.path so the agent package
# is importable (e.g., exports/ or examples/templates/)
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
# Fallback: import agent.py directly via file path
import importlib.util
agent_py = agent_path / "agent.py"
if not agent_py.exists():
raise FileNotFoundError(
f"No importable agent found at {agent_path}. "
f"Expected a Python package with agent.py."
)
spec = importlib.util.spec_from_file_location(
f"{package_name}.agent",
agent_py,
submodule_search_locations=[str(agent_path)],
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
return importlib.import_module(package_name)
@classmethod
def load(
@@ -729,7 +713,7 @@ class AgentRunner:
"""
self._approval_callback = callback
def _setup(self) -> None:
def _setup(self, event_bus=None) -> None:
"""Set up runtime, LLM, and executor."""
# Configure structured logging (auto-detects JSON vs human-readable)
from framework.observability import configure_logging
@@ -863,6 +847,7 @@ class AgentRunner:
accounts_prompt=accounts_prompt,
accounts_data=accounts_data,
tool_provider_map=tool_provider_map,
event_bus=event_bus,
)
def _get_api_key_env_var(self, model: str) -> str | None:
@@ -947,6 +932,7 @@ class AgentRunner:
accounts_prompt: str = "",
accounts_data: list[dict] | None = None,
tool_provider_map: dict[str, str] | None = None,
event_bus=None,
) -> None:
"""Set up multi-entry-point execution using AgentRuntime."""
# Convert AsyncEntryPointSpec to EntryPointSpec for AgentRuntime
@@ -1021,6 +1007,7 @@ class AgentRunner:
accounts_prompt=accounts_prompt,
accounts_data=accounts_data,
tool_provider_map=tool_provider_map,
event_bus=event_bus,
)
# Pass intro_message through for TUI display
+221 -30
View File
@@ -19,9 +19,9 @@ from framework.graph.executor import ExecutionResult
from framework.runtime.event_bus import EventBus
from framework.runtime.execution_stream import EntryPointSpec, ExecutionStream
from framework.runtime.outcome_aggregator import OutcomeAggregator
from framework.runtime.runtime_log_store import RuntimeLogStore
from framework.runtime.shared_state import SharedStateManager
from framework.storage.concurrent import ConcurrentStorage
from framework.runtime.runtime_log_store import RuntimeLogStore
from framework.storage.session_store import SessionStore
if TYPE_CHECKING:
@@ -130,6 +130,7 @@ class AgentRuntime:
accounts_prompt: str = "",
accounts_data: list[dict] | None = None,
tool_provider_map: dict[str, str] | None = None,
event_bus: "EventBus | None" = None,
):
"""
Initialize agent runtime.
@@ -148,6 +149,9 @@ class AgentRuntime:
accounts_prompt: Connected accounts block for system prompt injection
accounts_data: Raw account data for per-node prompt generation
tool_provider_map: Tool name to provider name mapping for account routing
event_bus: Optional external EventBus. If provided, the runtime shares
this bus instead of creating its own. Used by SessionManager to
share a single bus between queen, worker, and judge.
"""
self.graph = graph
self.goal = goal
@@ -179,7 +183,7 @@ class AgentRuntime:
# Initialize shared components
self._state_manager = SharedStateManager()
self._event_bus = EventBus(max_history=self._config.max_history)
self._event_bus = event_bus or EventBus(max_history=self._config.max_history)
self._outcome_aggregator = OutcomeAggregator(goal, self._event_bus)
# LLM and tools
@@ -205,6 +209,7 @@ class AgentRuntime:
# State
self._running = False
self._timers_paused = False
self._lock = asyncio.Lock()
# Optional greeting shown to user on TUI load (set by AgentRunner)
@@ -420,19 +425,63 @@ class AgentRuntime:
)
await asyncio.sleep(max(0, sleep_secs))
while self._running:
# Calculate next fire time upfront (used by skip paths too)
cron = croniter(expr, datetime.now())
next_dt = cron.get_next(datetime)
sleep_secs = (next_dt - datetime.now()).total_seconds()
# Gate: skip tick if timers are explicitly paused
if self._timers_paused:
logger.debug(
"Cron '%s': paused, skipping tick",
entry_point_id,
)
self._timer_next_fire[entry_point_id] = (
time.monotonic() + sleep_secs
)
await asyncio.sleep(max(0, sleep_secs))
continue
# Gate: skip tick if previous execution still running
_stream = self._streams.get(entry_point_id)
if _stream and _stream.active_execution_ids:
logger.debug(
"Cron '%s': execution already in progress, skipping tick",
entry_point_id,
)
self._timer_next_fire[entry_point_id] = (
time.monotonic() + sleep_secs
)
await asyncio.sleep(max(0, sleep_secs))
continue
self._timer_next_fire.pop(entry_point_id, None)
try:
ep_spec = self._entry_points.get(entry_point_id)
is_isolated = ep_spec and ep_spec.isolation_level == "isolated"
if is_isolated:
if _persistent_session_id:
session_state = {"resume_session_id": _persistent_session_id}
session_state = {
"resume_session_id": _persistent_session_id
}
else:
session_state = None
else:
session_state = self._get_primary_session_state(
exclude_entry_point=entry_point_id
)
# Gate: skip tick if no active session
if session_state is None:
logger.debug(
"Cron '%s': no active primary session, skipping tick",
entry_point_id,
)
self._timer_next_fire[entry_point_id] = (
time.monotonic() + sleep_secs
)
await asyncio.sleep(max(0, sleep_secs))
continue
exec_id = await self.trigger(
entry_point_id,
{
@@ -490,19 +539,58 @@ class AgentRuntime:
)
await asyncio.sleep(interval_secs)
while self._running:
# Gate: skip tick if timers are explicitly paused
if self._timers_paused:
logger.debug(
"Timer '%s': paused, skipping tick",
entry_point_id,
)
self._timer_next_fire[entry_point_id] = (
time.monotonic() + interval_secs
)
await asyncio.sleep(interval_secs)
continue
# Gate: skip tick if previous execution still running
_stream = self._streams.get(entry_point_id)
if _stream and _stream.active_execution_ids:
logger.debug(
"Timer '%s': execution already in progress, skipping tick",
entry_point_id,
)
self._timer_next_fire[entry_point_id] = (
time.monotonic() + interval_secs
)
await asyncio.sleep(interval_secs)
continue
self._timer_next_fire.pop(entry_point_id, None)
try:
ep_spec = self._entry_points.get(entry_point_id)
is_isolated = ep_spec and ep_spec.isolation_level == "isolated"
if is_isolated:
if _persistent_session_id:
session_state = {"resume_session_id": _persistent_session_id}
session_state = {
"resume_session_id": _persistent_session_id
}
else:
session_state = None
else:
session_state = self._get_primary_session_state(
exclude_entry_point=entry_point_id
)
# Gate: skip tick if no active session
if session_state is None:
logger.debug(
"Timer '%s': no active primary session, skipping tick",
entry_point_id,
)
self._timer_next_fire[entry_point_id] = (
time.monotonic() + interval_secs
)
await asyncio.sleep(interval_secs)
continue
exec_id = await self.trigger(
entry_point_id,
{
@@ -562,6 +650,7 @@ class AgentRuntime:
)
self._running = True
self._timers_paused = False
logger.info(f"AgentRuntime started with {len(self._streams)} streams")
async def stop(self) -> None:
@@ -603,6 +692,19 @@ class AgentRuntime:
self._running = False
logger.info("AgentRuntime stopped")
def pause_timers(self) -> None:
"""Pause all timer-driven entry points.
Timers will skip their ticks until ``resume_timers()`` is called.
"""
self._timers_paused = True
logger.info("Timers paused")
def resume_timers(self) -> None:
"""Resume timer-driven entry points after a pause."""
self._timers_paused = False
logger.info("Timers resumed")
def _resolve_stream(
self,
entry_point_id: str,
@@ -847,12 +949,18 @@ class AgentRuntime:
if interval and interval > 0 and self._running:
logger.info(
"Creating timer for '%s::%s': interval=%s min, immediate=%s, loop=%s",
graph_id, ep_id, interval, run_immediately,
graph_id,
ep_id,
interval,
run_immediately,
id(asyncio.get_event_loop()),
)
def _make_timer(
gid: str, local_ep: str, mins: float, immediate: bool,
gid: str,
local_ep: str,
mins: float,
immediate: bool,
):
async def _timer_loop():
interval_secs = mins * 60
@@ -863,12 +971,38 @@ class AgentRuntime:
logger.info(
"Timer loop started for '%s::%s' (sleep %ss)",
gid, local_ep, interval_secs,
gid,
local_ep,
interval_secs,
)
if not immediate:
timer_next_fire[local_ep] = time.monotonic() + interval_secs
await asyncio.sleep(interval_secs)
while self._running and gid in self._graphs:
# Gate: skip tick if timers are explicitly paused
if self._timers_paused:
logger.debug(
"Timer '%s::%s': paused, skipping tick",
gid,
local_ep,
)
timer_next_fire[local_ep] = time.monotonic() + interval_secs
await asyncio.sleep(interval_secs)
continue
# Gate: skip tick if previous execution still running
_reg = self._graphs.get(gid)
_stream = _reg.streams.get(local_ep) if _reg else None
if _stream and _stream.active_execution_ids:
logger.debug(
"Timer '%s::%s': execution already in progress, skipping tick",
gid,
local_ep,
)
timer_next_fire[local_ep] = time.monotonic() + interval_secs
await asyncio.sleep(interval_secs)
continue
logger.info("Timer firing for '%s::%s'", gid, local_ep)
timer_next_fire.pop(local_ep, None)
try:
@@ -878,28 +1012,47 @@ class AgentRuntime:
break
stream = reg.streams.get(local_ep)
if not stream:
logger.warning("Timer: no stream '%s' in '%s', stopping", local_ep, gid)
logger.warning(
"Timer: no stream '%s' in '%s', stopping", local_ep, gid
)
break
# Isolated entry points get their own session;
# shared ones join the primary session.
ep_spec = reg.entry_points.get(local_ep)
if ep_spec and ep_spec.isolation_level == "isolated":
if _persistent_session_id:
session_state = {"resume_session_id": _persistent_session_id}
session_state = {
"resume_session_id": _persistent_session_id
}
else:
session_state = None
else:
session_state = self._get_primary_session_state(
local_ep, source_graph_id=gid
)
# Gate: skip tick if no active session
if session_state is None:
logger.debug(
"Timer '%s::%s': no active primary session, skipping tick",
gid,
local_ep,
)
timer_next_fire[local_ep] = time.monotonic() + interval_secs
await asyncio.sleep(interval_secs)
continue
exec_id = await stream.execute(
{"event": {"source": "timer", "reason": "scheduled"}},
session_state=session_state,
)
# Remember session ID for reuse on next tick
if not _persistent_session_id and ep_spec and ep_spec.isolation_level == "isolated":
if (
not _persistent_session_id
and ep_spec
and ep_spec.isolation_level == "isolated"
):
_persistent_session_id = exec_id
except Exception as exc:
except Exception:
logger.error(
"Timer trigger failed for '%s::%s'",
gid,
@@ -1119,7 +1272,14 @@ class AgentRuntime:
)
return None
async def inject_input(self, node_id: str, content: str, graph_id: str | None = None) -> bool:
async def inject_input(
self,
node_id: str,
content: str,
graph_id: str | None = None,
*,
is_client_input: bool = False,
) -> bool:
"""Inject user input into a running client-facing node.
Routes input to the EventLoopNode identified by ``node_id``.
@@ -1129,6 +1289,8 @@ class AgentRuntime:
node_id: The node currently waiting for input
content: The user's input text
graph_id: Optional graph to search first (defaults to active graph)
is_client_input: True when the message originates from a real
human user (e.g. /chat endpoint), False for external events.
Returns:
True if input was delivered, False if no matching node found
@@ -1140,7 +1302,7 @@ class AgentRuntime:
target = graph_id or self._active_graph_id
if target in self._graphs:
for stream in self._graphs[target].streams.values():
if await stream.inject_input(node_id, content):
if await stream.inject_input(node_id, content, is_client_input=is_client_input):
return True
# Then search all other graphs
@@ -1148,7 +1310,7 @@ class AgentRuntime:
if gid == target:
continue
for stream in reg.streams.values():
if await stream.inject_input(node_id, content):
if await stream.inject_input(node_id, content, is_client_input=is_client_input):
return True
return False
@@ -1206,9 +1368,26 @@ class AgentRuntime:
# Fallback: primary graph
return list(self._entry_points.values())
def get_stream(self, entry_point_id: str, graph_id: str | None = None) -> ExecutionStream | None:
"""Get a specific execution stream (searches active graph first)."""
return self._resolve_stream(entry_point_id, graph_id)
def get_stream(self, entry_point_id: str) -> ExecutionStream | None:
"""Get a specific execution stream."""
return self._streams.get(entry_point_id)
def find_awaiting_node(self) -> tuple[str | None, str | None]:
"""Find a node that is currently awaiting user input.
Searches all graphs and their streams for any active executor
whose node has ``_awaiting_input`` set to ``True``.
Returns:
(node_id, graph_id) if found, else (None, None).
"""
for graph_id, reg in self._graphs.items():
for stream in reg.streams.values():
for executor in stream._active_executors.values():
for node_id, node in executor.node_registry.items():
if getattr(node, "_awaiting_input", False):
return node_id, graph_id
return None, None
def get_execution_result(
self,
@@ -1284,14 +1463,16 @@ class AgentRuntime:
active = stream.active_execution_ids
if not active:
continue
result.append({
"graph_id": graph_id,
"stream_id": stream.stream_id,
"entry_point_id": ep_id,
"active_execution_ids": active,
"is_awaiting_input": stream.is_awaiting_input,
"waiting_nodes": stream.get_waiting_nodes(),
})
result.append(
{
"graph_id": graph_id,
"stream_id": stream.stream_id,
"entry_point_id": ep_id,
"active_execution_ids": active,
"is_awaiting_input": stream.is_awaiting_input,
"waiting_nodes": stream.get_waiting_nodes(),
}
)
return result
def get_waiting_nodes(self) -> list[dict[str, Any]]:
@@ -1304,11 +1485,13 @@ class AgentRuntime:
for graph_id, reg in self._graphs.items():
for _ep_id, stream in reg.streams.items():
for waiting in stream.get_waiting_nodes():
result.append({
"graph_id": graph_id,
"stream_id": stream.stream_id,
**waiting,
})
result.append(
{
"graph_id": graph_id,
"stream_id": stream.stream_id,
**waiting,
}
)
return result
# === PROPERTIES ===
@@ -1333,6 +1516,11 @@ class AgentRuntime:
"""Access the webhook server (None if no webhook entry points)."""
return self._webhook_server
@property
def timers_paused(self) -> bool:
"""True when timer-driven entry points are paused (e.g. by stop_worker)."""
return self._timers_paused
@property
def is_running(self) -> bool:
"""Check if runtime is running."""
@@ -1358,6 +1546,7 @@ def create_agent_runtime(
accounts_prompt: str = "",
accounts_data: list[dict] | None = None,
tool_provider_map: dict[str, str] | None = None,
event_bus: "EventBus | None" = None,
) -> AgentRuntime:
"""
Create and configure an AgentRuntime with entry points.
@@ -1383,6 +1572,7 @@ def create_agent_runtime(
graph_id: Optional identifier for the primary graph (defaults to "primary").
accounts_data: Raw account data for per-node prompt generation.
tool_provider_map: Tool name to provider name mapping for account routing.
event_bus: Optional external EventBus to share with other components.
Returns:
Configured AgentRuntime (not yet started)
@@ -1408,6 +1598,7 @@ def create_agent_runtime(
accounts_prompt=accounts_prompt,
accounts_data=accounts_data,
tool_provider_map=tool_provider_map,
event_bus=event_bus,
)
for spec in entry_points:
+6 -6
View File
@@ -27,13 +27,13 @@ class EscalationTicket(BaseModel):
# Problem characterization (filled by judge via LLM deliberation)
severity: Literal["low", "medium", "high", "critical"]
cause: str # Human-readable: "Node has produced 18 RETRY verdicts..."
cause: str # Human-readable: "Node has produced 18 RETRY verdicts..."
judge_reasoning: str # Judge's own deliberation chain
suggested_action: str # "Restart node", "Human review", "Kill session", etc.
suggested_action: str # "Restart node", "Human review", "Kill session", etc.
# Evidence
recent_verdicts: list[str] # e.g. ["RETRY", "RETRY", "CONTINUE", "RETRY"]
total_steps_checked: int # How many steps the judge saw
steps_since_last_accept: int # Steps with no ACCEPT verdict
recent_verdicts: list[str] # e.g. ["RETRY", "RETRY", "CONTINUE", "RETRY"]
total_steps_checked: int # How many steps the judge saw
steps_since_last_accept: int # Steps with no ACCEPT verdict
stall_minutes: float | None # Wall-clock minutes since last new log step (None if active)
evidence_snippet: str # Brief excerpt from recent LLM output or error
evidence_snippet: str # Brief excerpt from recent LLM output or error
+22
View File
@@ -83,6 +83,7 @@ class EventType(StrEnum):
NODE_LOOP_STARTED = "node_loop_started"
NODE_LOOP_ITERATION = "node_loop_iteration"
NODE_LOOP_COMPLETED = "node_loop_completed"
NODE_ACTION_PLAN = "node_action_plan"
# LLM streaming observability
LLM_TEXT_DELTA = "llm_text_delta"
@@ -128,6 +129,9 @@ class EventType(StrEnum):
WORKER_ESCALATION_TICKET = "worker_escalation_ticket"
QUEEN_INTERVENTION_REQUESTED = "queen_intervention_requested"
# Worker lifecycle (session manager → frontend)
WORKER_LOADED = "worker_loaded"
@dataclass
class AgentEvent:
@@ -533,6 +537,24 @@ class EventBus:
)
)
async def emit_node_action_plan(
self,
stream_id: str,
node_id: str,
plan: str,
execution_id: str | None = None,
) -> None:
"""Emit node action plan event."""
await self.publish(
AgentEvent(
type=EventType.NODE_ACTION_PLAN,
stream_id=stream_id,
node_id=node_id,
execution_id=execution_id,
data={"plan": plan},
)
)
# === LLM STREAMING PUBLISHERS ===
async def emit_llm_text_delta(
+10 -3
View File
@@ -360,7 +360,13 @@ class ExecutionStream:
)
)
async def inject_input(self, node_id: str, content: str) -> bool:
async def inject_input(
self,
node_id: str,
content: str,
*,
is_client_input: bool = False,
) -> bool:
"""Inject user input into a running client-facing EventLoopNode.
Searches active executors for a node matching ``node_id`` and calls
@@ -371,7 +377,7 @@ class ExecutionStream:
for executor in self._active_executors.values():
node = executor.node_registry.get(node_id)
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(content)
await node.inject_event(content, is_client_input=is_client_input)
return True
return False
@@ -564,6 +570,7 @@ class ExecutionStream:
await self._write_session_state(execution_id, ctx, result=result)
# Emit completion/failure event
# (skip for pauses — executor already emitted execution_paused)
if self._scoped_event_bus:
if result.success:
await self._scoped_event_bus.emit_execution_completed(
@@ -572,7 +579,7 @@ class ExecutionStream:
output=result.output,
correlation_id=ctx.correlation_id,
)
else:
elif not result.paused_at:
await self._scoped_event_bus.emit_execution_failed(
stream_id=self.stream_id,
execution_id=execution_id,
+346
View File
@@ -0,0 +1,346 @@
# Hive Server
HTTP API backend for the Hive agent framework. Built on **aiohttp**, fully async, serving the frontend workspace and external clients.
## Architecture
Sessions are the primary entity. A session owns an EventBus + LLM and always has a queen executor. Workers are optional — they can be loaded into and unloaded from a session at any time.
```
Session {
event_bus # owned by session, shared with queen + worker
llm # owned by session
queen_executor # always present
worker_runtime? # optional — loaded/unloaded independently
}
```
## Structure
```
server/
├── app.py # Application factory, middleware, static serving
├── session_manager.py # Session lifecycle (create/load worker/unload/stop)
├── sse.py # Server-Sent Events helper
├── routes_sessions.py # Session lifecycle, info, worker-session browsing, discovery
├── routes_execution.py # Trigger, inject, chat, stop, resume, replay
├── routes_events.py # SSE event streaming
├── routes_graphs.py # Graph topology & node inspection
├── routes_logs.py # Execution logs (summary/details/tools)
├── routes_credentials.py # Credential management & validation
├── routes_agents.py # Legacy backward-compat routes
└── tests/
└── test_api.py # Full test suite with mocked runtimes
```
## Core Components
### `app.py` — Application Factory
`create_app(model)` builds the aiohttp `Application` with:
- **CORS middleware** — allows localhost origins
- **Error middleware** — catches exceptions, returns JSON errors
- **Static serving** — serves the frontend SPA with index.html fallback
- **Graceful shutdown** — stops all sessions on exit
### `session_manager.py` — Session Lifecycle Manager
Manages `Session` objects. Key methods:
- **`create_session()`** — creates EventBus + LLM, starts queen (no worker)
- **`create_session_with_worker()`** — one-step: session + worker + judge
- **`load_worker()`** — loads agent into existing session, starts judge
- **`unload_worker()`** — removes worker + judge, queen stays alive
- **`stop_session()`** — tears down everything (worker + queen)
Three-conversation model:
1. **Queen** — persistent interactive executor for user chat (always present)
2. **Worker**`AgentRuntime` that executes graphs (optional)
3. **Judge** — timer-driven background executor for health monitoring (active when worker is loaded)
### `sse.py` — SSE Helper
Thin wrapper around `aiohttp.StreamResponse` for Server-Sent Events with keepalive pings.
## API Reference
All session-scoped routes use the `session_id` returned from `POST /api/sessions`.
### Discovery
| Method | Route | Description |
|--------|-------|-------------|
| `GET` | `/api/discover` | Discover agents from filesystem |
Returns agents grouped by category with metadata (name, description, node count, tags, etc.).
### Session Lifecycle
| Method | Route | Description |
|--------|-------|-------------|
| `POST` | `/api/sessions` | Create a session |
| `GET` | `/api/sessions` | List all active sessions |
| `GET` | `/api/sessions/{session_id}` | Session detail (includes entry points + graphs if worker loaded) |
| `DELETE` | `/api/sessions/{session_id}` | Stop session entirely |
**Create session** has two modes:
```jsonc
// Queen-only session (no worker)
POST /api/sessions
{}
// or with custom ID:
{ "session_id": "my-custom-id" }
// Session with worker (one-step)
POST /api/sessions
{
"agent_path": "exports/my-agent",
"agent_id": "custom-worker-name", // optional
"model": "claude-sonnet-4-20250514" // optional
}
```
- Returns `201` with session object on success
- Returns `409` with `{"loading": true}` if agent is currently loading
- Returns `404` if agent_path doesn't exist
**Get session** returns `202` with `{"loading": true}` while loading, `404` if not found.
### Worker Lifecycle
| Method | Route | Description |
|--------|-------|-------------|
| `POST` | `/api/sessions/{session_id}/worker` | Load a worker into session |
| `DELETE` | `/api/sessions/{session_id}/worker` | Unload worker (queen stays alive) |
```jsonc
// Load worker into existing session
POST /api/sessions/{session_id}/worker
{
"agent_path": "exports/my-agent",
"worker_id": "custom-name", // optional
"model": "..." // optional
}
// Unload worker
DELETE /api/sessions/{session_id}/worker
```
### Execution Control
| Method | Route | Description |
|--------|-------|-------------|
| `POST` | `/api/sessions/{session_id}/trigger` | Start a new execution |
| `POST` | `/api/sessions/{session_id}/inject` | Inject input into a waiting node |
| `POST` | `/api/sessions/{session_id}/chat` | Smart chat routing |
| `POST` | `/api/sessions/{session_id}/stop` | Cancel a running execution |
| `POST` | `/api/sessions/{session_id}/pause` | Alias for stop |
| `POST` | `/api/sessions/{session_id}/resume` | Resume a paused execution |
| `POST` | `/api/sessions/{session_id}/replay` | Re-run from a checkpoint |
| `GET` | `/api/sessions/{session_id}/goal-progress` | Evaluate goal progress |
**Trigger:**
```jsonc
POST /api/sessions/{session_id}/trigger
{
"entry_point_id": "default",
"input_data": { "query": "research topic X" },
"session_state": {} // optional
}
// Returns: { "execution_id": "..." }
```
**Chat** routes messages with priority:
1. Worker awaiting input -> inject into worker node
2. Queen active -> inject into queen conversation
3. Neither available -> 503
```jsonc
POST /api/sessions/{session_id}/chat
{ "message": "hello" }
// Returns: { "status": "injected"|"queen", "delivered": true }
```
**Inject** into a specific node:
```jsonc
POST /api/sessions/{session_id}/inject
{ "node_id": "gather_info", "content": "user response", "graph_id": "main" }
```
**Stop:**
```jsonc
POST /api/sessions/{session_id}/stop
{ "execution_id": "..." }
```
**Resume:**
```jsonc
POST /api/sessions/{session_id}/resume
{
"session_id": "session_20260224_...", // worker session to resume
"checkpoint_id": "cp_..." // optional — resumes from latest if omitted
}
```
**Replay** (re-run from checkpoint):
```jsonc
POST /api/sessions/{session_id}/replay
{
"session_id": "session_20260224_...",
"checkpoint_id": "cp_..." // required
}
```
### SSE Event Streaming
| Method | Route | Description |
|--------|-------|-------------|
| `GET` | `/api/sessions/{session_id}/events` | SSE event stream |
```
GET /api/sessions/{session_id}/events
GET /api/sessions/{session_id}/events?types=CLIENT_OUTPUT_DELTA,EXECUTION_COMPLETED
```
Keepalive ping every 15s. Streams from the session's EventBus (covers both queen and worker events).
Default event types: `CLIENT_OUTPUT_DELTA`, `CLIENT_INPUT_REQUESTED`, `LLM_TEXT_DELTA`, `TOOL_CALL_STARTED`, `TOOL_CALL_COMPLETED`, `EXECUTION_STARTED`, `EXECUTION_COMPLETED`, `EXECUTION_FAILED`, `EXECUTION_PAUSED`, `NODE_LOOP_STARTED`, `NODE_LOOP_ITERATION`, `NODE_LOOP_COMPLETED`, `NODE_ACTION_PLAN`, `EDGE_TRAVERSED`, `GOAL_PROGRESS`, `QUEEN_INTERVENTION_REQUESTED`, `WORKER_ESCALATION_TICKET`, `NODE_INTERNAL_OUTPUT`, `NODE_STALLED`, `NODE_RETRY`, `NODE_TOOL_DOOM_LOOP`, `CONTEXT_COMPACTED`, `WORKER_LOADED`.
### Session Info
| Method | Route | Description |
|--------|-------|-------------|
| `GET` | `/api/sessions/{session_id}/stats` | Runtime statistics |
| `GET` | `/api/sessions/{session_id}/entry-points` | List entry points |
| `GET` | `/api/sessions/{session_id}/graphs` | List loaded graph IDs |
### Graph & Node Inspection
| Method | Route | Description |
|--------|-------|-------------|
| `GET` | `/api/sessions/{session_id}/graphs/{graph_id}/nodes` | List nodes + edges |
| `GET` | `/api/sessions/{session_id}/graphs/{graph_id}/nodes/{node_id}` | Node detail + outgoing edges |
| `GET` | `/api/sessions/{session_id}/graphs/{graph_id}/nodes/{node_id}/criteria` | Success criteria + last execution info |
| `GET` | `/api/sessions/{session_id}/graphs/{graph_id}/nodes/{node_id}/tools` | Resolved tool metadata |
**List nodes** supports optional enrichment with session progress:
```
GET /api/sessions/{session_id}/graphs/{graph_id}/nodes?session_id=worker_session_id
```
Adds `visit_count`, `has_failures`, `is_current`, `in_path` to each node.
### Logs
| Method | Route | Description |
|--------|-------|-------------|
| `GET` | `/api/sessions/{session_id}/logs` | Session-level logs |
| `GET` | `/api/sessions/{session_id}/graphs/{graph_id}/nodes/{node_id}/logs` | Node-scoped logs |
```
# List recent runs
GET /api/sessions/{session_id}/logs?level=summary&limit=20
# Detailed per-node execution for a specific worker session
GET /api/sessions/{session_id}/logs?session_id=ws_id&level=details
# Tool call logs
GET /api/sessions/{session_id}/logs?session_id=ws_id&level=tools
# Node-scoped (requires session_id query param)
GET .../nodes/{node_id}/logs?session_id=ws_id&level=all
```
Log levels: `summary` (run stats), `details` (per-node execution), `tools` (tool calls + LLM text).
### Worker Session Browsing
Browse persisted execution runs on disk.
| Method | Route | Description |
|--------|-------|-------------|
| `GET` | `/api/sessions/{session_id}/worker-sessions` | List worker sessions |
| `GET` | `/api/sessions/{session_id}/worker-sessions/{ws_id}` | Worker session state |
| `DELETE` | `/api/sessions/{session_id}/worker-sessions/{ws_id}` | Delete worker session |
| `GET` | `/api/sessions/{session_id}/worker-sessions/{ws_id}/checkpoints` | List checkpoints |
| `POST` | `/api/sessions/{session_id}/worker-sessions/{ws_id}/checkpoints/{cp_id}/restore` | Restore from checkpoint |
| `GET` | `/api/sessions/{session_id}/worker-sessions/{ws_id}/messages` | Get conversation messages |
**Messages** support filtering:
```
GET .../messages?node_id=gather_info # filter by node
GET .../messages?client_only=true # only user inputs + client-facing assistant outputs
```
### Credentials
| Method | Route | Description |
|--------|-------|-------------|
| `GET` | `/api/credentials` | List credential metadata (no secrets) |
| `POST` | `/api/credentials` | Save a credential |
| `GET` | `/api/credentials/{credential_id}` | Get credential metadata |
| `DELETE` | `/api/credentials/{credential_id}` | Delete a credential |
| `POST` | `/api/credentials/check-agent` | Validate agent credentials |
**Save credential:**
```jsonc
POST /api/credentials
{ "credential_id": "brave_search", "keys": { "api_key": "BSA..." } }
```
**Check agent credentials** — two-phase validation (same as runtime startup):
```jsonc
POST /api/credentials/check-agent
{
"agent_path": "exports/my-agent",
"verify": true // optional, default true — run health checks
}
// Returns:
{
"required": [
{
"credential_name": "brave_search",
"credential_id": "brave_search",
"env_var": "BRAVE_SEARCH_API_KEY",
"description": "Brave Search API key",
"help_url": "https://...",
"tools": ["brave_web_search"],
"node_types": [],
"available": true,
"valid": true, // true/false/null (null = not checked)
"validation_message": "OK", // human-readable health check result
"direct_api_key_supported": true,
"aden_supported": true,
"credential_key": "api_key"
}
]
}
```
When `verify: true`, runs health checks (lightweight HTTP calls) against each available credential to confirm it actually works — not just that it exists.
## Key Patterns
- **Session-primary** — sessions are the lookup key for all routes, workers are optional children
- **Per-request manager access** — routes get `SessionManager` via `request.app["manager"]`
- **Path validation** — user-provided path segments validated with `safe_path_segment()` to prevent directory traversal
- **Event-driven streaming** — per-client buffer queues (max 1000 events) with 15s keepalive pings
- **Shared EventBus** — session owns the bus, queen and worker both publish to it, SSE always connects to `session.event_bus`
- **No secrets in responses** — credential endpoints never return secret values
## Storage Paths
```
~/.hive/
├── queen/session/{session_id}/ # Queen conversation state
├── judge/session/{session_id}/ # Judge state
├── agents/{agent_name}/sessions/ # Worker execution sessions
└── credentials/ # Encrypted credential store
```
## Running Tests
```bash
pytest framework/server/tests/ -v
```
+1
View File
@@ -0,0 +1 @@
"""HTTP API server for the Hive agent framework."""
+36
View File
@@ -0,0 +1,36 @@
"""Backward-compatibility shim.
The primary implementation is now in ``session_manager.py``.
This module re-exports ``SessionManager`` as ``AgentManager`` and
keeps ``AgentSlot`` for test compatibility.
"""
import asyncio
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from framework.server.session_manager import Session, SessionManager # noqa: F401
@dataclass
class AgentSlot:
"""Legacy data class — kept for test compatibility only.
New code should use ``Session`` from ``session_manager``.
"""
id: str
agent_path: Path
runner: Any
runtime: Any
info: Any
loaded_at: float
queen_executor: Any = None
queen_task: asyncio.Task | None = None
judge_task: asyncio.Task | None = None
escalation_sub: str | None = None
# Backward compat alias
AgentManager = SessionManager
+209
View File
@@ -0,0 +1,209 @@
"""aiohttp Application factory for the Hive HTTP API server."""
import logging
from pathlib import Path
from aiohttp import web
from framework.server.session_manager import Session, SessionManager
logger = logging.getLogger(__name__)
def safe_path_segment(value: str) -> str:
"""Validate a URL path parameter is a safe filesystem name.
Raises HTTPBadRequest if the value contains path separators or
traversal sequences. aiohttp decodes ``%2F`` inside route params,
so a raw ``{session_id}`` can contain ``/`` or ``..`` after decoding.
"""
if "/" in value or "\\" in value or ".." in value:
raise web.HTTPBadRequest(reason="Invalid path parameter")
return value
def resolve_session(request: web.Request):
"""Resolve a Session from {session_id} in the URL.
Returns (session, None) on success or (None, error_response) on failure.
"""
manager: SessionManager = request.app["manager"]
sid = request.match_info["session_id"]
session = manager.get_session(sid)
if not session:
return None, web.json_response({"error": f"Session '{sid}' not found"}, status=404)
return session, None
def sessions_dir(session: Session) -> Path:
"""Resolve the worker sessions directory for a session.
Storage layout: ~/.hive/agents/{agent_name}/sessions/
Requires a worker to be loaded (worker_path must be set).
"""
if session.worker_path is None:
raise ValueError("No worker loaded — no worker sessions directory")
agent_name = session.worker_path.name
return Path.home() / ".hive" / "agents" / agent_name / "sessions"
# Allowed CORS origins (localhost on any port)
_CORS_ORIGINS = {"http://localhost", "http://127.0.0.1"}
def _is_cors_allowed(origin: str) -> bool:
"""Check if origin is localhost/127.0.0.1 on any port."""
if not origin:
return False
for base in _CORS_ORIGINS:
if origin == base or origin.startswith(base + ":"):
return True
return False
@web.middleware
async def cors_middleware(request: web.Request, handler):
"""CORS middleware scoped to localhost origins."""
origin = request.headers.get("Origin", "")
# Handle preflight
if request.method == "OPTIONS":
response = web.Response(status=204)
else:
try:
response = await handler(request)
except web.HTTPException as exc:
response = exc
if _is_cors_allowed(origin):
response.headers["Access-Control-Allow-Origin"] = origin
response.headers["Access-Control-Allow-Methods"] = "GET, POST, DELETE, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "Content-Type"
response.headers["Access-Control-Max-Age"] = "3600"
return response
@web.middleware
async def error_middleware(request: web.Request, handler):
"""Catch exceptions and return JSON error responses."""
try:
return await handler(request)
except web.HTTPException:
raise # Let aiohttp handle its own HTTP exceptions
except Exception as e:
logger.exception(f"Unhandled error: {e}")
return web.json_response(
{"error": str(e), "type": type(e).__name__},
status=500,
)
async def _on_shutdown(app: web.Application) -> None:
"""Gracefully unload all agents on server shutdown."""
manager: SessionManager = app["manager"]
await manager.shutdown_all()
async def handle_health(request: web.Request) -> web.Response:
"""GET /api/health — simple health check."""
manager: SessionManager = request.app["manager"]
sessions = manager.list_sessions()
return web.json_response(
{
"status": "ok",
"sessions": len(sessions),
"agents_loaded": sum(1 for s in sessions if s.worker_runtime is not None),
}
)
def create_app(model: str | None = None) -> web.Application:
"""Create and configure the aiohttp Application.
Args:
model: Default LLM model for agent loading.
Returns:
Configured aiohttp Application ready to run.
"""
app = web.Application(middlewares=[cors_middleware, error_middleware])
# Store manager on app for handlers
app["manager"] = SessionManager(model=model)
# Initialize credential store
from framework.credentials.store import CredentialStore
try:
from framework.credentials.validation import ensure_credential_key_env
ensure_credential_key_env()
app["credential_store"] = CredentialStore.with_aden_sync()
except Exception:
logger.debug("Encrypted credential store unavailable, using in-memory fallback")
app["credential_store"] = CredentialStore.for_testing({})
# Register shutdown hook
app.on_shutdown.append(_on_shutdown)
# Health check
app.router.add_get("/api/health", handle_health)
# Register route modules
from framework.server.routes_credentials import register_routes as register_credential_routes
from framework.server.routes_events import register_routes as register_event_routes
from framework.server.routes_execution import register_routes as register_execution_routes
from framework.server.routes_graphs import register_routes as register_graph_routes
from framework.server.routes_logs import register_routes as register_log_routes
from framework.server.routes_sessions import register_routes as register_session_routes
register_credential_routes(app)
register_execution_routes(app)
register_event_routes(app)
register_session_routes(app)
register_graph_routes(app)
register_log_routes(app)
# Static file serving — Option C production mode
# If frontend/dist/ exists, serve built frontend files on /
_setup_static_serving(app)
return app
def _setup_static_serving(app: web.Application) -> None:
"""Serve frontend static files if the dist directory exists."""
# Try: CWD/frontend/dist, core/frontend/dist, repo_root/frontend/dist
_here = Path(__file__).resolve().parent # core/framework/server/
candidates = [
Path("frontend/dist"),
_here.parent.parent / "frontend" / "dist", # core/frontend/dist
_here.parent.parent.parent / "frontend" / "dist", # repo_root/frontend/dist
]
dist_dir: Path | None = None
for candidate in candidates:
if candidate.is_dir() and (candidate / "index.html").exists():
dist_dir = candidate.resolve()
break
if dist_dir is None:
logger.debug("No frontend/dist found — skipping static file serving")
return
logger.info(f"Serving frontend from {dist_dir}")
async def handle_spa(request: web.Request) -> web.FileResponse:
"""Serve static files with SPA fallback to index.html."""
rel_path = request.match_info.get("path", "")
file_path = (dist_dir / rel_path).resolve()
if file_path.is_file() and file_path.is_relative_to(dist_dir):
return web.FileResponse(file_path)
# SPA fallback
return web.FileResponse(dist_dir / "index.html")
# Catch-all for SPA — must be registered LAST so /api routes take priority
app.router.add_get("/{path:.*}", handle_spa)
+213
View File
@@ -0,0 +1,213 @@
"""Credential CRUD routes."""
import logging
from aiohttp import web
from pydantic import SecretStr
from framework.credentials.models import CredentialKey, CredentialObject
from framework.credentials.store import CredentialStore
logger = logging.getLogger(__name__)
def _get_store(request: web.Request) -> CredentialStore:
return request.app["credential_store"]
def _credential_to_dict(cred: CredentialObject) -> dict:
"""Serialize a CredentialObject to JSON — never include secret values."""
return {
"credential_id": cred.id,
"credential_type": str(cred.credential_type),
"key_names": list(cred.keys.keys()),
"created_at": cred.created_at.isoformat() if cred.created_at else None,
"updated_at": cred.updated_at.isoformat() if cred.updated_at else None,
}
async def handle_list_credentials(request: web.Request) -> web.Response:
"""GET /api/credentials — list all credential metadata (no secrets)."""
store = _get_store(request)
cred_ids = store.list_credentials()
credentials = []
for cid in cred_ids:
cred = store.get_credential(cid, refresh_if_needed=False)
if cred:
credentials.append(_credential_to_dict(cred))
return web.json_response({"credentials": credentials})
async def handle_get_credential(request: web.Request) -> web.Response:
"""GET /api/credentials/{credential_id} — get single credential metadata."""
credential_id = request.match_info["credential_id"]
store = _get_store(request)
cred = store.get_credential(credential_id, refresh_if_needed=False)
if cred is None:
return web.json_response({"error": f"Credential '{credential_id}' not found"}, status=404)
return web.json_response(_credential_to_dict(cred))
async def handle_save_credential(request: web.Request) -> web.Response:
"""POST /api/credentials — store a credential.
Body: {"credential_id": "...", "keys": {"key_name": "value", ...}}
"""
store = _get_store(request)
body = await request.json()
credential_id = body.get("credential_id")
keys = body.get("keys")
if not credential_id or not keys or not isinstance(keys, dict):
return web.json_response({"error": "credential_id and keys are required"}, status=400)
cred = CredentialObject(
id=credential_id,
keys={k: CredentialKey(name=k, value=SecretStr(v)) for k, v in keys.items()},
)
store.save_credential(cred)
return web.json_response({"saved": credential_id}, status=201)
async def handle_delete_credential(request: web.Request) -> web.Response:
"""DELETE /api/credentials/{credential_id} — delete a credential."""
credential_id = request.match_info["credential_id"]
store = _get_store(request)
deleted = store.delete_credential(credential_id)
if not deleted:
return web.json_response({"error": f"Credential '{credential_id}' not found"}, status=404)
return web.json_response({"deleted": True})
async def handle_check_agent(request: web.Request) -> web.Response:
"""POST /api/credentials/check-agent — check and validate agent credentials.
Uses the same two-phase validation as agent startup:
1. Presence is the credential available (env, encrypted store, Aden)?
2. Health check does the credential actually work (lightweight HTTP call)?
Body: {"agent_path": "...", "verify": true}
"""
body = await request.json()
agent_path = body.get("agent_path")
verify = body.get("verify", True)
if not agent_path:
return web.json_response({"error": "agent_path is required"}, status=400)
try:
import os
from framework.credentials.setup import CredentialSetupSession
from framework.credentials.storage import CompositeStorage, EncryptedFileStorage, EnvVarStorage
from framework.credentials.validation import _presync_aden_tokens, ensure_credential_key_env
# Load env vars from shell config (same as runtime startup)
ensure_credential_key_env()
# Build a proper store with env + encrypted storage (same as validate_agent_credentials)
try:
from aden_tools.credentials import CREDENTIAL_SPECS
except ImportError:
CREDENTIAL_SPECS = {}
if os.environ.get("ADEN_API_KEY") and CREDENTIAL_SPECS:
_presync_aden_tokens(CREDENTIAL_SPECS)
env_mapping = {
(spec.credential_id or name): spec.env_var
for name, spec in CREDENTIAL_SPECS.items()
}
env_storage = EnvVarStorage(env_mapping=env_mapping)
if os.environ.get("HIVE_CREDENTIAL_KEY"):
storage = CompositeStorage(primary=env_storage, fallbacks=[EncryptedFileStorage()])
else:
storage = env_storage
store = CredentialStore(storage=storage)
# Detect required credentials from agent graph
session = CredentialSetupSession.from_agent_path(agent_path, missing_only=False)
# Health check function (may not be available)
check_health = None
if verify:
try:
from aden_tools.credentials import check_credential_health
check_health = check_credential_health
except ImportError:
pass
required = []
for mc in session.missing:
cred_id = mc.credential_id or mc.credential_name
available = store.is_available(cred_id)
entry = {
"credential_name": mc.credential_name,
"credential_id": cred_id,
"env_var": mc.env_var,
"description": mc.description,
"help_url": mc.help_url,
"tools": mc.tools,
"node_types": mc.node_types,
"available": available,
"direct_api_key_supported": mc.direct_api_key_supported,
"aden_supported": mc.aden_supported,
"credential_key": mc.credential_key,
"valid": None, # null = not checked
"validation_message": None,
}
# Phase 2: health check for available credentials
if available and verify and check_health:
spec = CREDENTIAL_SPECS.get(mc.credential_name)
if spec and spec.health_check_endpoint:
value = store.get(cred_id)
if value:
try:
result = check_health(
mc.credential_name,
value,
health_check_endpoint=spec.health_check_endpoint,
health_check_method=spec.health_check_method,
)
entry["valid"] = result.valid
entry["validation_message"] = result.message
except Exception as exc:
entry["valid"] = False
entry["validation_message"] = f"Health check error: {exc}"
required.append(entry)
# Determine overall status
any_missing = any(not r["available"] for r in required)
any_invalid = any(r["valid"] is False for r in required)
all_valid = not any_missing and not any_invalid
response = {"required": required, "all_valid": all_valid}
if any_missing or any_invalid:
reasons = []
if any_missing:
names = [r["credential_name"] for r in required if not r["available"]]
reasons.append(f"missing: {', '.join(names)}")
if any_invalid:
names = [r["credential_name"] for r in required if r["valid"] is False]
reasons.append(f"invalid: {', '.join(names)}")
response["error"] = "; ".join(reasons)
return web.json_response(response, status=424)
return web.json_response(response)
except Exception as e:
logger.exception(f"Error checking agent credentials: {e}")
return web.json_response({"error": str(e)}, status=500)
def register_routes(app: web.Application) -> None:
"""Register credential routes on the application."""
# check-agent must be registered BEFORE the {credential_id} wildcard
app.router.add_post("/api/credentials/check-agent", handle_check_agent)
app.router.add_get("/api/credentials", handle_list_credentials)
app.router.add_post("/api/credentials", handle_save_credential)
app.router.add_get("/api/credentials/{credential_id}", handle_get_credential)
app.router.add_delete("/api/credentials/{credential_id}", handle_delete_credential)
+123
View File
@@ -0,0 +1,123 @@
"""SSE event streaming route."""
import asyncio
import logging
from aiohttp import web
from framework.runtime.event_bus import EventType
from framework.server.app import resolve_session
logger = logging.getLogger(__name__)
# Default event types streamed to clients
DEFAULT_EVENT_TYPES = [
EventType.CLIENT_OUTPUT_DELTA,
EventType.CLIENT_INPUT_REQUESTED,
EventType.LLM_TEXT_DELTA,
EventType.TOOL_CALL_STARTED,
EventType.TOOL_CALL_COMPLETED,
EventType.EXECUTION_STARTED,
EventType.EXECUTION_COMPLETED,
EventType.EXECUTION_FAILED,
EventType.EXECUTION_PAUSED,
EventType.NODE_LOOP_STARTED,
EventType.NODE_LOOP_ITERATION,
EventType.NODE_LOOP_COMPLETED,
EventType.NODE_ACTION_PLAN,
EventType.EDGE_TRAVERSED,
EventType.GOAL_PROGRESS,
EventType.QUEEN_INTERVENTION_REQUESTED,
EventType.WORKER_ESCALATION_TICKET,
EventType.NODE_INTERNAL_OUTPUT,
EventType.NODE_STALLED,
EventType.NODE_RETRY,
EventType.NODE_TOOL_DOOM_LOOP,
EventType.CONTEXT_COMPACTED,
EventType.WORKER_LOADED,
]
# Keepalive interval in seconds
KEEPALIVE_INTERVAL = 15.0
def _parse_event_types(query_param: str | None) -> list[EventType]:
"""Parse comma-separated event type names into EventType values.
Falls back to DEFAULT_EVENT_TYPES if param is empty or invalid.
"""
if not query_param:
return DEFAULT_EVENT_TYPES
result = []
for name in query_param.split(","):
name = name.strip()
try:
result.append(EventType(name))
except ValueError:
logger.warning(f"Unknown event type filter: {name}")
return result or DEFAULT_EVENT_TYPES
async def handle_events(request: web.Request) -> web.StreamResponse:
"""SSE event stream for a session.
Query params:
types: Comma-separated event type names to filter (optional).
"""
session, err = resolve_session(request)
if err:
return err
# Session always has an event_bus — no runtime guard needed
event_bus = session.event_bus
event_types = _parse_event_types(request.query.get("types"))
# Per-client buffer queue
queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
async def on_event(event) -> None:
"""Push event dict into queue; drop if full."""
try:
queue.put_nowait(event.to_dict())
except asyncio.QueueFull:
pass # Drop oldest-undelivered; client will catch up
# Subscribe to EventBus
from framework.server.sse import SSEResponse
sub_id = event_bus.subscribe(
event_types=event_types,
handler=on_event,
)
sse = SSEResponse()
await sse.prepare(request)
try:
while True:
try:
data = await asyncio.wait_for(queue.get(), timeout=KEEPALIVE_INTERVAL)
await sse.send_event(data)
except TimeoutError:
await sse.send_keepalive()
except (ConnectionResetError, ConnectionError):
break
except RuntimeError as exc:
if "closing transport" in str(exc).lower():
break
raise
except asyncio.CancelledError:
pass
finally:
event_bus.unsubscribe(sub_id)
logger.debug("SSE client disconnected from session '%s'", session.id)
return sse.response
def register_routes(app: web.Application) -> None:
"""Register SSE event streaming routes."""
# Session-primary route
app.router.add_get("/api/sessions/{session_id}/events", handle_events)
+305
View File
@@ -0,0 +1,305 @@
"""Execution control routes — trigger, inject, chat, resume, stop, replay."""
import json
import logging
from aiohttp import web
from framework.server.app import resolve_session, safe_path_segment, sessions_dir
logger = logging.getLogger(__name__)
async def handle_trigger(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/trigger — start an execution.
Body: {"entry_point_id": "default", "input_data": {...}, "session_state": {...}?}
"""
session, err = resolve_session(request)
if err:
return err
if not session.worker_runtime:
return web.json_response({"error": "No worker loaded in this session"}, status=503)
body = await request.json()
entry_point_id = body.get("entry_point_id", "default")
input_data = body.get("input_data", {})
session_state = body.get("session_state") or {}
# Scope the worker execution to the live session ID
if "resume_session_id" not in session_state:
session_state["resume_session_id"] = session.id
execution_id = await session.worker_runtime.trigger(
entry_point_id,
input_data,
session_state=session_state,
)
return web.json_response({"execution_id": execution_id})
async def handle_inject(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/inject — inject input into a waiting node.
Body: {"node_id": "...", "content": "...", "graph_id": "..."}
"""
session, err = resolve_session(request)
if err:
return err
if not session.worker_runtime:
return web.json_response({"error": "No worker loaded in this session"}, status=503)
body = await request.json()
node_id = body.get("node_id")
content = body.get("content", "")
graph_id = body.get("graph_id")
if not node_id:
return web.json_response({"error": "node_id is required"}, status=400)
delivered = await session.worker_runtime.inject_input(node_id, content, graph_id=graph_id)
return web.json_response({"delivered": delivered})
async def handle_chat(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/chat — convenience endpoint.
Routing priority:
1. Worker awaiting input inject into worker node
2. Queen active inject into queen conversation
3. Error no handler available
Body: {"message": "hello"}
"""
session, err = resolve_session(request)
if err:
return err
body = await request.json()
message = body.get("message", "")
if not message:
return web.json_response({"error": "message is required"}, status=400)
# 1. Check if worker is awaiting input → inject to worker
if session.worker_runtime:
node_id, graph_id = session.worker_runtime.find_awaiting_node()
if node_id:
delivered = await session.worker_runtime.inject_input(
node_id,
message,
graph_id=graph_id,
is_client_input=True,
)
return web.json_response(
{
"status": "injected",
"node_id": node_id,
"delivered": delivered,
}
)
# 2. Queen active → inject into queen conversation
queen_executor = session.queen_executor
if queen_executor is not None:
node = queen_executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(message, is_client_input=True)
return web.json_response(
{
"status": "queen",
"delivered": True,
}
)
# 3. No queen or worker available
return web.json_response({"error": "No worker or queen available"}, status=503)
async def handle_goal_progress(request: web.Request) -> web.Response:
"""GET /api/sessions/{session_id}/goal-progress — evaluate goal progress."""
session, err = resolve_session(request)
if err:
return err
if not session.worker_runtime:
return web.json_response({"error": "No worker loaded in this session"}, status=503)
progress = await session.worker_runtime.get_goal_progress()
return web.json_response(progress, dumps=lambda obj: json.dumps(obj, default=str))
async def handle_resume(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/resume — resume a paused execution.
Body: {"session_id": "...", "checkpoint_id": "..." (optional)}
"""
session, err = resolve_session(request)
if err:
return err
if not session.worker_runtime:
return web.json_response({"error": "No worker loaded in this session"}, status=503)
body = await request.json()
worker_session_id = body.get("session_id")
checkpoint_id = body.get("checkpoint_id")
if not worker_session_id:
return web.json_response({"error": "session_id is required"}, status=400)
worker_session_id = safe_path_segment(worker_session_id)
if checkpoint_id:
checkpoint_id = safe_path_segment(checkpoint_id)
# Read session state
session_dir = sessions_dir(session) / worker_session_id
state_path = session_dir / "state.json"
if not state_path.exists():
return web.json_response({"error": "Session not found"}, status=404)
try:
state = json.loads(state_path.read_text())
except (json.JSONDecodeError, OSError) as e:
return web.json_response({"error": f"Failed to read session: {e}"}, status=500)
if checkpoint_id:
resume_session_state = {
"resume_session_id": worker_session_id,
"resume_from_checkpoint": checkpoint_id,
}
else:
progress = state.get("progress", {})
paused_at = progress.get("paused_at") or progress.get("resume_from")
resume_session_state = {
"resume_session_id": worker_session_id,
"memory": state.get("memory", {}),
"execution_path": progress.get("path", []),
"node_visit_counts": progress.get("node_visit_counts", {}),
}
if paused_at:
resume_session_state["paused_at"] = paused_at
entry_points = session.worker_runtime.get_entry_points()
if not entry_points:
return web.json_response({"error": "No entry points available"}, status=400)
input_data = state.get("input_data", {})
execution_id = await session.worker_runtime.trigger(
entry_points[0].id,
input_data=input_data,
session_state=resume_session_state,
)
return web.json_response(
{
"execution_id": execution_id,
"resumed_from": worker_session_id,
"checkpoint_id": checkpoint_id,
}
)
async def handle_stop(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/stop — cancel a running execution.
Body: {"execution_id": "..."}
"""
session, err = resolve_session(request)
if err:
return err
if not session.worker_runtime:
return web.json_response({"error": "No worker loaded in this session"}, status=503)
body = await request.json()
execution_id = body.get("execution_id")
if not execution_id:
return web.json_response({"error": "execution_id is required"}, status=400)
for graph_id in session.worker_runtime.list_graphs():
reg = session.worker_runtime.get_graph_registration(graph_id)
if reg is None:
continue
for _ep_id, stream in reg.streams.items():
cancelled = await stream.cancel_execution(execution_id)
if cancelled:
return web.json_response(
{
"stopped": True,
"execution_id": execution_id,
}
)
return web.json_response({"stopped": False, "error": "Execution not found"}, status=404)
async def handle_replay(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/replay — re-run from a checkpoint.
Body: {"session_id": "...", "checkpoint_id": "..."}
"""
session, err = resolve_session(request)
if err:
return err
if not session.worker_runtime:
return web.json_response({"error": "No worker loaded in this session"}, status=503)
body = await request.json()
worker_session_id = body.get("session_id")
checkpoint_id = body.get("checkpoint_id")
if not worker_session_id:
return web.json_response({"error": "session_id is required"}, status=400)
if not checkpoint_id:
return web.json_response({"error": "checkpoint_id is required"}, status=400)
worker_session_id = safe_path_segment(worker_session_id)
checkpoint_id = safe_path_segment(checkpoint_id)
cp_path = sessions_dir(session) / worker_session_id / "checkpoints" / f"{checkpoint_id}.json"
if not cp_path.exists():
return web.json_response({"error": "Checkpoint not found"}, status=404)
entry_points = session.worker_runtime.get_entry_points()
if not entry_points:
return web.json_response({"error": "No entry points available"}, status=400)
replay_session_state = {
"resume_session_id": worker_session_id,
"resume_from_checkpoint": checkpoint_id,
}
execution_id = await session.worker_runtime.trigger(
entry_points[0].id,
input_data={},
session_state=replay_session_state,
)
return web.json_response(
{
"execution_id": execution_id,
"replayed_from": worker_session_id,
"checkpoint_id": checkpoint_id,
}
)
def register_routes(app: web.Application) -> None:
"""Register execution control routes."""
# Session-primary routes
app.router.add_post("/api/sessions/{session_id}/trigger", handle_trigger)
app.router.add_post("/api/sessions/{session_id}/inject", handle_inject)
app.router.add_post("/api/sessions/{session_id}/chat", handle_chat)
app.router.add_post("/api/sessions/{session_id}/pause", handle_stop)
app.router.add_post("/api/sessions/{session_id}/resume", handle_resume)
app.router.add_post("/api/sessions/{session_id}/stop", handle_stop)
app.router.add_post("/api/sessions/{session_id}/replay", handle_replay)
app.router.add_get("/api/sessions/{session_id}/goal-progress", handle_goal_progress)
+224
View File
@@ -0,0 +1,224 @@
"""Graph and node inspection routes — node list, node detail, node criteria."""
import json
import logging
from aiohttp import web
from framework.server.app import resolve_session, safe_path_segment
logger = logging.getLogger(__name__)
def _get_graph_spec(session, graph_id: str):
"""Get GraphSpec for a graph_id. Returns (graph_spec, None) or (None, error_response)."""
if not session.worker_runtime:
return None, web.json_response({"error": "No worker loaded in this session"}, status=503)
reg = session.worker_runtime.get_graph_registration(graph_id)
if reg is None:
return None, web.json_response({"error": f"Graph '{graph_id}' not found"}, status=404)
return reg.graph, None
def _node_to_dict(node) -> dict:
"""Serialize a NodeSpec to a JSON-friendly dict."""
return {
"id": node.id,
"name": node.name,
"description": node.description,
"node_type": node.node_type,
"input_keys": node.input_keys,
"output_keys": node.output_keys,
"nullable_output_keys": node.nullable_output_keys,
"tools": node.tools,
"routes": node.routes,
"max_retries": node.max_retries,
"max_node_visits": node.max_node_visits,
"client_facing": node.client_facing,
"success_criteria": node.success_criteria,
"system_prompt": node.system_prompt or "",
}
async def handle_list_nodes(request: web.Request) -> web.Response:
"""List nodes in a graph."""
session, err = resolve_session(request)
if err:
return err
graph_id = request.match_info["graph_id"]
graph, err = _get_graph_spec(session, graph_id)
if err:
return err
nodes = [_node_to_dict(n) for n in graph.nodes]
# Optionally enrich with session progress
worker_session_id = request.query.get("session_id")
if worker_session_id and session.worker_path:
worker_session_id = safe_path_segment(worker_session_id)
from pathlib import Path
state_path = (
Path.home()
/ ".hive"
/ "agents"
/ session.worker_path.name
/ "sessions"
/ worker_session_id
/ "state.json"
)
if state_path.exists():
try:
state = json.loads(state_path.read_text())
progress = state.get("progress", {})
visit_counts = progress.get("node_visit_counts", {})
failures = progress.get("nodes_with_failures", [])
current = progress.get("current_node")
path = progress.get("path", [])
for node in nodes:
nid = node["id"]
node["visit_count"] = visit_counts.get(nid, 0)
node["has_failures"] = nid in failures
node["is_current"] = nid == current
node["in_path"] = nid in path
except (json.JSONDecodeError, OSError):
pass
edges = [
{"source": e.source, "target": e.target, "condition": e.condition, "priority": e.priority}
for e in graph.edges
]
return web.json_response(
{
"nodes": nodes,
"edges": edges,
"entry_node": graph.entry_node,
}
)
async def handle_get_node(request: web.Request) -> web.Response:
"""Get node detail."""
session, err = resolve_session(request)
if err:
return err
graph_id = request.match_info["graph_id"]
node_id = request.match_info["node_id"]
graph, err = _get_graph_spec(session, graph_id)
if err:
return err
node_spec = graph.get_node(node_id)
if node_spec is None:
return web.json_response({"error": f"Node '{node_id}' not found"}, status=404)
data = _node_to_dict(node_spec)
edges = [
{"target": e.target, "condition": e.condition, "priority": e.priority}
for e in graph.edges
if e.source == node_id
]
data["edges"] = edges
return web.json_response(data)
async def handle_node_criteria(request: web.Request) -> web.Response:
"""Get node success criteria and last execution info."""
session, err = resolve_session(request)
if err:
return err
graph_id = request.match_info["graph_id"]
node_id = request.match_info["node_id"]
graph, err = _get_graph_spec(session, graph_id)
if err:
return err
node_spec = graph.get_node(node_id)
if node_spec is None:
return web.json_response({"error": f"Node '{node_id}' not found"}, status=404)
result: dict = {
"node_id": node_id,
"success_criteria": node_spec.success_criteria,
"output_keys": node_spec.output_keys,
}
worker_session_id = request.query.get("session_id")
if worker_session_id and session.worker_runtime:
log_store = getattr(session.worker_runtime, "_runtime_log_store", None)
if log_store:
details = await log_store.load_details(worker_session_id)
if details:
node_details = [n for n in details.nodes if n.node_id == node_id]
if node_details:
latest = node_details[-1]
result["last_execution"] = {
"success": latest.success,
"error": latest.error,
"retry_count": latest.retry_count,
"needs_attention": latest.needs_attention,
"attention_reasons": latest.attention_reasons,
}
return web.json_response(result, dumps=lambda obj: json.dumps(obj, default=str))
async def handle_node_tools(request: web.Request) -> web.Response:
"""Get tools available to a node."""
session, err = resolve_session(request)
if err:
return err
graph_id = request.match_info["graph_id"]
node_id = request.match_info["node_id"]
graph, err = _get_graph_spec(session, graph_id)
if err:
return err
node_spec = graph.get_node(node_id)
if node_spec is None:
return web.json_response({"error": f"Node '{node_id}' not found"}, status=404)
tools_out = []
registry = getattr(session.runner, "_tool_registry", None) if session.runner else None
all_tools = registry.get_tools() if registry else {}
for name in node_spec.tools:
tool = all_tools.get(name)
if tool:
tools_out.append(
{
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters,
}
)
else:
tools_out.append({"name": name, "description": "", "parameters": {}})
return web.json_response({"tools": tools_out})
def register_routes(app: web.Application) -> None:
"""Register graph/node inspection routes."""
# Session-primary routes
app.router.add_get("/api/sessions/{session_id}/graphs/{graph_id}/nodes", handle_list_nodes)
app.router.add_get(
"/api/sessions/{session_id}/graphs/{graph_id}/nodes/{node_id}", handle_get_node
)
app.router.add_get(
"/api/sessions/{session_id}/graphs/{graph_id}/nodes/{node_id}/criteria",
handle_node_criteria,
)
app.router.add_get(
"/api/sessions/{session_id}/graphs/{graph_id}/nodes/{node_id}/tools",
handle_node_tools,
)
+114
View File
@@ -0,0 +1,114 @@
"""Log and observability routes — agent logs, node-scoped logs."""
import json
import logging
from aiohttp import web
from framework.server.app import resolve_session
logger = logging.getLogger(__name__)
async def handle_logs(request: web.Request) -> web.Response:
"""Session-level logs.
Query params:
session_id: Scope to a specific worker session (optional).
level: "summary" | "details" | "tools" (default: "summary").
limit: Max results when listing summaries (default: 20).
"""
session, err = resolve_session(request)
if err:
return err
if not session.worker_runtime:
return web.json_response({"error": "No worker loaded in this session"}, status=503)
log_store = getattr(session.worker_runtime, "_runtime_log_store", None)
if log_store is None:
return web.json_response({"error": "Logging not enabled for this agent"}, status=404)
worker_session_id = request.query.get("session_id")
level = request.query.get("level", "summary")
try:
limit = min(int(request.query.get("limit", "20")), 1000)
except (ValueError, TypeError):
limit = 20
if not worker_session_id:
summaries = await log_store.list_runs(limit=limit)
return web.json_response(
{"logs": [s.model_dump() for s in summaries]},
dumps=lambda obj: json.dumps(obj, default=str),
)
if level == "details":
details = await log_store.load_details(worker_session_id)
if details is None:
return web.json_response({"error": "No detail logs found"}, status=404)
return web.json_response(
{"session_id": worker_session_id, "nodes": [n.model_dump() for n in details.nodes]},
dumps=lambda obj: json.dumps(obj, default=str),
)
elif level == "tools":
tool_logs = await log_store.load_tool_logs(worker_session_id)
if tool_logs is None:
return web.json_response({"error": "No tool logs found"}, status=404)
return web.json_response(
{"session_id": worker_session_id, "steps": [s.model_dump() for s in tool_logs.steps]},
dumps=lambda obj: json.dumps(obj, default=str),
)
else:
summary = await log_store.load_summary(worker_session_id)
if summary is None:
return web.json_response({"error": "No summary log found"}, status=404)
return web.json_response(
summary.model_dump(),
dumps=lambda obj: json.dumps(obj, default=str),
)
async def handle_node_logs(request: web.Request) -> web.Response:
"""Node-scoped logs."""
session, err = resolve_session(request)
if err:
return err
node_id = request.match_info["node_id"]
if not session.worker_runtime:
return web.json_response({"error": "No worker loaded in this session"}, status=503)
log_store = getattr(session.worker_runtime, "_runtime_log_store", None)
if log_store is None:
return web.json_response({"error": "Logging not enabled"}, status=404)
worker_session_id = request.query.get("session_id")
if not worker_session_id:
return web.json_response({"error": "session_id query param is required"}, status=400)
level = request.query.get("level", "all")
result: dict = {"session_id": worker_session_id, "node_id": node_id}
if level in ("details", "all"):
details = await log_store.load_details(worker_session_id)
if details:
result["details"] = [n.model_dump() for n in details.nodes if n.node_id == node_id]
if level in ("tools", "all"):
tool_logs = await log_store.load_tool_logs(worker_session_id)
if tool_logs:
result["tool_logs"] = [s.model_dump() for s in tool_logs.steps if s.node_id == node_id]
return web.json_response(result, dumps=lambda obj: json.dumps(obj, default=str))
def register_routes(app: web.Application) -> None:
"""Register log routes."""
# Session-primary routes
app.router.add_get("/api/sessions/{session_id}/logs", handle_logs)
app.router.add_get(
"/api/sessions/{session_id}/graphs/{graph_id}/nodes/{node_id}/logs",
handle_node_logs,
)
+613
View File
@@ -0,0 +1,613 @@
"""Session lifecycle, info, and worker-session browsing routes.
Session-primary routes:
- POST /api/sessions create session (with or without worker)
- GET /api/sessions list all active sessions
- GET /api/sessions/{session_id} session detail
- DELETE /api/sessions/{session_id} stop session entirely
- POST /api/sessions/{session_id}/worker load a worker into session
- DELETE /api/sessions/{session_id}/worker unload worker from session
- GET /api/sessions/{session_id}/stats runtime statistics
- GET /api/sessions/{session_id}/entry-points list entry points
- GET /api/sessions/{session_id}/graphs list graph IDs
Worker session browsing (persisted execution runs on disk):
- GET /api/sessions/{session_id}/worker-sessions list
- GET /api/sessions/{session_id}/worker-sessions/{ws_id} detail
- DELETE /api/sessions/{session_id}/worker-sessions/{ws_id} delete
- GET /api/sessions/{session_id}/worker-sessions/{ws_id}/checkpoints list CPs
- POST /api/sessions/{session_id}/worker-sessions/{ws_id}/checkpoints/{cp}/restore
- GET /api/sessions/{session_id}/worker-sessions/{ws_id}/messages messages
"""
import json
import logging
import shutil
import time
from pathlib import Path
from aiohttp import web
from framework.server.app import resolve_session, safe_path_segment, sessions_dir
from framework.server.session_manager import SessionManager
logger = logging.getLogger(__name__)
def _get_manager(request: web.Request) -> SessionManager:
return request.app["manager"]
def _session_to_live_dict(session) -> dict:
"""Serialize a live Session to the session-primary JSON shape."""
info = session.worker_info
return {
"session_id": session.id,
"worker_id": session.worker_id,
"worker_name": info.name if info else session.worker_id,
"has_worker": session.worker_runtime is not None,
"agent_path": str(session.worker_path) if session.worker_path else "",
"description": info.description if info else "",
"goal": info.goal_name if info else "",
"node_count": info.node_count if info else 0,
"loaded_at": session.loaded_at,
"uptime_seconds": round(time.time() - session.loaded_at, 1),
"intro_message": getattr(session.runner, "intro_message", "") or "",
}
# ------------------------------------------------------------------
# Session lifecycle
# ------------------------------------------------------------------
async def handle_create_session(request: web.Request) -> web.Response:
"""POST /api/sessions — create a session.
Body: {
"agent_path": "..." (optional if provided, creates session with worker),
"agent_id": "..." (optional worker ID override),
"session_id": "..." (optional custom session ID),
"model": "..." (optional),
}
When agent_path is provided, creates a session with a worker in one step
(equivalent to the old POST /api/agents). Otherwise creates a queen-only
session that can later have a worker loaded via POST /sessions/{id}/worker.
"""
manager = _get_manager(request)
body = await request.json() if request.can_read_body else {}
agent_path = body.get("agent_path")
agent_id = body.get("agent_id")
session_id = body.get("session_id")
model = body.get("model")
try:
if agent_path:
# One-step: create session + load worker
session = await manager.create_session_with_worker(
agent_path,
agent_id=agent_id,
model=model,
)
else:
# Queen-only session
session = await manager.create_session(
session_id=session_id,
model=model,
)
except ValueError as e:
msg = str(e)
if "currently loading" in msg:
resolved_id = agent_id or (Path(agent_path).name if agent_path else "")
return web.json_response(
{"error": msg, "worker_id": resolved_id, "loading": True},
status=409,
)
return web.json_response({"error": msg}, status=409)
except FileNotFoundError as e:
return web.json_response({"error": str(e)}, status=404)
except Exception as e:
logger.exception("Error creating session: %s", e)
return web.json_response({"error": str(e)}, status=500)
return web.json_response(_session_to_live_dict(session), status=201)
async def handle_list_live_sessions(request: web.Request) -> web.Response:
"""GET /api/sessions — list all active sessions."""
manager = _get_manager(request)
sessions = [_session_to_live_dict(s) for s in manager.list_sessions()]
return web.json_response({"sessions": sessions})
async def handle_get_live_session(request: web.Request) -> web.Response:
"""GET /api/sessions/{session_id} — get session detail."""
manager = _get_manager(request)
session_id = request.match_info["session_id"]
session = manager.get_session(session_id)
if session is None:
if manager.is_loading(session_id):
return web.json_response(
{"session_id": session_id, "loading": True},
status=202,
)
return web.json_response(
{"error": f"Session '{session_id}' not found"},
status=404,
)
data = _session_to_live_dict(session)
if session.worker_runtime:
data["entry_points"] = [
{
"id": ep.id,
"name": ep.name,
"entry_node": ep.entry_node,
"trigger_type": ep.trigger_type,
}
for ep in session.worker_runtime.get_entry_points()
]
data["graphs"] = session.worker_runtime.list_graphs()
return web.json_response(data)
async def handle_stop_session(request: web.Request) -> web.Response:
"""DELETE /api/sessions/{session_id} — stop a session entirely."""
manager = _get_manager(request)
session_id = request.match_info["session_id"]
stopped = await manager.stop_session(session_id)
if not stopped:
return web.json_response(
{"error": f"Session '{session_id}' not found"},
status=404,
)
return web.json_response({"session_id": session_id, "stopped": True})
# ------------------------------------------------------------------
# Worker lifecycle
# ------------------------------------------------------------------
async def handle_load_worker(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/worker — load a worker into a session.
Body: {"agent_path": "...", "worker_id": "..." (optional), "model": "..." (optional)}
"""
manager = _get_manager(request)
session_id = request.match_info["session_id"]
body = await request.json()
agent_path = body.get("agent_path")
if not agent_path:
return web.json_response({"error": "agent_path is required"}, status=400)
worker_id = body.get("worker_id")
model = body.get("model")
try:
session = await manager.load_worker(
session_id,
agent_path,
worker_id=worker_id,
model=model,
)
except ValueError as e:
return web.json_response({"error": str(e)}, status=409)
except FileNotFoundError as e:
return web.json_response({"error": str(e)}, status=404)
except Exception as e:
logger.exception("Error loading worker: %s", e)
return web.json_response({"error": str(e)}, status=500)
return web.json_response(_session_to_live_dict(session))
async def handle_unload_worker(request: web.Request) -> web.Response:
"""DELETE /api/sessions/{session_id}/worker — unload worker, keep queen alive."""
manager = _get_manager(request)
session_id = request.match_info["session_id"]
removed = await manager.unload_worker(session_id)
if not removed:
session = manager.get_session(session_id)
if session is None:
return web.json_response(
{"error": f"Session '{session_id}' not found"},
status=404,
)
return web.json_response(
{"error": "No worker loaded in this session"},
status=409,
)
return web.json_response({"session_id": session_id, "worker_unloaded": True})
# ------------------------------------------------------------------
# Session info (worker details)
# ------------------------------------------------------------------
async def handle_session_stats(request: web.Request) -> web.Response:
"""GET /api/sessions/{session_id}/stats — runtime statistics."""
manager = _get_manager(request)
session_id = request.match_info["session_id"]
session = manager.get_session(session_id)
if session is None:
return web.json_response(
{"error": f"Session '{session_id}' not found"},
status=404,
)
stats = session.worker_runtime.get_stats() if session.worker_runtime else {}
return web.json_response(stats)
async def handle_session_entry_points(request: web.Request) -> web.Response:
"""GET /api/sessions/{session_id}/entry-points — list entry points."""
manager = _get_manager(request)
session_id = request.match_info["session_id"]
session = manager.get_session(session_id)
if session is None:
return web.json_response(
{"error": f"Session '{session_id}' not found"},
status=404,
)
eps = session.worker_runtime.get_entry_points() if session.worker_runtime else []
return web.json_response(
{
"entry_points": [
{
"id": ep.id,
"name": ep.name,
"entry_node": ep.entry_node,
"trigger_type": ep.trigger_type,
}
for ep in eps
]
}
)
async def handle_session_graphs(request: web.Request) -> web.Response:
"""GET /api/sessions/{session_id}/graphs — list loaded graphs."""
manager = _get_manager(request)
session_id = request.match_info["session_id"]
session = manager.get_session(session_id)
if session is None:
return web.json_response(
{"error": f"Session '{session_id}' not found"},
status=404,
)
graphs = session.worker_runtime.list_graphs() if session.worker_runtime else []
return web.json_response({"graphs": graphs})
# ------------------------------------------------------------------
# Worker session browsing (persisted execution runs on disk)
# ------------------------------------------------------------------
async def handle_list_worker_sessions(request: web.Request) -> web.Response:
"""List worker sessions on disk."""
session, err = resolve_session(request)
if err:
return err
if not session.worker_path:
return web.json_response({"sessions": []})
sess_dir = sessions_dir(session)
if not sess_dir.exists():
return web.json_response({"sessions": []})
sessions = []
for d in sorted(sess_dir.iterdir(), reverse=True):
if not d.is_dir() or not d.name.startswith("session_"):
continue
entry: dict = {"session_id": d.name}
state_path = d / "state.json"
if state_path.exists():
try:
state = json.loads(state_path.read_text())
entry["status"] = state.get("status", "unknown")
entry["started_at"] = state.get("started_at")
entry["completed_at"] = state.get("completed_at")
progress = state.get("progress", {})
entry["steps"] = progress.get("steps_executed", 0)
entry["paused_at"] = progress.get("paused_at")
except (json.JSONDecodeError, OSError):
entry["status"] = "error"
cp_dir = d / "checkpoints"
if cp_dir.exists():
entry["checkpoint_count"] = sum(1 for f in cp_dir.iterdir() if f.suffix == ".json")
else:
entry["checkpoint_count"] = 0
sessions.append(entry)
return web.json_response({"sessions": sessions})
async def handle_get_worker_session(request: web.Request) -> web.Response:
"""Get worker session detail from disk."""
session, err = resolve_session(request)
if err:
return err
if not session.worker_path:
return web.json_response({"error": "No worker loaded"}, status=503)
# Support both URL param names: ws_id (new) or session_id (legacy)
ws_id = request.match_info.get("ws_id") or request.match_info.get("session_id", "")
ws_id = safe_path_segment(ws_id)
state_path = sessions_dir(session) / ws_id / "state.json"
if not state_path.exists():
return web.json_response({"error": "Session not found"}, status=404)
try:
state = json.loads(state_path.read_text())
except (json.JSONDecodeError, OSError) as e:
return web.json_response({"error": f"Failed to read session: {e}"}, status=500)
return web.json_response(state)
async def handle_list_checkpoints(request: web.Request) -> web.Response:
"""List checkpoints for a worker session."""
session, err = resolve_session(request)
if err:
return err
if not session.worker_path:
return web.json_response({"error": "No worker loaded"}, status=503)
ws_id = request.match_info.get("ws_id") or request.match_info.get("session_id", "")
ws_id = safe_path_segment(ws_id)
cp_dir = sessions_dir(session) / ws_id / "checkpoints"
if not cp_dir.exists():
return web.json_response({"checkpoints": []})
checkpoints = []
for f in sorted(cp_dir.iterdir(), reverse=True):
if f.suffix != ".json":
continue
try:
data = json.loads(f.read_text())
checkpoints.append(
{
"checkpoint_id": f.stem,
"current_node": data.get("current_node"),
"next_node": data.get("next_node"),
"is_clean": data.get("is_clean", False),
"timestamp": data.get("timestamp"),
}
)
except (json.JSONDecodeError, OSError):
checkpoints.append({"checkpoint_id": f.stem, "error": "unreadable"})
return web.json_response({"checkpoints": checkpoints})
async def handle_delete_worker_session(request: web.Request) -> web.Response:
"""Delete a worker session from disk."""
session, err = resolve_session(request)
if err:
return err
if not session.worker_path:
return web.json_response({"error": "No worker loaded"}, status=503)
ws_id = request.match_info.get("ws_id") or request.match_info.get("session_id", "")
ws_id = safe_path_segment(ws_id)
session_path = sessions_dir(session) / ws_id
if not session_path.exists():
return web.json_response({"error": "Session not found"}, status=404)
shutil.rmtree(session_path)
return web.json_response({"deleted": ws_id})
async def handle_restore_checkpoint(request: web.Request) -> web.Response:
"""Restore from a checkpoint."""
session, err = resolve_session(request)
if err:
return err
if not session.worker_runtime:
return web.json_response({"error": "No worker loaded in this session"}, status=503)
ws_id = request.match_info.get("ws_id") or request.match_info.get("session_id", "")
ws_id = safe_path_segment(ws_id)
checkpoint_id = safe_path_segment(request.match_info["checkpoint_id"])
cp_path = sessions_dir(session) / ws_id / "checkpoints" / f"{checkpoint_id}.json"
if not cp_path.exists():
return web.json_response({"error": "Checkpoint not found"}, status=404)
entry_points = session.worker_runtime.get_entry_points()
if not entry_points:
return web.json_response({"error": "No entry points available"}, status=400)
restore_session_state = {
"resume_session_id": ws_id,
"resume_from_checkpoint": checkpoint_id,
}
execution_id = await session.worker_runtime.trigger(
entry_points[0].id,
input_data={},
session_state=restore_session_state,
)
return web.json_response(
{
"execution_id": execution_id,
"restored_from": ws_id,
"checkpoint_id": checkpoint_id,
}
)
async def handle_messages(request: web.Request) -> web.Response:
"""Get messages for a worker session."""
session, err = resolve_session(request)
if err:
return err
if not session.worker_path:
return web.json_response({"error": "No worker loaded"}, status=503)
ws_id = request.match_info.get("ws_id") or request.match_info.get("session_id", "")
ws_id = safe_path_segment(ws_id)
convs_dir = sessions_dir(session) / ws_id / "conversations"
if not convs_dir.exists():
return web.json_response({"messages": []})
filter_node = request.query.get("node_id")
all_messages = []
for node_dir in convs_dir.iterdir():
if not node_dir.is_dir():
continue
if filter_node and node_dir.name != filter_node:
continue
parts_dir = node_dir / "parts"
if not parts_dir.exists():
continue
for part_file in sorted(parts_dir.iterdir()):
if part_file.suffix != ".json":
continue
try:
part = json.loads(part_file.read_text())
part["_node_id"] = node_dir.name
all_messages.append(part)
except (json.JSONDecodeError, OSError):
continue
all_messages.sort(key=lambda m: m.get("seq", 0))
client_only = request.query.get("client_only", "").lower() in ("true", "1")
if client_only:
client_facing_nodes: set[str] = set()
if session.runner and hasattr(session.runner, "graph"):
for node in session.runner.graph.nodes:
if node.client_facing:
client_facing_nodes.add(node.id)
if client_facing_nodes:
all_messages = [
m
for m in all_messages
if not m.get("is_transition_marker")
and m["role"] != "tool"
and not (m["role"] == "assistant" and m.get("tool_calls"))
and (
(m["role"] == "user" and m.get("is_client_input"))
or (m["role"] == "assistant" and m.get("_node_id") in client_facing_nodes)
)
]
return web.json_response({"messages": all_messages})
# ------------------------------------------------------------------
# Agent discovery (not session-specific)
# ------------------------------------------------------------------
async def handle_discover(request: web.Request) -> web.Response:
"""GET /api/discover — discover agents from filesystem."""
from framework.tui.screens.agent_picker import discover_agents
manager = _get_manager(request)
loaded_paths = {str(s.worker_path) for s in manager.list_sessions() if s.worker_path}
groups = discover_agents()
result = {}
for category, entries in groups.items():
result[category] = [
{
"path": str(entry.path),
"name": entry.name,
"description": entry.description,
"category": entry.category,
"session_count": entry.session_count,
"node_count": entry.node_count,
"tool_count": entry.tool_count,
"tags": entry.tags,
"last_active": entry.last_active,
"is_loaded": str(entry.path) in loaded_paths,
}
for entry in entries
]
return web.json_response(result)
# ------------------------------------------------------------------
# Route registration
# ------------------------------------------------------------------
def register_routes(app: web.Application) -> None:
"""Register session routes."""
# Discovery
app.router.add_get("/api/discover", handle_discover)
# Session lifecycle
app.router.add_post("/api/sessions", handle_create_session)
app.router.add_get("/api/sessions", handle_list_live_sessions)
app.router.add_get("/api/sessions/{session_id}", handle_get_live_session)
app.router.add_delete("/api/sessions/{session_id}", handle_stop_session)
# Worker lifecycle
app.router.add_post("/api/sessions/{session_id}/worker", handle_load_worker)
app.router.add_delete("/api/sessions/{session_id}/worker", handle_unload_worker)
# Session info
app.router.add_get("/api/sessions/{session_id}/stats", handle_session_stats)
app.router.add_get("/api/sessions/{session_id}/entry-points", handle_session_entry_points)
app.router.add_get("/api/sessions/{session_id}/graphs", handle_session_graphs)
# Worker session browsing (session-primary)
app.router.add_get("/api/sessions/{session_id}/worker-sessions", handle_list_worker_sessions)
app.router.add_get(
"/api/sessions/{session_id}/worker-sessions/{ws_id}", handle_get_worker_session
)
app.router.add_delete(
"/api/sessions/{session_id}/worker-sessions/{ws_id}", handle_delete_worker_session
)
app.router.add_get(
"/api/sessions/{session_id}/worker-sessions/{ws_id}/checkpoints",
handle_list_checkpoints,
)
app.router.add_post(
"/api/sessions/{session_id}/worker-sessions/{ws_id}/checkpoints/{checkpoint_id}/restore",
handle_restore_checkpoint,
)
app.router.add_get(
"/api/sessions/{session_id}/worker-sessions/{ws_id}/messages",
handle_messages,
)
+666
View File
@@ -0,0 +1,666 @@
"""Session-primary lifecycle manager for the HTTP API server.
Sessions (queen) are the primary entity. Workers are optional and can be
loaded/unloaded while the queen stays alive.
Architecture:
- Session owns EventBus + LLM, shared with queen and worker
- Queen is always present once a session starts
- Worker is optional loaded into an existing session
- Judge is active only when a worker is loaded
"""
import asyncio
import json
import logging
import time
import uuid
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
@dataclass
class Session:
"""A live session with a queen and optional worker."""
id: str
event_bus: Any # EventBus — owned by session
llm: Any # LLMProvider — owned by session
loaded_at: float
# Queen (always present once started)
queen_executor: Any = None # GraphExecutor for queen input injection
queen_task: asyncio.Task | None = None
# Worker (optional)
worker_id: str | None = None
worker_path: Path | None = None
runner: Any | None = None # AgentRunner
worker_runtime: Any | None = None # AgentRuntime
worker_info: Any | None = None # AgentInfo
# Judge (active when worker is loaded)
judge_task: asyncio.Task | None = None
escalation_sub: str | None = None
class SessionManager:
"""Manages session lifecycles.
Thread-safe via asyncio.Lock. Workers are loaded via run_in_executor
(blocking I/O) then started on the event loop.
"""
def __init__(self, model: str | None = None) -> None:
self._sessions: dict[str, Session] = {}
self._loading: set[str] = set()
self._model = model
self._lock = asyncio.Lock()
# ------------------------------------------------------------------
# Session lifecycle
# ------------------------------------------------------------------
async def _create_session_core(
self,
session_id: str | None = None,
model: str | None = None,
) -> Session:
"""Create session infrastructure (EventBus, LLM) without starting queen.
Internal helper use create_session() or create_session_with_worker().
"""
from framework.config import RuntimeConfig
from framework.llm.litellm import LiteLLMProvider
from framework.runtime.event_bus import EventBus
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
resolved_id = session_id or f"session_{ts}_{uuid.uuid4().hex[:8]}"
async with self._lock:
if resolved_id in self._sessions:
raise ValueError(f"Session '{resolved_id}' already exists")
# Load LLM config from ~/.hive/configuration.json
rc = RuntimeConfig(model=model or self._model or RuntimeConfig().model)
# Session owns these — shared with queen and worker
llm = LiteLLMProvider(
model=rc.model,
api_key=rc.api_key,
api_base=rc.api_base,
**rc.extra_kwargs,
)
event_bus = EventBus()
session = Session(
id=resolved_id,
event_bus=event_bus,
llm=llm,
loaded_at=time.time(),
)
async with self._lock:
self._sessions[resolved_id] = session
return session
async def create_session(
self,
session_id: str | None = None,
model: str | None = None,
) -> Session:
"""Create a new session with a queen but no worker.
The queen starts immediately with MCP coding tools.
A worker can be loaded later via load_worker().
"""
session = await self._create_session_core(session_id=session_id, model=model)
# Start queen immediately (queen-only, no worker tools yet)
await self._start_queen(session, worker_identity=None)
logger.info("Session '%s' created (queen-only)", session.id)
return session
async def create_session_with_worker(
self,
agent_path: str | Path,
agent_id: str | None = None,
model: str | None = None,
) -> Session:
"""Create a session and load a worker in one step.
Backward-compatible with the old POST /api/agents flow.
Loads the worker FIRST so the queen starts with full lifecycle
and monitoring tools available.
The session gets an auto-generated unique ID. The agent name
becomes the worker_id (used by the frontend as backendAgentId).
"""
from framework.tools.queen_lifecycle_tools import build_worker_profile
agent_path = Path(agent_path)
resolved_worker_id = agent_id or agent_path.name
# Auto-generate session ID (not the agent name)
session = await self._create_session_core(model=model)
try:
# Load worker FIRST (before queen) so queen gets full tools
await self._load_worker_core(
session,
agent_path,
worker_id=resolved_worker_id,
model=model,
)
# Start queen with worker profile + lifecycle + monitoring tools
worker_identity = (
build_worker_profile(session.worker_runtime) if session.worker_runtime else None
)
await self._start_queen(session, worker_identity=worker_identity)
# Start health judge
if agent_path.name != "hive_coder" and session.worker_runtime:
await self._start_judge(session, session.runner._storage_path)
except Exception:
# If anything fails, tear down the session
await self.stop_session(session.id)
raise
return session
# ------------------------------------------------------------------
# Worker lifecycle
# ------------------------------------------------------------------
async def _load_worker_core(
self,
session: Session,
agent_path: str | Path,
worker_id: str | None = None,
model: str | None = None,
) -> None:
"""Load a worker agent into a session (core logic).
Sets up the runner, runtime, and session fields. Does NOT start the
judge or notify the queen callers handle those steps.
"""
from framework.runner import AgentRunner
agent_path = Path(agent_path)
resolved_worker_id = worker_id or agent_path.name
if session.worker_runtime is not None:
raise ValueError(f"Session '{session.id}' already has worker '{session.worker_id}'")
async with self._lock:
if session.id in self._loading:
raise ValueError(f"Session '{session.id}' is currently loading a worker")
self._loading.add(session.id)
try:
# Blocking I/O — load in executor
loop = asyncio.get_running_loop()
resolved_model = model or self._model
runner = await loop.run_in_executor(
None,
lambda: AgentRunner.load(
agent_path,
model=resolved_model,
interactive=False,
),
)
# Setup with session's event bus
if runner._agent_runtime is None:
await loop.run_in_executor(
None,
lambda: runner._setup(event_bus=session.event_bus),
)
runtime = runner._agent_runtime
# Start runtime on event loop
if runtime and not runtime.is_running:
await runtime.start()
info = runner.info()
# Update session
session.worker_id = resolved_worker_id
session.worker_path = agent_path
session.runner = runner
session.worker_runtime = runtime
session.worker_info = info
async with self._lock:
self._loading.discard(session.id)
logger.info(
"Worker '%s' loaded into session '%s'",
resolved_worker_id,
session.id,
)
except Exception:
async with self._lock:
self._loading.discard(session.id)
raise
async def load_worker(
self,
session_id: str,
agent_path: str | Path,
worker_id: str | None = None,
model: str | None = None,
) -> Session:
"""Load a worker agent into an existing session (with running queen).
Starts the worker runtime, health judge, and notifies the queen.
"""
agent_path = Path(agent_path)
session = self._sessions.get(session_id)
if session is None:
raise ValueError(f"Session '{session_id}' not found")
await self._load_worker_core(
session,
agent_path,
worker_id=worker_id,
model=model,
)
# Start judge + notify queen (skip for hive_coder itself)
if agent_path.name != "hive_coder" and session.worker_runtime:
await self._start_judge(session, session.runner._storage_path)
await self._notify_queen_worker_loaded(session)
# Emit SSE event so the frontend can update UI
await self._emit_worker_loaded(session)
return session
async def unload_worker(self, session_id: str) -> bool:
"""Unload the worker from a session. Queen stays alive."""
session = self._sessions.get(session_id)
if session is None:
return False
if session.worker_runtime is None:
return False
# Stop judge + escalation
self._stop_judge(session)
# Cleanup worker
if session.runner:
try:
await session.runner.cleanup_async()
except Exception as e:
logger.error("Error cleaning up worker '%s': %s", session.worker_id, e)
worker_id = session.worker_id
session.worker_id = None
session.worker_path = None
session.runner = None
session.worker_runtime = None
session.worker_info = None
# Notify queen
await self._notify_queen_worker_unloaded(session)
logger.info("Worker '%s' unloaded from session '%s'", worker_id, session_id)
return True
# ------------------------------------------------------------------
# Session teardown
# ------------------------------------------------------------------
async def stop_session(self, session_id: str) -> bool:
"""Stop a session entirely — unload worker + cancel queen."""
async with self._lock:
session = self._sessions.pop(session_id, None)
if session is None:
return False
# Stop judge
self._stop_judge(session)
# Stop queen
if session.queen_task is not None:
session.queen_task.cancel()
session.queen_task = None
session.queen_executor = None
# Cleanup worker
if session.runner:
try:
await session.runner.cleanup_async()
except Exception as e:
logger.error("Error cleaning up worker: %s", e)
logger.info("Session '%s' stopped", session_id)
return True
# ------------------------------------------------------------------
# Queen startup
# ------------------------------------------------------------------
async def _start_queen(
self,
session: Session,
worker_identity: str | None,
) -> None:
"""Start the queen executor for a session."""
from framework.agents.hive_coder.agent import (
queen_goal,
queen_graph as _queen_graph,
)
from framework.graph.executor import GraphExecutor
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.core import Runtime
hive_home = Path.home() / ".hive"
queen_dir = hive_home / "queen" / "session" / session.id
queen_dir.mkdir(parents=True, exist_ok=True)
# Register MCP coding tools
queen_registry = ToolRegistry()
import framework.agents.hive_coder as _hive_coder_pkg
hive_coder_dir = Path(_hive_coder_pkg.__file__).parent
mcp_config = hive_coder_dir / "mcp_servers.json"
if mcp_config.exists():
try:
queen_registry.load_mcp_config(mcp_config)
logger.info("Queen: loaded MCP tools from %s", mcp_config)
except Exception:
logger.warning("Queen: MCP config failed to load", exc_info=True)
# Always register lifecycle tools — they check session.worker_runtime
# at call time, so they work even if no worker is loaded yet.
from framework.tools.queen_lifecycle_tools import register_queen_lifecycle_tools
register_queen_lifecycle_tools(
queen_registry,
session=session,
session_id=session.id,
)
# Monitoring tools need concrete worker paths — only register when present
if session.worker_runtime:
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
register_worker_monitoring_tools(
queen_registry,
session.event_bus,
session.worker_path,
stream_id="queen",
worker_graph_id=session.worker_runtime._graph_id,
)
queen_tools = list(queen_registry.get_tools().values())
queen_tool_executor = queen_registry.get_executor()
# Build queen graph with adjusted prompt + tools
_orig_node = _queen_graph.nodes[0]
base_prompt = _orig_node.system_prompt or ""
if worker_identity is None:
worker_identity = (
"\n\n# Worker Profile\n"
"No worker agent loaded. You are operating independently.\n"
"Handle all tasks directly using your coding tools."
)
registered_tool_names = set(queen_registry.get_tools().keys())
declared_tools = _orig_node.tools or []
available_tools = [t for t in declared_tools if t in registered_tool_names]
node_updates: dict = {
"system_prompt": base_prompt + worker_identity,
}
if set(available_tools) != set(declared_tools):
missing = sorted(set(declared_tools) - registered_tool_names)
if missing:
logger.warning("Queen: tools not available: %s", missing)
node_updates["tools"] = available_tools
adjusted_node = _orig_node.model_copy(update=node_updates)
queen_graph = _queen_graph.model_copy(update={"nodes": [adjusted_node]})
queen_runtime = Runtime(hive_home / "queen")
async def _queen_loop():
try:
executor = GraphExecutor(
runtime=queen_runtime,
llm=session.llm,
tools=queen_tools,
tool_executor=queen_tool_executor,
event_bus=session.event_bus,
stream_id="queen",
storage_path=queen_dir,
loop_config=queen_graph.loop_config,
)
session.queen_executor = executor
logger.info(
"Queen starting with %d tools: %s",
len(queen_tools),
[t.name for t in queen_tools],
)
await executor.execute(
graph=queen_graph,
goal=queen_goal,
input_data={"greeting": "Session started."},
session_state={"resume_session_id": session.id},
)
logger.warning("Queen executor returned (should be forever-alive)")
except Exception:
logger.error("Queen conversation crashed", exc_info=True)
finally:
session.queen_executor = None
session.queen_task = asyncio.create_task(_queen_loop())
# ------------------------------------------------------------------
# Judge startup / teardown
# ------------------------------------------------------------------
async def _start_judge(
self,
session: Session,
worker_storage_path: str | Path,
) -> None:
"""Start the health judge for a session's worker."""
from framework.graph.executor import GraphExecutor
from framework.monitoring import judge_goal, judge_graph
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.core import Runtime
from framework.runtime.event_bus import EventType as _ET
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
worker_storage_path = Path(worker_storage_path)
try:
# Monitoring tools
monitoring_registry = ToolRegistry()
register_worker_monitoring_tools(
monitoring_registry,
session.event_bus,
worker_storage_path,
worker_graph_id=session.worker_runtime._graph_id,
)
hive_home = Path.home() / ".hive"
judge_dir = hive_home / "judge" / "session" / session.id
judge_dir.mkdir(parents=True, exist_ok=True)
judge_runtime = Runtime(hive_home / "judge")
monitoring_tools = list(monitoring_registry.get_tools().values())
monitoring_executor = monitoring_registry.get_executor()
async def _judge_loop():
interval = 120
first = True
while True:
if not first:
await asyncio.sleep(interval)
first = False
try:
executor = GraphExecutor(
runtime=judge_runtime,
llm=session.llm,
tools=monitoring_tools,
tool_executor=monitoring_executor,
event_bus=session.event_bus,
stream_id="judge",
storage_path=judge_dir,
loop_config=judge_graph.loop_config,
)
await executor.execute(
graph=judge_graph,
goal=judge_goal,
input_data={
"event": {"source": "timer", "reason": "scheduled"},
},
session_state={"resume_session_id": session.id},
)
except Exception:
logger.error("Health judge tick failed", exc_info=True)
session.judge_task = asyncio.create_task(_judge_loop())
# Escalation: judge → queen
async def _on_escalation(event):
ticket = event.data.get("ticket", {})
executor = session.queen_executor
if executor is None:
logger.warning("Escalation received but queen executor is None")
return
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
msg = "[ESCALATION TICKET from Health Judge]\n" + json.dumps(
ticket, indent=2, ensure_ascii=False
)
await node.inject_event(msg)
else:
logger.warning("Escalation received but queen node not ready")
session.escalation_sub = session.event_bus.subscribe(
event_types=[_ET.WORKER_ESCALATION_TICKET],
handler=_on_escalation,
)
logger.info("Judge started for session '%s'", session.id)
except Exception as e:
logger.error(
"Failed to start judge for session '%s': %s",
session.id,
e,
exc_info=True,
)
def _stop_judge(self, session: Session) -> None:
"""Cancel judge task and unsubscribe escalation events."""
if session.judge_task is not None:
session.judge_task.cancel()
session.judge_task = None
if session.escalation_sub is not None:
try:
session.event_bus.unsubscribe(session.escalation_sub)
except Exception:
pass
session.escalation_sub = None
# ------------------------------------------------------------------
# Queen notifications
# ------------------------------------------------------------------
async def _notify_queen_worker_loaded(self, session: Session) -> None:
"""Inject a system message into the queen about the loaded worker."""
from framework.tools.queen_lifecycle_tools import build_worker_profile
executor = session.queen_executor
if executor is None:
return
node = executor.node_registry.get("queen")
if node is None or not hasattr(node, "inject_event"):
return
profile = build_worker_profile(session.worker_runtime)
await node.inject_event(f"[SYSTEM] Worker loaded.{profile}")
async def _emit_worker_loaded(self, session: Session) -> None:
"""Publish a WORKER_LOADED event so the frontend can update."""
from framework.runtime.event_bus import AgentEvent, EventType
info = session.worker_info
await session.event_bus.publish(
AgentEvent(
type=EventType.WORKER_LOADED,
stream_id="queen",
data={
"worker_id": session.worker_id,
"worker_name": info.name if info else session.worker_id,
"agent_path": str(session.worker_path) if session.worker_path else "",
"goal": info.goal_name if info else "",
"node_count": info.node_count if info else 0,
},
)
)
async def _notify_queen_worker_unloaded(self, session: Session) -> None:
"""Notify the queen that the worker has been unloaded."""
executor = session.queen_executor
if executor is None:
return
node = executor.node_registry.get("queen")
if node is None or not hasattr(node, "inject_event"):
return
await node.inject_event(
"[SYSTEM] Worker unloaded. You are now operating independently. "
"Handle all tasks directly using your coding tools."
)
# ------------------------------------------------------------------
# Lookups
# ------------------------------------------------------------------
def get_session(self, session_id: str) -> Session | None:
return self._sessions.get(session_id)
def get_session_by_worker_id(self, worker_id: str) -> Session | None:
"""Find a session by its loaded worker's ID."""
for s in self._sessions.values():
if s.worker_id == worker_id:
return s
return None
def get_session_for_agent(self, agent_id: str) -> Session | None:
"""Resolve an agent_id to a session (backward compat).
Checks session.id first, then session.worker_id.
"""
s = self._sessions.get(agent_id)
if s:
return s
return self.get_session_by_worker_id(agent_id)
def is_loading(self, session_id: str) -> bool:
return session_id in self._loading
def list_sessions(self) -> list[Session]:
return list(self._sessions.values())
async def shutdown_all(self) -> None:
"""Gracefully stop all sessions. Called on server shutdown."""
session_ids = list(self._sessions.keys())
for sid in session_ids:
await self.stop_session(sid)
logger.info("All sessions stopped")
+73
View File
@@ -0,0 +1,73 @@
"""Server-Sent Events helper wrapping aiohttp StreamResponse."""
import json
import logging
from aiohttp import web
logger = logging.getLogger(__name__)
class SSEResponse:
"""Thin wrapper around aiohttp StreamResponse for SSE streaming.
Usage:
sse = SSEResponse()
await sse.prepare(request)
await sse.send_event({"key": "value"}, event="update")
await sse.send_keepalive()
"""
def __init__(self) -> None:
self._response: web.StreamResponse | None = None
async def prepare(self, request: web.Request) -> web.StreamResponse:
"""Prepare the SSE response with correct headers."""
self._response = web.StreamResponse(
status=200,
headers={
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
await self._response.prepare(request)
return self._response
async def send_event(
self,
data: dict,
event: str | None = None,
id: str | None = None,
) -> None:
"""Serialize and send an SSE event.
Args:
data: JSON-serializable dict to send as the data field.
event: Optional SSE event type.
id: Optional SSE event id.
"""
if self._response is None:
raise RuntimeError("SSEResponse not prepared; call prepare() first")
parts: list[str] = []
if id is not None:
parts.append(f"id: {id}\n")
if event is not None:
parts.append(f"event: {event}\n")
payload = json.dumps(data, default=str)
parts.append(f"data: {payload}\n")
parts.append("\n")
await self._response.write("".join(parts).encode("utf-8"))
async def send_keepalive(self) -> None:
"""Send an SSE comment as a keepalive heartbeat."""
if self._response is None:
raise RuntimeError("SSEResponse not prepared; call prepare() first")
await self._response.write(b": keepalive\n\n")
@property
def response(self) -> web.StreamResponse | None:
return self._response
File diff suppressed because it is too large Load Diff
+260 -48
View File
@@ -1,18 +1,33 @@
"""Queen lifecycle tools for worker management.
These tools give the Queen agent control over the worker agent's lifecycle.
They close over a reference to the worker's ``AgentRuntime`` and the shared
``EventBus``, following the same pattern as ``session_graph_tools.py``.
They close over a session-like object that provides ``worker_runtime``,
allowing late-binding access to the worker (which may be loaded/unloaded
dynamically).
Usage::
from framework.tools.queen_lifecycle_tools import register_queen_lifecycle_tools
# Server path — pass a Session object
register_queen_lifecycle_tools(
registry=queen_tool_registry,
worker_runtime=worker_runtime,
session=session,
session_id=session.id,
)
# TUI path — wrap bare references in an adapter
from framework.tools.queen_lifecycle_tools import WorkerSessionAdapter
adapter = WorkerSessionAdapter(
worker_runtime=runtime,
event_bus=event_bus,
storage_path=storage_path,
worker_path=storage_path,
)
register_queen_lifecycle_tools(
registry=queen_tool_registry,
session=adapter,
session_id=session_id,
)
"""
@@ -20,8 +35,9 @@ from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from framework.runner.tool_registry import ToolRegistry
@@ -31,20 +47,108 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
@dataclass
class WorkerSessionAdapter:
"""Adapter for TUI compatibility.
Wraps bare worker_runtime + event_bus + storage_path into a
session-like object that queen lifecycle tools can use.
"""
worker_runtime: Any # AgentRuntime
event_bus: Any # EventBus
worker_path: Path | None = None
def build_worker_profile(runtime: AgentRuntime) -> str:
"""Build a worker capability profile from its graph/goal definition.
Injected into the queen's system prompt so it knows what the worker
can and cannot do enabling correct delegation decisions.
"""
graph = runtime.graph
goal = runtime.goal
lines = ["\n\n# Worker Profile"]
lines.append(f"Agent: {runtime.graph_id}")
lines.append(f"Goal: {goal.name}")
if goal.description:
lines.append(f"Description: {goal.description}")
if goal.success_criteria:
lines.append("\n## Success Criteria")
for sc in goal.success_criteria:
lines.append(f"- {sc.description}")
if goal.constraints:
lines.append("\n## Constraints")
for c in goal.constraints:
lines.append(f"- {c.description}")
if graph.nodes:
lines.append("\n## Processing Stages")
for node in graph.nodes:
lines.append(f"- {node.id}: {node.description or node.name}")
all_tools: set[str] = set()
for node in graph.nodes:
if node.tools:
all_tools.update(node.tools)
if all_tools:
lines.append(f"\n## Worker Tools\n{', '.join(sorted(all_tools))}")
lines.append("\nStatus at session start: idle (not started).")
return "\n".join(lines)
def register_queen_lifecycle_tools(
registry: "ToolRegistry",
worker_runtime: "AgentRuntime",
event_bus: "EventBus",
registry: ToolRegistry,
session: Any = None,
session_id: str | None = None,
# Legacy params — used by TUI when not passing a session object
worker_runtime: AgentRuntime | None = None,
event_bus: EventBus | None = None,
storage_path: Path | None = None,
# Server context — enables load_built_agent tool
session_manager: Any = None,
manager_session_id: str | None = None,
) -> int:
"""Register queen lifecycle tools bound to *worker_runtime*.
"""Register queen lifecycle tools.
Args:
session: A Session or WorkerSessionAdapter with ``worker_runtime``
attribute. The tools read ``session.worker_runtime`` on each
call, supporting late-binding (worker loaded/unloaded).
session_id: Shared session ID so the worker uses the same session
scope as the queen and judge.
worker_runtime: (Legacy) Direct runtime reference. If ``session``
is not provided, a WorkerSessionAdapter is created from
worker_runtime + event_bus + storage_path.
session_manager: (Server only) The SessionManager instance, needed
for ``load_built_agent`` to hot-load a worker.
manager_session_id: (Server only) The session's ID in the manager,
used with ``session_manager.load_worker()``.
Returns the number of tools registered.
"""
# Build session adapter from legacy params if needed
if session is None:
if worker_runtime is None:
raise ValueError("Either session or worker_runtime must be provided")
session = WorkerSessionAdapter(
worker_runtime=worker_runtime,
event_bus=event_bus,
worker_path=storage_path,
)
from framework.llm.provider import Tool
tools_registered = 0
def _get_runtime():
"""Get current worker runtime from session (late-binding)."""
return getattr(session, "worker_runtime", None)
# --- start_worker ---------------------------------------------------------
async def start_worker(task: str) -> str:
@@ -53,20 +157,34 @@ def register_queen_lifecycle_tools(
Triggers the worker's default entry point with the given task.
Returns immediately the worker runs asynchronously.
"""
try:
# Get session state from any prior execution for memory continuity
session_state = worker_runtime._get_primary_session_state("default")
runtime = _get_runtime()
if runtime is None:
return json.dumps({"error": "No worker loaded in this session."})
exec_id = await worker_runtime.trigger(
try:
# Resume timers in case they were paused by a previous stop_worker
worker_runtime.resume_timers()
# Get session state from any prior execution for memory continuity
session_state = runtime._get_primary_session_state("default") or {}
# Use the shared session ID so queen, judge, and worker all
# scope their conversations to the same session.
if session_id:
session_state["resume_session_id"] = session_id
exec_id = await runtime.trigger(
entry_point_id="default",
input_data={"user_request": task},
session_state=session_state,
)
return json.dumps({
"status": "started",
"execution_id": exec_id,
"task": task,
})
return json.dumps(
{
"status": "started",
"execution_id": exec_id,
"task": task,
}
)
except Exception as e:
return json.dumps({"error": f"Failed to start worker: {e}"})
@@ -97,15 +215,19 @@ def register_queen_lifecycle_tools(
Stops the worker gracefully. Returns the IDs of cancelled executions.
"""
runtime = _get_runtime()
if runtime is None:
return json.dumps({"error": "No worker loaded in this session."})
cancelled = []
graph_id = worker_runtime.graph_id
graph_id = runtime.graph_id
# Get the primary graph's streams
reg = worker_runtime.get_graph_registration(graph_id)
reg = runtime.get_graph_registration(graph_id)
if reg is None:
return json.dumps({"error": "Worker graph not found"})
for ep_id, stream in reg.streams.items():
for _ep_id, stream in reg.streams.items():
for exec_id in list(stream.active_execution_ids):
try:
ok = await stream.cancel_execution(exec_id)
@@ -114,15 +236,21 @@ def register_queen_lifecycle_tools(
except Exception as e:
logger.warning("Failed to cancel %s: %s", exec_id, e)
return json.dumps({
"status": "stopped" if cancelled else "no_active_executions",
"cancelled": cancelled,
})
# Pause timers so the next tick doesn't restart execution
worker_runtime.pause_timers()
return json.dumps(
{
"status": "stopped" if cancelled else "no_active_executions",
"cancelled": cancelled,
"timers_paused": True,
}
)
_stop_tool = Tool(
name="stop_worker",
description=(
"Cancel the worker agent's active execution. "
"Cancel the worker agent's active execution and pause its timers. "
"The worker stops gracefully. No parameters needed."
),
parameters={"type": "object", "properties": {}},
@@ -137,9 +265,13 @@ def register_queen_lifecycle_tools(
Returns worker identity, execution state, active node, and iteration count.
"""
graph_id = worker_runtime.graph_id
goal = worker_runtime.goal
reg = worker_runtime.get_graph_registration(graph_id)
runtime = _get_runtime()
if runtime is None:
return json.dumps({"status": "not_loaded", "message": "No worker loaded."})
graph_id = runtime.graph_id
goal = runtime.goal
reg = runtime.get_graph_registration(graph_id)
if reg is None:
return json.dumps({"status": "not_loaded"})
@@ -151,17 +283,21 @@ def register_queen_lifecycle_tools(
active_execs = []
for ep_id, stream in reg.streams.items():
for exec_id in stream.active_execution_ids:
active_execs.append({
"execution_id": exec_id,
"entry_point": ep_id,
})
active_execs.append(
{
"execution_id": exec_id,
"entry_point": ep_id,
}
)
if not active_execs:
return json.dumps({
**base,
"status": "idle",
"message": "Worker has no active executions.",
})
return json.dumps(
{
**base,
"status": "idle",
"message": "Worker has no active executions.",
}
)
# Check if the worker is waiting for user input
waiting_nodes = []
@@ -198,8 +334,12 @@ def register_queen_lifecycle_tools(
Injects the message into the worker's active node conversation.
Use this to relay user instructions or concerns to the worker.
"""
graph_id = worker_runtime.graph_id
reg = worker_runtime.get_graph_registration(graph_id)
runtime = _get_runtime()
if runtime is None:
return json.dumps({"error": "No worker loaded in this session."})
graph_id = runtime.graph_id
reg = runtime.get_graph_registration(graph_id)
if reg is None:
return json.dumps({"error": "Worker graph not found"})
@@ -210,15 +350,19 @@ def register_queen_lifecycle_tools(
target_node_id = injectable[0]["node_id"]
ok = await stream.inject_input(target_node_id, content)
if ok:
return json.dumps({
"status": "delivered",
"node_id": target_node_id,
"content_preview": content[:100],
})
return json.dumps(
{
"status": "delivered",
"node_id": target_node_id,
"content_preview": content[:100],
}
)
return json.dumps({
"error": "No active worker node found — worker may be idle.",
})
return json.dumps(
{
"error": "No active worker node found — worker may be idle.",
}
)
_inject_tool = Tool(
name="inject_worker_message",
@@ -243,5 +387,73 @@ def register_queen_lifecycle_tools(
)
tools_registered += 1
# --- load_built_agent (server context only) --------------------------------
if session_manager is not None and manager_session_id is not None:
async def load_built_agent(agent_path: str) -> str:
"""Load a newly built agent as the worker in this session.
After building and validating an agent, call this to make it
available immediately. The user will see the agent's graph and
can interact with it without opening a new tab.
"""
runtime = _get_runtime()
if runtime is not None:
return json.dumps(
{
"error": "A worker is already loaded in this session. "
"Unload it first or open a new tab."
}
)
resolved_path = Path(agent_path).resolve()
if not resolved_path.exists():
return json.dumps({"error": f"Agent path does not exist: {resolved_path}"})
try:
updated_session = await session_manager.load_worker(
manager_session_id,
str(resolved_path),
)
info = updated_session.worker_info
return json.dumps(
{
"status": "loaded",
"worker_id": updated_session.worker_id,
"worker_name": info.name if info else updated_session.worker_id,
"goal": info.goal_name if info else "",
"node_count": info.node_count if info else 0,
}
)
except Exception as e:
return json.dumps({"error": f"Failed to load agent: {e}"})
_load_built_tool = Tool(
name="load_built_agent",
description=(
"Load a newly built agent as the worker in this session. "
"After building and validating an agent, call this with the agent's "
"path (e.g. 'exports/my_agent') to make it available immediately. "
"The user will see the agent's graph and can interact with it."
),
parameters={
"type": "object",
"properties": {
"agent_path": {
"type": "string",
"description": ("Path to the agent directory (e.g. 'exports/my_agent')"),
},
},
"required": ["agent_path"],
},
)
registry.register(
"load_built_agent",
_load_built_tool,
lambda inputs: load_built_agent(**inputs),
)
tools_registered += 1
logger.info("Registered %d queen lifecycle tools", tools_registered)
return tools_registered
@@ -42,8 +42,8 @@ _DEFAULT_LAST_N_STEPS = 40
def register_worker_monitoring_tools(
registry: "ToolRegistry",
event_bus: "EventBus",
registry: ToolRegistry,
event_bus: EventBus,
storage_path: Path,
stream_id: str = "judge",
worker_graph_id: str | None = None,
@@ -101,8 +101,7 @@ def register_worker_monitoring_tools(
return json.dumps({"error": "No sessions found — worker has not started yet"})
candidates = [
d for d in sessions_dir.iterdir()
if d.is_dir() and (d / "state.json").exists()
d for d in sessions_dir.iterdir() if d.is_dir() and (d / "state.json").exists()
]
if not candidates:
return json.dumps({"error": "No sessions found — worker has not started yet"})
@@ -219,7 +218,9 @@ def register_worker_monitoring_tools(
},
"last_n_steps": {
"type": "integer",
"description": f"How many recent log steps to include (default {_DEFAULT_LAST_N_STEPS})",
"description": (
f"How many recent log steps to include (default {_DEFAULT_LAST_N_STEPS})"
),
},
},
"required": [],
+37 -40
View File
@@ -241,8 +241,8 @@ class AdenTUI(App):
# Health judge + queen monitoring graphs (loaded alongside worker agents)
self._queen_graph_id: str | None = None
self._judge_graph_id: str | None = None
self._judge_task = None # concurrent.futures.Future for the judge loop
self._queen_task = None # concurrent.futures.Future for the queen loop
self._judge_task = None # concurrent.futures.Future for the judge loop
self._queen_task = None # concurrent.futures.Future for the queen loop
self._queen_executor = None # GraphExecutor for queen input injection
self._queen_escalation_sub = None # EventBus subscription for queen
@@ -434,9 +434,7 @@ class AdenTUI(App):
if self.runtime and not self.runtime.is_running:
try:
agent_loop = self.chat_repl._agent_loop
future = asyncio.run_coroutine_threadsafe(
self.runtime.start(), agent_loop
)
future = asyncio.run_coroutine_threadsafe(self.runtime.start(), agent_loop)
await asyncio.wrap_future(future)
except Exception as e:
self.status_bar.set_graph_id("")
@@ -470,6 +468,8 @@ class AdenTUI(App):
into the worker runtime. The worker is completely untouched.
"""
import asyncio
import uuid
from datetime import datetime
from pathlib import Path
from framework.graph.executor import GraphExecutor
@@ -488,6 +488,10 @@ class AdenTUI(App):
llm = self.runtime._llm
agent_loop = self.chat_repl._agent_loop
# Generate a shared session ID for queen, judge, and worker.
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
session_id = f"session_{ts}_{uuid.uuid4().hex[:8]}"
# 1. Monitoring tools (health summary, emit ticket, notify operator).
# Registered on a standalone registry — NOT merged into the worker.
monitoring_registry = ToolRegistry()
@@ -498,17 +502,18 @@ class AdenTUI(App):
worker_graph_id=self.runtime._graph_id,
)
# 2. Storage dirs — under worker's base path but completely owned
# by the judge/queen. Worker never writes here.
judge_dir = storage_path / "graphs" / "judge" / "session"
# 2. Storage dirs — global, not per-agent. Queen and judge are
# supervisory components that outlive any single worker.
hive_home = Path.home() / ".hive"
judge_dir = hive_home / "judge" / "session" / session_id
judge_dir.mkdir(parents=True, exist_ok=True)
queen_dir = storage_path / "graphs" / "queen" / "session"
queen_dir = hive_home / "queen" / "session" / session_id
queen_dir.mkdir(parents=True, exist_ok=True)
# ---------------------------------------------------------------
# 3. Health judge — background task, fires every 2 minutes.
# ---------------------------------------------------------------
judge_runtime = Runtime(storage_path / "graphs" / "judge")
judge_runtime = Runtime(hive_home / "judge")
monitoring_tools = list(monitoring_registry.get_tools().values())
monitoring_executor = monitoring_registry.get_executor()
@@ -544,13 +549,14 @@ class AdenTUI(App):
input_data={
"event": {"source": "timer", "reason": "scheduled"},
},
session_state={"resume_session_id": "persistent"},
session_state={"resume_session_id": session_id},
)
except Exception:
log.error("Health judge tick failed", exc_info=True)
self._judge_task = asyncio.run_coroutine_threadsafe(
_judge_loop(), agent_loop,
_judge_loop(),
agent_loop,
)
self._judge_graph_id = "judge"
@@ -585,6 +591,7 @@ class AdenTUI(App):
worker_runtime=self.runtime,
event_bus=event_bus,
storage_path=storage_path,
session_id=session_id,
)
register_worker_monitoring_tools(
queen_registry,
@@ -596,21 +603,10 @@ class AdenTUI(App):
queen_tools = list(queen_registry.get_tools().values())
queen_tool_executor = queen_registry.get_executor()
# Build worker identity to inject into the queen's system prompt.
# This must be in the system prompt (not input_data) because
# persistent sessions restore the old conversation and skip
# _build_initial_message — the queen would lose context.
worker_graph_id = self.runtime._graph_id
worker_goal_name = getattr(self.runtime.goal, "name", worker_graph_id)
worker_goal_desc = getattr(self.runtime.goal, "description", "")
worker_identity = (
f"\n\n# Current Session\n"
f"Worker agent: {worker_graph_id}\n"
f"Goal: {worker_goal_name}\n"
)
if worker_goal_desc:
worker_identity += f"Description: {worker_goal_desc}\n"
worker_identity += "Status at session start: idle (not started)."
# Build worker profile for queen's system prompt.
from framework.tools.queen_lifecycle_tools import build_worker_profile
worker_identity = build_worker_profile(self.runtime)
# Adjust queen graph: filter tools to what's registered and
# append worker identity to the system prompt.
@@ -631,7 +627,7 @@ class AdenTUI(App):
adjusted_node = _orig_queen_node.model_copy(update=node_updates)
queen_graph = queen_graph.model_copy(update={"nodes": [adjusted_node]})
queen_runtime = Runtime(storage_path / "graphs" / "queen")
queen_runtime = Runtime(hive_home / "queen")
async def _queen_loop():
try:
@@ -658,7 +654,7 @@ class AdenTUI(App):
graph=queen_graph,
goal=queen_goal,
input_data={"greeting": "Session started."},
session_state={"resume_session_id": "persistent"},
session_state={"resume_session_id": session_id},
)
# Should never reach here — queen is forever-alive.
log.warning(
@@ -671,7 +667,8 @@ class AdenTUI(App):
self._queen_executor = None
self._queen_task = asyncio.run_coroutine_threadsafe(
_queen_loop(), agent_loop,
_queen_loop(),
agent_loop,
)
self._queen_graph_id = "queen"
@@ -700,9 +697,9 @@ class AdenTUI(App):
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
import json as _json
msg = (
"[ESCALATION TICKET from Health Judge]\n"
+ _json.dumps(ticket, indent=2, ensure_ascii=False)
msg = "[ESCALATION TICKET from Health Judge]\n" + _json.dumps(
ticket, indent=2, ensure_ascii=False
)
await node.inject_event(msg)
else:
@@ -1041,9 +1038,7 @@ class AdenTUI(App):
if not coder_runtime.is_running:
try:
agent_loop = self.chat_repl._agent_loop
future = asyncio.run_coroutine_threadsafe(
coder_runtime.start(), agent_loop
)
future = asyncio.run_coroutine_threadsafe(coder_runtime.start(), agent_loop)
await asyncio.wrap_future(future)
except Exception as e:
self.notify(f"Failed to start coder runtime: {e}", severity="error")
@@ -1340,9 +1335,7 @@ class AdenTUI(App):
return
elif et == EventType.EXECUTION_FAILED:
error = event.data.get("error", "Unknown error")[:200]
self._inject_worker_status_into_queen(
f"Worker execution failed: {error}"
)
self._inject_worker_status_into_queen(f"Worker execution failed: {error}")
return
elif et in (
EventType.LLM_TEXT_DELTA,
@@ -1365,7 +1358,11 @@ class AdenTUI(App):
# --- Multi-graph filtering (non-queen mode) ---
# If the event has a graph_id and it's not the active graph,
# show a notification for important events and drop the rest.
if not _queen_active and event.graph_id is not None and event.graph_id != self.runtime.active_graph_id:
if (
not _queen_active
and event.graph_id is not None
and event.graph_id != self.runtime.active_graph_id
):
if et == EventType.CLIENT_INPUT_REQUESTED:
self.notify(
f"[bold]{event.graph_id}[/bold] is waiting for input",
+38 -10
View File
@@ -37,6 +37,29 @@ class AgentEntry:
node_count: int = 0
tool_count: int = 0
tags: list[str] = field(default_factory=list)
last_active: str | None = None
def _get_last_active(agent_name: str) -> str | None:
"""Return the most recent updated_at timestamp across all sessions."""
sessions_dir = Path.home() / ".hive" / "agents" / agent_name / "sessions"
if not sessions_dir.exists():
return None
latest: str | None = None
for session_dir in sessions_dir.iterdir():
if not session_dir.is_dir() or not session_dir.name.startswith("session_"):
continue
state_file = session_dir / "state.json"
if not state_file.exists():
continue
try:
data = json.loads(state_file.read_text())
ts = data.get("timestamps", {}).get("updated_at")
if ts and (latest is None or ts > latest):
latest = ts
except Exception:
continue
return latest
def _count_sessions(agent_name: str) -> int:
@@ -85,20 +108,24 @@ def discover_agents() -> dict[str, list[AgentEntry]]:
if not _is_valid_agent_dir(path):
continue
# config.py is source of truth for name/description
name, desc = _extract_python_agent_metadata(path)
config_fallback_name = path.name.replace("_", " ").title()
used_config = name != config_fallback_name
agent_json = path / "agent.json"
node_count, tool_count, tags = 0, 0, []
if agent_json.exists():
try:
data = json.loads(agent_json.read_text())
meta = data.get("agent", {})
name = meta.get("name", path.name)
desc = meta.get("description", "")
except Exception:
name = path.name
desc = "(error reading agent.json)"
node_count, tool_count, tags = _extract_agent_stats(agent_json)
else:
name, desc = _extract_python_agent_metadata(path)
if not used_config:
# config.py didn't provide values, fall back to agent.json
try:
data = json.loads(agent_json.read_text())
meta = data.get("agent", {})
name = meta.get("name", name)
desc = meta.get("description", desc)
except Exception:
pass
entries.append(
AgentEntry(
@@ -110,6 +137,7 @@ def discover_agents() -> dict[str, list[AgentEntry]]:
node_count=node_count,
tool_count=tool_count,
tags=tags,
last_active=_get_last_active(path.name),
)
)
if entries:
+1 -4
View File
@@ -1294,10 +1294,7 @@ class ChatRepl(Vertical):
# Get entry points for the active graph, preferring manual
# (interactive) ones over event/timer-driven ones.
entry_points = self.runtime.get_entry_points()
manual_eps = [
ep for ep in entry_points
if ep.trigger_type in ("manual", "api")
]
manual_eps = [ep for ep in entry_points if ep.trigger_type in ("manual", "api")]
if not manual_eps:
manual_eps = entry_points # fallback: use whatever is available
if not manual_eps:
+21
View File
@@ -0,0 +1,21 @@
{
"$schema": "https://ui.shadcn.com/schema.json",
"style": "default",
"rsc": false,
"tsx": true,
"tailwind": {
"config": "",
"css": "src/index.css",
"baseColor": "neutral",
"cssVariables": true,
"prefix": ""
},
"aliases": {
"components": "@/components",
"utils": "@/lib/utils",
"ui": "@/components/ui",
"lib": "@/lib",
"hooks": "@/hooks"
},
"iconLibrary": "lucide"
}
+13
View File
@@ -0,0 +1,13 @@
<!DOCTYPE html>
<html lang="en" class="dark">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="icon" type="image/png" href="/favicon.png" />
<title>Hive</title>
</head>
<body>
<div id="root"></div>
<script type="module" src="/src/main.tsx"></script>
</body>
</html>
+4368
View File
File diff suppressed because it is too large Load Diff
+33
View File
@@ -0,0 +1,33 @@
{
"name": "hive-frontend",
"private": true,
"version": "0.1.0",
"type": "module",
"scripts": {
"dev": "vite",
"build": "tsc -b && vite build",
"preview": "vite preview",
"test": "vitest run"
},
"dependencies": {
"clsx": "^2.1.1",
"lucide-react": "^0.575.0",
"react": "^18.3.1",
"react-dom": "^18.3.1",
"react-markdown": "^10.1.0",
"react-router-dom": "^7.1.0",
"remark-gfm": "^4.0.1",
"tailwind-merge": "^3.5.0"
},
"devDependencies": {
"@tailwindcss/vite": "^4.0.0",
"@types/node": "^25.3.0",
"@types/react": "^18.3.18",
"@types/react-dom": "^18.3.5",
"@vitejs/plugin-react": "^4.3.4",
"tailwindcss": "^4.0.0",
"typescript": "~5.6.2",
"vite": "^6.0.0",
"vitest": "^4.0.18"
}
}
Binary file not shown.

After

Width:  |  Height:  |  Size: 438 KiB

+16
View File
@@ -0,0 +1,16 @@
import { Routes, Route } from "react-router-dom";
import Home from "./pages/home";
import MyAgents from "./pages/my-agents";
import Workspace from "./pages/workspace";
function App() {
return (
<Routes>
<Route path="/" element={<Home />} />
<Route path="/my-agents" element={<MyAgents />} />
<Route path="/workspace" element={<Workspace />} />
</Routes>
);
}
export default App;
+6
View File
@@ -0,0 +1,6 @@
import { api } from "./client";
import type { DiscoverResult } from "./types";
export const agentsApi = {
discover: () => api.get<DiscoverResult>("/discover"),
};
+41
View File
@@ -0,0 +1,41 @@
const API_BASE = "/api";
export class ApiError extends Error {
constructor(
public status: number,
public body: { error: string; type?: string; [key: string]: unknown },
) {
super(body.error);
this.name = "ApiError";
}
}
async function request<T>(path: string, options: RequestInit = {}): Promise<T> {
const url = `${API_BASE}${path}`;
const response = await fetch(url, {
...options,
headers: {
"Content-Type": "application/json",
...options.headers,
},
});
if (!response.ok) {
const body = await response
.json()
.catch(() => ({ error: response.statusText }));
throw new ApiError(response.status, body);
}
return response.json();
}
export const api = {
get: <T>(path: string) => request<T>(path),
post: <T>(path: string, body?: unknown) =>
request<T>(path, {
method: "POST",
body: body ? JSON.stringify(body) : undefined,
}),
delete: <T>(path: string) => request<T>(path, { method: "DELETE" }),
};
+63
View File
@@ -0,0 +1,63 @@
import { api } from "./client";
export interface CredentialInfo {
credential_id: string;
credential_type: string;
key_names: string[];
created_at: string | null;
updated_at: string | null;
}
export interface AgentCredentialRequirement {
credential_name: string;
credential_id: string;
env_var: string;
description: string;
help_url: string;
tools: string[];
node_types: string[];
available: boolean;
valid: boolean | null;
validation_message: string | null;
direct_api_key_supported: boolean;
aden_supported: boolean;
credential_key: string;
}
export interface CheckAgentResponse {
required: AgentCredentialRequirement[];
all_valid: boolean;
error?: string;
}
export const credentialsApi = {
list: () =>
api.get<{ credentials: CredentialInfo[] }>("/credentials"),
get: (credentialId: string) =>
api.get<CredentialInfo>(`/credentials/${credentialId}`),
save: (credentialId: string, keys: Record<string, string>) =>
api.post<{ saved: string }>("/credentials", {
credential_id: credentialId,
keys,
}),
delete: (credentialId: string) =>
api.delete<{ deleted: boolean }>(`/credentials/${credentialId}`),
checkAgent: async (agentPath: string): Promise<CheckAgentResponse> => {
const url = `/api/credentials/check-agent`;
const response = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ agent_path: agentPath }),
});
const body = await response.json();
// 424 = validation failures, but body still has the required array
if (response.ok || response.status === 424) {
return body as CheckAgentResponse;
}
throw new Error(body.error || response.statusText);
},
};
+64
View File
@@ -0,0 +1,64 @@
import { api } from "./client";
import type {
TriggerResult,
InjectResult,
ChatResult,
StopResult,
ResumeResult,
ReplayResult,
GoalProgress,
} from "./types";
export const executionApi = {
trigger: (
sessionId: string,
entryPointId: string,
inputData: Record<string, unknown>,
sessionState?: Record<string, unknown>,
) =>
api.post<TriggerResult>(`/sessions/${sessionId}/trigger`, {
entry_point_id: entryPointId,
input_data: inputData,
session_state: sessionState,
}),
inject: (
sessionId: string,
nodeId: string,
content: string,
graphId?: string,
) =>
api.post<InjectResult>(`/sessions/${sessionId}/inject`, {
node_id: nodeId,
content,
graph_id: graphId,
}),
chat: (sessionId: string, message: string) =>
api.post<ChatResult>(`/sessions/${sessionId}/chat`, { message }),
stop: (sessionId: string, executionId: string) =>
api.post<StopResult>(`/sessions/${sessionId}/stop`, {
execution_id: executionId,
}),
pause: (sessionId: string, executionId: string) =>
api.post<StopResult>(`/sessions/${sessionId}/pause`, {
execution_id: executionId,
}),
resume: (sessionId: string, workerSessionId: string, checkpointId?: string) =>
api.post<ResumeResult>(`/sessions/${sessionId}/resume`, {
session_id: workerSessionId,
checkpoint_id: checkpointId,
}),
replay: (sessionId: string, workerSessionId: string, checkpointId: string) =>
api.post<ReplayResult>(`/sessions/${sessionId}/replay`, {
session_id: workerSessionId,
checkpoint_id: checkpointId,
}),
goalProgress: (sessionId: string) =>
api.get<GoalProgress>(`/sessions/${sessionId}/goal-progress`),
};
+29
View File
@@ -0,0 +1,29 @@
import { api } from "./client";
import type { GraphTopology, NodeDetail, NodeCriteria, ToolInfo } from "./types";
export const graphsApi = {
nodes: (sessionId: string, graphId: string, workerSessionId?: string) =>
api.get<GraphTopology>(
`/sessions/${sessionId}/graphs/${graphId}/nodes${workerSessionId ? `?session_id=${workerSessionId}` : ""}`,
),
node: (sessionId: string, graphId: string, nodeId: string) =>
api.get<NodeDetail>(
`/sessions/${sessionId}/graphs/${graphId}/nodes/${nodeId}`,
),
nodeCriteria: (
sessionId: string,
graphId: string,
nodeId: string,
workerSessionId?: string,
) =>
api.get<NodeCriteria>(
`/sessions/${sessionId}/graphs/${graphId}/nodes/${nodeId}/criteria${workerSessionId ? `?session_id=${workerSessionId}` : ""}`,
),
nodeTools: (sessionId: string, graphId: string, nodeId: string) =>
api.get<{ tools: ToolInfo[] }>(
`/sessions/${sessionId}/graphs/${graphId}/nodes/${nodeId}/tools`,
),
};
+40
View File
@@ -0,0 +1,40 @@
import { api } from "./client";
import type { LogEntry, LogNodeDetail, LogToolStep } from "./types";
export const logsApi = {
list: (sessionId: string, limit?: number) =>
api.get<{ logs: LogEntry[] }>(
`/sessions/${sessionId}/logs${limit ? `?limit=${limit}` : ""}`,
),
summary: (sessionId: string, workerSessionId: string) =>
api.get<LogEntry>(
`/sessions/${sessionId}/logs?session_id=${workerSessionId}&level=summary`,
),
details: (sessionId: string, workerSessionId: string) =>
api.get<{ session_id: string; nodes: LogNodeDetail[] }>(
`/sessions/${sessionId}/logs?session_id=${workerSessionId}&level=details`,
),
tools: (sessionId: string, workerSessionId: string) =>
api.get<{ session_id: string; steps: LogToolStep[] }>(
`/sessions/${sessionId}/logs?session_id=${workerSessionId}&level=tools`,
),
nodeLogs: (
sessionId: string,
graphId: string,
nodeId: string,
workerSessionId: string,
level?: string,
) =>
api.get<{
session_id: string;
node_id: string;
details?: LogNodeDetail[];
tool_logs?: LogToolStep[];
}>(
`/sessions/${sessionId}/graphs/${graphId}/nodes/${nodeId}/logs?session_id=${workerSessionId}${level ? `&level=${level}` : ""}`,
),
};
+102
View File
@@ -0,0 +1,102 @@
import { api } from "./client";
import type {
LiveSession,
LiveSessionDetail,
SessionSummary,
SessionDetail,
Checkpoint,
Message,
EntryPoint,
} from "./types";
export const sessionsApi = {
// --- Session lifecycle ---
/** Create a session. If agentPath is provided, loads worker in one step. */
create: (agentPath?: string, agentId?: string, model?: string) =>
api.post<LiveSession>("/sessions", {
agent_path: agentPath,
agent_id: agentId,
model,
}),
/** List all active sessions. */
list: () => api.get<{ sessions: LiveSession[] }>("/sessions"),
/** Get session detail (includes entry_points, graphs when worker is loaded). */
get: (sessionId: string) =>
api.get<LiveSessionDetail>(`/sessions/${sessionId}`),
/** Stop a session entirely. */
stop: (sessionId: string) =>
api.delete<{ session_id: string; stopped: boolean }>(
`/sessions/${sessionId}`,
),
// --- Worker lifecycle ---
loadWorker: (
sessionId: string,
agentPath: string,
workerId?: string,
model?: string,
) =>
api.post<LiveSession>(`/sessions/${sessionId}/worker`, {
agent_path: agentPath,
worker_id: workerId,
model,
}),
unloadWorker: (sessionId: string) =>
api.delete<{ session_id: string; worker_unloaded: boolean }>(
`/sessions/${sessionId}/worker`,
),
// --- Session info ---
stats: (sessionId: string) =>
api.get<Record<string, unknown>>(`/sessions/${sessionId}/stats`),
entryPoints: (sessionId: string) =>
api.get<{ entry_points: EntryPoint[] }>(
`/sessions/${sessionId}/entry-points`,
),
graphs: (sessionId: string) =>
api.get<{ graphs: string[] }>(`/sessions/${sessionId}/graphs`),
// --- Worker session browsing (persisted execution runs) ---
workerSessions: (sessionId: string) =>
api.get<{ sessions: SessionSummary[] }>(
`/sessions/${sessionId}/worker-sessions`,
),
workerSession: (sessionId: string, wsId: string) =>
api.get<SessionDetail>(
`/sessions/${sessionId}/worker-sessions/${wsId}`,
),
deleteWorkerSession: (sessionId: string, wsId: string) =>
api.delete<{ deleted: string }>(
`/sessions/${sessionId}/worker-sessions/${wsId}`,
),
checkpoints: (sessionId: string, wsId: string) =>
api.get<{ checkpoints: Checkpoint[] }>(
`/sessions/${sessionId}/worker-sessions/${wsId}/checkpoints`,
),
restore: (sessionId: string, wsId: string, checkpointId: string) =>
api.post<{ execution_id: string }>(
`/sessions/${sessionId}/worker-sessions/${wsId}/checkpoints/${checkpointId}/restore`,
),
messages: (sessionId: string, wsId: string, nodeId?: string) => {
const params = new URLSearchParams({ client_only: "true" });
if (nodeId) params.set("node_id", nodeId);
return api.get<{ messages: Message[] }>(
`/sessions/${sessionId}/worker-sessions/${wsId}/messages?${params}`,
);
},
};
+276
View File
@@ -0,0 +1,276 @@
// --- Session types (primary) ---
export interface LiveSession {
session_id: string;
worker_id: string | null;
worker_name: string | null;
has_worker: boolean;
agent_path: string;
description: string;
goal: string;
node_count: number;
loaded_at: number;
uptime_seconds: number;
intro_message?: string;
/** Present in 409 conflict responses when worker is still loading */
loading?: boolean;
}
export interface LiveSessionDetail extends LiveSession {
entry_points?: EntryPoint[];
graphs?: string[];
}
export interface EntryPoint {
id: string;
name: string;
entry_node: string;
trigger_type: string;
}
export interface DiscoverEntry {
path: string;
name: string;
description: string;
category: string;
session_count: number;
node_count: number;
tool_count: number;
tags: string[];
last_active: string | null;
is_loaded: boolean;
}
/** Keyed by category name. */
export type DiscoverResult = Record<string, DiscoverEntry[]>;
// --- Execution types ---
export interface TriggerResult {
execution_id: string;
}
export interface InjectResult {
delivered: boolean;
}
export interface ChatResult {
status: "started" | "injected" | "queen";
execution_id?: string;
node_id?: string;
delivered?: boolean;
}
export interface StopResult {
stopped: boolean;
execution_id?: string;
error?: string;
}
export interface ResumeResult {
execution_id: string;
resumed_from: string;
checkpoint_id: string | null;
}
export interface ReplayResult {
execution_id: string;
replayed_from: string;
checkpoint_id: string;
}
export interface GoalProgress {
progress: number;
criteria: unknown[];
}
// --- Session types ---
export interface SessionSummary {
session_id: string;
status?: string;
started_at?: string | null;
completed_at?: string | null;
steps?: number;
paused_at?: string | null;
checkpoint_count: number;
}
export interface SessionDetail {
status: string;
started_at: string;
completed_at: string | null;
input_data: Record<string, unknown>;
memory: Record<string, unknown>;
progress: {
current_node: string | null;
paused_at: string | null;
steps_executed: number;
path: string[];
node_visit_counts: Record<string, number>;
nodes_with_failures: string[];
resume_from?: string;
};
}
export interface Checkpoint {
checkpoint_id: string;
current_node: string | null;
next_node: string | null;
is_clean: boolean;
timestamp: string | null;
error?: string;
}
export interface Message {
seq: number;
role: string;
content: string;
_node_id: string;
is_transition_marker?: boolean;
is_client_input?: boolean;
tool_calls?: unknown[];
[key: string]: unknown;
}
// --- Graph / Node types ---
export interface NodeSpec {
id: string;
name: string;
description: string;
node_type: string;
input_keys: string[];
output_keys: string[];
nullable_output_keys: string[];
tools: string[];
routes: Record<string, string>;
max_retries: number;
max_node_visits: number;
client_facing: boolean;
success_criteria: string | null;
system_prompt: string;
// Runtime enrichment (when session_id provided)
visit_count?: number;
has_failures?: boolean;
is_current?: boolean;
in_path?: boolean;
}
export interface EdgeInfo {
target: string;
condition: string;
priority: number;
}
export interface NodeDetail extends NodeSpec {
edges: EdgeInfo[];
}
export interface GraphEdge {
source: string;
target: string;
condition: string;
priority: number;
}
export interface GraphTopology {
nodes: NodeSpec[];
edges: GraphEdge[];
entry_node: string;
}
export interface NodeCriteria {
node_id: string;
success_criteria: string | null;
output_keys: string[];
last_execution?: {
success: boolean;
error: string | null;
retry_count: number;
needs_attention: boolean;
attention_reasons: string[];
};
}
// --- Tool info types ---
export interface ToolInfo {
name: string;
description: string;
parameters: Record<string, unknown>;
}
// --- Log types ---
export interface LogEntry {
[key: string]: unknown;
}
export interface LogNodeDetail {
node_id: string;
node_name: string;
success: boolean;
error?: string;
retry_count?: number;
needs_attention?: boolean;
attention_reasons?: string[];
total_steps: number;
}
export interface LogToolStep {
node_id: string;
step_index: number;
llm_text: string;
[key: string]: unknown;
}
// --- SSE Event types ---
export type EventTypeName =
| "execution_started"
| "execution_completed"
| "execution_failed"
| "execution_paused"
| "execution_resumed"
| "state_changed"
| "state_conflict"
| "goal_progress"
| "goal_achieved"
| "constraint_violation"
| "stream_started"
| "stream_stopped"
| "node_loop_started"
| "node_loop_iteration"
| "node_loop_completed"
| "node_action_plan"
| "llm_text_delta"
| "llm_reasoning_delta"
| "tool_call_started"
| "tool_call_completed"
| "client_output_delta"
| "client_input_requested"
| "node_internal_output"
| "node_input_blocked"
| "node_stalled"
| "node_tool_doom_loop"
| "judge_verdict"
| "output_key_set"
| "node_retry"
| "edge_traversed"
| "context_compacted"
| "webhook_received"
| "custom"
| "escalation_requested"
| "worker_loaded";
export interface AgentEvent {
type: EventTypeName;
stream_id: string;
node_id: string | null;
execution_id: string | null;
data: Record<string, unknown>;
timestamp: string;
correlation_id: string | null;
graph_id: string | null;
}
+518
View File
@@ -0,0 +1,518 @@
import { memo, useMemo, useState, useRef } from "react";
import { Play, Pause, Loader2, CheckCircle2 } from "lucide-react";
export type NodeStatus = "running" | "complete" | "pending" | "error" | "looping";
export interface GraphNode {
id: string;
label: string;
status: NodeStatus;
next?: string[];
backEdges?: string[];
iterations?: number;
maxIterations?: number;
statusLabel?: string;
edgeLabels?: Record<string, string>;
}
type RunState = "idle" | "deploying" | "running";
interface AgentGraphProps {
nodes: GraphNode[];
title: string;
onNodeClick?: (node: GraphNode) => void;
onRun?: () => void;
onPause?: () => void;
version?: string;
runState?: RunState;
}
// --- Extracted RunButton so hover state survives parent re-renders ---
interface RunButtonProps {
runState: RunState;
disabled: boolean;
onRun: () => void;
onPause: () => void;
btnRef: React.Ref<HTMLButtonElement>;
}
const RunButton = memo(function RunButton({ runState, disabled, onRun, onPause, btnRef }: RunButtonProps) {
const [hovered, setHovered] = useState(false);
const showPause = runState === "running" && hovered;
return (
<button
ref={btnRef}
onClick={runState === "running" ? onPause : onRun}
disabled={runState === "deploying" || disabled}
onMouseEnter={() => setHovered(true)}
onMouseLeave={() => setHovered(false)}
className={`flex items-center gap-1.5 px-2.5 py-1 rounded-md text-[11px] font-semibold transition-all duration-200 ${
showPause
? "bg-amber-500/15 text-amber-400 border border-amber-500/40 hover:bg-amber-500/25 active:scale-95 cursor-pointer"
: runState === "running"
? "bg-green-500/15 text-green-400 border border-green-500/30 cursor-pointer"
: runState === "deploying"
? "bg-primary/10 text-primary border border-primary/20 cursor-default"
: disabled
? "bg-muted/30 text-muted-foreground/40 border border-border/20 cursor-not-allowed"
: "bg-primary/10 text-primary border border-primary/20 hover:bg-primary/20 hover:border-primary/40 active:scale-95"
}`}
>
{runState === "deploying" ? (
<Loader2 className="w-3 h-3 animate-spin" />
) : showPause ? (
<Pause className="w-3 h-3 fill-current" />
) : runState === "running" ? (
<CheckCircle2 className="w-3 h-3" />
) : (
<Play className="w-3 h-3 fill-current" />
)}
{runState === "deploying" ? "Deploying\u2026" : showPause ? "Pause" : runState === "running" ? "Running" : "Run"}
</button>
);
});
const NODE_W_MAX = 180;
const NODE_H = 44;
const GAP_Y = 48;
const TOP_Y = 30;
const MARGIN_LEFT = 20;
const MARGIN_RIGHT = 50; // space for back-edge arcs
const SVG_BASE_W = 320;
const GAP_X = 12;
// Unified amber/gold palette
const statusColors: Record<NodeStatus, { dot: string; bg: string; border: string; glow: string }> = {
running: {
dot: "hsl(45,95%,58%)",
bg: "hsl(45,95%,58%,0.08)",
border: "hsl(45,95%,58%,0.5)",
glow: "hsl(45,95%,58%,0.15)",
},
looping: {
dot: "hsl(38,90%,55%)",
bg: "hsl(38,90%,55%,0.08)",
border: "hsl(38,90%,55%,0.5)",
glow: "hsl(38,90%,55%,0.15)",
},
complete: {
dot: "hsl(43,70%,45%)",
bg: "hsl(43,70%,45%,0.05)",
border: "hsl(43,70%,45%,0.25)",
glow: "none",
},
pending: {
dot: "hsl(35,15%,28%)",
bg: "hsl(35,10%,12%)",
border: "hsl(35,10%,20%)",
glow: "none",
},
error: {
dot: "hsl(0,65%,55%)",
bg: "hsl(0,65%,55%,0.06)",
border: "hsl(0,65%,55%,0.3)",
glow: "hsl(0,65%,55%,0.1)",
},
};
function formatLabel(id: string): string {
return id
.split("-")
.map((w) => w.charAt(0).toUpperCase() + w.slice(1))
.join(" ");
}
export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, onPause, version, runState: externalRunState }: AgentGraphProps) {
const [localRunState, setLocalRunState] = useState<RunState>("idle");
const runState = externalRunState ?? localRunState;
const runBtnRef = useRef<HTMLButtonElement>(null);
const handleRun = () => {
if (runState !== "idle") return;
if (onRun) {
onRun();
} else {
setLocalRunState("deploying");
setTimeout(() => setLocalRunState("running"), 1800);
setTimeout(() => setLocalRunState("idle"), 5000);
}
};
const idxMap = useMemo(() => Object.fromEntries(nodes.map((n, i) => [n.id, i])), [nodes]);
const backEdges = useMemo(() => {
const edges: { fromIdx: number; toIdx: number }[] = [];
nodes.forEach((n, i) => {
(n.next || []).forEach((toId) => {
const toIdx = idxMap[toId];
if (toIdx !== undefined && toIdx <= i) edges.push({ fromIdx: i, toIdx });
});
(n.backEdges || []).forEach((toId) => {
const toIdx = idxMap[toId];
if (toIdx !== undefined) edges.push({ fromIdx: i, toIdx });
});
});
return edges;
}, [nodes, idxMap]);
const forwardEdges = useMemo(() => {
const edges: { fromIdx: number; toIdx: number; fanCount: number; fanIndex: number; label?: string }[] = [];
nodes.forEach((n, i) => {
const targets = (n.next || [])
.map((toId) => ({ toId, toIdx: idxMap[toId] }))
.filter((t): t is { toId: string; toIdx: number } => t.toIdx !== undefined && t.toIdx > i);
targets.forEach(({ toId, toIdx }, fi) => {
edges.push({
fromIdx: i,
toIdx,
fanCount: targets.length,
fanIndex: fi,
label: n.edgeLabels?.[toId],
});
});
});
return edges;
}, [nodes, idxMap]);
// --- Layer-based layout computation ---
const layout = useMemo(() => {
if (nodes.length === 0) {
return { layers: [] as number[], cols: [] as number[], maxCols: 1, nodeW: NODE_W_MAX, colSpacing: 0, firstColX: MARGIN_LEFT };
}
// 1. Build reverse adjacency from forward edges (who are the parents of each node)
const parents = new Map<number, number[]>();
nodes.forEach((_, i) => parents.set(i, []));
forwardEdges.forEach((e) => {
parents.get(e.toIdx)!.push(e.fromIdx);
});
// 2. Assign layers via longest-path from entry
const layers = new Array(nodes.length).fill(0);
for (let i = 0; i < nodes.length; i++) {
const pars = parents.get(i) || [];
if (pars.length > 0) {
layers[i] = Math.max(...pars.map((p) => layers[p])) + 1;
}
}
// 3. Group nodes by layer
const layerGroups = new Map<number, number[]>();
layers.forEach((l, i) => {
const group = layerGroups.get(l) || [];
group.push(i);
layerGroups.set(l, group);
});
// 4. Compute max columns and dynamic node width
let maxCols = 1;
layerGroups.forEach((group) => {
maxCols = Math.max(maxCols, group.length);
});
const usableW = SVG_BASE_W - MARGIN_LEFT - MARGIN_RIGHT;
const nodeW = Math.min(NODE_W_MAX, Math.floor((usableW - (maxCols - 1) * GAP_X) / maxCols));
const colSpacing = nodeW + GAP_X;
const totalNodesW = maxCols * nodeW + (maxCols - 1) * GAP_X;
const firstColX = MARGIN_LEFT + (usableW - totalNodesW) / 2;
// 5. Assign columns within each layer (centered, ordered by parent column)
const cols = new Array(nodes.length).fill(0);
layerGroups.forEach((group) => {
if (group.length === 1) {
// Center single node: place at middle column
cols[group[0]] = (maxCols - 1) / 2;
} else {
// Sort group by average parent column to reduce crossings
const sorted = [...group].sort((a, b) => {
const aParents = parents.get(a) || [];
const bParents = parents.get(b) || [];
const aAvg = aParents.length > 0 ? aParents.reduce((s, p) => s + cols[p], 0) / aParents.length : 0;
const bAvg = bParents.length > 0 ? bParents.reduce((s, p) => s + cols[p], 0) / bParents.length : 0;
return aAvg - bAvg;
});
// Spread evenly, centered within maxCols
const offset = (maxCols - group.length) / 2;
sorted.forEach((nodeIdx, i) => {
cols[nodeIdx] = offset + i;
});
}
});
return { layers, cols, maxCols, nodeW, colSpacing, firstColX };
}, [nodes, forwardEdges]);
if (nodes.length === 0) {
return (
<div className="flex flex-col h-full">
<div className="px-5 pt-4 pb-2 flex items-center justify-between">
<div className="flex items-center gap-2">
<p className="text-[11px] text-muted-foreground font-medium uppercase tracking-wider">Pipeline</p>
{version && (
<span className="text-[10px] font-mono font-medium text-muted-foreground/60 border border-border/30 rounded px-1 py-0.5 leading-none">
{version}
</span>
)}
</div>
<RunButton runState={runState} disabled={nodes.length === 0} onRun={handleRun} onPause={onPause ?? (() => {})} btnRef={runBtnRef} />
</div>
<div className="flex-1 flex items-center justify-center px-5">
<p className="text-xs text-muted-foreground/60 text-center italic">No pipeline configured yet.<br/>Chat with the Queen to get started.</p>
</div>
</div>
);
}
const { layers, cols, nodeW, colSpacing, firstColX } = layout;
const nodePos = (i: number) => ({
x: firstColX + cols[i] * colSpacing,
y: TOP_Y + layers[i] * (NODE_H + GAP_Y),
});
const maxLayer = nodes.length > 0 ? Math.max(...layers) : 0;
const svgHeight = TOP_Y * 2 + (maxLayer + 1) * NODE_H + maxLayer * GAP_Y + 10;
const backEdgeSpace = backEdges.length > 0 ? MARGIN_RIGHT + backEdges.length * 18 : 20;
const svgWidth = Math.max(SVG_BASE_W, firstColX + layout.maxCols * nodeW + (layout.maxCols - 1) * GAP_X + backEdgeSpace);
// Check if a skip-level forward edge would collide with intermediate nodes
const hasCollision = (fromLayer: number, toLayer: number, fromX: number, toX: number): boolean => {
const minX = Math.min(fromX, toX);
const maxX = Math.max(fromX, toX) + nodeW;
for (let i = 0; i < nodes.length; i++) {
const l = layers[i];
if (l > fromLayer && l < toLayer) {
const nx = firstColX + cols[i] * colSpacing;
// Check horizontal overlap
if (nx < maxX && nx + nodeW > minX) return true;
}
}
return false;
};
const renderForwardEdge = (edge: { fromIdx: number; toIdx: number; fanCount: number; fanIndex: number; label?: string }, i: number) => {
const from = nodePos(edge.fromIdx);
const to = nodePos(edge.toIdx);
const fromCenterX = from.x + nodeW / 2;
const toCenterX = to.x + nodeW / 2;
const y1 = from.y + NODE_H;
const y2 = to.y;
// Fan-out: spread exit points across the source node's bottom
let startX = fromCenterX;
if (edge.fanCount > 1) {
const spread = nodeW * 0.5;
const step = edge.fanCount > 1 ? spread / (edge.fanCount - 1) : 0;
startX = fromCenterX - spread / 2 + edge.fanIndex * step;
}
const midY = (y1 + y2) / 2;
const fromLayer = layers[edge.fromIdx];
const toLayer = layers[edge.toIdx];
const skipsLayers = toLayer - fromLayer > 1;
let d: string;
if (skipsLayers && hasCollision(fromLayer, toLayer, from.x, to.x)) {
// Route around intermediate nodes: curve to the left
const detourX = Math.min(from.x, to.x) - nodeW * 0.4;
d = `M ${startX} ${y1} C ${startX} ${y1 + 20}, ${detourX} ${y1 + 20}, ${detourX} ${midY} S ${toCenterX} ${y2 - 20} ${toCenterX} ${y2}`;
} else {
// Standard bezier: from source bottom to target top
d = `M ${startX} ${y1} C ${startX} ${midY}, ${toCenterX} ${midY}, ${toCenterX} ${y2}`;
}
const fromNode = nodes[edge.fromIdx];
const isActive = fromNode.status === "complete" || fromNode.status === "running" || fromNode.status === "looping";
const strokeColor = isActive ? "hsl(43,70%,45%,0.35)" : "hsl(35,10%,20%)";
const arrowColor = isActive ? "hsl(43,70%,45%,0.5)" : "hsl(35,10%,22%)";
return (
<g key={`fwd-${i}`}>
<path d={d} fill="none" stroke={strokeColor} strokeWidth={1.5} />
<polygon
points={`${toCenterX - 4},${y2 - 6} ${toCenterX + 4},${y2 - 6} ${toCenterX},${y2 - 1}`}
fill={arrowColor}
/>
{edge.label && (
<text
x={(startX + toCenterX) / 2 + 8}
y={midY - 2}
fill="hsl(35,15%,40%)"
fontSize={9}
fontStyle="italic"
>
{edge.label}
</text>
)}
</g>
);
};
const renderBackEdge = (edge: { fromIdx: number; toIdx: number }, i: number) => {
const from = nodePos(edge.fromIdx);
const to = nodePos(edge.toIdx);
const rightX = Math.max(from.x, to.x) + nodeW;
const rightOffset = 28 + i * 18;
const startX = from.x + nodeW;
const startY = from.y + NODE_H / 2;
const endX = to.x + nodeW;
const endY = to.y + NODE_H / 2;
const curveX = rightX + rightOffset;
const r = 12;
const fromNode = nodes[edge.fromIdx];
const isActive = fromNode.status === "complete" || fromNode.status === "running" || fromNode.status === "looping";
const color = isActive ? "hsl(38,80%,50%,0.3)" : "hsl(35,10%,20%)";
// Bezier curve with rounded corners
const path = `M ${startX} ${startY} C ${startX + r} ${startY}, ${curveX} ${startY}, ${curveX} ${startY - r} L ${curveX} ${endY + r} C ${curveX} ${endY}, ${endX + r} ${endY}, ${endX + 6} ${endY}`;
return (
<g key={`back-${i}`}>
<path d={path} fill="none" stroke={color} strokeWidth={1.5} strokeDasharray="4 3" />
<polygon
points={`${endX + 6},${endY - 3} ${endX + 6},${endY + 3} ${endX},${endY}`}
fill={isActive ? "hsl(38,80%,50%,0.45)" : "hsl(35,10%,22%)"}
/>
</g>
);
};
const renderNode = (node: GraphNode, i: number) => {
const pos = nodePos(i);
const isActive = node.status === "running" || node.status === "looping";
const isDone = node.status === "complete";
const colors = statusColors[node.status];
const clipId = `clip-label-${node.id}`;
return (
<g key={node.id} onClick={() => onNodeClick?.(node)} style={{ cursor: onNodeClick ? "pointer" : "default" }}>
{/* Ambient glow for active nodes */}
{isActive && (
<>
<rect
x={pos.x - 4} y={pos.y - 4}
width={nodeW + 8} height={NODE_H + 8}
rx={16} fill={colors.glow}
/>
<rect
x={pos.x - 2} y={pos.y - 2}
width={nodeW + 4} height={NODE_H + 4}
rx={14} fill="none" stroke={colors.dot} strokeWidth={1} opacity={0.25}
style={{ animation: "pulse-ring 2.5s ease-out infinite" }}
/>
</>
)}
{/* Node background */}
<rect
x={pos.x} y={pos.y}
width={nodeW} height={NODE_H}
rx={12}
fill={colors.bg}
stroke={colors.border}
strokeWidth={isActive ? 1.5 : 1}
/>
{/* Status dot */}
<circle cx={pos.x + 18} cy={pos.y + NODE_H / 2} r={4.5} fill={colors.dot} />
{isActive && (
<circle cx={pos.x + 18} cy={pos.y + NODE_H / 2} r={7} fill="none" stroke={colors.dot} strokeWidth={1} opacity={0.3}>
<animate attributeName="r" values="7;11;7" dur="2s" repeatCount="indefinite" />
<animate attributeName="opacity" values="0.3;0;0.3" dur="2s" repeatCount="indefinite" />
</circle>
)}
{/* Check mark for complete */}
{isDone && (
<text
x={pos.x + 18} y={pos.y + NODE_H / 2 + 1}
fill={colors.dot} fontSize={8} fontWeight={700}
textAnchor="middle" dominantBaseline="middle"
>
&#x2713;
</text>
)}
{/* Label -- properly capitalized, clipped for narrow nodes */}
<clipPath id={clipId}>
<rect x={pos.x + 30} y={pos.y} width={nodeW - 38} height={NODE_H} />
</clipPath>
<text
x={pos.x + 32} y={pos.y + NODE_H / 2}
fill={isActive ? "hsl(45,90%,85%)" : isDone ? "hsl(40,20%,75%)" : "hsl(35,10%,45%)"}
fontSize={nodeW < 140 ? 10.5 : 12.5}
fontWeight={isActive ? 600 : isDone ? 500 : 400}
dominantBaseline="middle"
letterSpacing="0.01em"
clipPath={`url(#${clipId})`}
>
{formatLabel(node.id)}
</text>
{/* Status label for active nodes */}
{node.statusLabel && isActive && (
<text
x={pos.x + nodeW + 10} y={pos.y + NODE_H / 2}
fill="hsl(45,80%,60%)" fontSize={10.5} fontStyle="italic"
dominantBaseline="middle" opacity={0.8}
>
{node.statusLabel}
</text>
)}
{/* Iteration badge */}
{node.iterations !== undefined && node.iterations > 0 && (
<g>
<rect
x={pos.x + nodeW - 36} y={pos.y + NODE_H / 2 - 8}
width={26} height={16} rx={8}
fill={colors.dot} opacity={0.15}
/>
<text
x={pos.x + nodeW - 23} y={pos.y + NODE_H / 2}
fill={colors.dot} fontSize={9} fontWeight={600}
textAnchor="middle" dominantBaseline="middle" opacity={0.8}
>
{node.iterations}{node.maxIterations ? `/${node.maxIterations}` : "\u00d7"}
</text>
</g>
)}
</g>
);
};
return (
<div className="flex flex-col h-full">
{/* Compact sub-label */}
<div className="px-5 pt-4 pb-2 flex items-center justify-between">
<div className="flex items-center gap-2">
<p className="text-[11px] text-muted-foreground font-medium uppercase tracking-wider">Pipeline</p>
{version && (
<span className="text-[10px] font-mono font-medium text-muted-foreground/60 border border-border/30 rounded px-1 py-0.5 leading-none">
{version}
</span>
)}
</div>
<RunButton runState={runState} disabled={nodes.length === 0} onRun={handleRun} onPause={onPause ?? (() => {})} btnRef={runBtnRef} />
</div>
{/* Graph */}
<div className="flex-1 overflow-y-auto overflow-x-hidden px-3 pb-5">
<svg
width={svgWidth}
height={svgHeight}
viewBox={`0 0 ${svgWidth} ${svgHeight}`}
className="select-none"
style={{ fontFamily: "'Inter', system-ui, sans-serif" }}
>
{forwardEdges.map((e, i) => renderForwardEdge(e, i))}
{backEdges.map((e, i) => renderBackEdge(e, i))}
{nodes.map((n, i) => renderNode(n, i))}
</svg>
</div>
</div>
);
}
+193
View File
@@ -0,0 +1,193 @@
import { memo, useState, useRef, useEffect } from "react";
import { Send, Crown, Cpu } from "lucide-react";
import { formatAgentDisplayName } from "@/lib/chat-helpers";
import MarkdownContent from "@/components/MarkdownContent";
export interface ChatMessage {
id: string;
agent: string;
agentColor: string;
content: string;
timestamp: string;
type?: "system" | "agent" | "user";
role?: "queen" | "worker";
/** Which worker thread this message belongs to (worker agent name) */
thread?: string;
}
interface ChatPanelProps {
messages: ChatMessage[];
onSend: (message: string, thread: string) => void;
isWaiting?: boolean;
activeThread: string;
/** When true, the agent is waiting for user input — changes placeholder text */
awaitingInput?: boolean;
/** When true, the input is disabled (e.g. during loading) */
disabled?: boolean;
}
const queenColor = "hsl(45,95%,58%)";
function getColor(_agent: string, role?: "queen" | "worker"): string {
if (role === "queen") return queenColor;
return "hsl(220,60%,55%)";
}
const MessageBubble = memo(function MessageBubble({ msg }: { msg: ChatMessage }) {
const isUser = msg.type === "user";
const isQueen = msg.role === "queen";
const color = getColor(msg.agent, msg.role);
if (msg.type === "system") {
return (
<div className="flex justify-center py-1">
<span className="text-[11px] text-muted-foreground bg-muted/60 px-3 py-1.5 rounded-full">
{msg.content}
</span>
</div>
);
}
if (isUser) {
return (
<div className="flex justify-end">
<div className="max-w-[75%] bg-primary text-primary-foreground text-sm leading-relaxed rounded-2xl rounded-br-md px-4 py-3">
<p className="whitespace-pre-wrap break-words">{msg.content}</p>
</div>
</div>
);
}
return (
<div className="flex gap-3">
<div
className={`flex-shrink-0 ${isQueen ? "w-9 h-9" : "w-7 h-7"} rounded-xl flex items-center justify-center`}
style={{
backgroundColor: `${color}18`,
border: `1.5px solid ${color}35`,
boxShadow: isQueen ? `0 0 12px ${color}20` : undefined,
}}
>
{isQueen ? (
<Crown className="w-4 h-4" style={{ color }} />
) : (
<Cpu className="w-3.5 h-3.5" style={{ color }} />
)}
</div>
<div className={`flex-1 min-w-0 ${isQueen ? "max-w-[85%]" : "max-w-[75%]"}`}>
<div className="flex items-center gap-2 mb-1">
<span className={`font-medium ${isQueen ? "text-sm" : "text-xs"}`} style={{ color }}>
{msg.agent}
</span>
<span
className={`text-[10px] font-medium px-1.5 py-0.5 rounded-md ${
isQueen ? "bg-primary/15 text-primary" : "bg-muted text-muted-foreground"
}`}
>
{isQueen ? "Queen" : "Worker"}
</span>
</div>
<div
className={`text-sm leading-relaxed rounded-2xl rounded-tl-md px-4 py-3 ${
isQueen ? "border border-primary/20 bg-primary/5" : "bg-muted/60"
}`}
>
<MarkdownContent content={msg.content} />
</div>
</div>
</div>
);
}, (prev, next) => prev.msg.id === next.msg.id && prev.msg.content === next.msg.content);
export default function ChatPanel({ messages, onSend, isWaiting, activeThread, awaitingInput, disabled }: ChatPanelProps) {
const [input, setInput] = useState("");
const [readMap, setReadMap] = useState<Record<string, number>>({});
const bottomRef = useRef<HTMLDivElement>(null);
const threadMessages = messages.filter((m) => {
if (m.type === "system" && !m.thread) return false;
return m.thread === activeThread;
});
console.log('[ChatPanel] render: messages:', messages.length, 'threadMessages:', threadMessages.length, 'activeThread:', activeThread, 'threads:', [...new Set(messages.map(m => m.thread))]);
// Mark current thread as read
useEffect(() => {
const count = messages.filter((m) => m.thread === activeThread).length;
setReadMap((prev) => ({ ...prev, [activeThread]: count }));
}, [activeThread, messages]);
// Suppress unused var
void readMap;
const lastMsg = threadMessages[threadMessages.length - 1];
useEffect(() => {
bottomRef.current?.scrollIntoView({ behavior: "smooth" });
}, [threadMessages.length, lastMsg?.content]);
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (!input.trim()) return;
onSend(input.trim(), activeThread);
setInput("");
};
const activeWorkerLabel = formatAgentDisplayName(activeThread);
return (
<div className="flex flex-col h-full min-w-0">
{/* Compact sub-header */}
<div className="px-5 pt-4 pb-2 flex items-center gap-2">
<p className="text-[11px] text-muted-foreground font-medium uppercase tracking-wider">Conversation</p>
</div>
{/* Messages */}
<div className="flex-1 overflow-auto scrollbar-hide px-5 py-4 space-y-3">
{threadMessages.map((msg) => (
<MessageBubble key={msg.id} msg={msg} />
))}
{isWaiting && (
<div className="flex gap-3">
<div className="w-7 h-7 rounded-xl bg-muted flex items-center justify-center">
<Cpu className="w-3.5 h-3.5 text-muted-foreground" />
</div>
<div className="bg-muted/60 rounded-2xl rounded-tl-md px-4 py-3">
<div className="flex gap-1.5">
<span className="w-1.5 h-1.5 rounded-full bg-muted-foreground animate-bounce" style={{ animationDelay: "0ms" }} />
<span className="w-1.5 h-1.5 rounded-full bg-muted-foreground animate-bounce" style={{ animationDelay: "150ms" }} />
<span className="w-1.5 h-1.5 rounded-full bg-muted-foreground animate-bounce" style={{ animationDelay: "300ms" }} />
</div>
</div>
</div>
)}
<div ref={bottomRef} />
</div>
{/* Input */}
<form onSubmit={handleSubmit} className="p-4 border-t border-border">
<div className="flex items-center gap-3 bg-muted/40 rounded-xl px-4 py-2.5 border border-border focus-within:border-primary/40 transition-colors">
<input
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder={
disabled
? "Connecting to agent..."
: awaitingInput
? "Agent is waiting for your response..."
: `Message ${activeWorkerLabel}...`
}
disabled={disabled}
className="flex-1 bg-transparent text-sm text-foreground outline-none placeholder:text-muted-foreground disabled:opacity-50 disabled:cursor-not-allowed"
/>
<button
type="submit"
disabled={!input.trim() || disabled}
className="p-2 rounded-lg bg-primary text-primary-foreground disabled:opacity-30 hover:opacity-90 transition-opacity"
>
<Send className="w-4 h-4" />
</button>
</div>
</form>
</div>
);
}
@@ -0,0 +1,387 @@
import { useState, useEffect, useCallback } from "react";
import { KeyRound, Check, AlertCircle, X, Shield, Loader2, Trash2, ExternalLink } from "lucide-react";
import { credentialsApi, type AgentCredentialRequirement, type CheckAgentResponse } from "@/api/credentials";
export interface Credential {
id: string;
name: string;
description: string;
icon: string;
connected: boolean;
required: boolean;
}
/** Create fresh (disconnected) credentials for an agent type.
* Real credentials are fetched from the backend via agentPath — this returns
* an empty list as a safe default until the backend responds. */
export function createFreshCredentials(_agentType: string): Credential[] {
return [];
}
/** Clone credentials from an existing set (for new instances of the same agent) */
export function cloneCredentials(existing: Credential[]): Credential[] {
return existing.map(c => ({ ...c }));
}
/** Check if all required credentials are connected */
export function allRequiredCredentialsMet(creds: Credential[]): boolean {
return creds.filter(c => c.required).every(c => c.connected);
}
// Internal display type for the modal
interface CredentialRow {
id: string;
name: string;
description: string;
icon: string;
connected: boolean;
required: boolean;
credentialKey: string; // key name within the credential (e.g., "api_key")
adenSupported: boolean; // whether this credential uses OAuth via Aden
valid: boolean | null; // true = health check passed, false = failed, null = not checked
validationMessage: string | null;
}
function requirementToRow(r: AgentCredentialRequirement): CredentialRow {
return {
id: r.credential_id,
name: r.credential_name,
description: r.description,
icon: "\uD83D\uDD11",
connected: r.available,
required: true,
credentialKey: r.credential_key || "api_key",
adenSupported: r.aden_supported,
valid: r.valid,
validationMessage: r.validation_message,
};
}
// Module-level cache: credential requirements are static per agent path.
// Cleared on save/delete so the next fetch picks up updated availability.
const credentialCache = new Map<string, AgentCredentialRequirement[]>();
interface CredentialsModalProps {
agentType: string;
agentLabel: string;
open: boolean;
onClose: () => void;
agentPath?: string;
onCredentialChange?: () => void;
// Legacy props — still accepted for backward compat but ignored when backend is available
credentials?: Credential[];
onToggleCredential?: (credId: string) => void;
}
export default function CredentialsModal({
agentType,
agentLabel,
open,
onClose,
agentPath,
onCredentialChange,
credentials: legacyCredentials,
onToggleCredential,
}: CredentialsModalProps) {
const [rows, setRows] = useState<CredentialRow[]>([]);
const [loading, setLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
const [editingId, setEditingId] = useState<string | null>(null);
const [inputValue, setInputValue] = useState("");
const [saving, setSaving] = useState(false);
const fetchStatus = useCallback(async () => {
setError(null);
try {
if (agentPath) {
// Check cache first — credential requirements are static per agent
const cached = credentialCache.get(agentPath);
if (cached) {
setRows(cached.map(requirementToRow));
setLoading(false);
return;
}
// Real agent — ask backend what credentials it actually needs
setLoading(true);
const result: CheckAgentResponse = await credentialsApi.checkAgent(agentPath);
credentialCache.set(agentPath, result.required);
setRows(result.required.map(requirementToRow));
} else {
// No real path — no credentials to show
setRows([]);
}
} catch {
// Backend unavailable — fall back to legacy props or empty
if (legacyCredentials) {
setRows(legacyCredentials.map(c => ({
...c,
credentialKey: "api_key",
adenSupported: false,
valid: null,
validationMessage: null,
})));
} else {
setRows([]);
}
} finally {
setLoading(false);
}
}, [agentPath, agentType, legacyCredentials]);
// Fetch on open
useEffect(() => {
if (open) {
fetchStatus();
setEditingId(null);
setInputValue("");
}
}, [open, fetchStatus]);
const handleConnect = async (row: CredentialRow) => {
if (row.adenSupported) {
// OAuth credential — redirect to Aden platform
window.open("https://hive.adenhq.com/", "_blank", "noopener");
return;
}
if (editingId === row.id) {
// Already editing — save
if (!inputValue.trim()) return;
setSaving(true);
try {
await credentialsApi.save(row.id, { [row.credentialKey]: inputValue.trim() });
setEditingId(null);
setInputValue("");
if (agentPath) credentialCache.delete(agentPath);
onCredentialChange?.();
await fetchStatus();
} catch {
setError(`Failed to save ${row.name}`);
} finally {
setSaving(false);
}
} else {
// Start editing — show inline API key input
setEditingId(row.id);
setInputValue("");
}
};
const handleDisconnect = async (row: CredentialRow) => {
setSaving(true);
try {
await credentialsApi.delete(row.id);
if (agentPath) credentialCache.delete(agentPath);
onCredentialChange?.();
await fetchStatus();
} catch {
// Backend unavailable — fall back to legacy toggle
onToggleCredential?.(row.id);
} finally {
setSaving(false);
}
};
if (!open) return null;
const connectedCount = rows.filter(c => c.connected).length;
const requiredCount = rows.filter(c => c.required).length;
const requiredConnected = rows.filter(c => c.required && c.connected).length;
const invalidCount = rows.filter(c => c.valid === false).length;
const missingCount = requiredCount - requiredConnected;
const allRequiredMet = requiredConnected === requiredCount && invalidCount === 0;
return (
<>
{/* Backdrop */}
<div className="fixed inset-0 z-50 bg-black/60 backdrop-blur-sm" onClick={onClose} />
{/* Modal */}
<div className="fixed inset-0 z-50 flex items-center justify-center p-4 pointer-events-none">
<div className="bg-card border border-border rounded-xl shadow-2xl w-full max-w-md pointer-events-auto">
{/* Header */}
<div className="flex items-center justify-between px-5 py-4 border-b border-border/60">
<div className="flex items-center gap-3">
<div className="w-8 h-8 rounded-lg bg-primary/10 border border-primary/20 flex items-center justify-center">
<KeyRound className="w-4 h-4 text-primary" />
</div>
<div>
<h2 className="text-sm font-semibold text-foreground">Credentials</h2>
<p className="text-[11px] text-muted-foreground">{agentLabel}</p>
</div>
</div>
<button onClick={onClose} className="p-1.5 rounded-md hover:bg-muted/60 text-muted-foreground hover:text-foreground transition-colors">
<X className="w-4 h-4" />
</button>
</div>
{/* Status banner */}
{!loading && (
<div className={`mx-5 mt-4 px-3 py-2.5 rounded-lg border text-xs font-medium flex items-center gap-2 ${
allRequiredMet
? "bg-emerald-500/10 border-emerald-500/20 text-emerald-600"
: "bg-destructive/5 border-destructive/20 text-destructive"
}`}>
{allRequiredMet ? (
<>
<Shield className="w-3.5 h-3.5" />
All required credentials connected ({connectedCount}/{rows.length} total)
</>
) : (
<>
<AlertCircle className="w-3.5 h-3.5" />
{missingCount > 0 && `${missingCount} missing`}
{missingCount > 0 && invalidCount > 0 && ", "}
{invalidCount > 0 && `${invalidCount} invalid`}
</>
)}
</div>
)}
{/* Error banner */}
{error && (
<div className="mx-5 mt-2 px-3 py-2 rounded-lg border border-destructive/20 bg-destructive/5 text-xs text-destructive">
{error}
</div>
)}
{/* Loading state */}
{loading && (
<div className="p-8 flex items-center justify-center">
<Loader2 className="w-5 h-5 animate-spin text-muted-foreground" />
</div>
)}
{/* Credential list */}
{!loading && (
<div className="p-5 space-y-2">
{rows.map((row) => (
<div key={row.id}>
<div
className={`flex items-center gap-3 px-3 py-3 rounded-lg border transition-colors ${
row.connected && row.valid !== false
? "border-primary/20 bg-primary/[0.03]"
: row.valid === false
? "border-destructive/30 bg-destructive/[0.03]"
: "border-border/60 bg-muted/20"
}`}
>
<span className="text-lg flex-shrink-0">{row.icon}</span>
<div className="flex-1 min-w-0">
<div className="flex items-center gap-2">
<span className="text-sm font-medium text-foreground">{row.name}</span>
{row.required && (
<span className={`text-[9px] font-semibold uppercase tracking-wider px-1.5 py-0.5 rounded ${
row.connected
? "text-emerald-600/70 bg-emerald-500/10"
: "text-destructive/70 bg-destructive/10"
}`}>
Required
</span>
)}
</div>
<p className="text-[11px] text-muted-foreground mt-0.5">{row.description}</p>
{row.valid === false && row.validationMessage && (
<p className="text-[11px] text-destructive mt-0.5">{row.validationMessage}</p>
)}
</div>
{row.connected ? (
<div className="flex items-center gap-1 flex-shrink-0">
{row.valid === false ? (
<span className="flex items-center gap-1.5 px-3 py-1.5 rounded-md text-xs font-medium bg-destructive/10 text-destructive" title={row.validationMessage || "Invalid"}>
<AlertCircle className="w-3 h-3" />
Invalid
</span>
) : (
<span className="flex items-center gap-1.5 px-3 py-1.5 rounded-md text-xs font-medium bg-primary/10 text-primary">
<Check className="w-3 h-3" />
Connected
</span>
)}
<button
onClick={() => handleDisconnect(row)}
disabled={saving}
className="p-1.5 rounded-md text-muted-foreground hover:text-destructive hover:bg-destructive/10 transition-colors"
title="Disconnect"
>
<Trash2 className="w-3 h-3" />
</button>
</div>
) : (
<button
onClick={() => handleConnect(row)}
disabled={saving}
className="flex items-center gap-1.5 px-3 py-1.5 rounded-md text-xs font-medium bg-muted/60 text-foreground hover:bg-muted transition-colors flex-shrink-0"
>
{row.adenSupported ? (
<>
<ExternalLink className="w-3 h-3" />
Authorize
</>
) : (
<>
<KeyRound className="w-3 h-3" />
Connect
</>
)}
</button>
)}
</div>
{/* Inline API key input */}
{editingId === row.id && !row.connected && (
<div className="mt-1.5 flex gap-2 px-3">
<input
type="password"
value={inputValue}
onChange={(e) => setInputValue(e.target.value)}
onKeyDown={(e) => {
if (e.key === "Enter") handleConnect(row);
if (e.key === "Escape") { setEditingId(null); setInputValue(""); }
}}
placeholder={`Paste your ${row.name} API key...`}
autoFocus
className="flex-1 px-3 py-1.5 rounded-md border border-border bg-background text-xs text-foreground placeholder:text-muted-foreground focus:outline-none focus:ring-1 focus:ring-primary/40"
/>
<button
onClick={() => handleConnect(row)}
disabled={saving || !inputValue.trim()}
className="px-3 py-1.5 rounded-md text-xs font-medium bg-primary text-primary-foreground hover:bg-primary/90 disabled:opacity-50 disabled:cursor-not-allowed transition-colors"
>
{saving ? <Loader2 className="w-3 h-3 animate-spin" /> : "Save"}
</button>
<button
onClick={() => { setEditingId(null); setInputValue(""); }}
className="px-2 py-1.5 rounded-md text-xs text-muted-foreground hover:bg-muted transition-colors"
>
Cancel
</button>
</div>
)}
</div>
))}
</div>
)}
{/* Footer */}
{!loading && (
<div className="px-5 pb-4">
<button
onClick={onClose}
disabled={!allRequiredMet}
className={`w-full py-2.5 rounded-lg text-sm font-medium transition-colors ${
allRequiredMet
? "bg-primary text-primary-foreground hover:bg-primary/90"
: "bg-muted text-muted-foreground cursor-not-allowed"
}`}
>
{allRequiredMet ? "Done" : missingCount > 0 ? "Connect required credentials to continue" : "Fix invalid credentials to continue"}
</button>
</div>
)}
</div>
</div>
</>
);
}
@@ -0,0 +1,97 @@
import ReactMarkdown from "react-markdown";
import remarkGfm from "remark-gfm";
import type { Components } from "react-markdown";
import { cn } from "@/lib/utils";
const components: Components = {
// Headers: same size as body text, just bold — keeps chat bubbles compact
h1: ({ children }) => <h1 className="font-bold mt-3 mb-1 first:mt-0">{children}</h1>,
h2: ({ children }) => <h2 className="font-bold mt-2 mb-1 first:mt-0">{children}</h2>,
h3: ({ children }) => <h3 className="font-semibold mt-2 mb-1 first:mt-0">{children}</h3>,
// Paragraphs: preserve whitespace and line breaks (matches existing plain-text behavior)
p: ({ children }) => <p className="whitespace-pre-wrap break-words mb-2 last:mb-0">{children}</p>,
// Lists
ul: ({ children }) => <ul className="list-disc pl-4 mb-2 last:mb-0 space-y-0.5">{children}</ul>,
ol: ({ children }) => <ol className="list-decimal pl-4 mb-2 last:mb-0 space-y-0.5">{children}</ol>,
li: ({ children }) => <li>{children}</li>,
// Inline code
code: ({ className, children, ...props }) => {
const isBlock = className?.includes("language-");
if (isBlock) {
return (
<code className={cn("text-xs", className)} {...props}>
{children}
</code>
);
}
return (
<code className="bg-muted px-1 py-0.5 rounded text-[13px] font-mono">
{children}
</code>
);
},
// Code blocks
pre: ({ children }) => (
<pre className="bg-muted/80 rounded-lg p-3 overflow-x-auto text-xs font-mono my-2 last:mb-0">
{children}
</pre>
),
// Links
a: ({ href, children }) => (
<a
href={href}
target="_blank"
rel="noopener noreferrer"
className="text-primary underline underline-offset-2 hover:opacity-80"
>
{children}
</a>
),
// Tables
table: ({ children }) => (
<div className="overflow-x-auto my-2 last:mb-0">
<table className="text-xs border-collapse w-full">{children}</table>
</div>
),
th: ({ children }) => (
<th className="border border-border px-2 py-1 text-left font-semibold bg-muted/40">
{children}
</th>
),
td: ({ children }) => <td className="border border-border px-2 py-1">{children}</td>,
// Blockquotes
blockquote: ({ children }) => (
<blockquote className="border-l-2 border-primary/40 pl-3 my-2 text-muted-foreground italic">
{children}
</blockquote>
),
// Horizontal rules
hr: () => <hr className="border-border my-3" />,
// Strong & emphasis inherit naturally from <strong>/<em> defaults — no overrides needed
};
const remarkPlugins = [remarkGfm];
interface MarkdownContentProps {
content: string;
className?: string;
}
export default function MarkdownContent({ content, className }: MarkdownContentProps) {
return (
<div className={cn("break-words text-foreground", className)}>
<ReactMarkdown remarkPlugins={remarkPlugins} components={components}>
{content}
</ReactMarkdown>
</div>
);
}
@@ -0,0 +1,406 @@
import { useState, useEffect, useRef } from "react";
import { X, Cpu, Zap, Clock, RotateCcw, CheckCircle2, AlertCircle, Loader2, ChevronDown, ChevronRight, Copy, Check, Terminal, Wrench, BookOpen, GitBranch, Bot } from "lucide-react";
import type { GraphNode, NodeStatus } from "./AgentGraph";
import type { NodeSpec, ToolInfo, NodeCriteria } from "../api/types";
import { graphsApi } from "../api/graphs";
import { logsApi } from "../api/logs";
import MarkdownContent from "./MarkdownContent";
interface Tool {
name: string;
description: string;
icon: string;
credentials?: ToolCredential[];
}
interface ToolCredential {
key: string;
label: string;
connected: boolean;
value?: string;
}
interface NodeDetailPanelProps {
node: GraphNode | null;
nodeSpec?: NodeSpec | null;
sessionId?: string;
graphId?: string;
workerSessionId?: string | null;
nodeLogs?: string[];
actionPlan?: string;
onClose: () => void;
}
const statusConfig: Record<NodeStatus, { label: string; color: string; Icon: React.FC<{ className?: string }> }> = {
running: { label: "Running", color: "hsl(45,95%,58%)", Icon: ({ className }) => <Loader2 className={`${className} animate-spin`} /> },
looping: { label: "Looping", color: "hsl(38,90%,55%)", Icon: ({ className }) => <RotateCcw className={`${className} animate-spin`} style={{ animationDuration: "2s" }} /> },
complete: { label: "Complete", color: "hsl(43,70%,45%)", Icon: ({ className }) => <CheckCircle2 className={className} /> },
pending: { label: "Pending", color: "hsl(220,15%,45%)", Icon: ({ className }) => <Clock className={className} /> },
error: { label: "Error", color: "hsl(0,65%,55%)", Icon: ({ className }) => <AlertCircle className={className} /> },
};
function formatNodeId(id: string): string {
return id.split("-").map(w => w.charAt(0).toUpperCase() + w.slice(1)).join(" ");
}
function CredentialRow({ cred }: { cred: ToolCredential }) {
return (
<div className="flex items-center justify-between px-3 py-2 rounded-lg bg-background/60 border border-border/30 mt-1.5">
<div className="flex items-center gap-2 min-w-0">
<span className={`w-1.5 h-1.5 rounded-full flex-shrink-0 ${cred.connected ? "bg-primary" : "bg-muted-foreground/40"}`} />
<span className="text-[11px] text-muted-foreground font-medium truncate">{cred.label}</span>
</div>
{cred.connected ? (
<span className="text-[10px] text-primary/80 font-medium flex-shrink-0 ml-2">Connected</span>
) : (
<button className="text-[10px] px-2 py-0.5 rounded-md bg-primary/15 text-primary border border-primary/25 font-semibold hover:bg-primary/25 transition-colors flex-shrink-0 ml-2">
Connect
</button>
)}
</div>
);
}
function ToolRow({ tool }: { tool: Tool }) {
const [expanded, setExpanded] = useState(false);
const hasCreds = tool.credentials && tool.credentials.length > 0;
return (
<div className="rounded-xl border border-border/20 overflow-hidden">
<button
onClick={() => hasCreds && setExpanded(v => !v)}
className={`w-full flex items-start gap-3 p-3 bg-muted/30 hover:bg-muted/50 transition-colors text-left ${!hasCreds ? "cursor-default" : ""}`}
>
<span className="text-base leading-none mt-0.5 flex-shrink-0">{tool.icon}</span>
<div className="min-w-0 flex-1">
<p className="text-xs font-medium text-foreground">{tool.name}</p>
<p className="text-[11px] text-muted-foreground mt-0.5 leading-relaxed">{tool.description}</p>
</div>
{hasCreds && (
<span className="flex-shrink-0 mt-0.5">
{expanded
? <ChevronDown className="w-3 h-3 text-muted-foreground" />
: <ChevronRight className="w-3 h-3 text-muted-foreground" />
}
</span>
)}
</button>
{expanded && hasCreds && (
<div className="px-3 pb-3 bg-muted/20 border-t border-border/15">
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mt-2 mb-1">Credentials</p>
{tool.credentials!.map(cred => (
<CredentialRow key={cred.key} cred={cred} />
))}
</div>
)}
</div>
);
}
function LogsTab({ nodeId, isActive: _isActive, sessionId, graphId, workerSessionId, nodeLogs }: { nodeId: string; isActive: boolean; sessionId?: string; graphId?: string; workerSessionId?: string | null; nodeLogs?: string[] }) {
const [historicalLines, setHistoricalLines] = useState<string[]>([]);
const bottomRef = useRef<HTMLDivElement>(null);
// Fetch historical logs when session is available (post-execution viewing)
useEffect(() => {
if (sessionId && graphId && workerSessionId) {
logsApi.nodeLogs(sessionId, graphId, nodeId, workerSessionId)
.then(r => {
const realLines: string[] = [];
if (r.details) {
for (const d of r.details) {
realLines.push(`[LOG] ${d.node_name}${d.success ? "SUCCESS" : "FAILED"}${d.error ? ` (${d.error})` : ""}${d.total_steps} steps`);
}
}
if (r.tool_logs) {
for (const s of r.tool_logs) {
realLines.push(`[STEP ${s.step_index}] ${s.llm_text.slice(0, 120)}${s.llm_text.length > 120 ? "..." : ""}`);
}
}
if (realLines.length > 0) {
setHistoricalLines(realLines);
}
})
.catch(() => { /* keep fallback on error */ });
}
}, [sessionId, graphId, nodeId, workerSessionId]);
// Resolve which lines to display: live SSE logs > historical > default
const lines = (nodeLogs && nodeLogs.length > 0)
? nodeLogs
: historicalLines.length > 0
? historicalLines
: ["[--:--:--] INFO Awaiting execution..."];
useEffect(() => {
bottomRef.current?.scrollIntoView({ behavior: "smooth" });
}, [lines]);
return (
<div className="flex-1 overflow-auto scrollbar-hide bg-background/80 rounded-xl border border-border/20 font-mono text-[10.5px] leading-relaxed p-3">
{lines.map((line, i) => {
const isWarn = line.includes(" WARN ");
const isErr = line.includes(" ERROR ");
const isDebug = line.includes(" DEBUG ");
return (
<div
key={i}
className={isErr ? "text-red-400" : isWarn ? "text-yellow-400/80" : isDebug ? "text-muted-foreground/50" : "text-green-400/70"}
>
{line}
</div>
);
})}
<div ref={bottomRef} />
</div>
);
}
function SystemPromptTab({ systemPrompt }: { systemPrompt?: string }) {
const prompt = systemPrompt || "";
const [copied, setCopied] = useState(false);
const handleCopy = () => {
navigator.clipboard.writeText(prompt);
setCopied(true);
setTimeout(() => setCopied(false), 1500);
};
if (!prompt) {
return (
<div className="flex-1 flex items-center justify-center">
<p className="text-xs text-muted-foreground/60 italic text-center">No system prompt configured</p>
</div>
);
}
return (
<div className="flex-1 overflow-auto scrollbar-hide flex flex-col gap-2">
<div className="flex items-center justify-between">
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider">System Prompt</p>
<button
onClick={handleCopy}
className="flex items-center gap-1 text-[10px] text-muted-foreground hover:text-foreground transition-colors"
>
{copied ? <Check className="w-3 h-3 text-primary" /> : <Copy className="w-3 h-3" />}
{copied ? "Copied" : "Copy"}
</button>
</div>
<textarea
readOnly
value={prompt}
className="flex-1 min-h-[240px] w-full rounded-xl bg-muted/30 border border-border/20 text-[11px] text-muted-foreground leading-relaxed p-3 font-mono resize-none focus:outline-none focus:border-border/40"
/>
</div>
);
}
function SubagentsTab() {
return (
<div className="flex-1 flex items-center justify-center">
<p className="text-xs text-muted-foreground/60 italic text-center">No subagents assigned to this node.</p>
</div>
);
}
type Tab = "overview" | "tools" | "logs" | "prompt" | "subagents";
const tabs: { id: Tab; label: string; Icon: React.FC<{ className?: string }> }[] = [
{ id: "overview", label: "Overview", Icon: ({ className }) => <GitBranch className={className} /> },
{ id: "tools", label: "Tools", Icon: ({ className }) => <Wrench className={className} /> },
{ id: "logs", label: "Logs", Icon: ({ className }) => <Terminal className={className} /> },
{ id: "prompt", label: "Prompt", Icon: ({ className }) => <BookOpen className={className} /> },
{ id: "subagents", label: "Subagents", Icon: ({ className }) => <Bot className={className} /> },
];
export default function NodeDetailPanel({ node, nodeSpec, sessionId, graphId, workerSessionId, nodeLogs, actionPlan, onClose }: NodeDetailPanelProps) {
const [activeTab, setActiveTab] = useState<Tab>("overview");
const [realTools, setRealTools] = useState<ToolInfo[] | null>(null);
const [realCriteria, setRealCriteria] = useState<NodeCriteria | null>(null);
useEffect(() => {
setActiveTab("overview");
setRealTools(null);
setRealCriteria(null);
}, [node?.id]);
// Fetch real tool descriptions when Tools tab is active and session is loaded
useEffect(() => {
if (activeTab === "tools" && sessionId && graphId && node) {
graphsApi.nodeTools(sessionId, graphId, node.id)
.then(r => setRealTools(r.tools))
.catch(() => setRealTools(null));
}
}, [activeTab, sessionId, graphId, node?.id]);
// Fetch real criteria when Overview tab is active and session is loaded
useEffect(() => {
if (activeTab === "overview" && sessionId && graphId && node) {
graphsApi.nodeCriteria(sessionId, graphId, node.id, workerSessionId || undefined)
.then(r => setRealCriteria(r))
.catch(() => setRealCriteria(null));
}
}, [activeTab, sessionId, graphId, node?.id, workerSessionId]);
if (!node) return null;
const status = statusConfig[node.status];
const StatusIcon = status.Icon;
const isActive = node.status === "running" || node.status === "looping";
return (
<div className="flex flex-col h-full border-l border-border/40 bg-card/20 animate-in slide-in-from-right">
{/* Header */}
<div className="px-4 pt-4 pb-3 border-b border-border/30 flex items-start justify-between gap-2 flex-shrink-0">
<div className="flex items-start gap-3 min-w-0">
<div
className="w-8 h-8 rounded-lg flex items-center justify-center flex-shrink-0 mt-0.5"
style={{ backgroundColor: `${status.color}18`, border: `1.5px solid ${status.color}35` }}
>
<Cpu className="w-3.5 h-3.5" style={{ color: status.color }} />
</div>
<div className="min-w-0">
<h3 className="text-sm font-semibold text-foreground leading-tight">{formatNodeId(node.id)}</h3>
<div className="flex items-center gap-1.5 mt-1">
<span style={{ color: status.color }}><StatusIcon className="w-3 h-3 flex-shrink-0" /></span>
<span className="text-[11px] font-medium" style={{ color: status.color }}>{status.label}</span>
{node.iterations !== undefined && node.iterations > 0 && (
<>
<span className="text-muted-foreground/40 text-[10px]">&middot;</span>
<span className="text-[11px] text-muted-foreground">
{node.iterations}{node.maxIterations ? `/${node.maxIterations}` : ""} iterations
</span>
</>
)}
</div>
</div>
</div>
<button
onClick={onClose}
className="p-1 rounded-md text-muted-foreground hover:text-foreground hover:bg-muted/50 transition-colors flex-shrink-0"
>
<X className="w-3.5 h-3.5" />
</button>
</div>
{/* Status label */}
{node.statusLabel && (
<div className="px-4 py-2 border-b border-border/20 flex-shrink-0">
<div className="flex items-center gap-2 text-[11px] text-muted-foreground bg-muted/40 rounded-lg px-3 py-2">
<Zap className="w-3 h-3 text-primary flex-shrink-0" />
<span className="italic">{node.statusLabel}</span>
</div>
</div>
)}
{/* Tab bar */}
<div className="flex border-b border-border/30 flex-shrink-0 px-2 pt-1 overflow-x-auto scrollbar-hide">
{tabs.map(tab => (
<button
key={tab.id}
onClick={() => setActiveTab(tab.id)}
className={`flex items-center gap-1.5 px-3 py-2 text-[11px] font-medium border-b-2 transition-colors -mb-px ${
activeTab === tab.id
? "border-primary text-primary"
: "border-transparent text-muted-foreground hover:text-foreground"
}`}
>
<tab.Icon className="w-3 h-3" />
{tab.label}
</button>
))}
</div>
{/* Tab content */}
<div className="flex-1 overflow-auto scrollbar-hide px-4 py-4 flex flex-col gap-3">
{activeTab === "overview" && (
<>
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider">Action Plan</p>
{actionPlan ? (
<div className="rounded-lg border border-border/30 bg-background/60 px-3 py-2.5 text-[11px] leading-relaxed text-foreground/80">
<MarkdownContent content={actionPlan} />
</div>
) : (
<div className="flex items-center justify-center py-6">
<p className="text-[11px] text-muted-foreground/50 italic">Action plan will appear when node starts running</p>
</div>
)}
{(() => {
if (realCriteria && realCriteria.success_criteria) {
const criteriaLines = realCriteria.success_criteria.split("\n").filter(l => l.trim());
const passed = realCriteria.last_execution?.success ?? null;
return (
<div className="mt-1">
<div className="flex items-center justify-between mb-2">
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider">Judge Criteria</p>
{passed !== null && (
<span className={`text-[10px] font-medium px-2 py-0.5 rounded-full ${passed ? "bg-[hsl(43,70%,45%)]/15 text-[hsl(43,70%,45%)]" : "bg-red-500/15 text-red-400"}`}>
{passed ? "Passed" : "Failed"}
</span>
)}
</div>
<div className="flex flex-col gap-1.5">
{criteriaLines.map((line, i) => (
<div key={i} className="flex items-start gap-2">
<div className={`mt-0.5 w-3.5 h-3.5 rounded-full flex-shrink-0 flex items-center justify-center border ${passed ? "border-transparent bg-[hsl(43,70%,45%)]" : "border-border/40 bg-muted/30"}`}>
{passed && (
<svg viewBox="0 0 8 8" className="w-2 h-2" fill="none">
<path d="M1.5 4l2 2 3-3" stroke="white" strokeWidth="1.2" strokeLinecap="round" strokeLinejoin="round"/>
</svg>
)}
</div>
<span className={`text-[11px] leading-relaxed ${passed ? "text-foreground/70" : "text-foreground/80"}`}>{line}</span>
</div>
))}
</div>
</div>
);
}
return null;
})()}
{node.next && node.next.length > 0 && (
<div className="mt-2">
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mb-2">Sends to</p>
<div className="flex flex-wrap gap-1.5">
{node.next.map((n) => (
<span key={n} className="text-[11px] px-2.5 py-1 rounded-full bg-primary/10 text-primary border border-primary/20 font-medium">
{formatNodeId(n)}
</span>
))}
</div>
</div>
)}
</>
)}
{activeTab === "tools" && (
<div className="space-y-2">
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mb-1">Tools & Integrations</p>
{realTools && realTools.length > 0
? realTools.map((t, i) => (
<ToolRow key={i} tool={{ name: t.name, description: t.description || "No description available", icon: "\ud83d\udd27" }} />
))
: (
<div className="flex items-center justify-center py-6">
<p className="text-[11px] text-muted-foreground/50 italic">No tools available</p>
</div>
)
}
</div>
)}
{activeTab === "logs" && (
<LogsTab nodeId={node.id} isActive={isActive} sessionId={sessionId} graphId={graphId} workerSessionId={workerSessionId} nodeLogs={nodeLogs} />
)}
{activeTab === "prompt" && (
<SystemPromptTab systemPrompt={nodeSpec?.system_prompt} />
)}
{activeTab === "subagents" && (
<SubagentsTab />
)}
</div>
</div>
);
}
+152
View File
@@ -0,0 +1,152 @@
import { useState, useCallback } from "react";
import { useNavigate } from "react-router-dom";
import { Crown, X } from "lucide-react";
import { loadPersistedTabs, savePersistedTabs, TAB_STORAGE_KEY, type PersistedTabState } from "@/lib/tab-persistence";
export interface TopBarTab {
agentType: string;
label: string;
isActive: boolean;
hasRunning: boolean;
}
interface TopBarProps {
/** Live tabs from workspace state. When omitted, reads from localStorage. */
tabs?: TopBarTab[];
/** Called when a tab is clicked (workspace overrides to setActiveWorker). */
onTabClick?: (agentType: string) => void;
/** Called when a tab's X is clicked (workspace overrides for SSE teardown). */
onCloseTab?: (agentType: string) => void;
/** Whether close buttons are shown. Defaults to true when >1 tab. */
canCloseTabs?: boolean;
/** Content rendered right after the tab strip (e.g. + button). */
afterTabs?: React.ReactNode;
/** Right-side slot for page-specific controls (e.g. credentials). */
children?: React.ReactNode;
}
export default function TopBar({ tabs: tabsProp, onTabClick, onCloseTab, canCloseTabs, afterTabs, children }: TopBarProps) {
const navigate = useNavigate();
// Fallback: read persisted tabs when no live tabs provided
const [persisted, setPersisted] = useState<PersistedTabState | null>(() =>
tabsProp ? null : loadPersistedTabs()
);
const tabs: TopBarTab[] = tabsProp ?? deriveTabs(persisted);
const showClose = canCloseTabs ?? true;
const handleTabClick = useCallback((agentType: string) => {
if (onTabClick) {
onTabClick(agentType);
} else {
navigate(`/workspace?agent=${encodeURIComponent(agentType)}`);
}
}, [onTabClick, navigate]);
const handleCloseTab = useCallback((agentType: string, e: React.MouseEvent) => {
e.stopPropagation();
if (onCloseTab) {
onCloseTab(agentType);
return;
}
// Fallback: update localStorage directly (non-workspace pages)
setPersisted(prev => {
if (!prev) return null;
const nextTabs = prev.tabs.filter(t => t.agentType !== agentType);
if (nextTabs.length === 0) {
localStorage.removeItem(TAB_STORAGE_KEY);
return null;
}
const removedIds = new Set(prev.tabs.filter(t => t.agentType === agentType).map(t => t.id));
const nextSessions = { ...prev.sessions };
for (const id of removedIds) delete nextSessions[id];
const nextActiveSession = { ...prev.activeSessionByAgent };
delete nextActiveSession[agentType];
const nextActiveWorker = prev.activeWorker === agentType
? nextTabs[0].agentType
: prev.activeWorker;
const nextState: PersistedTabState = {
tabs: nextTabs,
activeSessionByAgent: nextActiveSession,
activeWorker: nextActiveWorker,
sessions: nextSessions,
};
savePersistedTabs(nextState);
return nextState;
});
}, [onCloseTab]);
return (
<div className="relative h-12 flex items-center justify-between px-5 border-b border-border/60 bg-card/50 backdrop-blur-sm flex-shrink-0">
<div className="flex items-center gap-3 min-w-0">
<button onClick={() => navigate("/")} className="flex items-center gap-2 hover:opacity-80 transition-opacity flex-shrink-0">
<Crown className="w-4 h-4 text-primary" />
<span className="text-sm font-semibold text-primary">Hive</span>
</button>
{tabs.length > 0 && (
<>
<span className="text-border text-xs flex-shrink-0">|</span>
<div className="flex items-center gap-0.5 min-w-0 overflow-x-auto scrollbar-hide">
{tabs.map((tab) => (
<button
key={tab.agentType}
onClick={() => handleTabClick(tab.agentType)}
className={`group flex items-center gap-1.5 px-3 py-1.5 rounded-md text-xs font-medium transition-colors whitespace-nowrap flex-shrink-0 ${
tab.isActive
? "bg-primary/15 text-primary"
: "text-muted-foreground hover:text-foreground hover:bg-muted/50"
}`}
>
{tab.hasRunning && (
<span className="relative flex h-1.5 w-1.5 flex-shrink-0">
<span className="animate-ping absolute inline-flex h-full w-full rounded-full bg-primary opacity-60" />
<span className="relative inline-flex rounded-full h-1.5 w-1.5 bg-primary" />
</span>
)}
<span>{tab.label}</span>
{showClose && (
<X
className="w-3 h-3 opacity-0 group-hover:opacity-60 hover:!opacity-100 transition-opacity"
onClick={(e) => handleCloseTab(tab.agentType, e)}
/>
)}
</button>
))}
</div>
{afterTabs}
</>
)}
</div>
{children && (
<div className="flex items-center gap-1 flex-shrink-0">
{children}
</div>
)}
</div>
);
}
/** Derive TopBarTab[] from persisted localStorage state (used outside workspace). */
function deriveTabs(persisted: PersistedTabState | null): TopBarTab[] {
if (!persisted) return [];
const seen = new Set<string>();
const tabs: TopBarTab[] = [];
for (const tab of persisted.tabs) {
if (seen.has(tab.agentType)) continue;
seen.add(tab.agentType);
const sessionData = persisted.sessions?.[tab.id];
const hasRunning = sessionData?.graphNodes?.some(
(n) => n.status === "running" || n.status === "looping"
) ?? false;
tabs.push({
agentType: tab.agentType,
label: tab.label,
isActive: false, // no active tab outside workspace
hasRunning,
});
}
return tabs;
}
+126
View File
@@ -0,0 +1,126 @@
import { useEffect, useRef, useCallback, useState } from "react";
import type { AgentEvent, EventTypeName } from "@/api/types";
interface UseSSEOptions {
sessionId: string;
eventTypes?: EventTypeName[];
onEvent?: (event: AgentEvent) => void;
enabled?: boolean;
}
export function useSSE({
sessionId,
eventTypes,
onEvent,
enabled = true,
}: UseSSEOptions) {
const [connected, setConnected] = useState(false);
const [lastEvent, setLastEvent] = useState<AgentEvent | null>(null);
const eventSourceRef = useRef<EventSource | null>(null);
const onEventRef = useRef(onEvent);
onEventRef.current = onEvent;
const typesKey = eventTypes?.join(",") ?? "";
useEffect(() => {
if (!enabled || !sessionId) return;
let url = `/api/sessions/${sessionId}/events`;
if (eventTypes?.length) {
url += `?types=${eventTypes.join(",")}`;
}
const es = new EventSource(url);
eventSourceRef.current = es;
es.onopen = () => setConnected(true);
es.onerror = () => setConnected(false);
const handler = (e: MessageEvent) => {
try {
const event: AgentEvent = JSON.parse(e.data);
setLastEvent(event);
onEventRef.current?.(event);
} catch {
// Ignore parse errors (keepalive comments)
}
};
es.onmessage = handler;
return () => {
es.close();
eventSourceRef.current = null;
setConnected(false);
};
}, [sessionId, enabled, typesKey]);
const close = useCallback(() => {
eventSourceRef.current?.close();
eventSourceRef.current = null;
setConnected(false);
}, []);
return { connected, lastEvent, close };
}
// --- Multi-session SSE hook ---
interface UseMultiSSEOptions {
/** Map of agentType → backendSessionId. Only non-empty IDs get an EventSource. */
sessions: Record<string, string>;
onEvent: (agentType: string, event: AgentEvent) => void;
}
/**
* Manages one EventSource per loaded session. Diffs `sessions` on each render:
* opens new connections, closes removed ones, leaves existing ones alone.
*/
export function useMultiSSE({ sessions, onEvent }: UseMultiSSEOptions) {
const onEventRef = useRef(onEvent);
onEventRef.current = onEvent;
const sourcesRef = useRef(new Map<string, EventSource>());
// Diff-based open/close — runs on every `sessions` change
useEffect(() => {
const current = sourcesRef.current;
const desired = new Set(Object.keys(sessions));
// Close connections for sessions no longer in the map
for (const [agentType, es] of current) {
if (!desired.has(agentType)) {
es.close();
current.delete(agentType);
}
}
// Open connections for newly added sessions
for (const [agentType, sessionId] of Object.entries(sessions)) {
if (!sessionId || current.has(agentType)) continue;
const url = `/api/sessions/${sessionId}/events`;
const es = new EventSource(url);
es.onmessage = (e: MessageEvent) => {
try {
const event: AgentEvent = JSON.parse(e.data);
console.log('[SSE] received:', agentType, event.type, event.stream_id, event.node_id);
onEventRef.current(agentType, event);
} catch {
// Ignore parse errors (keepalive comments)
}
};
current.set(agentType, es);
}
}, [sessions]);
// Close all on unmount only
useEffect(() => {
return () => {
for (const es of sourcesRef.current.values()) es.close();
sourcesRef.current.clear();
};
}, []);
}
+115
View File
@@ -0,0 +1,115 @@
@import "tailwindcss";
@custom-variant dark (&:is(.dark *));
@theme {
--color-background: hsl(var(--background));
--color-foreground: hsl(var(--foreground));
--color-card: hsl(var(--card));
--color-card-foreground: hsl(var(--card-foreground));
--color-popover: hsl(var(--popover));
--color-popover-foreground: hsl(var(--popover-foreground));
--color-primary: hsl(var(--primary));
--color-primary-foreground: hsl(var(--primary-foreground));
--color-secondary: hsl(var(--secondary));
--color-secondary-foreground: hsl(var(--secondary-foreground));
--color-muted: hsl(var(--muted));
--color-muted-foreground: hsl(var(--muted-foreground));
--color-accent: hsl(var(--accent));
--color-accent-foreground: hsl(var(--accent-foreground));
--color-destructive: hsl(var(--destructive));
--color-destructive-foreground: hsl(var(--destructive-foreground));
--color-border: hsl(var(--border));
--color-input: hsl(var(--input));
--color-ring: hsl(var(--ring));
--radius-sm: calc(var(--radius) - 4px);
--radius-md: calc(var(--radius) - 2px);
--radius-lg: var(--radius);
--radius-xl: calc(var(--radius) + 4px);
}
@layer base {
:root {
--background: 0 0% 100%;
--foreground: 0 0% 3.9%;
--card: 0 0% 100%;
--card-foreground: 0 0% 3.9%;
--popover: 0 0% 100%;
--popover-foreground: 0 0% 3.9%;
--primary: 45 93% 47%;
--primary-foreground: 0 0% 2%;
--secondary: 0 0% 96.1%;
--secondary-foreground: 0 0% 9%;
--muted: 0 0% 96.1%;
--muted-foreground: 0 0% 45.1%;
--accent: 0 0% 96.1%;
--accent-foreground: 0 0% 9%;
--destructive: 0 84.2% 60.2%;
--destructive-foreground: 0 0% 98%;
--border: 0 0% 89.8%;
--input: 0 0% 89.8%;
--ring: 45 93% 47%;
--radius: 0.5rem;
}
.dark {
--background: 240 6% 6%;
--foreground: 0 0% 95%;
--card: 240 5% 8%;
--card-foreground: 0 0% 95%;
--popover: 240 5% 8%;
--popover-foreground: 0 0% 95%;
--primary: 45 93% 47%;
--primary-foreground: 0 0% 2%;
--secondary: 240 3.7% 15.9%;
--secondary-foreground: 0 0% 98%;
--muted: 240 3.7% 15.9%;
--muted-foreground: 240 5% 64.9%;
--accent: 240 3.7% 15.9%;
--accent-foreground: 0 0% 98%;
--destructive: 0 62.8% 50.6%;
--destructive-foreground: 0 0% 98%;
--border: 240 3.7% 15.9%;
--input: 240 3.7% 15.9%;
--ring: 45 93% 47%;
}
}
@layer base {
* {
@apply border-border;
}
body {
@apply bg-background text-foreground;
}
button {
cursor: pointer;
}
}
/* Scrollbar hide utility */
.scrollbar-hide {
-ms-overflow-style: none;
scrollbar-width: none;
}
.scrollbar-hide::-webkit-scrollbar {
display: none;
}
/* Pulse ring animation for SVG elements */
@keyframes pulse-ring {
0% { opacity: 0.25; transform: scale(1); }
50% { opacity: 0; transform: scale(1.05); }
100% { opacity: 0.25; transform: scale(1); }
}
/* Slide-in animation */
@keyframes slide-in-from-right {
from { transform: translateX(10px); opacity: 0; }
to { transform: translateX(0); opacity: 1; }
}
.animate-in.slide-in-from-right {
animation: slide-in-from-right 0.2s ease-out;
}
+302
View File
@@ -0,0 +1,302 @@
import { describe, it, expect } from "vitest";
import { backendMessageToChatMessage, sseEventToChatMessage, formatAgentDisplayName } from "./chat-helpers";
import type { AgentEvent, Message } from "@/api/types";
// ---------------------------------------------------------------------------
// backendMessageToChatMessage
// ---------------------------------------------------------------------------
describe("backendMessageToChatMessage", () => {
it("converts a user message", () => {
const msg: Message = { seq: 1, role: "user", content: "hello", _node_id: "chat" };
const result = backendMessageToChatMessage(msg, "inbox-management");
expect(result.type).toBe("user");
expect(result.agent).toBe("You");
expect(result.role).toBeUndefined();
expect(result.content).toBe("hello");
expect(result.thread).toBe("inbox-management");
});
it("converts an assistant message with node_id as agent", () => {
const msg: Message = { seq: 2, role: "assistant", content: "hi", _node_id: "intake" };
const result = backendMessageToChatMessage(msg, "inbox-management");
expect(result.agent).toBe("intake");
expect(result.role).toBe("worker");
expect(result.type).toBeUndefined();
});
it("defaults agent to 'Agent' when _node_id is empty", () => {
const msg: Message = { seq: 3, role: "assistant", content: "ok", _node_id: "" };
const result = backendMessageToChatMessage(msg, "inbox-management");
expect(result.agent).toBe("Agent");
});
it("produces deterministic ID from seq", () => {
const msg: Message = { seq: 42, role: "user", content: "test", _node_id: "x" };
const result = backendMessageToChatMessage(msg, "thread");
expect(result.id).toBe("backend-42");
});
it("passes through the thread parameter", () => {
const msg: Message = { seq: 1, role: "user", content: "hi", _node_id: "x" };
const result = backendMessageToChatMessage(msg, "my-thread");
expect(result.thread).toBe("my-thread");
});
it("uses agentDisplayName instead of node_id when provided", () => {
const msg: Message = { seq: 2, role: "assistant", content: "hi", _node_id: "intake" };
const result = backendMessageToChatMessage(msg, "thread", "Competitive Intel Agent");
expect(result.agent).toBe("Competitive Intel Agent");
});
it("still shows 'You' for user messages even when agentDisplayName is provided", () => {
const msg: Message = { seq: 1, role: "user", content: "hello", _node_id: "chat" };
const result = backendMessageToChatMessage(msg, "thread", "My Agent");
expect(result.agent).toBe("You");
});
});
// ---------------------------------------------------------------------------
// sseEventToChatMessage
// ---------------------------------------------------------------------------
function makeEvent(overrides: Partial<AgentEvent>): AgentEvent {
return {
type: "execution_started",
stream_id: "s1",
node_id: null,
execution_id: null,
data: {},
timestamp: "2026-01-01T00:00:00Z",
correlation_id: null,
graph_id: null,
...overrides,
};
}
describe("sseEventToChatMessage", () => {
it("converts client_output_delta to streaming message with snapshot", () => {
const event = makeEvent({
type: "client_output_delta",
node_id: "chat",
execution_id: "abc",
data: { content: "hello", snapshot: "hello world" },
});
const result = sseEventToChatMessage(event, "inbox-management");
expect(result).not.toBeNull();
expect(result!.id).toBe("stream-abc-chat");
expect(result!.content).toBe("hello world");
expect(result!.role).toBe("worker");
expect(result!.agent).toBe("chat");
});
it("produces same ID for same execution_id + node_id (enables upsert)", () => {
const event1 = makeEvent({
type: "client_output_delta",
node_id: "chat",
execution_id: "abc",
data: { snapshot: "first" },
});
const event2 = makeEvent({
type: "client_output_delta",
node_id: "chat",
execution_id: "abc",
data: { snapshot: "second" },
});
expect(sseEventToChatMessage(event1, "t")!.id).toBe(
sseEventToChatMessage(event2, "t")!.id,
);
});
it("uses turnId for message ID when provided", () => {
const event = makeEvent({
type: "client_output_delta",
node_id: "chat",
execution_id: null,
data: { snapshot: "hello" },
});
const result = sseEventToChatMessage(event, "t", undefined, 3);
expect(result!.id).toBe("stream-3-chat");
});
it("different turnIds produce different message IDs (separate bubbles)", () => {
const event = makeEvent({
type: "client_output_delta",
node_id: "chat",
execution_id: null,
data: { snapshot: "hello" },
});
const r1 = sseEventToChatMessage(event, "t", undefined, 1);
const r2 = sseEventToChatMessage(event, "t", undefined, 2);
expect(r1!.id).not.toBe(r2!.id);
});
it("same turnId produces same ID within a turn (enables streaming upsert)", () => {
const e1 = makeEvent({
type: "client_output_delta",
node_id: "chat",
execution_id: null,
data: { snapshot: "partial" },
});
const e2 = makeEvent({
type: "client_output_delta",
node_id: "chat",
execution_id: null,
data: { snapshot: "partial response" },
});
expect(sseEventToChatMessage(e1, "t", undefined, 5)!.id).toBe(
sseEventToChatMessage(e2, "t", undefined, 5)!.id,
);
});
it("falls back to execution_id when turnId is not provided", () => {
const event = makeEvent({
type: "client_output_delta",
node_id: "chat",
execution_id: "exec-123",
data: { snapshot: "hello" },
});
const result = sseEventToChatMessage(event, "t");
expect(result!.id).toBe("stream-exec-123-chat");
});
it("falls back to '0' when both turnId and execution_id are null", () => {
const event = makeEvent({
type: "client_output_delta",
node_id: "chat",
execution_id: null,
data: { snapshot: "hello" },
});
const result = sseEventToChatMessage(event, "t");
expect(result!.id).toBe("stream-0-chat");
});
it("converts client_input_requested with prompt to message", () => {
const event = makeEvent({
type: "client_input_requested",
node_id: "chat",
execution_id: "abc",
data: { prompt: "What next?" },
});
const result = sseEventToChatMessage(event, "t");
expect(result).not.toBeNull();
expect(result!.content).toBe("What next?");
expect(result!.role).toBe("worker");
});
it("returns null for client_input_requested without prompt", () => {
const event = makeEvent({
type: "client_input_requested",
node_id: "chat",
execution_id: "abc",
data: { prompt: "" },
});
expect(sseEventToChatMessage(event, "t")).toBeNull();
});
it("converts execution_failed to system error message", () => {
const event = makeEvent({
type: "execution_failed",
execution_id: "abc",
data: { error: "timeout" },
});
const result = sseEventToChatMessage(event, "t");
expect(result).not.toBeNull();
expect(result!.type).toBe("system");
expect(result!.content).toContain("timeout");
});
it("returns null for execution_started (no chat message)", () => {
const event = makeEvent({ type: "execution_started", execution_id: "abc" });
expect(sseEventToChatMessage(event, "t")).toBeNull();
});
it("uses agentDisplayName instead of node_id when provided", () => {
const event = makeEvent({
type: "client_output_delta",
node_id: "research",
execution_id: "abc",
data: { snapshot: "results" },
});
const result = sseEventToChatMessage(event, "t", "Competitive Intel Agent");
expect(result).not.toBeNull();
expect(result!.agent).toBe("Competitive Intel Agent");
});
it("converts llm_text_delta with snapshot to worker message", () => {
const event = makeEvent({
type: "llm_text_delta",
node_id: "news-search",
execution_id: "abc",
data: { content: "Searching", snapshot: "Searching for news articles..." },
});
const result = sseEventToChatMessage(event, "t");
expect(result).not.toBeNull();
expect(result!.id).toBe("stream-abc-news-search");
expect(result!.content).toBe("Searching for news articles...");
expect(result!.role).toBe("worker");
expect(result!.agent).toBe("news-search");
});
it("returns null for llm_text_delta with empty snapshot", () => {
const event = makeEvent({
type: "llm_text_delta",
node_id: "news-search",
execution_id: "abc",
data: { content: "", snapshot: "" },
});
expect(sseEventToChatMessage(event, "t")).toBeNull();
});
it("uses node_id (not agentDisplayName) for llm_text_delta", () => {
const event = makeEvent({
type: "llm_text_delta",
node_id: "news-search",
execution_id: "abc",
data: { snapshot: "results" },
});
const result = sseEventToChatMessage(event, "t", "Competitive Intel Agent");
expect(result).not.toBeNull();
expect(result!.agent).toBe("news-search");
});
it("still uses 'System' for execution_failed even when agentDisplayName is provided", () => {
const event = makeEvent({
type: "execution_failed",
execution_id: "abc",
data: { error: "boom" },
});
const result = sseEventToChatMessage(event, "t", "My Agent");
expect(result!.agent).toBe("System");
});
});
// ---------------------------------------------------------------------------
// formatAgentDisplayName
// ---------------------------------------------------------------------------
describe("formatAgentDisplayName", () => {
it("converts underscored agent name to title case", () => {
expect(formatAgentDisplayName("competitive_intel_agent")).toBe("Competitive Intel Agent");
});
it("strips -graph suffix", () => {
expect(formatAgentDisplayName("competitive_intel_agent-graph")).toBe("Competitive Intel Agent");
});
it("strips _graph suffix", () => {
expect(formatAgentDisplayName("my_agent_graph")).toBe("My Agent");
});
it("converts hyphenated names to title case", () => {
expect(formatAgentDisplayName("inbox-management")).toBe("Inbox Management");
});
it("takes the last path segment", () => {
expect(formatAgentDisplayName("examples/templates/job_hunter")).toBe("Job Hunter");
});
it("handles a single word", () => {
expect(formatAgentDisplayName("agent")).toBe("Agent");
});
});
+141
View File
@@ -0,0 +1,141 @@
/**
* Pure functions for converting backend messages and SSE events into ChatMessage objects.
* No React dependencies — just JSON in, object out.
*/
import type { ChatMessage } from "@/components/ChatPanel";
import type { AgentEvent, Message } from "@/api/types";
/**
* Derive a human-readable display name from a raw agent identifier.
*
* Examples:
* "competitive_intel_agent" → "Competitive Intel Agent"
* "competitive_intel_agent-graph" → "Competitive Intel Agent"
* "inbox-management" → "Inbox Management"
* "job_hunter" → "Job Hunter"
*/
export function formatAgentDisplayName(raw: string): string {
// Take the last path segment (in case it's a path like "examples/templates/foo")
const base = raw.split("/").pop() || raw;
// Strip common suffixes like "-graph" or "_graph"
const stripped = base.replace(/[-_]graph$/, "");
// Replace underscores and hyphens with spaces, then title-case each word
return stripped
.replace(/[_-]/g, " ")
.replace(/\b\w/g, (c) => c.toUpperCase())
.trim();
}
/**
* Convert a backend Message (from sessionsApi.messages()) into a ChatMessage.
* When agentDisplayName is provided, it is used as the sender for all agent
* messages instead of the raw node_id.
*/
export function backendMessageToChatMessage(
msg: Message,
thread: string,
agentDisplayName?: string,
): ChatMessage {
return {
id: `backend-${msg.seq}`,
agent: msg.role === "user" ? "You" : agentDisplayName || msg._node_id || "Agent",
agentColor: "",
content: msg.content,
timestamp: "",
type: msg.role === "user" ? "user" : undefined,
role: msg.role === "user" ? undefined : "worker",
thread,
};
}
/**
* Convert an SSE AgentEvent into a ChatMessage, or null if the event
* doesn't produce a visible chat message.
* When agentDisplayName is provided, it is used as the sender for all agent
* messages instead of the raw node_id.
*/
export function sseEventToChatMessage(
event: AgentEvent,
thread: string,
agentDisplayName?: string,
turnId?: number,
): ChatMessage | null {
// turnId disambiguates messages across response turns. Within a single
// turn the ID stays stable so the upsert logic can replace the previous
// snapshot (streaming). Across turns, different turnIds produce different
// IDs so each response gets its own bubble.
const idKey = turnId != null ? String(turnId) : (event.execution_id ?? "0");
switch (event.type) {
case "client_output_delta": {
const snapshot = (event.data?.snapshot as string) || (event.data?.content as string) || "";
if (!snapshot) return null;
return {
id: `stream-${idKey}-${event.node_id}`,
agent: agentDisplayName || event.node_id || "Agent",
agentColor: "",
content: snapshot,
timestamp: "",
role: "worker",
thread,
};
}
case "client_input_requested": {
const prompt = (event.data?.prompt as string) || "";
if (!prompt) return null;
return {
id: `input-req-${idKey}-${event.node_id}`,
agent: agentDisplayName || event.node_id || "Agent",
agentColor: "",
content: prompt,
timestamp: "",
role: "worker",
thread,
};
}
case "llm_text_delta": {
const snapshot = (event.data?.snapshot as string) || (event.data?.content as string) || "";
if (!snapshot) return null;
return {
id: `stream-${idKey}-${event.node_id}`,
agent: event.node_id || "Agent",
agentColor: "",
content: snapshot,
timestamp: "",
role: "worker",
thread,
};
}
case "execution_paused": {
return {
id: `paused-${event.execution_id}`,
agent: "System",
agentColor: "",
content: "Execution paused by user",
timestamp: "",
type: "system",
thread,
};
}
case "execution_failed": {
const error = (event.data?.error as string) || "Execution failed";
return {
id: `error-${event.execution_id}`,
agent: "System",
agentColor: "",
content: `Error: ${error}`,
timestamp: "",
type: "system",
thread,
};
}
default:
return null;
}
}
@@ -0,0 +1,260 @@
import { describe, it, expect } from "vitest";
import { topologyToGraphNodes } from "./graph-converter";
import type { GraphTopology, NodeSpec } from "@/api/types";
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeNode(id: string, overrides: Partial<NodeSpec> = {}): NodeSpec {
return {
id,
name: id,
description: "",
node_type: "event_loop",
input_keys: [],
output_keys: [],
nullable_output_keys: [],
tools: [],
routes: {},
max_retries: 3,
max_node_visits: 0,
client_facing: false,
success_criteria: null,
system_prompt: "",
...overrides,
};
}
// ---------------------------------------------------------------------------
// Edge classification
// ---------------------------------------------------------------------------
describe("edge classification", () => {
it("linear chain: all edges in next[], no backEdges", () => {
const topology: GraphTopology = {
nodes: [makeNode("A"), makeNode("B"), makeNode("C")],
edges: [
{ source: "A", target: "B", condition: "on_success", priority: 0 },
{ source: "B", target: "C", condition: "on_success", priority: 0 },
],
entry_node: "A",
};
const result = topologyToGraphNodes(topology);
expect(result).toHaveLength(3);
const a = result.find((n) => n.id === "A")!;
const b = result.find((n) => n.id === "B")!;
const c = result.find((n) => n.id === "C")!;
expect(a.next).toEqual(["B"]);
expect(a.backEdges).toBeUndefined();
expect(b.next).toEqual(["C"]);
expect(b.backEdges).toBeUndefined();
expect(c.next).toBeUndefined();
expect(c.backEdges).toBeUndefined();
});
it("loop edge: classified as backEdge", () => {
const topology: GraphTopology = {
nodes: [makeNode("A"), makeNode("B"), makeNode("C")],
edges: [
{ source: "A", target: "B", condition: "on_success", priority: 0 },
{ source: "B", target: "C", condition: "on_success", priority: 0 },
{ source: "C", target: "A", condition: "on_success", priority: 0 },
],
entry_node: "A",
};
const result = topologyToGraphNodes(topology);
const c = result.find((n) => n.id === "C")!;
expect(c.next).toBeUndefined();
expect(c.backEdges).toEqual(["A"]);
});
it("diamond/fan-out: multiple next targets", () => {
const topology: GraphTopology = {
nodes: [makeNode("A"), makeNode("B"), makeNode("C"), makeNode("D")],
edges: [
{ source: "A", target: "B", condition: "on_success", priority: 0 },
{ source: "A", target: "C", condition: "on_failure", priority: 1 },
{ source: "B", target: "D", condition: "on_success", priority: 0 },
{ source: "C", target: "D", condition: "on_success", priority: 0 },
],
entry_node: "A",
};
const result = topologyToGraphNodes(topology);
const a = result.find((n) => n.id === "A")!;
expect(a.next).toEqual(expect.arrayContaining(["B", "C"]));
expect(a.next).toHaveLength(2);
});
});
// ---------------------------------------------------------------------------
// Status mapping
// ---------------------------------------------------------------------------
describe("status mapping", () => {
it("no enrichment: all nodes pending", () => {
const topology: GraphTopology = {
nodes: [makeNode("A"), makeNode("B")],
edges: [
{ source: "A", target: "B", condition: "on_success", priority: 0 },
],
entry_node: "A",
};
const result = topologyToGraphNodes(topology);
expect(result.every((n) => n.status === "pending")).toBe(true);
});
it("is_current: running", () => {
const topology: GraphTopology = {
nodes: [makeNode("A", { is_current: true, visit_count: 1, in_path: true })],
edges: [],
entry_node: "A",
};
const result = topologyToGraphNodes(topology);
expect(result[0].status).toBe("running");
});
it("is_current + visit_count > 1: looping", () => {
const topology: GraphTopology = {
nodes: [makeNode("A", { is_current: true, visit_count: 3, in_path: true })],
edges: [],
entry_node: "A",
};
const result = topologyToGraphNodes(topology);
expect(result[0].status).toBe("looping");
});
it("in_path + visited + not current: complete", () => {
const topology: GraphTopology = {
nodes: [makeNode("A", { in_path: true, visit_count: 1, is_current: false })],
edges: [],
entry_node: "A",
};
const result = topologyToGraphNodes(topology);
expect(result[0].status).toBe("complete");
});
it("has_failures: error", () => {
const topology: GraphTopology = {
nodes: [makeNode("A", { has_failures: true, in_path: true, visit_count: 1 })],
edges: [],
entry_node: "A",
};
const result = topologyToGraphNodes(topology);
expect(result[0].status).toBe("error");
});
});
// ---------------------------------------------------------------------------
// Iteration tracking
// ---------------------------------------------------------------------------
describe("iteration tracking", () => {
it("visit_count maps to iterations", () => {
const topology: GraphTopology = {
nodes: [makeNode("A", { visit_count: 3, in_path: true })],
edges: [],
entry_node: "A",
};
const result = topologyToGraphNodes(topology);
expect(result[0].iterations).toBe(3);
});
it("max_node_visits maps to maxIterations", () => {
const topology: GraphTopology = {
nodes: [makeNode("A", { max_node_visits: 5, visit_count: 1, in_path: true })],
edges: [],
entry_node: "A",
};
const result = topologyToGraphNodes(topology);
expect(result[0].maxIterations).toBe(5);
});
it("max_node_visits == 0 (unlimited): maxIterations omitted", () => {
const topology: GraphTopology = {
nodes: [makeNode("A", { max_node_visits: 0, visit_count: 1, in_path: true })],
edges: [],
entry_node: "A",
};
const result = topologyToGraphNodes(topology);
expect(result[0].maxIterations).toBeUndefined();
});
});
// ---------------------------------------------------------------------------
// Edge labels
// ---------------------------------------------------------------------------
describe("edge labels", () => {
it("conditional edges produce edgeLabels, on_success/always do not", () => {
const topology: GraphTopology = {
nodes: [makeNode("A"), makeNode("B"), makeNode("C"), makeNode("D")],
edges: [
{ source: "A", target: "B", condition: "conditional", priority: 0 },
{ source: "A", target: "C", condition: "on_failure", priority: 1 },
{ source: "B", target: "D", condition: "on_success", priority: 0 },
{ source: "C", target: "D", condition: "always", priority: 0 },
],
entry_node: "A",
};
const result = topologyToGraphNodes(topology);
const a = result.find((n) => n.id === "A")!;
const b = result.find((n) => n.id === "B")!;
const c = result.find((n) => n.id === "C")!;
// A has conditional + on_failure edges → both get labels
expect(a.edgeLabels).toEqual({ B: "conditional", C: "on_failure" });
// B has on_success → no label
expect(b.edgeLabels).toBeUndefined();
// C has always → no label
expect(c.edgeLabels).toBeUndefined();
});
});
// ---------------------------------------------------------------------------
// Node ordering
// ---------------------------------------------------------------------------
describe("node ordering", () => {
it("nodes returned in BFS walk order from entry_node, not input order", () => {
const topology: GraphTopology = {
// Input order: C, A, B — but BFS from A should yield A, B, C
nodes: [makeNode("C"), makeNode("A"), makeNode("B")],
edges: [
{ source: "A", target: "B", condition: "on_success", priority: 0 },
{ source: "B", target: "C", condition: "on_success", priority: 0 },
],
entry_node: "A",
};
const result = topologyToGraphNodes(topology);
expect(result.map((n) => n.id)).toEqual(["A", "B", "C"]);
});
it("empty topology returns empty array", () => {
const topology: GraphTopology = {
nodes: [],
edges: [],
entry_node: "",
};
const result = topologyToGraphNodes(topology);
expect(result).toEqual([]);
});
});
+130
View File
@@ -0,0 +1,130 @@
import type { GraphTopology, NodeSpec } from "@/api/types";
import type { GraphNode, NodeStatus } from "@/components/AgentGraph";
/**
* Convert a backend GraphTopology (nodes + edges + entry_node) into
* the GraphNode[] shape that AgentGraph renders.
*
* Three jobs:
* 1. Order nodes via BFS from entry_node
* 2. Classify edges as forward (next) or backward (backEdges)
* 3. Map session enrichment fields to NodeStatus
*/
export function topologyToGraphNodes(topology: GraphTopology): GraphNode[] {
const { nodes, edges, entry_node } = topology;
if (nodes.length === 0) return [];
// Build adjacency list: source → [target, ...]
const adj = new Map<string, string[]>();
for (const e of edges) {
const list = adj.get(e.source) || [];
list.push(e.target);
adj.set(e.source, list);
}
// BFS from entry_node to determine walk order + position map
const order: string[] = [];
const position = new Map<string, number>();
const visited = new Set<string>();
const start = entry_node || nodes[0].id;
const queue = [start];
visited.add(start);
while (queue.length > 0) {
const id = queue.shift()!;
position.set(id, order.length);
order.push(id);
for (const target of adj.get(id) || []) {
if (!visited.has(target)) {
visited.add(target);
queue.push(target);
}
}
}
// Add any nodes not reachable from entry (shouldn't happen in valid graphs)
for (const n of nodes) {
if (!visited.has(n.id)) {
position.set(n.id, order.length);
order.push(n.id);
}
}
// Build a node lookup
const nodeMap = new Map<string, NodeSpec>();
for (const n of nodes) {
nodeMap.set(n.id, n);
}
// Classify edges per source node
const nextMap = new Map<string, string[]>();
const backMap = new Map<string, string[]>();
for (const e of edges) {
const srcPos = position.get(e.source) ?? 0;
const tgtPos = position.get(e.target) ?? 0;
if (tgtPos <= srcPos) {
// Back edge (target is at same or earlier position in BFS)
const list = backMap.get(e.source) || [];
list.push(e.target);
backMap.set(e.source, list);
} else {
// Forward edge
const list = nextMap.get(e.source) || [];
list.push(e.target);
nextMap.set(e.source, list);
}
}
// Build edge condition labels (only for non-trivial conditions)
const edgeLabelMap = new Map<string, Record<string, string>>();
for (const e of edges) {
if (e.condition !== "always" && e.condition !== "on_success") {
const labels = edgeLabelMap.get(e.source) || {};
labels[e.target] = e.condition;
edgeLabelMap.set(e.source, labels);
}
}
// Build GraphNode[] in BFS order
return order.map((id) => {
const spec = nodeMap.get(id);
const next = nextMap.get(id);
const back = backMap.get(id);
const labels = edgeLabelMap.get(id);
const result: GraphNode = {
id,
label: spec?.name || id,
status: mapStatus(spec),
...(next && next.length > 0 ? { next } : {}),
...(back && back.length > 0 ? { backEdges: back } : {}),
...(labels ? { edgeLabels: labels } : {}),
};
// Iteration tracking from session enrichment
if (spec?.visit_count !== undefined && spec.visit_count > 0) {
result.iterations = spec.visit_count;
}
if (spec?.max_node_visits !== undefined && spec.max_node_visits > 0) {
result.maxIterations = spec.max_node_visits;
}
return result;
});
}
function mapStatus(spec: NodeSpec | undefined): NodeStatus {
if (!spec) return "pending";
if (spec.has_failures) return "error";
if (spec.is_current) {
return (spec.visit_count ?? 0) > 1 ? "looping" : "running";
}
if (spec.in_path && (spec.visit_count ?? 0) > 0) return "complete";
return "pending";
}
+49
View File
@@ -0,0 +1,49 @@
/**
* Shared tab persistence utilities for workspace sessions.
* Used by both TopBar and workspace.tsx.
*/
import type { ChatMessage } from "@/components/ChatPanel";
import type { GraphNode } from "@/components/AgentGraph";
export const TAB_STORAGE_KEY = "hive:workspace-tabs";
export interface PersistedTabState {
tabs: Array<{ id: string; agentType: string; label: string }>;
activeSessionByAgent: Record<string, string>;
activeWorker: string;
sessions?: Record<string, { messages: ChatMessage[]; graphNodes: GraphNode[] }>;
}
export function loadPersistedTabs(): PersistedTabState | null {
try {
const raw = localStorage.getItem(TAB_STORAGE_KEY);
if (!raw) return null;
const parsed = JSON.parse(raw);
if (!Array.isArray(parsed.tabs) || parsed.tabs.length === 0) return null;
return parsed as PersistedTabState;
} catch {
return null;
}
}
const MAX_PERSISTED_MESSAGES = 50;
export function savePersistedTabs(state: PersistedTabState): void {
try {
const capped = { ...state };
if (capped.sessions) {
const trimmed: typeof capped.sessions = {};
for (const [id, data] of Object.entries(capped.sessions)) {
trimmed[id] = {
messages: data.messages.slice(-MAX_PERSISTED_MESSAGES),
graphNodes: data.graphNodes,
};
}
capped.sessions = trimmed;
}
localStorage.setItem(TAB_STORAGE_KEY, JSON.stringify(capped));
} catch {
// localStorage full or unavailable — silently ignore
}
}
+6
View File
@@ -0,0 +1,6 @@
import { type ClassValue, clsx } from "clsx";
import { twMerge } from "tailwind-merge";
export function cn(...inputs: ClassValue[]) {
return twMerge(clsx(inputs));
}
+10
View File
@@ -0,0 +1,10 @@
import ReactDOM from "react-dom/client";
import { BrowserRouter } from "react-router-dom";
import App from "./App";
import "./index.css";
ReactDOM.createRoot(document.getElementById("root")!).render(
<BrowserRouter>
<App />
</BrowserRouter>
);
+243
View File
@@ -0,0 +1,243 @@
import { useState, useEffect } from "react";
import { useNavigate } from "react-router-dom";
import { Crown, Mail, Briefcase, Shield, Search, Newspaper, ArrowRight, Hexagon, Send, Bot } from "lucide-react";
import TopBar from "@/components/TopBar";
import type { LucideIcon } from "lucide-react";
import { agentsApi } from "@/api/agents";
import type { DiscoverEntry } from "@/api/types";
// --- Icon and color maps (backend can't serve icons) ---
const AGENT_ICONS: Record<string, LucideIcon> = {
email_inbox_management: Mail,
job_hunter: Briefcase,
vulnerability_assessment: Shield,
deep_research_agent: Search,
tech_news_reporter: Newspaper,
};
const AGENT_COLORS: Record<string, string> = {
email_inbox_management: "hsl(38,80%,55%)",
job_hunter: "hsl(30,85%,58%)",
vulnerability_assessment: "hsl(15,70%,52%)",
deep_research_agent: "hsl(210,70%,55%)",
tech_news_reporter: "hsl(270,60%,55%)",
};
function agentSlug(path: string): string {
return path.replace(/\/$/, "").split("/").pop() || path;
}
// --- Generic prompt hints (not tied to specific agents) ---
const promptHints = [
"Check my inbox for urgent emails",
"Find senior engineer roles that match my profile",
"Research the latest trends in AI agents",
"Run a security scan on my domain",
];
export default function Home() {
const navigate = useNavigate();
const [inputValue, setInputValue] = useState("");
const [showAgents, setShowAgents] = useState(false);
const [agents, setAgents] = useState<DiscoverEntry[]>([]);
const [loading, setLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
// Fetch agents on mount so data is ready when user toggles
useEffect(() => {
setLoading(true);
agentsApi
.discover()
.then((result) => {
const examples = result["Examples"] || [];
setAgents(examples);
})
.catch((err) => {
setError(err.message || "Failed to load agents");
})
.finally(() => {
setLoading(false);
});
}, []);
const handleSelect = (agentPath: string) => {
navigate(`/workspace?agent=${encodeURIComponent(agentPath)}`);
};
const handlePromptHint = (text: string) => {
navigate(`/workspace?agent=new-agent&prompt=${encodeURIComponent(text)}`);
};
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (inputValue.trim()) {
navigate(`/workspace?agent=new-agent&prompt=${encodeURIComponent(inputValue.trim())}`);
}
};
return (
<div className="min-h-screen bg-background flex flex-col">
<TopBar />
{/* Main content */}
<div className="flex-1 flex flex-col items-center justify-center p-6">
<div className="w-full max-w-2xl">
{/* Queen Bee greeting */}
<div className="text-center mb-8">
<div
className="inline-flex w-12 h-12 rounded-2xl items-center justify-center mb-4"
style={{
backgroundColor: "hsl(45,95%,58%,0.1)",
border: "1.5px solid hsl(45,95%,58%,0.25)",
boxShadow: "0 0 24px hsl(45,95%,58%,0.08)",
}}
>
<Crown className="w-6 h-6 text-primary" />
</div>
<h1 className="text-xl font-semibold text-foreground mb-1.5">What can I help you with?</h1>
<p className="text-sm text-muted-foreground">
I'm your Queen Bee I create and coordinate worker agents to handle tasks for you.
</p>
</div>
{/* Chat input */}
<form onSubmit={handleSubmit} className="mb-6">
<div className="relative border border-border/60 rounded-xl bg-card/50 hover:border-primary/30 focus-within:border-primary/40 transition-colors shadow-sm">
<input
value={inputValue}
onChange={(e) => setInputValue(e.target.value)}
onKeyDown={(e) => {
if (e.key === "Enter") {
e.preventDefault();
handleSubmit(e);
}
}}
placeholder="Describe a task for the hive..."
className="w-full bg-transparent px-5 py-3 pr-12 text-sm text-foreground placeholder:text-muted-foreground/60 focus:outline-none rounded-xl"
/>
<div className="absolute right-3 top-1/2 -translate-y-1/2">
<button
type="submit"
disabled={!inputValue.trim()}
className="w-7 h-7 rounded-lg bg-primary/90 hover:bg-primary text-primary-foreground flex items-center justify-center transition-colors disabled:opacity-30 disabled:cursor-not-allowed"
>
<Send className="w-3.5 h-3.5" />
</button>
</div>
</div>
</form>
{/* Action buttons */}
<div className="flex items-center justify-center gap-3 mb-6">
<button
onClick={() => setShowAgents(!showAgents)}
className="inline-flex items-center gap-2 text-sm font-medium px-4 py-2 rounded-lg border border-border/60 text-muted-foreground hover:text-foreground hover:border-primary/30 hover:bg-primary/[0.03] transition-all"
>
<Hexagon className="w-4 h-4 text-primary/60" />
<span>Try a sample agent</span>
<ArrowRight className={`w-3.5 h-3.5 transition-transform duration-200 ${showAgents ? "rotate-90" : ""}`} />
</button>
<button
onClick={() => navigate("/my-agents")}
className="inline-flex items-center gap-2 text-sm font-medium px-4 py-2 rounded-lg border border-border/60 text-muted-foreground hover:text-foreground hover:border-primary/30 hover:bg-primary/[0.03] transition-all"
>
<Bot className="w-4 h-4 text-primary/60" />
<span>My Agents</span>
</button>
</div>
{/* Prompt hint pills */}
<div className="flex flex-wrap justify-center gap-2 mb-6">
{promptHints.map((hint) => (
<button
key={hint}
onClick={() => handlePromptHint(hint)}
className="text-xs text-muted-foreground hover:text-foreground border border-border/50 hover:border-primary/30 rounded-full px-3.5 py-1.5 transition-all hover:bg-primary/[0.03]"
>
{hint}
</button>
))}
</div>
{/* Agent cards — revealed on toggle */}
{showAgents && (
<div className="animate-in fade-in slide-in-from-bottom-2 duration-300">
{loading && (
<div className="text-center py-8 text-sm text-muted-foreground">Loading agents...</div>
)}
{error && (
<div className="text-center py-8 text-sm text-destructive">{error}</div>
)}
{!loading && !error && agents.length === 0 && (
<div className="text-center py-8 text-sm text-muted-foreground">No sample agents found.</div>
)}
{!loading && !error && agents.length > 0 && (
<div className="grid grid-cols-3 gap-3">
{agents.map((agent) => {
const slug = agentSlug(agent.path);
const Icon = AGENT_ICONS[slug] || Hexagon;
const color = AGENT_COLORS[slug] || "hsl(45,95%,58%)";
return (
<button
key={agent.path}
onClick={() => handleSelect(agent.path)}
className="text-left rounded-xl border border-border/60 p-4 transition-all duration-200 hover:border-primary/30 hover:bg-primary/[0.03] group relative overflow-hidden h-full flex flex-col"
>
<div className="flex flex-col flex-1">
<div className="flex items-center gap-3 mb-2.5">
<div
className="w-9 h-9 rounded-lg flex items-center justify-center flex-shrink-0"
style={{
backgroundColor: `${color}15`,
border: `1.5px solid ${color}30`,
}}
>
<Icon className="w-4 h-4" style={{ color }} />
</div>
<h3 className="text-sm font-semibold text-foreground group-hover:text-primary transition-colors">
{agent.name}
</h3>
</div>
<p className="text-xs text-muted-foreground leading-relaxed mb-3 line-clamp-2">
{agent.description}
</p>
<div className="flex gap-1.5 flex-wrap mt-auto">
{agent.tags.length > 0 ? (
agent.tags.map((tag) => (
<span
key={tag}
className="text-[10px] font-medium px-2 py-0.5 rounded-full bg-muted/60 text-muted-foreground"
>
{tag}
</span>
))
) : (
<>
{agent.node_count > 0 && (
<span className="text-[10px] font-medium px-2 py-0.5 rounded-full bg-muted/60 text-muted-foreground">
{agent.node_count} nodes
</span>
)}
{agent.tool_count > 0 && (
<span className="text-[10px] font-medium px-2 py-0.5 rounded-full bg-muted/60 text-muted-foreground">
{agent.tool_count} tools
</span>
)}
</>
)}
</div>
</div>
</button>
);
})}
</div>
)}
</div>
)}
</div>
</div>
</div>
);
}
+128
View File
@@ -0,0 +1,128 @@
import { useState, useEffect } from "react";
import { useNavigate } from "react-router-dom";
import { Bot, Activity, Moon, Plus } from "lucide-react";
import TopBar from "@/components/TopBar";
import { agentsApi } from "@/api/agents";
import type { DiscoverEntry } from "@/api/types";
function timeAgo(iso: string): string {
const diff = Date.now() - new Date(iso).getTime();
const seconds = Math.floor(diff / 1000);
if (seconds < 60) return "Just now";
const minutes = Math.floor(seconds / 60);
if (minutes < 60) return `${minutes} min ago`;
const hours = Math.floor(minutes / 60);
if (hours < 24) return `${hours} hour${hours !== 1 ? "s" : ""} ago`;
const days = Math.floor(hours / 24);
return `${days} day${days !== 1 ? "s" : ""} ago`;
}
export default function MyAgents() {
const navigate = useNavigate();
const [agents, setAgents] = useState<DiscoverEntry[]>([]);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
useEffect(() => {
agentsApi
.discover()
.then((result) => {
setAgents(result["Your Agents"] || []);
})
.catch((err) => {
setError(err.message || "Failed to load agents");
})
.finally(() => {
setLoading(false);
});
}, []);
const activeCount = agents.filter((a) => a.is_loaded).length;
const idleCount = agents.length - activeCount;
return (
<div className="min-h-screen bg-background flex flex-col">
<TopBar />
{/* Content */}
<div className="flex-1 p-6 md:p-10 max-w-5xl mx-auto w-full">
<div className="flex items-center justify-between mb-8">
<div>
<h1 className="text-xl font-semibold text-foreground">My Agents</h1>
<p className="text-sm text-muted-foreground mt-1">
{activeCount} active · {idleCount} idle
</p>
</div>
<button
onClick={() => navigate("/workspace?agent=new-agent")}
className="flex items-center gap-2 px-4 py-2 rounded-lg bg-primary text-primary-foreground text-sm font-medium hover:bg-primary/90 transition-colors"
>
<Plus className="w-4 h-4" />
New Agent
</button>
</div>
{loading && (
<div className="text-center py-16 text-sm text-muted-foreground">Loading agents...</div>
)}
{error && (
<div className="text-center py-16 text-sm text-destructive">{error}</div>
)}
{!loading && !error && agents.length === 0 && (
<div className="text-center py-16 text-sm text-muted-foreground">No agents found in exports/</div>
)}
{!loading && !error && agents.length > 0 && (
<div className="grid grid-cols-1 sm:grid-cols-2 lg:grid-cols-3 gap-4">
{agents.map((agent) => (
<button
key={agent.path}
onClick={() => navigate(`/workspace?agent=${encodeURIComponent(agent.path)}`)}
className="group text-left rounded-xl border border-border/60 bg-card/50 p-5 hover:border-primary/40 hover:bg-card transition-all duration-200"
>
<div className="flex items-start justify-between mb-3">
<div className="p-2 rounded-lg bg-muted/60">
<Bot className="w-4 h-4 text-muted-foreground group-hover:text-primary transition-colors" />
</div>
<div className="flex items-center gap-1.5">
{agent.is_loaded ? (
<>
<span className="relative flex h-2 w-2">
<span className="animate-ping absolute inline-flex h-full w-full rounded-full bg-primary opacity-50" />
<span className="relative inline-flex rounded-full h-2 w-2 bg-primary" />
</span>
<span className="text-xs font-medium text-primary">Active</span>
</>
) : (
<>
<Moon className="w-3 h-3 text-muted-foreground" />
<span className="text-xs text-muted-foreground">Idle</span>
</>
)}
</div>
</div>
<h3 className="text-sm font-semibold text-foreground mb-1 group-hover:text-primary transition-colors">
{agent.name}
</h3>
<p className="text-xs text-muted-foreground leading-relaxed mb-4 line-clamp-2">
{agent.description}
</p>
<div className="flex items-center justify-between text-xs text-muted-foreground">
<div className="flex items-center gap-1">
<Activity className="w-3 h-3" />
<span>
{agent.session_count} session{agent.session_count !== 1 ? "s" : ""}
</span>
</div>
<span>{agent.last_active ? timeAgo(agent.last_active) : "Never run"}</span>
</div>
</button>
))}
</div>
)}
</div>
</div>
);
}
File diff suppressed because it is too large Load Diff
+1
View File
@@ -0,0 +1 @@
/// <reference types="vite/client" />
+25
View File
@@ -0,0 +1,25 @@
{
"compilerOptions": {
"target": "ES2020",
"useDefineForClassFields": true,
"lib": ["ES2020", "DOM", "DOM.Iterable"],
"module": "ESNext",
"skipLibCheck": true,
"moduleResolution": "bundler",
"allowImportingTsExtensions": true,
"isolatedModules": true,
"moduleDetection": "force",
"noEmit": true,
"jsx": "react-jsx",
"strict": true,
"noUnusedLocals": true,
"noUnusedParameters": true,
"noFallthroughCasesInSwitch": true,
"baseUrl": ".",
"paths": {
"@/*": ["./src/*"]
}
},
"include": ["src"],
"references": [{ "path": "./tsconfig.node.json" }]
}
+21
View File
@@ -0,0 +1,21 @@
{
"compilerOptions": {
"target": "ES2022",
"lib": ["ES2023"],
"module": "ESNext",
"skipLibCheck": true,
"moduleResolution": "bundler",
"allowImportingTsExtensions": true,
"isolatedModules": true,
"moduleDetection": "force",
"composite": true,
"emitDeclarationOnly": true,
"declaration": true,
"strict": true,
"noUnusedLocals": true,
"noUnusedParameters": true,
"noFallthroughCasesInSwitch": true,
"types": ["node"]
},
"include": ["vite.config.ts"]
}
+21
View File
@@ -0,0 +1,21 @@
import { defineConfig } from "vite";
import react from "@vitejs/plugin-react";
import tailwindcss from "@tailwindcss/vite";
import path from "path";
export default defineConfig({
plugins: [react(), tailwindcss()],
resolve: {
alias: {
"@": path.resolve(__dirname, "./src"),
},
},
server: {
proxy: {
"/api": {
target: "http://localhost:8787",
changeOrigin: true,
},
},
},
});
+1
View File
@@ -21,6 +21,7 @@ dependencies = [
[project.optional-dependencies]
tui = ["textual>=0.75.0"]
webhook = ["aiohttp>=3.9.0"]
server = ["aiohttp>=3.9.0"]
[project.scripts]
hive = "framework.cli:main"
@@ -84,11 +84,13 @@ def run(
sys.exit(1)
context: dict[str, Any] = {
"competitors_input": json.dumps({
"competitors": competitors_data,
"focus_areas": [a.strip() for a in focus_areas.split(",")],
"report_frequency": frequency,
})
"competitors_input": json.dumps(
{
"competitors": competitors_data,
"focus_areas": [a.strip() for a in focus_areas.split(",")],
"report_frequency": frequency,
}
)
}
result = asyncio.run(default_agent.run(context))
@@ -1,7 +1,14 @@
"""Agent graph construction for Competitive Intelligence Agent."""
from typing import Any, TYPE_CHECKING
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint, NodeSpec
from framework.graph import (
EdgeSpec,
EdgeCondition,
Goal,
SuccessCriterion,
Constraint,
NodeSpec,
)
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.runtime.event_bus import EventBus
@@ -296,7 +303,9 @@ class CompetitiveIntelAgent:
session_state=session_state,
)
async def run(self, context: dict[str, Any], session_state: dict[str, Any] | None = None) -> ExecutionResult:
async def run(
self, context: dict[str, Any], session_state: dict[str, Any] | None = None
) -> ExecutionResult:
"""
Run the agent (convenience method for single execution).
@@ -371,5 +380,6 @@ class CompetitiveIntelAgent:
"warnings": warnings,
}
# Create default instance
default_agent: CompetitiveIntelAgent = CompetitiveIntelAgent()
@@ -5,9 +5,11 @@ from framework.config import RuntimeConfig
default_config: RuntimeConfig = RuntimeConfig()
@dataclass
class AgentMetadata:
"""Metadata for the Competitive Intelligence Agent."""
name: str = "Competitive Intelligence Agent"
version: str = "1.0.0"
description: str = (
@@ -21,4 +23,5 @@ class AgentMetadata:
"and I'll research them across websites, news, and GitHub to produce a detailed digest."
)
metadata: AgentMetadata = AgentMetadata()
@@ -10,7 +10,12 @@ intake_node: NodeSpec = NodeSpec(
node_type="event_loop",
client_facing=True,
input_keys=["competitors_input"],
output_keys=["competitors", "focus_areas", "report_frequency", "has_github_competitors"],
output_keys=[
"competitors",
"focus_areas",
"report_frequency",
"has_github_competitors",
],
system_prompt="""\
You are a competitive intelligence intake specialist. Your job is to gather the
information needed to run a competitive analysis.
@@ -86,7 +86,9 @@ def _get_access_token(account: str = "") -> str:
credentials = CredentialStoreAdapter.default()
if account:
token = credentials.get_by_alias("google", account)
# Strip provider prefix if LLM passes "google/alias" format
clean_account = account.removeprefix("google/")
token = credentials.get_by_alias("google", clean_account)
else:
token = credentials.get("google")
if token:
+4 -18884
View File
File diff suppressed because it is too large Load Diff
+4 -1
View File
@@ -9,7 +9,10 @@
},
"license": "Apache-2.0",
"scripts": {
"test:duplicates": "bun test scripts/auto-close-duplicates"
"test:duplicates": "bun test scripts/auto-close-duplicates",
"frontend:dev": "cd core/frontend && npm run dev",
"frontend:build": "cd core/frontend && npm run build",
"frontend:preview": "cd core/frontend && npm run preview"
},
"devDependencies": {
"@types/node": "^20.10.0",
+71
View File
@@ -0,0 +1,71 @@
# Plan: Set Up React Frontend for Hive
## Context
The `feat/open-hive` branch has a complete backend HTTP API (aiohttp on port 8787) with CRUD, execution control, sessions, SSE streaming, and more. The server already has SPA static-file serving built in — it looks for `frontend/dist/index.html` and serves it with a catch-all fallback. **No frontend exists yet.** The user has a Lovable.dev design they'll paste pages from later, so the scaffold must be Lovable-compatible (React 18, Vite, Tailwind, shadcn/ui, React Router).
The goal: create a deployable frontend shell with a typed API client layer, so the user can immediately start dropping in Lovable pages.
## Key Decisions
| Decision | Choice | Why |
|----------|--------|-----|
| Location | `core/frontend/` | Keeps frontend co-located with the Python framework inside `core/`. Requires a small tweak to app.py to add `core/frontend/dist` as a lookup candidate. |
| Build tool | **Vite** | SPA output, Lovable uses Vite, CRA deprecated, Next.js is overkill for SPA |
| Package manager | **npm** | Root `package.json` declares `npm@10.2.0` — stay consistent |
| Styling | **Tailwind CSS v4 + shadcn/ui** | Lovable generates these; shadcn copies source into project |
| Routing | **React Router** | Lovable uses it; SPA client-side routing matches backend catch-all |
| Dev proxy | Vite `server.proxy``:8787` | Avoids CORS issues, SSE EventSource works through proxy |
## Files to Create
```
core/frontend/
├── package.json
├── vite.config.ts
├── tsconfig.json
├── tsconfig.node.json
├── index.html
├── components.json # shadcn config (via npx shadcn@latest init)
├── src/
│ ├── main.tsx # React entry point
│ ├── App.tsx # Router shell
│ ├── index.css # Tailwind imports
│ ├── vite-env.d.ts # Vite type declarations
│ ├── lib/
│ │ └── utils.ts # shadcn cn() utility
│ ├── api/
│ │ ├── client.ts # Base fetch wrapper (/api prefix)
│ │ ├── types.ts # All TS types matching backend responses
│ │ ├── agents.ts # Agent CRUD endpoints
│ │ ├── execution.ts # Trigger, chat, inject, stop, resume
│ │ ├── sessions.ts # Sessions & checkpoints
│ │ ├── graphs.ts # Graph/node inspection
│ │ └── logs.ts # Log retrieval
│ ├── hooks/
│ │ └── use-sse.ts # SSE EventSource hook
│ └── pages/
│ └── index.tsx # Placeholder landing page
```
## Files to Modify
- `core/framework/server/app.py` — add `core/frontend/dist` as a static-file lookup candidate
- `package.json` — add `frontend:dev` and `frontend:build` convenience scripts
- `Makefile` — add `frontend-dev` and `frontend-build` targets
## Lovable Compatibility
When pasting Lovable pages later:
1. **Imports like `@/components/ui/button`** work via the `@` alias
2. **Run `npx shadcn@latest add <component>`** for each UI component a page needs
3. **Add routes** to `App.tsx` — Lovable pages export default React components
4. **If pages use `@tanstack/react-query`**, install it: `npm install @tanstack/react-query`
5. **Tailwind classes** work out of the box
## Verification
1. `cd core/frontend && npm run build` succeeds and produces `core/frontend/dist/index.html`
2. Start backend: `cd core && uv run python -m framework.runner.cli serve` — logs "Serving frontend from ..."
3. Open `http://localhost:8787` — placeholder page renders
4. Dev mode: `cd core/frontend && npm run dev` on `:5173`, API calls proxy to `:8787`
+213
View File
@@ -884,6 +884,218 @@ def discover_mcp_tools(server_config_path: str = "") -> str:
return json.dumps(result, indent=2, default=str)
# ── Meta-agent: Agent tool catalog ────────────────────────────────────────
@mcp.tool()
def list_agent_tools(server_config_path: str = "") -> str:
"""List all tools available for agent building from the hive-tools MCP server.
Returns tool names grouped by category. Use this BEFORE designing an agent
to know exactly which tools exist. Only use tools from this list in node
definitions never guess or fabricate tool names.
Args:
server_config_path: Path to mcp_servers.json. Default: tools/mcp_servers.json
(the standard hive-tools server). Can also point to an agent's config
to see what tools that specific agent has access to.
Returns:
JSON with tool names grouped by prefix (e.g. gmail_*, slack_*, etc.)
"""
# Resolve config path
if not server_config_path:
candidates = [
os.path.join(PROJECT_ROOT, "tools", "mcp_servers.json"),
os.path.join(PROJECT_ROOT, "mcp_servers.json"),
]
config_path = None
for c in candidates:
if os.path.isfile(c):
config_path = c
break
if not config_path:
return json.dumps({"error": "No mcp_servers.json found"})
else:
config_path = _resolve_path(server_config_path)
if not os.path.isfile(config_path):
return json.dumps({"error": f"Config not found: {server_config_path}"})
try:
with open(config_path, encoding="utf-8") as f:
servers_config = json.load(f)
except (json.JSONDecodeError, OSError) as e:
return json.dumps({"error": f"Failed to read config: {e}"})
try:
from framework.runner.mcp_client import MCPClient, MCPServerConfig
except ImportError:
return json.dumps({"error": "Cannot import MCPClient"})
all_tools: list[dict] = []
errors = []
config_dir = os.path.dirname(config_path)
for server_name, server_conf in servers_config.items():
cwd = server_conf.get("cwd", "")
if cwd and not os.path.isabs(cwd):
cwd = os.path.abspath(os.path.join(config_dir, cwd))
try:
config = MCPServerConfig(
name=server_name,
transport=server_conf.get("transport", "stdio"),
command=server_conf.get("command"),
args=server_conf.get("args", []),
env=server_conf.get("env", {}),
cwd=cwd or None,
url=server_conf.get("url"),
headers=server_conf.get("headers", {}),
)
client = MCPClient(config)
client.connect()
for tool in client.list_tools():
all_tools.append({"name": tool.name, "description": tool.description})
client.disconnect()
except Exception as e:
errors.append({"server": server_name, "error": str(e)})
# Group by prefix (e.g., gmail_, slack_, stripe_)
groups: dict[str, list[str]] = {}
for t in sorted(all_tools, key=lambda x: x["name"]):
parts = t["name"].split("_", 1)
prefix = parts[0] if len(parts) > 1 else "general"
groups.setdefault(prefix, []).append(t["name"])
result: dict = {
"total": len(all_tools),
"tools_by_category": groups,
"all_tool_names": sorted(t["name"] for t in all_tools),
}
if errors:
result["errors"] = errors
return json.dumps(result, indent=2)
# ── Meta-agent: Agent tool validation ─────────────────────────────────────
@mcp.tool()
def validate_agent_tools(agent_path: str) -> str:
"""Validate that all tools declared in an agent's nodes exist in its MCP servers.
Connects to the agent's configured MCP servers, discovers available tools,
then checks every node's declared tools against what actually exists.
Use this after building an agent to catch hallucinated or misspelled tool names.
Args:
agent_path: Path to agent directory (e.g. "exports/my_agent")
Returns:
JSON with validation result: pass/fail, missing tools per node, available tools
"""
resolved = _resolve_path(agent_path)
if not os.path.isdir(resolved):
return json.dumps({"error": f"Agent directory not found: {agent_path}"})
# --- Discover available tools from agent's MCP servers ---
mcp_config_path = os.path.join(resolved, "mcp_servers.json")
if not os.path.isfile(mcp_config_path):
return json.dumps({"error": f"No mcp_servers.json found in {agent_path}"})
try:
from framework.runner.mcp_client import MCPClient, MCPServerConfig
except ImportError:
return json.dumps({"error": "Cannot import MCPClient"})
available_tools: set[str] = set()
discovery_errors = []
config_dir = os.path.dirname(mcp_config_path)
try:
with open(mcp_config_path, encoding="utf-8") as f:
servers_config = json.load(f)
except (json.JSONDecodeError, OSError) as e:
return json.dumps({"error": f"Failed to read mcp_servers.json: {e}"})
for server_name, server_conf in servers_config.items():
cwd = server_conf.get("cwd", "")
if cwd and not os.path.isabs(cwd):
cwd = os.path.abspath(os.path.join(config_dir, cwd))
try:
config = MCPServerConfig(
name=server_name,
transport=server_conf.get("transport", "stdio"),
command=server_conf.get("command"),
args=server_conf.get("args", []),
env=server_conf.get("env", {}),
cwd=cwd or None,
url=server_conf.get("url"),
headers=server_conf.get("headers", {}),
)
client = MCPClient(config)
client.connect()
for tool in client.list_tools():
available_tools.add(tool.name)
client.disconnect()
except Exception as e:
discovery_errors.append({"server": server_name, "error": str(e)})
# --- Load agent nodes and extract declared tools ---
agent_py = os.path.join(resolved, "agent.py")
if not os.path.isfile(agent_py):
return json.dumps({"error": f"No agent.py found in {agent_path}"})
import importlib
import importlib.util
import sys
package_name = os.path.basename(resolved)
parent_dir = os.path.dirname(os.path.abspath(resolved))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
try:
agent_module = importlib.import_module(package_name)
except Exception as e:
return json.dumps({"error": f"Failed to import agent: {e}"})
nodes = getattr(agent_module, "nodes", None)
if not nodes:
return json.dumps({"error": "Agent module has no 'nodes' attribute"})
# --- Validate declared vs available ---
missing_by_node: dict[str, list[str]] = {}
for node in nodes:
node_tools = getattr(node, "tools", None) or []
missing = [t for t in node_tools if t not in available_tools]
if missing:
node_name = getattr(node, "name", None) or getattr(node, "id", "unknown")
node_id = getattr(node, "id", "unknown")
missing_by_node[f"{node_name} (id={node_id})"] = sorted(missing)
result: dict = {
"valid": len(missing_by_node) == 0,
"agent": agent_path,
"available_tool_count": len(available_tools),
}
if missing_by_node:
result["missing_tools"] = missing_by_node
result["message"] = (
f"FAIL: {sum(len(v) for v in missing_by_node.values())} tool(s) declared "
f"in nodes do not exist. Run discover_mcp_tools() to see available tools "
f"and fix the node definitions."
)
else:
result["message"] = "PASS: All declared tools exist in the agent's MCP servers."
if discovery_errors:
result["discovery_errors"] = discovery_errors
return json.dumps(result, indent=2)
# ── Meta-agent: Agent inventory ───────────────────────────────────────────
@@ -905,6 +1117,7 @@ def list_agents() -> str:
scan_dirs = [
(os.path.join(PROJECT_ROOT, "core", "framework", "agents"), "framework"),
(os.path.join(PROJECT_ROOT, "exports"), "user"),
(os.path.join(PROJECT_ROOT, "examples", "templates"), "example"),
]
for scan_dir, source in scan_dirs:
Generated
+5 -1
View File
@@ -808,6 +808,9 @@ dependencies = [
]
[package.optional-dependencies]
server = [
{ name = "aiohttp" },
]
tui = [
{ name = "textual" },
]
@@ -823,6 +826,7 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "aiohttp", marker = "extra == 'server'", specifier = ">=3.9.0" },
{ name = "aiohttp", marker = "extra == 'webhook'", specifier = ">=3.9.0" },
{ name = "anthropic", specifier = ">=0.40.0" },
{ name = "fastmcp", specifier = ">=2.0.0" },
@@ -837,7 +841,7 @@ requires-dist = [
{ name = "textual", marker = "extra == 'tui'", specifier = ">=0.75.0" },
{ name = "tools", editable = "tools" },
]
provides-extras = ["tui", "webhook"]
provides-extras = ["tui", "webhook", "server"]
[package.metadata.requires-dev]
dev = [