Merge branch 'feature/browser-use-fix' into feature/hive-experimental-comp-pipeline

This commit is contained in:
Timothy
2026-04-09 08:53:37 -07:00
12 changed files with 389 additions and 164 deletions
+36 -31
View File
@@ -117,9 +117,7 @@ _STRIP_RE = re.compile(
)
_INTERNAL_OPEN_RE = re.compile(
r"<(?:" + "|".join(_INTERNAL_TAGS) + r")>"
)
_INTERNAL_OPEN_RE = re.compile(r"<(?:" + "|".join(_INTERNAL_TAGS) + r")>")
# Matches a trailing `<` that could be the start of an internal tag.
# We build a pattern that matches `<` followed by any prefix of any
# internal tag name (e.g. `<so`, `<contex`, `<think`).
@@ -128,25 +126,42 @@ for _tag in _INTERNAL_TAGS:
for _i in range(1, len(_tag) + 1):
_PARTIAL_PREFIXES.add(_tag[:_i])
_PARTIAL_OPEN_RE = re.compile(
r"<(?:" + "|".join(re.escape(p) for p in sorted(_PARTIAL_PREFIXES, key=len, reverse=True)) + r")$"
r"<(?:"
+ "|".join(re.escape(p) for p in sorted(_PARTIAL_PREFIXES, key=len, reverse=True))
+ r")$"
)
_GENERIC_TAG_RE = re.compile(r"</?[a-zA-Z_][\w-]*\s*/?>")
_GENERIC_TAG_OR_PARTIAL_RE = re.compile(r"<[a-zA-Z_]|</[a-zA-Z_]|<$")
def _strip_internal_tags_from_snapshot(snapshot: str) -> str:
"""Remove all internal tag blocks from the full accumulated text.
Also truncates at any unclosed or partially-opened internal tag
so partial tags never leak to the frontend during streaming.
Also truncates at any unclosed or partially-opened tag so partial
tags never leak to the frontend during streaming.
"""
cleaned = _STRIP_RE.sub("", snapshot)
# Truncate at any fully-opened but unclosed internal tag
m = _INTERNAL_OPEN_RE.search(cleaned)
if m:
cleaned = cleaned[:m.start()]
cleaned = cleaned[: m.start()]
# Truncate at any partial opening tag at the end (e.g. `<social` or `<co`)
m2 = _PARTIAL_OPEN_RE.search(cleaned)
if m2:
cleaned = cleaned[:m2.start()]
cleaned = cleaned[: m2.start()]
# Generic pass: strip any remaining XML-like tags the LLM hallucinated
# (e.g. <professional>, <staging>, </neutral>). These are never
# intentional markup — just remove them outright.
cleaned = _GENERIC_TAG_RE.sub("", cleaned)
# Truncate at any remaining `<` that looks like it could be a tag
# start (followed by a letter) or a bare `<` at end of string.
# During streaming this suppresses partial tags until they resolve.
m3 = _GENERIC_TAG_OR_PARTIAL_RE.search(cleaned)
if m3:
cleaned = cleaned[: m3.start()]
return cleaned
@@ -548,9 +563,7 @@ class AgentLoop(NodeProtocol):
_consecutive_empty_turns: int = 0
# 6. Main loop
logger.debug(
"[AgentLoop.execute] Entering main loop, start_iteration=%d", start_iteration
)
logger.debug("[AgentLoop.execute] Entering main loop, start_iteration=%d", start_iteration)
for iteration in range(start_iteration, self._config.max_iterations):
iter_start = time.time()
logger.debug("[AgentLoop.execute] iteration=%d starting", iteration)
@@ -584,9 +597,7 @@ class AgentLoop(NodeProtocol):
)
# 6b. Drain injection queue
logger.debug(
"[AgentLoop.execute] iteration=%d: draining injection queue...", iteration
)
logger.debug("[AgentLoop.execute] iteration=%d: draining injection queue...", iteration)
drained_injections = await self._drain_injection_queue(conversation, ctx)
logger.debug(
"[AgentLoop.execute] iteration=%d: drained %d injections",
@@ -682,7 +693,9 @@ class AgentLoop(NodeProtocol):
_new_prompt = stamp_prompt_datetime(ctx.dynamic_prompt_provider())
else:
from framework.orchestrator.prompting import build_system_prompt_for_node_context
from framework.orchestrator.prompting import (
build_system_prompt_for_node_context,
)
_new_prompt = build_system_prompt_for_node_context(ctx)
if _new_prompt != conversation.system_prompt:
@@ -755,8 +768,7 @@ class AgentLoop(NodeProtocol):
ctx, conversation, tools, iteration, accumulator
)
logger.debug(
"[AgentLoop.execute] iteration=%d:"
" _run_single_turn completed successfully",
"[AgentLoop.execute] iteration=%d: _run_single_turn completed successfully",
iteration,
)
_turn_ms = int((time.monotonic() - _turn_t0) * 1000)
@@ -830,8 +842,7 @@ class AgentLoop(NodeProtocol):
except Exception as e:
logger.debug(
"[AgentLoop.execute] iteration=%d:"
" Exception in _run_single_turn: %s (%s)",
"[AgentLoop.execute] iteration=%d: Exception in _run_single_turn: %s (%s)",
iteration,
type(e).__name__,
str(e)[:200],
@@ -1006,7 +1017,6 @@ class AgentLoop(NodeProtocol):
and not outputs_set
and not user_input_requested
and not queen_input_requested
)
if truly_empty and accumulator is not None:
missing = self._get_missing_output_keys(
@@ -1265,7 +1275,6 @@ class AgentLoop(NodeProtocol):
_worker_no_tool_turn = (
not real_tool_results
and not outputs_set
and not queen_input_requested
and not user_input_requested
)
@@ -1771,7 +1780,7 @@ class AgentLoop(NodeProtocol):
missing = self._get_missing_output_keys(
accumulator, ctx.node_spec.output_keys, ctx.node_spec.nullable_output_keys
)
if missing and self._judge is not None :
if missing and self._judge is not None:
hint = (
f"Task incomplete. Required outputs not yet produced: {missing}. "
f"Follow your system prompt instructions to complete the work."
@@ -2216,11 +2225,9 @@ class AgentLoop(NodeProtocol):
# Strip internal reasoning tags from the full
# snapshot, then diff against what we already
# emitted to get the new visible delta.
_new_clean = _strip_internal_tags_from_snapshot(
event.snapshot
)
_new_clean = _strip_internal_tags_from_snapshot(event.snapshot)
if len(_new_clean) > len(_clean_snapshot):
_delta = _new_clean[len(_clean_snapshot):]
_delta = _new_clean[len(_clean_snapshot) :]
_clean_snapshot = _new_clean
await self._publish_text_delta(
stream_id,
@@ -2347,14 +2354,13 @@ class AgentLoop(NodeProtocol):
# execution. The LLM will see it on the next inner turn.
if not self._injection_queue.empty():
while not self._injection_queue.empty():
_inj_content, _inj_client, _inj_images = (
self._injection_queue.get_nowait()
)
_inj_content, _inj_client, _inj_images = self._injection_queue.get_nowait()
if _inj_client:
await conversation.add_user_message(_inj_content)
logger.info(
"[%s] Priority-injected user message mid-turn (%d chars)",
node_id, len(_inj_content),
node_id,
len(_inj_content),
)
else:
await conversation.add_user_message(_inj_content)
@@ -3536,4 +3542,3 @@ class AgentLoop(NodeProtocol):
# -------------------------------------------------------------------
# Subagent Execution
# -------------------------------------------------------------------
+3 -9
View File
@@ -961,9 +961,6 @@ def load_agent_config(data: str | dict) -> tuple[GraphSpec, Goal]:
elif nc.tools.policy == "none":
tools_list = []
tool_policy = "none"
elif nc.tools.policy == "all":
tools_list = []
tool_policy = "all"
else:
# Inherit agent-level tool config
if config.tools.policy == "explicit" and config.tools.allowed:
@@ -1037,9 +1034,7 @@ def load_agent_config(data: str | dict) -> tuple[GraphSpec, Goal]:
"max_tokens": config.max_tokens,
"loop_config": dict(config.loop_config),
"conversation_mode": config.conversation_mode,
"identity_prompt": _resolve_template_vars(
config.identity_prompt, tvars
) or "",
"identity_prompt": _resolve_template_vars(config.identity_prompt, tvars) or "",
}
graph = GraphSpec(**graph_kwargs)
@@ -1267,6 +1262,7 @@ class AgentLoader:
os.environ["HIVE_STORAGE_PATH"] = str(self._storage_path)
# MCP tools are loaded by McpRegistryStage in the pipeline during AgentHost.start()
@staticmethod
def _import_agent_module(agent_path: Path):
"""Import an agent package from its directory path.
@@ -2120,9 +2116,7 @@ class AgentLoader:
warnings.append(warning_msg)
except ImportError:
# aden_tools not installed - fall back to direct check
has_llm_nodes = any(
node.node_type == "event_loop" for node in self.graph.nodes
)
has_llm_nodes = any(node.node_type == "event_loop" for node in self.graph.nodes)
if has_llm_nodes:
api_key_env = self._get_api_key_env_var(self.model)
if api_key_env and not os.environ.get(api_key_env):
+45 -12
View File
@@ -10,6 +10,7 @@ This module centralizes:
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any
@@ -18,6 +19,24 @@ from framework.orchestrator.goal import Goal
from framework.orchestrator.node import DataBuffer, NodeContext, NodeProtocol, NodeSpec
from framework.tracker.decision_tracker import DecisionTracker
logger = logging.getLogger(__name__)
# Tool names that are ALWAYS available to every node, regardless of
# the node's explicit tool policy. These are framework essentials that
# agents need unconditionally.
_ALWAYS_AVAILABLE_TOOLS: frozenset[str] = frozenset(
{
"read_file",
"write_file",
"edit_file",
"list_directory",
"search_files",
"hashline_edit",
"set_output",
"escalate",
}
)
@dataclass
class GraphContext:
@@ -128,28 +147,38 @@ def _resolve_available_tools(
"""Select tools available to the current node.
Respects ``node_spec.tool_access_policy``:
- ``"all"`` -- all tools from the registry (no filtering).
- ``"explicit"`` -- only tools whose name appears in ``node_spec.tools``.
If the list is empty, **no tools** are given (default-deny).
- ``"none"`` -- no tools at all.
- ``"explicit"`` -- only tools whose name appears in ``node_spec.tools``
PLUS framework-default tools (read_file, set_output, etc.).
If the list is empty, only defaults are given.
- ``"none"`` -- only framework-default tools (read_file, set_output, etc.).
Framework-default tools (``_ALWAYS_AVAILABLE_TOOLS``) are always included
regardless of policy agents need file I/O and output/escalate to function.
"""
if override_tools is not None:
return list(override_tools)
# Merge override with always-available, dedup by name
names = {t.name for t in override_tools}
extra = [t for t in tools if t.name in _ALWAYS_AVAILABLE_TOOLS and t.name not in names]
return list(override_tools) + extra
policy = getattr(node_spec, "tool_access_policy", "explicit")
# Always include framework-default tools
always_tools = [t for t in tools if t.name in _ALWAYS_AVAILABLE_TOOLS]
if policy == "none":
return []
return always_tools
if policy == "all":
return list(tools)
# "explicit" (default): only tools named in node_spec.tools.
# "explicit" (default): declared tools + framework defaults
if not node_spec.tools:
return []
return always_tools
return [tool for tool in tools if tool.name in node_spec.tools]
declared = set(node_spec.tools)
declared_tools = [
t for t in tools if t.name in declared and t.name not in _ALWAYS_AVAILABLE_TOOLS
]
return always_tools + declared_tools
def _derive_input_data(buffer: DataBuffer, input_keys: list[str]) -> dict[str, Any]:
@@ -283,6 +312,10 @@ def build_node_context_from_graph_context(
gc = graph_context
resolved_override_tools = override_tools
if resolved_override_tools is None and gc.is_continuous and gc.cumulative_tools:
if node_spec.tool_access_policy == "explicit" and node_spec.tools:
declared = set(node_spec.tools) | _ALWAYS_AVAILABLE_TOOLS
resolved_override_tools = [t for t in gc.cumulative_tools if t.name in declared]
else:
resolved_override_tools = list(gc.cumulative_tools)
resolved_inherited_conversation = inherited_conversation
+15 -6
View File
@@ -42,6 +42,7 @@ class McpRegistryStage(PipelineStage):
self._tool_registry = ToolRegistry()
from framework.loader.mcp_registry import MCPRegistry
from framework.orchestrator.files import FILES_MCP_SERVER_NAME
registry = MCPRegistry()
mcp_loaded = False
@@ -52,9 +53,7 @@ class McpRegistryStage(PipelineStage):
if names:
configs = registry.resolve_for_agent(include=names)
if configs:
self._tool_registry.load_registry_servers(
[asdict(c) for c in configs]
)
self._tool_registry.load_registry_servers([asdict(c) for c in configs])
mcp_loaded = True
logger.info(
"[pipeline] McpRegistryStage: loaded %d servers: %s",
@@ -73,14 +72,24 @@ class McpRegistryStage(PipelineStage):
if not mcp_loaded:
configs = registry.resolve_for_agent(profile="all")
if configs:
self._tool_registry.load_registry_servers(
[asdict(c) for c in configs]
)
self._tool_registry.load_registry_servers([asdict(c) for c in configs])
logger.info(
"[pipeline] McpRegistryStage: loaded %d servers (fallback)",
len(configs),
)
# 4. Ensure files-tools is always available — agents need file I/O
# for reading skills, writing data, etc. regardless of config.
loaded_names = set(self._tool_registry._mcp_server_tools.keys())
if FILES_MCP_SERVER_NAME not in loaded_names:
files_configs = registry.resolve_for_agent(include=[FILES_MCP_SERVER_NAME])
if files_configs:
self._tool_registry.load_registry_servers([asdict(c) for c in files_configs])
logger.info(
"[pipeline] McpRegistryStage: injected %s",
FILES_MCP_SERVER_NAME,
)
total = len(self._tool_registry.get_tools())
logger.info("[pipeline] McpRegistryStage: %d tools available", total)
+17 -3
View File
@@ -12,7 +12,7 @@ These are resolved at load time from ``AgentConfig.variables``.
from __future__ import annotations
from pydantic import BaseModel, Field
from pydantic import BaseModel, ConfigDict, Field, model_validator
class ToolAccessConfig(BaseModel):
@@ -20,24 +20,38 @@ class ToolAccessConfig(BaseModel):
Controls which tools a node/agent has access to.
* ``all`` -- every tool from the registry.
* ``explicit`` -- only tools listed in ``allowed`` (default; empty = zero tools).
* ``none`` -- no tools at all.
``all`` is not permitted agents must declare every tool they use.
"""
model_config = ConfigDict(populate_by_name=True)
policy: str = Field(
default="explicit",
description="One of: 'all', 'explicit', 'none'.",
description="One of: 'explicit', 'none'. 'all' is not allowed.",
)
allowed: list[str] = Field(
default_factory=list,
description="Tool names when policy='explicit'.",
alias="tools",
)
denied: list[str] = Field(
default_factory=list,
description="Tool names to deny (applied after allowed).",
)
@model_validator(mode="after")
def _reject_policy_all(self) -> ToolAccessConfig:
if self.policy == "all":
raise ValueError(
"tool policy 'all' is not allowed — "
"list every tool explicitly in 'allowed' instead. "
"This ensures agents only see the tools they need."
)
return self
class NodeConfig(BaseModel):
"""Declarative node definition."""
+24 -10
View File
@@ -1927,10 +1927,7 @@ def register_queen_lifecycle_tools(
# Create agent folder early so flowchart and agent_path are available
# throughout the entire BUILDING phase.
_agent_name = (
agent_name
or phase_state.draft_graph.get("agent_name", "").strip()
)
_agent_name = agent_name or phase_state.draft_graph.get("agent_name", "").strip()
if _agent_name:
from framework.config import COLONIES_DIR
@@ -1967,9 +1964,7 @@ def register_queen_lifecycle_tools(
# Transition to BUILDING phase
await phase_state.switch_to_building(source="tool")
_update_meta_json(
session_manager, manager_session_id, {"phase": "building"}
)
_update_meta_json(session_manager, manager_session_id, {"phase": "building"})
phase_state.build_confirmed = False
# No injection here -- the return message tells the queen what to do.
@@ -3044,8 +3039,7 @@ def register_queen_lifecycle_tools(
if parent_dir not in _sys.path:
_sys.path.insert(0, parent_dir)
stale = [
n for n in _sys.modules
if n == pkg_name or n.startswith(f"{pkg_name}.")
n for n in _sys.modules if n == pkg_name or n.startswith(f"{pkg_name}.")
]
for n in stale:
del _sys.modules[n]
@@ -3206,7 +3200,9 @@ def register_queen_lifecycle_tools(
"properties": {
"agent_path": {
"type": "string",
"description": ("Path to the agent directory (e.g. '~/.hive/colonies/my_agent')"),
"description": (
"Path to the agent directory (e.g. '~/.hive/colonies/my_agent')"
),
},
},
"required": ["agent_path"],
@@ -3231,6 +3227,24 @@ def register_queen_lifecycle_tools(
if runtime is None:
return json.dumps({"error": "No worker loaded in this session."})
# Guard: refuse to start while an execution is already running.
# Calling again would cancel the active one via the
# "Restarted with new execution" path in ExecutionStream.execute(),
# which is almost never what the queen intends.
for graph_id in runtime.list_graphs():
reg = runtime.get_graph_registration(graph_id)
if reg is None:
continue
for _ep_id, stream in reg.streams.items():
if stream.active_execution_ids:
return json.dumps(
{
"error": "Worker is already running.",
"active_execution_ids": list(stream.active_execution_ids),
"hint": "Wait for the worker to finish (WORKER_TERMINAL event) or call stop_agent() before starting a new run.",
}
)
try:
# Pre-flight: validate credentials and resync MCP servers.
loop = asyncio.get_running_loop()
+102
View File
@@ -312,5 +312,107 @@ def register_graph_tools(registry: ToolRegistry, runtime: AgentHost) -> int:
registry.register("get_user_presence", _presence_tool, lambda inputs: get_user_presence())
tools_registered += 1
# --- validate_agent_package -----------------------------------------------
def validate_agent_package(agent_name: str) -> str:
"""Validate an agent's configuration without loading it.
Parses agent.json, checks the schema (including tool policies,
MCP server refs, node wiring), and returns actionable errors.
Call this **before** load_agent to catch problems early.
"""
from framework.server.app import validate_agent_path as _vap
try:
resolved = _vap(agent_name)
except ValueError as e:
return json.dumps({"valid": False, "errors": [str(e)]})
agent_json = resolved / "agent.json"
if not agent_json.exists():
return json.dumps(
{
"valid": False,
"errors": [f"No agent.json found at {resolved}"],
}
)
try:
raw = json.loads(agent_json.read_text(encoding="utf-8"))
except Exception as e:
return json.dumps({"valid": False, "errors": [f"Invalid JSON: {e}"]})
errors: list[str] = []
# 1. Schema validation via AgentConfig
try:
from framework.schemas.agent_config import AgentConfig
AgentConfig(**raw)
except Exception as e:
errors.append(f"agent.json schema error: {e}")
# 2. Tool policy checks on each node
for node in raw.get("nodes", []):
nid = node.get("id", "?")
t = node.get("tools", {})
if isinstance(t, dict):
policy = t.get("policy", "explicit")
if policy == "all":
errors.append(
f"Node '{nid}' uses policy 'all'"
f"list every tool explicitly in 'tools' instead."
)
allowed = t.get("tools") or t.get("allowed") or []
if policy == "explicit" and not allowed:
errors.append(
f"Node '{nid}' has policy 'explicit' but empty tool list — "
f"the node will have ZERO tools. Add tool names."
)
# 3. MCP server refs
mcp = raw.get("mcp_servers", [])
if not mcp:
errors.append(
"No mcp_servers specified — the agent will have no tools. "
'Add at least: [{"name": "hive-tools"}, {"name": "gcu-tools"}]'
)
for ref in mcp:
if isinstance(ref, str):
errors.append(
f"mcp_servers entry '{ref}' is a string, not an object. "
f'Use {{"name": "{ref}"}} instead.'
)
if errors:
return json.dumps({"valid": False, "errors": errors})
return json.dumps({"valid": True, "errors": []})
_validate_tool = Tool(
name="validate_agent_package",
description=(
"Validate an agent's agent.json configuration without loading it. "
"Checks schema, tool policies, MCP server refs, and node wiring. "
"Call BEFORE load_agent to catch problems early. "
"Returns {valid: bool, errors: [string]}."
),
parameters={
"type": "object",
"properties": {
"agent_name": {
"type": "string",
"description": "Name/path of the agent package to validate.",
},
},
"required": ["agent_name"],
},
)
registry.register(
"validate_agent_package",
_validate_tool,
lambda inputs: validate_agent_package(inputs["agent_name"]),
)
tools_registered += 1
logger.info("Registered %d graph lifecycle tools", tools_registered)
return tools_registered
+17 -16
View File
@@ -138,19 +138,22 @@ export function ColonyProvider({ children }: { children: ReactNode }) {
// Map agent_path → session_id + queen_id from live sessions
const liveSessionMap = new Map<string, { sessionId: string; queenId: string | null }>();
for (const s of sessionsResult.sessions) {
liveSessionMap.set(s.agent_path.replace(/\/$/, ""), {
const slug = agentSlug(s.agent_path);
if (slug) {
liveSessionMap.set(slug, {
sessionId: s.session_id,
queenId: s.queen_id ?? null,
});
}
}
// Map agent_path → queen_id from history (most recent session wins)
const historyQueenMap = new Map<string, string>();
for (const s of historyResult.sessions) {
if (s.agent_path && s.queen_id) {
const normalized = s.agent_path.replace(/\/$/, "");
if (!historyQueenMap.has(normalized)) {
historyQueenMap.set(normalized, s.queen_id);
const slug = agentSlug(s.agent_path);
if (slug && !historyQueenMap.has(slug)) {
historyQueenMap.set(slug, s.queen_id);
}
}
}
@@ -160,11 +163,10 @@ export function ColonyProvider({ children }: { children: ReactNode }) {
const newColonies: Colony[] = allAgents.map((agent) => {
const slug = agentSlug(agent.path);
const colonyId = slugToColonyId(slug);
const normalizedPath = agent.path.replace(/\/$/, "");
const liveInfo = liveSessionMap.get(normalizedPath);
const liveInfo = liveSessionMap.get(slug);
const sessionId = liveInfo?.sessionId ?? null;
const isRunning = sessionId !== null;
const queenProfileId = liveInfo?.queenId ?? historyQueenMap.get(normalizedPath) ?? null;
const queenProfileId = liveInfo?.queenId ?? historyQueenMap.get(slug) ?? null;
return {
id: colonyId,
@@ -210,17 +212,16 @@ export function ColonyProvider({ children }: { children: ReactNode }) {
const fetchStatus = useCallback(async () => {
try {
const { sessions } = await sessionsApi.list();
const livePathSet = new Set(
sessions.map((s) => s.agent_path.replace(/\/$/, "")),
);
const liveSlugMap = new Map<string, string>();
for (const s of sessions) {
const slug = agentSlug(s.agent_path);
if (slug) liveSlugMap.set(slug, s.session_id);
}
setColonies((prev) =>
prev.map((c) => {
const normalizedPath = c.agentPath.replace(/\/$/, "");
const isRunning = livePathSet.has(normalizedPath);
const sessionId =
sessions.find((s) => s.agent_path.replace(/\/$/, "") === normalizedPath)
?.session_id ?? null;
return { ...c, status: isRunning ? "running" : "idle", sessionId };
const slug = agentSlug(c.agentPath);
const sessionId = liveSlugMap.get(slug) ?? null;
return { ...c, status: sessionId ? "running" : "idle", sessionId };
}),
);
const liveQueenIds = new Set(
+46 -7
View File
@@ -1,5 +1,5 @@
import { useState, useCallback, useRef, useEffect, useMemo } from "react";
import { useParams } from "react-router-dom";
import { useParams, useLocation } from "react-router-dom";
import { Loader2, WifiOff, KeyRound, FolderOpen, X } from "lucide-react";
import type { GraphNode, NodeStatus } from "@/components/graph-types";
import DraftGraph from "@/components/DraftGraph";
@@ -190,12 +190,20 @@ function defaultAgentState(): AgentState {
export default function ColonyChat() {
const { colonyId } = useParams<{ colonyId: string }>();
const location = useLocation();
const { colonies, markVisited } = useColony();
const { setActions } = useHeaderActions();
// Route state from home page (new chat flow)
const routeState = (location.state || {}) as {
prompt?: string;
agentPath?: string;
};
const isNewChat = colonyId?.startsWith("new-") ?? false;
// Find the colony matching this route
const colony = colonies.find((c) => c.id === colonyId);
const agentPath = colony?.agentPath ?? "";
const agentPath = colony?.agentPath ?? routeState.agentPath ?? "";
const slug = agentPath ? agentSlug(agentPath) : "";
const queenInfo = getQueenForAgent(slug);
const colonyName = colony?.name ?? colonyId ?? "Colony";
@@ -364,6 +372,13 @@ export default function ColonyChat() {
async (sessionId: string, knownGraphId?: string) => {
try {
let graphId = knownGraphId;
if (!graphId) {
// Try session detail first (graph_id is always set when worker is loaded)
try {
const detail = await sessionsApi.get(sessionId);
graphId = detail.graph_id ?? undefined;
} catch { /* fall through */ }
}
if (!graphId) {
const { graphs } = await sessionsApi.graphs(sessionId);
if (!graphs.length) return;
@@ -383,7 +398,31 @@ export default function ColonyChat() {
// ── Session loading ────────────────────────────────────────────────────
const loadSession = useCallback(async () => {
if (!agentPath || loadingRef.current) return;
if (loadingRef.current) return;
// For new chats without an agent, create a queen-only session
if (!agentPath && isNewChat) {
loadingRef.current = true;
updateState({ loading: true, error: null, ready: false, sessionId: null });
try {
const session = await sessionsApi.create(
undefined, undefined, undefined,
routeState.prompt || undefined,
);
updateState({
sessionId: session.session_id,
displayName: "New Chat",
queenPhase: "planning",
loading: false,
ready: true,
});
} catch (err: unknown) {
updateState({ loading: false, error: String(err) });
} finally {
loadingRef.current = false;
}
return;
}
if (!agentPath) return;
loadingRef.current = true;
updateState({ loading: true, error: null, ready: false, sessionId: null });
@@ -505,11 +544,11 @@ export default function ColonyChat() {
} finally {
loadingRef.current = false;
}
}, [agentPath, updateState]);
}, [agentPath, isNewChat, routeState.prompt, updateState]);
// Load session on mount or when agent path changes
useEffect(() => {
if (agentPath) {
if (agentPath || isNewChat) {
// Reset state for new colony
setMessages([]);
setGraphNodes([]);
@@ -521,7 +560,7 @@ export default function ColonyChat() {
loadingRef.current = false;
loadSession();
}
}, [agentPath]); // eslint-disable-line react-hooks/exhaustive-deps
}, [agentPath, isNewChat]); // eslint-disable-line react-hooks/exhaustive-deps
// Fetch graph when session becomes ready
useEffect(() => {
@@ -1280,7 +1319,7 @@ export default function ColonyChat() {
// ── Render ─────────────────────────────────────────────────────────────
if (!colony && !agentState.loading) {
if (!colony && !isNewChat && !agentState.loading) {
return (
<div className="flex-1 flex items-center justify-center">
<p className="text-sm text-muted-foreground">Colony not found: {colonyId}</p>
+23 -56
View File
@@ -1,4 +1,4 @@
"""CSV Tool - Read and manipulate CSV files."""
"""CSV Tool - Read and manipulate CSV files using absolute paths."""
import csv
import os
@@ -6,7 +6,7 @@ import re
from fastmcp import FastMCP
from ..file_system_toolkits.security import get_sandboxed_path
from ..file_system_toolkits.security import resolve_safe_path
def register_tools(mcp: FastMCP) -> None:
@@ -15,7 +15,6 @@ def register_tools(mcp: FastMCP) -> None:
@mcp.tool()
def csv_read(
path: str,
agent_id: str,
limit: int | None = None,
offset: int = 0,
) -> dict:
@@ -23,8 +22,7 @@ def register_tools(mcp: FastMCP) -> None:
Read a CSV file and return its contents.
Args:
path: Path to the CSV file (relative to agent sandbox)
agent_id: Agent identifier
path: Absolute path to the CSV file
limit: Maximum number of rows to return (None = all rows)
offset: Number of rows to skip from the beginning
@@ -34,7 +32,7 @@ def register_tools(mcp: FastMCP) -> None:
if offset < 0 or (limit is not None and limit < 0):
return {"error": "offset and limit must be non-negative"}
try:
secure_path = get_sandboxed_path(path, agent_id)
secure_path = resolve_safe_path(path)
if not os.path.exists(secure_path):
return {"error": f"File not found: {path}"}
@@ -42,7 +40,6 @@ def register_tools(mcp: FastMCP) -> None:
if not path.lower().endswith(".csv"):
return {"error": "File must have .csv extension"}
# Read CSV
with open(secure_path, encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
@@ -51,7 +48,6 @@ def register_tools(mcp: FastMCP) -> None:
columns = list(reader.fieldnames)
# Apply offset and limit
rows = []
for i, row in enumerate(reader):
if i < offset:
@@ -60,7 +56,6 @@ def register_tools(mcp: FastMCP) -> None:
break
rows.append(row)
# Get total row count (re-read for accurate count)
with open(secure_path, encoding="utf-8", newline="") as f:
reader = csv.reader(f)
total_rows = sum(1 for row in reader if any(row)) - 1
@@ -79,6 +74,8 @@ def register_tools(mcp: FastMCP) -> None:
except csv.Error as e:
return {"error": f"CSV parsing error: {str(e)}"}
except ValueError as e:
return {"error": str(e)}
except UnicodeDecodeError:
return {"error": "File encoding error: unable to decode as UTF-8"}
except Exception as e:
@@ -87,7 +84,6 @@ def register_tools(mcp: FastMCP) -> None:
@mcp.tool()
def csv_write(
path: str,
agent_id: str,
columns: list[str],
rows: list[dict],
) -> dict:
@@ -95,8 +91,7 @@ def register_tools(mcp: FastMCP) -> None:
Write data to a new CSV file.
Args:
path: Path to the CSV file (relative to agent sandbox)
agent_id: Agent identifier
path: Absolute path to the CSV file
columns: List of column names for the header
rows: List of dictionaries, each representing a row
@@ -104,7 +99,7 @@ def register_tools(mcp: FastMCP) -> None:
dict with success status and metadata
"""
try:
secure_path = get_sandboxed_path(path, agent_id)
secure_path = resolve_safe_path(path)
if not path.lower().endswith(".csv"):
return {"error": "File must have .csv extension"}
@@ -112,17 +107,14 @@ def register_tools(mcp: FastMCP) -> None:
if not columns:
return {"error": "columns cannot be empty"}
# Create parent directories if needed
parent_dir = os.path.dirname(secure_path)
if parent_dir:
os.makedirs(parent_dir, exist_ok=True)
# Write CSV
with open(secure_path, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=columns)
writer.writeheader()
for row in rows:
# Only write columns that exist in fieldnames
filtered_row = {k: v for k, v in row.items() if k in columns}
writer.writerow(filtered_row)
@@ -134,28 +126,28 @@ def register_tools(mcp: FastMCP) -> None:
"rows_written": len(rows),
}
except ValueError as e:
return {"error": str(e)}
except Exception as e:
return {"error": f"Failed to write CSV: {str(e)}"}
@mcp.tool()
def csv_append(
path: str,
agent_id: str,
rows: list[dict],
) -> dict:
"""
Append rows to an existing CSV file.
Args:
path: Path to the CSV file (relative to agent sandbox)
agent_id: Agent identifier
path: Absolute path to the CSV file
rows: List of dictionaries to append, keys should match existing columns
Returns:
dict with success status and metadata
"""
try:
secure_path = get_sandboxed_path(path, agent_id)
secure_path = resolve_safe_path(path)
if not os.path.exists(secure_path):
return {"error": f"File not found: {path}. Use csv_write to create a new file."}
@@ -166,25 +158,21 @@ def register_tools(mcp: FastMCP) -> None:
if not rows:
return {"error": "rows cannot be empty"}
# Read existing columns
with open(secure_path, encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
if reader.fieldnames is None:
return {"error": "CSV file is empty or has no headers"}
columns = list(reader.fieldnames)
# Append rows
with open(secure_path, "a", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=columns)
for row in rows:
# Only write columns that exist in fieldnames
filtered_row = {k: v for k, v in row.items() if k in columns}
writer.writerow(filtered_row)
# Get new total row count
with open(secure_path, encoding="utf-8", newline="") as f:
reader = csv.reader(f)
total_rows = sum(1 for row in reader if any(row)) - 1 # Subtract header
total_rows = sum(1 for row in reader if any(row)) - 1
return {
"success": True,
@@ -195,6 +183,8 @@ def register_tools(mcp: FastMCP) -> None:
except csv.Error as e:
return {"error": f"CSV parsing error: {str(e)}"}
except ValueError as e:
return {"error": str(e)}
except UnicodeDecodeError:
return {"error": "File encoding error: unable to decode as UTF-8"}
except Exception as e:
@@ -203,20 +193,18 @@ def register_tools(mcp: FastMCP) -> None:
@mcp.tool()
def csv_info(
path: str,
agent_id: str,
) -> dict:
"""
Get metadata about a CSV file without reading all data.
Args:
path: Path to the CSV file (relative to agent sandbox)
agent_id: Agent identifier
path: Absolute path to the CSV file
Returns:
dict with file metadata (columns, row count, file size)
"""
try:
secure_path = get_sandboxed_path(path, agent_id)
secure_path = resolve_safe_path(path)
if not os.path.exists(secure_path):
return {"error": f"File not found: {path}"}
@@ -224,10 +212,8 @@ def register_tools(mcp: FastMCP) -> None:
if not path.lower().endswith(".csv"):
return {"error": "File must have .csv extension"}
# Get file size
file_size = os.path.getsize(secure_path)
# Read headers and count rows
with open(secure_path, encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
@@ -235,8 +221,6 @@ def register_tools(mcp: FastMCP) -> None:
return {"error": "CSV file is empty or has no headers"}
columns = list(reader.fieldnames)
# Count rows
total_rows = sum(1 for _ in reader)
return {
@@ -250,6 +234,8 @@ def register_tools(mcp: FastMCP) -> None:
except csv.Error as e:
return {"error": f"CSV parsing error: {str(e)}"}
except ValueError as e:
return {"error": str(e)}
except UnicodeDecodeError:
return {"error": "File encoding error: unable to decode as UTF-8"}
except Exception as e:
@@ -258,7 +244,6 @@ def register_tools(mcp: FastMCP) -> None:
@mcp.tool()
def csv_sql(
path: str,
agent_id: str,
query: str,
) -> dict:
"""
@@ -267,27 +252,12 @@ def register_tools(mcp: FastMCP) -> None:
The CSV file is loaded as a table named 'data'. Use standard SQL syntax.
Args:
path: Path to the CSV file (relative to agent sandbox)
agent_id: Agent identifier
path: Absolute path to the CSV file
query: SQL query to execute. The CSV is available as table 'data'.
Example: "SELECT * FROM data WHERE price > 100 ORDER BY name LIMIT 10"
Returns:
dict with query results, columns, and row count
Examples:
# Filter rows
query="SELECT * FROM data WHERE status = 'pending'"
# Aggregate data
query="SELECT category, COUNT(*) as count, "
"AVG(price) as avg_price FROM data GROUP BY category"
# Sort and limit
query="SELECT name, price FROM data ORDER BY price DESC LIMIT 5"
# Search text (case-insensitive)
query="SELECT * FROM data WHERE LOWER(name) LIKE '%phone%'"
"""
try:
import duckdb
@@ -300,7 +270,7 @@ def register_tools(mcp: FastMCP) -> None:
}
try:
secure_path = get_sandboxed_path(path, agent_id)
secure_path = resolve_safe_path(path)
if not os.path.exists(secure_path):
return {"error": f"File not found: {path}"}
@@ -311,13 +281,10 @@ def register_tools(mcp: FastMCP) -> None:
if not query or not query.strip():
return {"error": "query cannot be empty"}
# Security: allow SELECT/WITH only
query_upper = query.lstrip().upper()
if not (query_upper.startswith("SELECT") or query_upper.startswith("WITH")):
return {"error": "Only SELECT queries are allowed for security reasons"}
# Disallowed keywords for security (word-boundary match to avoid
# false positives on column names like created_at, updated_at, etc.)
_WRITE_PATTERN = re.compile(
r"\b(INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|TRUNCATE|EXEC|EXECUTE)\b",
re.IGNORECASE,
@@ -326,7 +293,6 @@ def register_tools(mcp: FastMCP) -> None:
if match:
return {"error": f"'{match.group().upper()}' is not allowed in queries"}
# Block obvious multi-statement / injection attempts
q_lower = query.lower()
for token in [";", "--", "/*", "*/"]:
if token in q_lower:
@@ -334,7 +300,6 @@ def register_tools(mcp: FastMCP) -> None:
con = duckdb.connect(":memory:")
try:
# SAFE: parameter binding (no string interpolation)
con.execute(
"CREATE TABLE data AS SELECT * FROM read_csv_auto(?)",
[str(secure_path)],
@@ -359,6 +324,8 @@ def register_tools(mcp: FastMCP) -> None:
finally:
con.close()
except ValueError as e:
return {"error": str(e)}
except Exception as e:
error_msg = str(e)
if "Catalog Error" in error_msg:
@@ -1,6 +1,40 @@
import os
from pathlib import Path
# Base directory for agent sandboxes
# Directories that tools are allowed to read/write within.
_ALLOWED_ROOTS: tuple[str, ...] = (
os.path.expanduser("~/.hive"),
os.path.expanduser("~/aden/hive/exports"),
)
def resolve_safe_path(path: str) -> str:
"""Resolve *path* to an absolute path and verify it's within allowed roots.
Accepts both absolute paths and paths relative to ``~/.hive``.
Raises ``ValueError`` when the resolved path falls outside all
allowed roots.
"""
path = path.strip()
if not path:
raise ValueError("Path cannot be empty.")
# Expand ~ and resolve to absolute
resolved = str(Path(os.path.expanduser(path)).resolve())
for root in _ALLOWED_ROOTS:
real_root = os.path.realpath(root)
if resolved.startswith(real_root + os.sep) or resolved == real_root:
return resolved
raise ValueError(
f"Access denied: '{path}' is outside allowed directories. "
f"Use absolute paths under ~/.hive/ or exports/."
)
# Keep the old API for backward compatibility with non-CSV tools.
# TODO: migrate remaining callers and remove.
AGENT_SANDBOXES_DIR = os.path.expanduser("~/.hive/workdir/workspaces/default")
@@ -9,28 +43,20 @@ def get_sandboxed_path(path: str, agent_id: str) -> str:
if not agent_id:
raise ValueError("agent_id is required")
# Ensure agent directory exists
agent_dir = os.path.realpath(os.path.join(AGENT_SANDBOXES_DIR, agent_id, "current"))
os.makedirs(agent_dir, exist_ok=True)
# Normalize whitespace to prevent bypass via leading spaces/tabs
path = path.strip()
# Treat both OS-absolute paths AND Unix-style leading slashes as absolute-style
if os.path.isabs(path) or path.startswith(("/", "\\")):
# Strip exactly one leading separator to make path relative to agent_dir,
# preserving any subsequent separators (e.g. UNC paths like //server/share)
rel_path = path[1:] if path and path[0] in ("/", "\\") else path
final_path = os.path.realpath(os.path.join(agent_dir, rel_path))
else:
final_path = os.path.realpath(os.path.join(agent_dir, path))
# Verify path is within agent_dir
try:
common_prefix = os.path.commonpath([final_path, agent_dir])
except ValueError as err:
# commonpath raises ValueError when paths are on different drives (Windows)
# or when mixing absolute and relative paths
raise ValueError(f"Access denied: Path '{path}' is outside the agent sandbox.") from err
if common_prefix != agent_dir:
+22 -1
View File
@@ -510,7 +510,28 @@ class BeelineBridge:
await asyncio.sleep(0.1)
if not node_id:
return {"ok": False, "error": f"Element not found: {selector}"}
# Check if the element might be inside a Shadow DOM container
shadow_hint = ""
try:
shadow_check = await self.evaluate(tab_id, """
(function() {
var hosts = document.querySelectorAll('[id]');
for (var h of hosts) {
if (h.shadowRoot) return h.id;
}
return null;
})()
""")
shadow_host = (shadow_check or {}).get("result")
if shadow_host:
shadow_hint = (
f" The page has Shadow DOM (host: #{shadow_host}). "
f"Use browser_shadow_query('#{shadow_host} >>> {selector}') "
f"to pierce shadow roots, or browser_evaluate with manual JS traversal."
)
except Exception:
pass
return {"ok": False, "error": f"Element not found: {selector}{shadow_hint}"}
# Scroll into view FIRST to ensure element is rendered
try: