Compare commits

...

16 Commits

Author SHA1 Message Date
Timothy 135477e516 feat: agent idling detection 2026-03-01 20:14:35 -08:00
Timothy 28dce63682 fix: conversation ordering 2026-03-01 18:56:41 -08:00
Timothy 313ac952e0 Merge branch 'feature/tool-pill-v2' into feature/queen-worker-comm 2026-03-01 18:33:54 -08:00
Timothy 0633d5130b fix: command line refresh frontend build 2026-03-01 18:33:43 -08:00
Timothy 995e487b49 Merge branch 'feature/tool-pill-v2' into feature/queen-worker-comm 2026-03-01 18:26:49 -08:00
Timothy 64b58b57e0 fix: remove reddish color 2026-03-01 18:26:27 -08:00
Timothy c6465908df feat: colorful tool pills 2026-03-01 18:11:57 -08:00
Timothy ca96bcc09f fix: add pending question content to worker status 2026-03-01 18:11:15 -08:00
Timothy 65ee628fae fix: tool pill turn id 2026-03-01 17:58:31 -08:00
Timothy 02043614e5 feat: consolidate worker status report, fix conversation order 2026-03-01 17:56:27 -08:00
Timothy 212b9bf9d4 fix: load agent 2026-03-01 16:26:55 -08:00
Timothy 6070c30a88 Merge branch 'feat/open-hive' into feature/queen-worker-comm 2026-03-01 16:06:43 -08:00
Timothy 8a653e51bc feat: separate worker and queen input 2026-03-01 15:50:28 -08:00
Timothy Zhang 677bee6fe5 Merge branch 'feat/open-hive' of https://github.com/adenhq/hive into feat/open-hive 2026-02-27 19:55:54 -08:00
Timothy Zhang de27bfe76f fix: windows competibility 2026-02-27 19:55:48 -08:00
bryan 9c3a11d7bb chore: remove load agent 2026-02-27 19:14:35 -08:00
22 changed files with 921 additions and 398 deletions
+1 -1
View File
@@ -761,7 +761,7 @@ class GraphBuilder:
path = self.storage_path / f"{session_id}.json"
if not path.exists():
raise FileNotFoundError(f"Session not found: {session_id}")
return BuildSession.model_validate_json(path.read_text())
return BuildSession.model_validate_json(path.read_text(encoding="utf-8"))
@classmethod
def list_sessions(cls, storage_path: Path | str | None = None) -> list[str]:
+1 -1
View File
@@ -164,7 +164,7 @@ def _read_credential_key_file() -> str | None:
"""Read the credential key from ``~/.hive/secrets/credential_key``."""
try:
if CREDENTIAL_KEY_PATH.is_file():
value = CREDENTIAL_KEY_PATH.read_text().strip()
value = CREDENTIAL_KEY_PATH.read_text(encoding="utf-8").strip()
if value:
return value
except Exception:
+32 -7
View File
@@ -103,7 +103,12 @@ FEEDBACK: (reason if RETRY, empty if ACCEPT)"""
def _extract_recent_context(conversation: NodeConversation, max_messages: int = 10) -> str:
"""Extract recent conversation messages for evaluation."""
"""Extract recent conversation messages for evaluation.
Includes tool-call summaries from assistant messages so the judge
can see what tools were invoked (especially set_output values) even
when the assistant message body is empty.
"""
messages = conversation.messages
recent = messages[-max_messages:] if len(messages) > max_messages else messages
@@ -112,8 +117,24 @@ def _extract_recent_context(conversation: NodeConversation, max_messages: int =
role = msg.role.upper()
content = msg.content or ""
# Truncate long tool results
if msg.role == "tool" and len(content) > 200:
content = content[:200] + "..."
if msg.role == "tool" and len(content) > 500:
content = content[:500] + "..."
# For assistant messages with empty content but tool_calls,
# summarise the tool calls so the judge knows what happened.
if msg.role == "assistant" and not content.strip():
tool_calls = getattr(msg, "tool_calls", None)
if tool_calls:
tc_parts = []
for tc in tool_calls:
fn = tc.get("function", {}) if isinstance(tc, dict) else {}
name = fn.get("name", "")
args = fn.get("arguments", "")
if name == "set_output":
# Show the value so the judge can evaluate content quality
tc_parts.append(f" called {name}({args[:1000]})")
else:
tc_parts.append(f" called {name}(...)")
content = "Tool calls:\n" + "\n".join(tc_parts)
if content.strip():
parts.append(f"[{role}]: {content.strip()}")
@@ -125,6 +146,10 @@ def _format_outputs(accumulator_state: dict[str, Any]) -> str:
Lists and dicts get structural formatting so the judge can assess
quantity and structure, not just a truncated stringification.
String values are given a generous limit (2000 chars) so the judge
can verify substantive content (e.g. a research brief with key
questions, scope boundaries, and deliverables).
"""
if not accumulator_state:
return "(none)"
@@ -144,12 +169,12 @@ def _format_outputs(accumulator_state: dict[str, Any]) -> str:
val_str += f"\n ... and {len(value) - 8} more"
elif isinstance(value, dict):
val_str = str(value)
if len(val_str) > 400:
val_str = val_str[:400] + "..."
if len(val_str) > 2000:
val_str = val_str[:2000] + "..."
else:
val_str = str(value)
if len(val_str) > 300:
val_str = val_str[:300] + "..."
if len(val_str) > 2000:
val_str = val_str[:2000] + "..."
parts.append(f" {key}: {val_str}")
return "\n".join(parts)
+3 -3
View File
@@ -1887,7 +1887,7 @@ def import_from_export(
return json.dumps({"success": False, "error": f"File not found: {agent_json_path}"})
try:
data = json.loads(path.read_text())
data = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError as e:
return json.dumps({"success": False, "error": f"Invalid JSON: {e}"})
@@ -3009,7 +3009,7 @@ def debug_test(
# Find which file contains the test
test_file = None
for py_file in tests_dir.glob("test_*.py"):
content = py_file.read_text()
content = py_file.read_text(encoding="utf-8")
if f"def {test_name}" in content or f"async def {test_name}" in content:
test_file = py_file
break
@@ -3161,7 +3161,7 @@ def list_tests(
tests = []
for test_file in sorted(tests_dir.glob("test_*.py")):
try:
content = test_file.read_text()
content = test_file.read_text(encoding="utf-8")
tree = ast.parse(content)
# Find all async function definitions that start with "test_"
+64 -2
View File
@@ -428,7 +428,7 @@ def _load_resume_state(
if not cp_path.exists():
return None
try:
cp_data = json.loads(cp_path.read_text())
cp_data = json.loads(cp_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
return {
@@ -444,7 +444,7 @@ def _load_resume_state(
if not state_path.exists():
return None
try:
state_data = json.loads(state_path.read_text())
state_data = json.loads(state_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return None
progress = state_data.get("progress", {})
@@ -1941,12 +1941,74 @@ def _open_browser(url: str) -> None:
pass # Best-effort — don't crash if browser can't open
def _build_frontend() -> bool:
"""Build the frontend if source is newer than dist. Returns True if dist exists."""
import subprocess
# Find the frontend directory relative to this file or cwd
candidates = [
Path("core/frontend"),
Path(__file__).resolve().parent.parent.parent / "frontend",
]
frontend_dir: Path | None = None
for c in candidates:
if (c / "package.json").is_file():
frontend_dir = c.resolve()
break
if frontend_dir is None:
return False
dist_dir = frontend_dir / "dist"
src_dir = frontend_dir / "src"
# Skip build if dist is up-to-date (newest src file older than dist index.html)
index_html = dist_dir / "index.html"
if index_html.exists() and src_dir.is_dir():
dist_mtime = index_html.stat().st_mtime
needs_build = False
for f in src_dir.rglob("*"):
if f.is_file() and f.stat().st_mtime > dist_mtime:
needs_build = True
break
if not needs_build:
return True
# Need to build
print("Building frontend...")
try:
# Ensure deps are installed
subprocess.run(
["npm", "install", "--no-fund", "--no-audit"],
cwd=frontend_dir,
check=True,
capture_output=True,
)
subprocess.run(
["npm", "run", "build"],
cwd=frontend_dir,
check=True,
capture_output=True,
)
print("Frontend built.")
return True
except FileNotFoundError:
print("Node.js not found — skipping frontend build.")
return dist_dir.is_dir()
except subprocess.CalledProcessError as exc:
stderr = exc.stderr.decode(errors="replace") if exc.stderr else ""
print(f"Frontend build failed: {stderr[:500]}")
return dist_dir.is_dir()
def cmd_serve(args: argparse.Namespace) -> int:
"""Start the HTTP API server."""
import logging
from aiohttp import web
_build_frontend()
from framework.server.app import create_app
logging.basicConfig(
+54 -10
View File
@@ -411,7 +411,10 @@ class AgentRuntime:
)
continue
def _make_cron_timer(entry_point_id: str, expr: str, immediate: bool):
def _make_cron_timer(
entry_point_id: str, expr: str, immediate: bool,
idle_timeout: float = 300,
):
async def _cron_loop():
from croniter import croniter
@@ -442,12 +445,21 @@ class AgentRuntime:
await asyncio.sleep(max(0, sleep_secs))
continue
# Gate: skip tick if previous execution still running
# Gate: skip tick if agent is actively working.
# If the execution is idle (no LLM/tool activity
# beyond idle_timeout) let the timer proceed —
# execute() will cancel the stale execution.
_stream = self._streams.get(entry_point_id)
if _stream and _stream.active_execution_ids:
if (
_stream
and _stream.active_execution_ids
and _stream.agent_idle_seconds < idle_timeout
):
logger.debug(
"Cron '%s': execution already in progress, skipping tick",
"Cron '%s': agent actively working (idle %.0fs < %ds), skipping tick",
entry_point_id,
_stream.agent_idle_seconds,
idle_timeout,
)
self._timer_next_fire[entry_point_id] = (
time.monotonic() + sleep_secs
@@ -517,7 +529,10 @@ class AgentRuntime:
return _cron_loop
task = asyncio.create_task(
_make_cron_timer(ep_id, cron_expr, run_immediately)()
_make_cron_timer(
ep_id, cron_expr, run_immediately,
idle_timeout=tc.get("idle_timeout_seconds", 300),
)()
)
self._timer_tasks.append(task)
logger.info(
@@ -529,7 +544,10 @@ class AgentRuntime:
elif interval and interval > 0:
# Fixed interval mode (original behavior)
def _make_timer(entry_point_id: str, mins: float, immediate: bool):
def _make_timer(
entry_point_id: str, mins: float, immediate: bool,
idle_timeout: float = 300,
):
async def _timer_loop():
interval_secs = mins * 60
_persistent_session_id: str | None = None
@@ -551,12 +569,18 @@ class AgentRuntime:
await asyncio.sleep(interval_secs)
continue
# Gate: skip tick if previous execution still running
# Gate: skip tick if agent is actively working.
_stream = self._streams.get(entry_point_id)
if _stream and _stream.active_execution_ids:
if (
_stream
and _stream.active_execution_ids
and _stream.agent_idle_seconds < idle_timeout
):
logger.debug(
"Timer '%s': execution already in progress, skipping tick",
"Timer '%s': agent actively working (idle %.0fs < %ds), skipping tick",
entry_point_id,
_stream.agent_idle_seconds,
idle_timeout,
)
self._timer_next_fire[entry_point_id] = (
time.monotonic() + interval_secs
@@ -621,7 +645,12 @@ class AgentRuntime:
return _timer_loop
task = asyncio.create_task(_make_timer(ep_id, interval, run_immediately)())
task = asyncio.create_task(
_make_timer(
ep_id, interval, run_immediately,
idle_timeout=tc.get("idle_timeout_seconds", 300),
)()
)
self._timer_tasks.append(task)
logger.info(
"Started timer for entry point '%s' every %s min%s",
@@ -1174,6 +1203,21 @@ class AgentRuntime:
return float("inf")
return time.monotonic() - self._last_user_input_time
@property
def agent_idle_seconds(self) -> float:
"""Seconds since any stream last had activity (LLM call, tool call, etc.).
Returns the *minimum* idle time across all streams with active
executions. Returns ``float('inf')`` if nothing is running.
"""
min_idle = float("inf")
for reg in self._graphs.values():
for stream in reg.streams.values():
idle = stream.agent_idle_seconds
if idle < min_idle:
min_idle = idle
return min_idle
def get_graph_registration(self, graph_id: str) -> _GraphRegistration | None:
"""Get the registration for a specific graph (or None)."""
return self._graphs.get(graph_id)
+57 -3
View File
@@ -32,6 +32,19 @@ if TYPE_CHECKING:
from framework.storage.concurrent import ConcurrentStorage
from framework.storage.session_store import SessionStore
class ExecutionAlreadyRunningError(RuntimeError):
"""Raised when attempting to start an execution on a stream that already has one running."""
def __init__(self, stream_id: str, active_ids: list[str]):
self.stream_id = stream_id
self.active_ids = active_ids
super().__init__(
f"Stream '{stream_id}' already has an active execution: {active_ids}. "
"Concurrent executions on the same stream are not allowed."
)
logger = logging.getLogger(__name__)
@@ -56,9 +69,11 @@ class GraphScopedEventBus(EventBus):
# (subscriptions, history, semaphore, etc.) to the real bus.
self._real_bus = bus
self._scope_graph_id = graph_id
self.last_activity_time: float = time.monotonic()
async def publish(self, event: "AgentEvent") -> None: # type: ignore[override]
event.graph_id = self._scope_graph_id
self.last_activity_time = time.monotonic()
await self._real_bus.publish(event)
# --- Delegate state-reading methods to the real bus ---
@@ -233,9 +248,13 @@ class ExecutionStream:
self._lock = asyncio.Lock()
# Graph-scoped event bus (stamps graph_id on published events)
self._scoped_event_bus = self._event_bus
if self._event_bus and self.graph_id:
self._scoped_event_bus = GraphScopedEventBus(self._event_bus, self.graph_id)
# Always wrap in GraphScopedEventBus so we can track last_activity_time.
if self._event_bus:
self._scoped_event_bus = GraphScopedEventBus(
self._event_bus, self.graph_id or ""
)
else:
self._scoped_event_bus = None
# State
self._running = False
@@ -265,6 +284,21 @@ class ExecutionStream:
"""Return IDs of all currently active executions."""
return list(self._active_executions.keys())
@property
def agent_idle_seconds(self) -> float:
"""Seconds since the last agent activity (LLM call, tool call, node transition).
Returns ``float('inf')`` if no event bus is attached or no events have
been published yet. When there are no active executions, also returns
``float('inf')`` (nothing to be idle *about*).
"""
if not self._active_executions:
return float("inf")
bus = self._scoped_event_bus
if isinstance(bus, GraphScopedEventBus):
return time.monotonic() - bus.last_activity_time
return float("inf")
@property
def is_awaiting_input(self) -> bool:
"""True when an active execution is blocked waiting for client input."""
@@ -404,6 +438,26 @@ class ExecutionStream:
if not self._running:
raise RuntimeError(f"ExecutionStream '{self.stream_id}' is not running")
# Only one execution may run on a stream at a time — concurrent
# executions corrupt shared session state. Cancel any running
# execution before starting the new one. The cancelled execution
# writes its state to disk before cleanup, and the new execution
# runs in the same session directory (via resume_session_id).
active = self.active_execution_ids
for eid in active:
logger.info(
"Cancelling running execution %s on stream '%s' before starting new one",
eid, self.stream_id,
)
executor = self._active_executors.get(eid)
if executor:
for node in executor.node_registry.values():
if hasattr(node, "signal_shutdown"):
node.signal_shutdown()
if hasattr(node, "cancel_current_turn"):
node.cancel_current_turn()
await self.cancel_execution(eid)
# When resuming, reuse the original session ID so the execution
# continues in the same session directory instead of creating a new one.
resume_session_id = session_state.get("resume_session_id") if session_state else None
+54 -28
View File
@@ -92,12 +92,10 @@ async def handle_inject(request: web.Request) -> web.Response:
async def handle_chat(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/chat — convenience endpoint.
"""POST /api/sessions/{session_id}/chat — send a message to the queen.
Routing priority:
1. Worker awaiting input inject into worker node
2. Queen active inject into queen conversation
3. Error no handler available
The input box is permanently connected to the queen agent.
Worker input is handled separately via /worker-input.
Body: {"message": "hello"}
"""
@@ -111,26 +109,6 @@ async def handle_chat(request: web.Request) -> web.Response:
if not message:
return web.json_response({"error": "message is required"}, status=400)
# 1. Check if worker is awaiting input → inject to worker
if session.worker_runtime:
node_id, graph_id = session.worker_runtime.find_awaiting_node()
if node_id:
delivered = await session.worker_runtime.inject_input(
node_id,
message,
graph_id=graph_id,
is_client_input=True,
)
return web.json_response(
{
"status": "injected",
"node_id": node_id,
"delivered": delivered,
}
)
# 2. Queen active → inject into queen conversation
queen_executor = session.queen_executor
if queen_executor is not None:
node = queen_executor.node_registry.get("queen")
@@ -143,8 +121,47 @@ async def handle_chat(request: web.Request) -> web.Response:
}
)
# 3. No queen or worker available
return web.json_response({"error": "No worker or queen available"}, status=503)
return web.json_response({"error": "Queen not available"}, status=503)
async def handle_worker_input(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/worker-input — send input to waiting worker node.
Auto-discovers the worker node currently awaiting input and injects the message.
Returns 404 if no worker node is awaiting input.
Body: {"message": "..."}
"""
session, err = resolve_session(request)
if err:
return err
body = await request.json()
message = body.get("message", "")
if not message:
return web.json_response({"error": "message is required"}, status=400)
if not session.worker_runtime:
return web.json_response({"error": "No worker loaded"}, status=503)
node_id, graph_id = session.worker_runtime.find_awaiting_node()
if not node_id:
return web.json_response({"error": "No worker node awaiting input"}, status=404)
delivered = await session.worker_runtime.inject_input(
node_id,
message,
graph_id=graph_id,
is_client_input=True,
)
return web.json_response(
{
"status": "injected",
"node_id": node_id,
"delivered": delivered,
}
)
async def handle_goal_progress(request: web.Request) -> web.Response:
@@ -190,7 +207,7 @@ async def handle_resume(request: web.Request) -> web.Response:
return web.json_response({"error": "Session not found"}, status=404)
try:
state = json.loads(state_path.read_text())
state = json.loads(state_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError) as e:
return web.json_response({"error": f"Failed to read session: {e}"}, status=500)
@@ -255,6 +272,14 @@ async def handle_stop(request: web.Request) -> web.Response:
if reg is None:
continue
for _ep_id, stream in reg.streams.items():
# Signal shutdown on active nodes to abort in-flight LLM streams
for executor in stream._active_executors.values():
for node in executor.node_registry.values():
if hasattr(node, "signal_shutdown"):
node.signal_shutdown()
if hasattr(node, "cancel_current_turn"):
node.cancel_current_turn()
cancelled = await stream.cancel_execution(execution_id)
if cancelled:
return web.json_response(
@@ -340,6 +365,7 @@ def register_routes(app: web.Application) -> None:
app.router.add_post("/api/sessions/{session_id}/trigger", handle_trigger)
app.router.add_post("/api/sessions/{session_id}/inject", handle_inject)
app.router.add_post("/api/sessions/{session_id}/chat", handle_chat)
app.router.add_post("/api/sessions/{session_id}/worker-input", handle_worker_input)
app.router.add_post("/api/sessions/{session_id}/pause", handle_stop)
app.router.add_post("/api/sessions/{session_id}/resume", handle_resume)
app.router.add_post("/api/sessions/{session_id}/stop", handle_stop)
+1 -1
View File
@@ -79,7 +79,7 @@ async def handle_list_nodes(request: web.Request) -> web.Response:
)
if state_path.exists():
try:
state = json.loads(state_path.read_text())
state = json.loads(state_path.read_text(encoding="utf-8"))
progress = state.get("progress", {})
visit_counts = progress.get("node_visit_counts", {})
failures = progress.get("nodes_with_failures", [])
+11 -7
View File
@@ -369,7 +369,7 @@ async def handle_list_worker_sessions(request: web.Request) -> web.Response:
state_path = d / "state.json"
if state_path.exists():
try:
state = json.loads(state_path.read_text())
state = json.loads(state_path.read_text(encoding="utf-8"))
entry["status"] = state.get("status", "unknown")
entry["started_at"] = state.get("started_at")
entry["completed_at"] = state.get("completed_at")
@@ -408,7 +408,7 @@ async def handle_get_worker_session(request: web.Request) -> web.Response:
return web.json_response({"error": "Session not found"}, status=404)
try:
state = json.loads(state_path.read_text())
state = json.loads(state_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError) as e:
return web.json_response({"error": f"Failed to read session: {e}"}, status=500)
@@ -436,7 +436,7 @@ async def handle_list_checkpoints(request: web.Request) -> web.Response:
if f.suffix != ".json":
continue
try:
data = json.loads(f.read_text())
data = json.loads(f.read_text(encoding="utf-8"))
checkpoints.append(
{
"checkpoint_id": f.stem,
@@ -546,13 +546,14 @@ async def handle_messages(request: web.Request) -> web.Response:
if part_file.suffix != ".json":
continue
try:
part = json.loads(part_file.read_text())
part = json.loads(part_file.read_text(encoding="utf-8"))
part["_node_id"] = node_dir.name
part.setdefault("created_at", part_file.stat().st_mtime)
all_messages.append(part)
except (json.JSONDecodeError, OSError):
continue
all_messages.sort(key=lambda m: m.get("seq", 0))
all_messages.sort(key=lambda m: m.get("created_at", m.get("seq", 0)))
client_only = request.query.get("client_only", "").lower() in ("true", "1")
if client_only:
@@ -600,13 +601,16 @@ async def handle_queen_messages(request: web.Request) -> web.Response:
if part_file.suffix != ".json":
continue
try:
part = json.loads(part_file.read_text())
part = json.loads(part_file.read_text(encoding="utf-8"))
part["_node_id"] = node_dir.name
# Use file mtime as created_at so frontend can order
# queen and worker messages chronologically.
part.setdefault("created_at", part_file.stat().st_mtime)
all_messages.append(part)
except (json.JSONDecodeError, OSError):
continue
all_messages.sort(key=lambda m: m.get("seq", 0))
all_messages.sort(key=lambda m: m.get("created_at", m.get("seq", 0)))
# Filter to client-facing messages only
all_messages = [
+2 -2
View File
@@ -277,13 +277,13 @@ class SessionManager:
if not state_path.exists():
continue
try:
state = json.loads(state_path.read_text())
state = json.loads(state_path.read_text(encoding="utf-8"))
if state.get("status") != "active":
continue
state["status"] = "cancelled"
state.setdefault("result", {})["error"] = "Stale session: runtime restarted"
state.setdefault("timestamps", {})["updated_at"] = datetime.now().isoformat()
state_path.write_text(json.dumps(state, indent=2))
state_path.write_text(json.dumps(state, indent=2), encoding="utf-8")
logger.info(
"Marked stale session '%s' as cancelled for agent '%s'", d.name, agent_path.name
)
+2 -2
View File
@@ -95,7 +95,7 @@ class CheckpointStore:
return None
try:
return Checkpoint.model_validate_json(checkpoint_path.read_text())
return Checkpoint.model_validate_json(checkpoint_path.read_text(encoding="utf-8"))
except Exception as e:
logger.error(f"Failed to load checkpoint {checkpoint_id}: {e}")
return None
@@ -123,7 +123,7 @@ class CheckpointStore:
return None
try:
return CheckpointIndex.model_validate_json(self.index_path.read_text())
return CheckpointIndex.model_validate_json(self.index_path.read_text(encoding="utf-8"))
except Exception as e:
logger.error(f"Failed to load checkpoint index: {e}")
return None
+2 -2
View File
@@ -114,7 +114,7 @@ class SessionStore:
if not state_path.exists():
return None
return SessionState.model_validate_json(state_path.read_text())
return SessionState.model_validate_json(state_path.read_text(encoding="utf-8"))
return await asyncio.to_thread(_read)
@@ -151,7 +151,7 @@ class SessionStore:
continue
try:
state = SessionState.model_validate_json(state_path.read_text())
state = SessionState.model_validate_json(state_path.read_text(encoding="utf-8"))
# Apply filters
if status and state.status != status:
+2 -2
View File
@@ -190,7 +190,7 @@ def cmd_test_debug(args: argparse.Namespace) -> int:
# Find which file contains the test
test_file = None
for py_file in tests_dir.glob("test_*.py"):
content = py_file.read_text()
content = py_file.read_text(encoding="utf-8")
if f"def {test_name}" in content or f"async def {test_name}" in content:
test_file = py_file
break
@@ -238,7 +238,7 @@ def _scan_test_files(tests_dir: Path) -> list[dict]:
for test_file in sorted(tests_dir.glob("test_*.py")):
try:
content = test_file.read_text()
content = test_file.read_text(encoding="utf-8")
tree = ast.parse(content)
for node in ast.walk(tree):
+304 -62
View File
@@ -158,6 +158,11 @@ def register_queen_lifecycle_tools(
# --- start_worker ---------------------------------------------------------
# How long to wait for credential validation + MCP resync before
# proceeding with trigger anyway. These are pre-flight checks that
# should not block the queen indefinitely.
_START_PREFLIGHT_TIMEOUT = 15 # seconds
async def start_worker(task: str) -> str:
"""Start the worker agent with a task description.
@@ -169,25 +174,47 @@ def register_queen_lifecycle_tools(
return json.dumps({"error": "No worker loaded in this session."})
try:
# Validate credentials before running — same deferred check as
# handle_trigger. Runs in executor because validate_agent_credentials
# makes blocking HTTP health-check calls.
# Pre-flight: validate credentials and resync MCP servers.
# Both are blocking I/O (HTTP health-checks, subprocess spawns)
# so they run in a thread-pool executor. We cap the total
# preflight time so the queen never hangs waiting.
loop = asyncio.get_running_loop()
await loop.run_in_executor(
None, lambda: validate_agent_credentials(runtime.graph.nodes)
)
# Resync MCP servers if credentials were added since the worker loaded
# (e.g. user connected an OAuth account mid-session via Aden UI).
runner = getattr(session, "runner", None)
if runner:
async def _preflight():
cred_error: CredentialError | None = None
try:
await loop.run_in_executor(
None,
lambda: runner._tool_registry.resync_mcp_servers_if_needed(),
None, lambda: validate_agent_credentials(runtime.graph.nodes)
)
except CredentialError as e:
cred_error = e
except Exception as e:
logger.warning("MCP resync failed: %s", e)
logger.warning("Credential validation failed: %s", e)
runner = getattr(session, "runner", None)
if runner:
try:
await loop.run_in_executor(
None,
lambda: runner._tool_registry.resync_mcp_servers_if_needed(),
)
except Exception as e:
logger.warning("MCP resync failed: %s", e)
# Re-raise CredentialError after MCP resync so both steps
# get a chance to run before we bail.
if cred_error is not None:
raise cred_error
try:
await asyncio.wait_for(_preflight(), timeout=_START_PREFLIGHT_TIMEOUT)
except TimeoutError:
logger.warning(
"start_worker preflight timed out after %ds — proceeding with trigger",
_START_PREFLIGHT_TIMEOUT,
)
except CredentialError:
raise # handled below
# Resume timers in case they were paused by a previous stop_worker
runtime.resume_timers()
@@ -254,30 +281,40 @@ def register_queen_lifecycle_tools(
# --- stop_worker ----------------------------------------------------------
async def stop_worker() -> str:
"""Cancel all active worker executions.
"""Cancel all active worker executions across all graphs.
Stops the worker gracefully. Returns the IDs of cancelled executions.
Stops the worker immediately. Returns the IDs of cancelled executions.
"""
runtime = _get_runtime()
if runtime is None:
return json.dumps({"error": "No worker loaded in this session."})
cancelled = []
graph_id = runtime.graph_id
# Get the primary graph's streams
reg = runtime.get_graph_registration(graph_id)
if reg is None:
return json.dumps({"error": "Worker graph not found"})
# Iterate ALL registered graphs — multiple entrypoint requests
# can spawn executions in different graphs within the same session.
for graph_id in runtime.list_graphs():
reg = runtime.get_graph_registration(graph_id)
if reg is None:
continue
for _ep_id, stream in reg.streams.items():
for exec_id in list(stream.active_execution_ids):
try:
ok = await stream.cancel_execution(exec_id)
if ok:
cancelled.append(exec_id)
except Exception as e:
logger.warning("Failed to cancel %s: %s", exec_id, e)
for _ep_id, stream in reg.streams.items():
# Signal shutdown on all active EventLoopNodes first so they
# exit cleanly and cancel their in-flight LLM streams.
for executor in stream._active_executors.values():
for node in executor.node_registry.values():
if hasattr(node, "signal_shutdown"):
node.signal_shutdown()
if hasattr(node, "cancel_current_turn"):
node.cancel_current_turn()
for exec_id in list(stream.active_execution_ids):
try:
ok = await stream.cancel_execution(exec_id)
if ok:
cancelled.append(exec_id)
except Exception as e:
logger.warning("Failed to cancel %s: %s", exec_id, e)
# Pause timers so the next tick doesn't restart execution
runtime.pause_timers()
@@ -303,10 +340,24 @@ def register_queen_lifecycle_tools(
# --- get_worker_status ----------------------------------------------------
async def get_worker_status() -> str:
"""Check if the worker is idle, running, or waiting for user input.
def _get_event_bus():
"""Get the session's event bus for querying history."""
return getattr(session, "event_bus", None)
Returns worker identity, execution state, active node, and iteration count.
async def get_worker_status(last_n: int = 20) -> str:
"""Comprehensive worker status: state, execution details, and recent activity.
Returns everything the queen needs in a single call:
- Identity and high-level state (idle / running / waiting_for_input)
- Active execution details (elapsed time, current node, iteration)
- Running tool calls (started but not yet completed)
- Recent completed tool calls (name, success/error)
- Node transitions (execution path)
- Retries, stalls, and constraint violations
- Goal progress and token consumption
Args:
last_n: Number of recent events to include per category (default 20).
"""
runtime = _get_runtime()
if runtime is None:
@@ -318,55 +369,246 @@ def register_queen_lifecycle_tools(
if reg is None:
return json.dumps({"status": "not_loaded"})
base = {
result: dict[str, Any] = {
"worker_graph_id": graph_id,
"worker_goal": getattr(goal, "name", graph_id),
}
# --- Execution state ---
active_execs = []
for ep_id, stream in reg.streams.items():
for exec_id in stream.active_execution_ids:
active_execs.append(
{
"execution_id": exec_id,
"entry_point": ep_id,
}
)
exec_info: dict[str, Any] = {
"execution_id": exec_id,
"entry_point": ep_id,
}
ctx = stream.get_context(exec_id)
if ctx:
from datetime import datetime
elapsed = (datetime.now() - ctx.started_at).total_seconds()
exec_info["elapsed_seconds"] = round(elapsed, 1)
exec_info["exec_status"] = ctx.status
active_execs.append(exec_info)
if not active_execs:
return json.dumps(
{
**base,
"status": "idle",
"message": "Worker has no active executions.",
}
result["status"] = "idle"
result["message"] = "Worker has no active executions."
else:
waiting_nodes = []
for _ep_id, stream in reg.streams.items():
waiting_nodes.extend(stream.get_waiting_nodes())
result["status"] = "waiting_for_input" if waiting_nodes else "running"
result["active_executions"] = active_execs
if waiting_nodes:
result["waiting_node_id"] = waiting_nodes[0]["node_id"]
result["agent_idle_seconds"] = round(runtime.agent_idle_seconds, 1)
# --- EventBus enrichment ---
bus = _get_event_bus()
if not bus:
return json.dumps(result)
try:
# Pending user question (from ask_user tool)
if result.get("status") == "waiting_for_input":
input_events = bus.get_history(
event_type=EventType.CLIENT_INPUT_REQUESTED, limit=1
)
if input_events:
prompt = input_events[0].data.get("prompt", "")
if prompt:
result["pending_question"] = prompt
# Current node
edge_events = bus.get_history(event_type=EventType.EDGE_TRAVERSED, limit=1)
if edge_events:
target = edge_events[0].data.get("target_node")
if target:
result["current_node"] = target
# Current iteration
iter_events = bus.get_history(event_type=EventType.NODE_LOOP_ITERATION, limit=1)
if iter_events:
result["current_iteration"] = iter_events[0].data.get("iteration")
# Running tool calls (started but not yet completed)
tool_started = bus.get_history(
event_type=EventType.TOOL_CALL_STARTED, limit=last_n * 2
)
tool_completed = bus.get_history(
event_type=EventType.TOOL_CALL_COMPLETED, limit=last_n * 2
)
completed_ids = {
evt.data.get("tool_use_id")
for evt in tool_completed
if evt.data.get("tool_use_id")
}
running = [
evt
for evt in tool_started
if evt.data.get("tool_use_id")
and evt.data.get("tool_use_id") not in completed_ids
]
if running:
result["running_tools"] = [
{
"tool": evt.data.get("tool_name"),
"node": evt.node_id,
"started_at": evt.timestamp.isoformat(),
"input_preview": str(evt.data.get("tool_input", ""))[:200],
}
for evt in running
]
# Check if the worker is waiting for user input
waiting_nodes = []
for _ep_id, stream in reg.streams.items():
waiting_nodes.extend(stream.get_waiting_nodes())
# Recent completed tool calls
if tool_completed:
result["recent_tool_calls"] = [
{
"tool": evt.data.get("tool_name"),
"error": bool(evt.data.get("is_error")),
"node": evt.node_id,
"time": evt.timestamp.isoformat(),
}
for evt in tool_completed[:last_n]
]
status = "waiting_for_input" if waiting_nodes else "running"
result = {
**base,
"status": status,
"active_executions": active_execs,
}
if waiting_nodes:
result["waiting_node_id"] = waiting_nodes[0]["node_id"]
return json.dumps(result)
# Node transitions
edges = bus.get_history(event_type=EventType.EDGE_TRAVERSED, limit=last_n)
if edges:
result["node_transitions"] = [
{
"from": evt.data.get("source_node"),
"to": evt.data.get("target_node"),
"condition": evt.data.get("edge_condition"),
"time": evt.timestamp.isoformat(),
}
for evt in edges
]
# Retries
retries = bus.get_history(event_type=EventType.NODE_RETRY, limit=last_n)
if retries:
result["retries"] = [
{
"node": evt.node_id,
"retry_count": evt.data.get("retry_count"),
"error": evt.data.get("error", "")[:200],
"time": evt.timestamp.isoformat(),
}
for evt in retries
]
# Stalls and doom loops
stalls = bus.get_history(event_type=EventType.NODE_STALLED, limit=5)
doom_loops = bus.get_history(event_type=EventType.NODE_TOOL_DOOM_LOOP, limit=5)
issues = []
for evt in stalls:
issues.append({
"type": "stall",
"node": evt.node_id,
"reason": evt.data.get("reason", "")[:200],
"time": evt.timestamp.isoformat(),
})
for evt in doom_loops:
issues.append({
"type": "tool_doom_loop",
"node": evt.node_id,
"description": evt.data.get("description", "")[:200],
"time": evt.timestamp.isoformat(),
})
if issues:
result["issues"] = issues
# Constraint violations
violations = bus.get_history(event_type=EventType.CONSTRAINT_VIOLATION, limit=5)
if violations:
result["constraint_violations"] = [
{
"constraint": evt.data.get("constraint_id"),
"description": evt.data.get("description", "")[:200],
"time": evt.timestamp.isoformat(),
}
for evt in violations
]
# Goal progress
try:
progress = await runtime.get_goal_progress()
if progress:
result["goal_progress"] = progress
except Exception:
pass
# Token summary
llm_events = bus.get_history(
event_type=EventType.LLM_TURN_COMPLETE, limit=200
)
if llm_events:
total_in = sum(
evt.data.get("input_tokens", 0) or 0 for evt in llm_events
)
total_out = sum(
evt.data.get("output_tokens", 0) or 0 for evt in llm_events
)
result["token_summary"] = {
"llm_turns": len(llm_events),
"input_tokens": total_in,
"output_tokens": total_out,
"total_tokens": total_in + total_out,
}
# Execution completions/failures
exec_completed = bus.get_history(
event_type=EventType.EXECUTION_COMPLETED, limit=5
)
exec_failed = bus.get_history(
event_type=EventType.EXECUTION_FAILED, limit=5
)
if exec_completed or exec_failed:
result["execution_outcomes"] = []
for evt in exec_completed:
result["execution_outcomes"].append({
"outcome": "completed",
"execution_id": evt.execution_id,
"time": evt.timestamp.isoformat(),
})
for evt in exec_failed:
result["execution_outcomes"].append({
"outcome": "failed",
"execution_id": evt.execution_id,
"error": evt.data.get("error", "")[:200],
"time": evt.timestamp.isoformat(),
})
except Exception:
pass # Non-critical enrichment
return json.dumps(result, default=str, ensure_ascii=False)
_status_tool = Tool(
name="get_worker_status",
description=(
"Check the worker agent's current state: idle (no execution), "
"running (actively processing), or waiting_for_input (blocked on "
"user response). Returns execution details."
"Get comprehensive worker status: state (idle/running/waiting_for_input), "
"execution details (elapsed time, current node, iteration), "
"recent tool calls, running tools, node transitions, retries, "
"stalls, constraint violations, goal progress, and token consumption. "
"One call gives the queen a complete picture."
),
parameters={"type": "object", "properties": {}},
parameters={
"type": "object",
"properties": {
"last_n": {
"type": "integer",
"description": "Number of recent events per category (default 20)",
},
},
"required": [],
},
)
registry.register(
"get_worker_status", _status_tool, lambda inputs: get_worker_status(**inputs)
)
registry.register("get_worker_status", _status_tool, lambda inputs: get_worker_status())
tools_registered += 1
# --- inject_worker_message ------------------------------------------------
+4 -4
View File
@@ -53,7 +53,7 @@ def _get_last_active(agent_name: str) -> str | None:
if not state_file.exists():
continue
try:
data = json.loads(state_file.read_text())
data = json.loads(state_file.read_text(encoding="utf-8"))
ts = data.get("timestamps", {}).get("updated_at")
if ts and (latest is None or ts > latest):
latest = ts
@@ -84,7 +84,7 @@ def _extract_agent_stats(agent_path: Path) -> tuple[int, int, list[str]]:
agent_py = agent_path / "agent.py"
if agent_py.exists():
try:
tree = ast.parse(agent_py.read_text())
tree = ast.parse(agent_py.read_text(encoding="utf-8"))
for node in ast.walk(tree):
# Find `nodes = [...]` assignment
if isinstance(node, ast.Assign):
@@ -99,7 +99,7 @@ def _extract_agent_stats(agent_path: Path) -> tuple[int, int, list[str]]:
agent_json = agent_path / "agent.json"
if agent_json.exists():
try:
data = json.loads(agent_json.read_text())
data = json.loads(agent_json.read_text(encoding="utf-8"))
json_nodes = data.get("nodes", [])
if node_count == 0:
node_count = len(json_nodes)
@@ -150,7 +150,7 @@ def discover_agents() -> dict[str, list[AgentEntry]]:
agent_json = path / "agent.json"
if agent_json.exists():
try:
data = json.loads(agent_json.read_text())
data = json.loads(agent_json.read_text(encoding="utf-8"))
meta = data.get("agent", {})
name = meta.get("name", name)
desc = meta.get("description", desc)
+3
View File
@@ -37,6 +37,9 @@ export const executionApi = {
chat: (sessionId: string, message: string) =>
api.post<ChatResult>(`/sessions/${sessionId}/chat`, { message }),
workerInput: (sessionId: string, message: string) =>
api.post<ChatResult>(`/sessions/${sessionId}/worker-input`, { message }),
stop: (sessionId: string, executionId: string) =>
api.post<StopResult>(`/sessions/${sessionId}/stop`, {
execution_id: executionId,
+2
View File
@@ -131,6 +131,8 @@ export interface Message {
is_transition_marker?: boolean;
is_client_input?: boolean;
tool_calls?: unknown[];
/** Epoch seconds from file mtime — used for cross-conversation ordering */
created_at?: number;
[key: string]: unknown;
}
+164 -58
View File
@@ -1,6 +1,5 @@
import { memo, useState, useRef, useEffect } from "react";
import { Send, Square, Crown, Cpu, Check, ChevronRight, Loader2 } from "lucide-react";
import { formatAgentDisplayName } from "@/lib/chat-helpers";
import { Send, Square, Crown, Cpu, Check, Loader2, Reply } from "lucide-react";
import MarkdownContent from "@/components/MarkdownContent";
export interface ChatMessage {
@@ -9,10 +8,12 @@ export interface ChatMessage {
agentColor: string;
content: string;
timestamp: string;
type?: "system" | "agent" | "user" | "tool_status";
type?: "system" | "agent" | "user" | "tool_status" | "worker_input_request";
role?: "queen" | "worker";
/** Which worker thread this message belongs to (worker agent name) */
thread?: string;
/** Epoch ms when this message was first created — used for ordering queen/worker interleaving */
createdAt?: number;
}
interface ChatPanelProps {
@@ -20,30 +21,48 @@ interface ChatPanelProps {
onSend: (message: string, thread: string) => void;
isWaiting?: boolean;
activeThread: string;
/** When true, the agent is waiting for user input — changes placeholder text */
awaitingInput?: boolean;
/** When true, the worker is waiting for user input — shows inline reply box */
workerAwaitingInput?: boolean;
/** When true, the input is disabled (e.g. during loading) */
disabled?: boolean;
/** Called when user clicks the stop button to cancel the queen's current turn */
onCancel?: () => void;
/** Called when user submits a reply to the worker's input request */
onWorkerReply?: (message: string) => void;
}
const queenColor = "hsl(45,95%,58%)";
const workerColor = "hsl(220,60%,55%)";
function getColor(_agent: string, role?: "queen" | "worker"): string {
if (role === "queen") return queenColor;
return "hsl(220,60%,55%)";
return workerColor;
}
// Honey-drizzle palette — based on color-hex.com/color-palette/80116
// #8e4200 · #db6f02 · #ff9624 · #ffb825 · #ffd69c + adjacent warm tones
const TOOL_HEX = [
"#db6f02", // rich orange
"#ffb825", // golden yellow
"#ff9624", // bright orange
"#c48820", // warm bronze
"#e89530", // honey
"#d4a040", // goldenrod
"#cc7a10", // caramel
"#e5a820", // sunflower
];
function toolHex(name: string): string {
let hash = 0;
for (let i = 0; i < name.length; i++) hash = (hash * 31 + name.charCodeAt(i)) | 0;
return TOOL_HEX[Math.abs(hash) % TOOL_HEX.length];
}
function ToolActivityRow({ content }: { content: string }) {
const [expanded, setExpanded] = useState(false);
let tools: { name: string; done: boolean }[] = [];
let allDone = false;
try {
const parsed = JSON.parse(content);
tools = parsed.tools || [];
allDone = parsed.allDone ?? false;
} catch {
// Legacy plain-text fallback
return (
@@ -57,53 +76,132 @@ function ToolActivityRow({ content }: { content: string }) {
if (tools.length === 0) return null;
const total = tools.length;
// Group by tool name → count done vs running
const grouped = new Map<string, { done: number; running: number }>();
for (const t of tools) {
const entry = grouped.get(t.name) || { done: 0, running: 0 };
if (t.done) entry.done++;
else entry.running++;
grouped.set(t.name, entry);
}
if (allDone && !expanded) {
return (
<div className="flex gap-3 pl-10">
<button
onClick={() => setExpanded(true)}
className="flex items-center gap-1.5 text-[11px] text-muted-foreground hover:text-foreground transition-colors"
>
<ChevronRight className="w-3 h-3" />
<Check className="w-3 h-3 text-emerald-500" />
<span>{total} tool{total === 1 ? "" : "s"} used</span>
</button>
</div>
);
// Build pill list: running first, then done
const runningPills: { name: string; count: number }[] = [];
const donePills: { name: string; count: number }[] = [];
for (const [name, counts] of grouped) {
if (counts.running > 0) runningPills.push({ name, count: counts.running });
if (counts.done > 0) donePills.push({ name, count: counts.done });
}
return (
<div className="flex gap-3 pl-10">
<div className="flex flex-wrap items-center gap-1.5">
{allDone && (
<button onClick={() => setExpanded(false)} className="text-muted-foreground hover:text-foreground transition-colors">
<ChevronRight className="w-3 h-3 rotate-90" />
</button>
)}
{tools.map((t, i) => (
<span
key={i}
className={`inline-flex items-center gap-1 text-[11px] px-2 py-0.5 rounded-full border ${
t.done
? "text-emerald-600 bg-emerald-500/10 border-emerald-500/20"
: "text-muted-foreground bg-muted/40 border-border/40"
}`}
>
{t.done ? (
<Check className="w-2.5 h-2.5" />
) : (
{runningPills.map((p) => {
const hex = toolHex(p.name);
return (
<span
key={`run-${p.name}`}
className="inline-flex items-center gap-1 text-[11px] px-2.5 py-0.5 rounded-full"
style={{ color: hex, backgroundColor: `${hex}18`, border: `1px solid ${hex}35` }}
>
<Loader2 className="w-2.5 h-2.5 animate-spin" />
)}
{t.name}
</span>
))}
{p.name}
{p.count > 1 && (
<span className="text-[10px] font-medium opacity-70">×{p.count}</span>
)}
</span>
);
})}
{donePills.map((p) => {
const hex = toolHex(p.name);
return (
<span
key={`done-${p.name}`}
className="inline-flex items-center gap-1 text-[11px] px-2.5 py-0.5 rounded-full"
style={{ color: hex, backgroundColor: `${hex}18`, border: `1px solid ${hex}35` }}
>
<Check className="w-2.5 h-2.5" />
{p.name}
{p.count > 1 && (
<span className="text-[10px] opacity-80">×{p.count}</span>
)}
</span>
);
})}
</div>
</div>
);
}
/** Inline reply box that appears below a worker's input request in the chat thread. */
function WorkerInputReply({ onSubmit, disabled }: { onSubmit: (text: string) => void; disabled?: boolean }) {
const [value, setValue] = useState("");
const [sent, setSent] = useState(false);
const inputRef = useRef<HTMLTextAreaElement>(null);
useEffect(() => {
if (!disabled && !sent) inputRef.current?.focus();
}, [disabled, sent]);
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (!value.trim() || sent) return;
onSubmit(value.trim());
setSent(true);
};
if (sent) {
return (
<div className="ml-10 flex items-center gap-1.5 text-[11px] text-muted-foreground py-1">
<Check className="w-3 h-3 text-emerald-500" />
<span>Response sent</span>
</div>
);
}
return (
<form onSubmit={handleSubmit} className="ml-10 mt-1">
<div
className="flex items-center gap-2 rounded-xl px-3 py-2 border transition-colors"
style={{
backgroundColor: `${workerColor}08`,
borderColor: `${workerColor}30`,
}}
>
<Reply className="w-3.5 h-3.5 flex-shrink-0" style={{ color: workerColor }} />
<textarea
ref={inputRef}
rows={1}
value={value}
onChange={(e) => {
setValue(e.target.value);
const ta = e.target;
ta.style.height = "auto";
ta.style.height = `${Math.min(ta.scrollHeight, 120)}px`;
}}
onKeyDown={(e) => {
if (e.key === "Enter" && !e.shiftKey) {
e.preventDefault();
handleSubmit(e);
}
}}
placeholder="Reply to worker..."
disabled={disabled}
className="flex-1 bg-transparent text-sm text-foreground outline-none placeholder:text-muted-foreground disabled:opacity-50 resize-none overflow-y-auto"
/>
<button
type="submit"
disabled={!value.trim() || disabled}
className="p-1.5 rounded-lg transition-opacity disabled:opacity-30 hover:opacity-90"
style={{ backgroundColor: workerColor, color: "white" }}
>
<Send className="w-3.5 h-3.5" />
</button>
</div>
</form>
);
}
const MessageBubble = memo(function MessageBubble({ msg }: { msg: ChatMessage }) {
const isUser = msg.type === "user";
const isQueen = msg.role === "queen";
@@ -174,7 +272,7 @@ const MessageBubble = memo(function MessageBubble({ msg }: { msg: ChatMessage })
);
}, (prev, next) => prev.msg.id === next.msg.id && prev.msg.content === next.msg.content);
export default function ChatPanel({ messages, onSend, isWaiting, activeThread, awaitingInput, disabled, onCancel }: ChatPanelProps) {
export default function ChatPanel({ messages, onSend, isWaiting, activeThread, workerAwaitingInput, disabled, onCancel, onWorkerReply }: ChatPanelProps) {
const [input, setInput] = useState("");
const [readMap, setReadMap] = useState<Record<string, number>>({});
const bottomRef = useRef<HTMLDivElement>(null);
@@ -197,7 +295,7 @@ export default function ChatPanel({ messages, onSend, isWaiting, activeThread, a
const lastMsg = threadMessages[threadMessages.length - 1];
useEffect(() => {
bottomRef.current?.scrollIntoView({ behavior: "smooth" });
}, [threadMessages.length, lastMsg?.content]);
}, [threadMessages.length, lastMsg?.content, workerAwaitingInput]);
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
@@ -207,7 +305,16 @@ export default function ChatPanel({ messages, onSend, isWaiting, activeThread, a
if (textareaRef.current) textareaRef.current.style.height = "auto";
};
const activeWorkerLabel = formatAgentDisplayName(activeThread);
// Find the last worker message to attach the inline reply box below.
// For explicit ask_user, this will be the worker_input_request message.
// For auto-block, this will be the last client_output_delta streamed message.
const lastWorkerMsgIdx = workerAwaitingInput
? threadMessages.reduce(
(last, m, i) =>
m.role === "worker" && m.type !== "tool_status" && m.type !== "system" ? i : last,
-1,
)
: -1;
return (
<div className="flex flex-col h-full min-w-0">
@@ -218,8 +325,13 @@ export default function ChatPanel({ messages, onSend, isWaiting, activeThread, a
{/* Messages */}
<div className="flex-1 overflow-auto px-5 py-4 space-y-3">
{threadMessages.map((msg) => (
<MessageBubble key={msg.id} msg={msg} />
{threadMessages.map((msg, idx) => (
<div key={msg.id}>
<MessageBubble msg={msg} />
{idx === lastWorkerMsgIdx && onWorkerReply && (
<WorkerInputReply onSubmit={onWorkerReply} />
)}
</div>
))}
{isWaiting && (
@@ -239,7 +351,7 @@ export default function ChatPanel({ messages, onSend, isWaiting, activeThread, a
<div ref={bottomRef} />
</div>
{/* Input */}
{/* Input — always connected to Queen */}
<form onSubmit={handleSubmit} className="p-4 border-t border-border">
<div className="flex items-center gap-3 bg-muted/40 rounded-xl px-4 py-2.5 border border-border focus-within:border-primary/40 transition-colors">
<textarea
@@ -258,13 +370,7 @@ export default function ChatPanel({ messages, onSend, isWaiting, activeThread, a
handleSubmit(e);
}
}}
placeholder={
disabled
? "Connecting to agent..."
: awaitingInput
? "Agent is waiting for your response..."
: `Message ${activeWorkerLabel}...`
}
placeholder={disabled ? "Connecting to agent..." : "Message Queen Bee..."}
disabled={disabled}
className="flex-1 bg-transparent text-sm text-foreground outline-none placeholder:text-muted-foreground disabled:opacity-50 disabled:cursor-not-allowed resize-none overflow-y-auto"
/>
+15 -14
View File
@@ -37,8 +37,11 @@ export function backendMessageToChatMessage(
thread: string,
agentDisplayName?: string,
): ChatMessage {
// Use file-mtime created_at (epoch seconds → ms) for cross-conversation
// ordering; fall back to seq for backwards compatibility.
const createdAt = msg.created_at ? msg.created_at * 1000 : msg.seq;
return {
id: `backend-${msg.seq}`,
id: `backend-${msg._node_id}-${msg.seq}`,
agent: msg.role === "user" ? "You" : agentDisplayName || msg._node_id || "Agent",
agentColor: "",
content: msg.content,
@@ -46,6 +49,7 @@ export function backendMessageToChatMessage(
type: msg.role === "user" ? "user" : undefined,
role: msg.role === "user" ? undefined : "worker",
thread,
createdAt,
};
}
@@ -67,6 +71,8 @@ export function sseEventToChatMessage(
const eid = event.execution_id ?? "";
const tid = turnId != null ? String(turnId) : "";
const idKey = eid && tid ? `${eid}-${tid}` : eid || tid || `t-${Date.now()}`;
// Use the backend event timestamp for message ordering
const createdAt = event.timestamp ? new Date(event.timestamp).getTime() : Date.now();
switch (event.type) {
case "client_output_delta": {
@@ -86,22 +92,14 @@ export function sseEventToChatMessage(
timestamp: "",
role: "worker",
thread,
createdAt,
};
}
case "client_input_requested": {
const prompt = (event.data?.prompt as string) || "";
if (!prompt) return null;
return {
id: `input-req-${idKey}-${event.node_id}`,
agent: agentDisplayName || event.node_id || "Agent",
agentColor: "",
content: prompt,
timestamp: "",
role: "worker",
thread,
};
}
case "client_input_requested":
// Handled explicitly in handleSSEEvent (workspace.tsx) so it can
// create a worker_input_request message and set awaitingInput state.
return null;
case "llm_text_delta": {
const snapshot = (event.data?.snapshot as string) || (event.data?.content as string) || "";
@@ -114,6 +112,7 @@ export function sseEventToChatMessage(
timestamp: "",
role: "worker",
thread,
createdAt,
};
}
@@ -126,6 +125,7 @@ export function sseEventToChatMessage(
timestamp: "",
type: "system",
thread,
createdAt,
};
}
@@ -139,6 +139,7 @@ export function sseEventToChatMessage(
timestamp: "",
type: "system",
thread,
createdAt,
};
}
+137 -174
View File
@@ -182,80 +182,6 @@ function NewTabPopover({ open, onClose, anchorRef, discoverAgents, onFromScratch
);
}
// --- LoadAgentPopover ---
interface LoadAgentPopoverProps {
open: boolean;
onClose: () => void;
anchorRef: React.RefObject<HTMLButtonElement | null>;
discoverAgents: DiscoverEntry[];
onSelect: (agentPath: string) => void;
}
function LoadAgentPopover({ open, onClose, anchorRef, discoverAgents, onSelect }: LoadAgentPopoverProps) {
const [pos, setPos] = useState<{ top: number; right: number } | null>(null);
const ref = useRef<HTMLDivElement>(null);
useEffect(() => {
if (open && anchorRef.current) {
const rect = anchorRef.current.getBoundingClientRect();
setPos({ top: rect.bottom + 4, right: window.innerWidth - rect.right });
}
}, [open, anchorRef]);
useEffect(() => {
if (!open) return;
const handler = (e: MouseEvent) => {
if (
ref.current && !ref.current.contains(e.target as Node) &&
anchorRef.current && !anchorRef.current.contains(e.target as Node)
) onClose();
};
document.addEventListener("mousedown", handler);
return () => document.removeEventListener("mousedown", handler);
}, [open, onClose, anchorRef]);
useEffect(() => {
if (!open) return;
const handler = (e: KeyboardEvent) => { if (e.key === "Escape") onClose(); };
document.addEventListener("keydown", handler);
return () => document.removeEventListener("keydown", handler);
}, [open, onClose]);
if (!open || !pos) return null;
return ReactDOM.createPortal(
<div
ref={ref}
style={{ position: "fixed", top: pos.top, right: pos.right, zIndex: 9999 }}
className="w-60 rounded-xl border border-border/60 bg-card shadow-xl shadow-black/30 overflow-hidden"
>
<div className="flex items-center gap-2 px-3 py-2.5 border-b border-border/40">
<span className="text-xs font-semibold text-muted-foreground uppercase tracking-wider">
Load Agent
</span>
</div>
<div className="p-1.5 flex flex-col max-h-64 overflow-y-auto">
{discoverAgents.map(agent => (
<button
key={agent.path}
onClick={() => { onSelect(agent.path); onClose(); }}
className="flex items-center gap-2.5 w-full px-3 py-2 rounded-lg text-left transition-colors hover:bg-muted/60 text-foreground"
>
<div className="w-6 h-6 rounded-md bg-muted/80 flex items-center justify-center flex-shrink-0">
<Bot className="w-3.5 h-3.5 text-muted-foreground" />
</div>
<span className="text-sm font-medium">{agent.name}</span>
</button>
))}
{discoverAgents.length === 0 && (
<p className="text-xs text-muted-foreground px-3 py-2">No agents found</p>
)}
</div>
</div>,
document.body
);
}
function fmtLogTs(ts: string): string {
try {
const d = new Date(ts);
@@ -280,6 +206,8 @@ interface AgentBackendState {
graphId: string | null;
nodeSpecs: NodeSpec[];
awaitingInput: boolean;
/** The message ID of the current worker input request (for inline reply box) */
workerInputMessageId: string | null;
workerRunState: "idle" | "deploying" | "running";
currentExecutionId: string | null;
nodeLogs: Record<string, string[]>;
@@ -301,6 +229,7 @@ function defaultAgentState(): AgentBackendState {
graphId: null,
nodeSpecs: [],
awaitingInput: false,
workerInputMessageId: null,
workerRunState: "idle",
currentExecutionId: null,
nodeLogs: {},
@@ -399,9 +328,6 @@ export default function Workspace() {
const [selectedNode, setSelectedNode] = useState<GraphNode | null>(null);
const [newTabOpen, setNewTabOpen] = useState(false);
const newTabBtnRef = useRef<HTMLButtonElement>(null);
const [loadAgentOpen, setLoadAgentOpen] = useState(false);
const [loadingWorker, setLoadingWorker] = useState(false);
const loadAgentBtnRef = useRef<HTMLButtonElement>(null);
// Ref mirror of sessionsByAgent so SSE callback can read current graph
// state without adding sessionsByAgent to its dependency array.
@@ -489,7 +415,7 @@ export default function Workspace() {
const errorMsg: ChatMessage = {
id: makeId(), agent: "System", agentColor: "",
content: `Failed to trigger run: ${errMsg}`,
timestamp: "", type: "system", thread: activeWorker,
timestamp: "", type: "system", thread: activeWorker, createdAt: Date.now(),
};
return { ...s, messages: [...s.messages, errorMsg] };
}),
@@ -546,7 +472,7 @@ export default function Workspace() {
if (prompt) {
const userMsg: ChatMessage = {
id: makeId(), agent: "You", agentColor: "",
content: prompt, timestamp: "", type: "user", thread: agentType,
content: prompt, timestamp: "", type: "user", thread: agentType, createdAt: Date.now(),
};
setSessionsByAgent(prev => ({
...prev,
@@ -677,6 +603,7 @@ export default function Workspace() {
// Check worker session status (detects running worker).
// Only restore messages when rejoining an existing backend session.
let isWorkerRunning = false;
const restoredMsgs: ChatMessage[] = [];
try {
const { sessions: workerSessions } = await sessionsApi.workerSessions(session.session_id);
const resumable = workerSessions.find(
@@ -686,16 +613,8 @@ export default function Workspace() {
if (isResumedSession && resumable) {
const { messages } = await sessionsApi.messages(session.session_id, resumable.session_id);
if (messages.length > 0) {
const chatMsgs = messages.map((m: Message) =>
backendMessageToChatMessage(m, agentType, displayName),
);
setSessionsByAgent((prev) => ({
...prev,
[agentType]: (prev[agentType] || []).map((s, i) =>
i === 0 ? { ...s, messages: [...s.messages, ...chatMsgs] } : s,
),
}));
for (const m of messages as Message[]) {
restoredMsgs.push(backendMessageToChatMessage(m, agentType, displayName));
}
}
} catch {
@@ -706,26 +625,27 @@ export default function Workspace() {
if (isResumedSession) {
try {
const { messages: queenMsgs } = await sessionsApi.queenMessages(session.session_id);
if (queenMsgs.length > 0) {
const chatMsgs = queenMsgs.map((m: Message) => {
const msg = backendMessageToChatMessage(m, agentType, "Queen Bee");
if (msg) msg.role = "queen";
return msg;
}).filter(Boolean);
if (chatMsgs.length > 0) {
setSessionsByAgent((prev) => ({
...prev,
[agentType]: (prev[agentType] || []).map((s, i) =>
i === 0 ? { ...s, messages: [...chatMsgs, ...s.messages] } : s,
),
}));
}
for (const m of queenMsgs as Message[]) {
const msg = backendMessageToChatMessage(m, agentType, "Queen Bee");
msg.role = "queen";
restoredMsgs.push(msg);
}
} catch {
// Queen messages not available — not critical
}
}
// Merge queen + worker messages in chronological order
if (restoredMsgs.length > 0) {
restoredMsgs.sort((a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0));
setSessionsByAgent((prev) => ({
...prev,
[agentType]: (prev[agentType] || []).map((s, i) =>
i === 0 ? { ...s, messages: [...restoredMsgs, ...s.messages] } : s,
),
}));
}
updateAgentState(agentType, {
ready: true,
loading: false,
@@ -875,7 +795,7 @@ export default function Workspace() {
const errorMsg: ChatMessage = {
id: makeId(), agent: "System", agentColor: "",
content: `Failed to pause: ${errMsg}`,
timestamp: "", type: "system", thread: activeWorker,
timestamp: "", type: "system", thread: activeWorker, createdAt: Date.now(),
};
return { ...s, messages: [...s.messages, errorMsg] };
}),
@@ -917,20 +837,27 @@ export default function Workspace() {
// --- SSE event handler ---
const upsertChatMessage = useCallback(
(agentType: string, chatMsg: ChatMessage) => {
console.log('[UPSERT] agentType:', agentType, 'msgId:', chatMsg.id, 'thread:', chatMsg.thread, 'role:', chatMsg.role, 'content:', chatMsg.content?.slice(0, 40));
setSessionsByAgent((prev) => {
const sessions = prev[agentType] || [];
const activeId = activeSessionRef.current[agentType] || sessions[0]?.id;
console.log('[UPSERT-inner] sessions:', sessions.length, 'activeId:', activeId, 'sessionIds:', sessions.map(s => s.id));
return {
...prev,
[agentType]: sessions.map((s) => {
if (s.id !== activeId) return s;
const idx = s.messages.findIndex((m) => m.id === chatMsg.id);
const newMessages =
idx >= 0
? s.messages.map((m, i) => (i === idx ? chatMsg : m))
: [...s.messages, chatMsg];
let newMessages: ChatMessage[];
if (idx >= 0) {
// Update existing message in place, preserve position
newMessages = s.messages.map((m, i) =>
i === idx ? { ...chatMsg, createdAt: m.createdAt ?? chatMsg.createdAt } : m,
);
} else {
// Append — SSE events arrive in server-timestamp order via the
// shared EventBus, so arrival order already interleaves queen
// and worker correctly. Local user messages are always created
// before their server responses, so append is safe there too.
newMessages = [...s.messages, chatMsg];
}
return { ...s, messages: newMessages };
}),
};
@@ -950,7 +877,14 @@ export default function Workspace() {
const displayName = isQueen ? "Queen Bee" : (agentDisplayName || undefined);
const role = isQueen ? "queen" as const : "worker" as const;
const ts = fmtLogTs(event.timestamp);
const currentTurn = turnCounterRef.current[agentType] ?? 0;
// Turn counter is per-stream so queen and worker tool pills don't
// interfere. A worker node_loop_iteration no longer increments
// the queen's turn counter (which would cause pill ID mismatches
// between tool_call_started and tool_call_completed).
const turnKey = `${agentType}:${streamId}`;
const currentTurn = turnCounterRef.current[turnKey] ?? 0;
// Backend event timestamp for correct queen/worker message ordering
const eventCreatedAt = event.timestamp ? new Date(event.timestamp).getTime() : Date.now();
// Mark queen as ready on the first queen SSE event
if (isQueen && !agentStates[agentType]?.queenReady) {
@@ -960,7 +894,7 @@ export default function Workspace() {
switch (event.type) {
case "execution_started":
if (isQueen) {
turnCounterRef.current[agentType] = currentTurn + 1;
turnCounterRef.current[turnKey] = currentTurn + 1;
updateAgentState(agentType, { isTyping: true });
} else {
// Warn if prior LLM snapshots are being dropped (edge case: execution_completed never arrived)
@@ -968,7 +902,7 @@ export default function Workspace() {
if (Object.keys(priorSnapshots).length > 0) {
console.debug(`[hive] execution_started: dropping ${Object.keys(priorSnapshots).length} unflushed LLM snapshot(s)`);
}
turnCounterRef.current[agentType] = currentTurn + 1;
turnCounterRef.current[turnKey] = currentTurn + 1;
updateAgentState(agentType, {
isTyping: true,
isStreaming: false,
@@ -998,6 +932,7 @@ export default function Workspace() {
isTyping: false,
isStreaming: false,
awaitingInput: false,
workerInputMessageId: null,
workerRunState: "idle",
currentExecutionId: null,
llmSnapshots: {},
@@ -1041,17 +976,48 @@ export default function Workspace() {
}
if (event.type === "client_input_requested") {
updateAgentState(agentType, { awaitingInput: true, isTyping: false, isStreaming: false });
console.log('[CLIENT_INPUT_REQ] stream_id:', streamId, 'isQueen:', isQueen, 'node_id:', event.node_id, 'prompt:', (event.data?.prompt as string)?.slice(0, 80), 'agentType:', agentType);
if (isQueen) {
updateAgentState(agentType, { awaitingInput: true, isTyping: false, isStreaming: false });
} else {
// Worker input request.
// If the prompt is non-empty (explicit ask_user), create a visible
// message bubble. For auto-block (empty prompt), the worker's text
// was already streamed via client_output_delta — just activate the
// reply box below the last worker message.
const eid = event.execution_id ?? "";
const prompt = (event.data?.prompt as string) || "";
if (prompt) {
const workerInputMsg: ChatMessage = {
id: `worker-input-${eid}-${event.node_id || Date.now()}`,
agent: displayName || event.node_id || "Worker",
agentColor: "",
content: prompt,
timestamp: "",
type: "worker_input_request",
role: "worker",
thread: agentType,
createdAt: eventCreatedAt,
};
console.log('[CLIENT_INPUT_REQ] creating worker_input_request msg:', workerInputMsg.id, 'content:', prompt.slice(0, 80));
upsertChatMessage(agentType, workerInputMsg);
}
updateAgentState(agentType, {
awaitingInput: true,
isTyping: false,
isStreaming: false,
});
}
}
if (event.type === "execution_paused") {
updateAgentState(agentType, { isTyping: false, isStreaming: false, awaitingInput: false });
updateAgentState(agentType, { isTyping: false, isStreaming: false, awaitingInput: false, workerInputMessageId: null });
if (!isQueen) {
updateAgentState(agentType, { workerRunState: "idle", currentExecutionId: null });
markAllNodesAs(agentType, ["running", "looping"], "pending");
}
}
if (event.type === "execution_failed") {
updateAgentState(agentType, { isTyping: false, isStreaming: false, awaitingInput: false });
updateAgentState(agentType, { isTyping: false, isStreaming: false, awaitingInput: false, workerInputMessageId: null });
if (!isQueen) {
updateAgentState(agentType, { workerRunState: "idle", currentExecutionId: null });
if (event.node_id) {
@@ -1066,7 +1032,7 @@ export default function Workspace() {
}
case "node_loop_started":
turnCounterRef.current[agentType] = currentTurn + 1;
turnCounterRef.current[turnKey] = currentTurn + 1;
updateAgentState(agentType, { isTyping: true, activeToolCalls: {} });
if (!isQueen && event.node_id) {
const sessions = sessionsRef.current[agentType] || [];
@@ -1082,8 +1048,8 @@ export default function Workspace() {
break;
case "node_loop_iteration":
turnCounterRef.current[agentType] = currentTurn + 1;
updateAgentState(agentType, { isStreaming: false, activeToolCalls: {} });
turnCounterRef.current[turnKey] = currentTurn + 1;
updateAgentState(agentType, { isStreaming: false, activeToolCalls: {}, awaitingInput: false });
if (!isQueen && event.node_id) {
const pendingText = agentStates[agentType]?.llmSnapshots[event.node_id];
if (pendingText?.trim()) {
@@ -1166,6 +1132,7 @@ export default function Workspace() {
type: "tool_status",
role,
thread: agentType,
createdAt: eventCreatedAt,
});
return {
...prev,
@@ -1211,6 +1178,7 @@ export default function Workspace() {
type: "tool_status",
role,
thread: agentType,
createdAt: eventCreatedAt,
});
return {
...prev,
@@ -1379,12 +1347,12 @@ export default function Workspace() {
if (!allRequiredCredentialsMet(activeSession.credentials)) {
const userMsg: ChatMessage = {
id: makeId(), agent: "You", agentColor: "",
content: text, timestamp: "", type: "user", thread,
content: text, timestamp: "", type: "user", thread, createdAt: Date.now(),
};
const promptMsg: ChatMessage = {
id: makeId(), agent: "Queen Bee", agentColor: "",
content: "Before we get started, you'll need to configure your credentials. Click the **Credentials** button in the top bar to connect the required integrations for this agent.",
timestamp: "", role: "queen" as const, thread,
timestamp: "", role: "queen" as const, thread, createdAt: Date.now(),
};
setSessionsByAgent(prev => ({
...prev,
@@ -1397,7 +1365,7 @@ export default function Workspace() {
const userMsg: ChatMessage = {
id: makeId(), agent: "You", agentColor: "",
content: text, timestamp: "", type: "user", thread,
content: text, timestamp: "", type: "user", thread, createdAt: Date.now(),
};
setSessionsByAgent(prev => ({
...prev,
@@ -1408,31 +1376,12 @@ export default function Workspace() {
updateAgentState(activeWorker, { isTyping: true });
if (state?.sessionId && state?.ready) {
executionApi.chat(state.sessionId, text).then((result) => {
if (result.status === "started") {
// Queen wasn't ready — backend triggered worker directly
updateAgentState(activeWorker, {
currentExecutionId: result.execution_id || null,
workerRunState: "running",
});
const notice: ChatMessage = {
id: makeId(), agent: "System", agentColor: "",
content: "The queen wasn't ready yet — your message triggered an agent run directly.",
timestamp: "", type: "system", thread,
};
setSessionsByAgent(prev => ({
...prev,
[activeWorker]: prev[activeWorker].map(s =>
s.id === activeSession.id ? { ...s, messages: [...s.messages, notice] } : s
),
}));
}
}).catch((err: unknown) => {
executionApi.chat(state.sessionId, text).catch((err: unknown) => {
const errMsg = err instanceof Error ? err.message : String(err);
const errorChatMsg: ChatMessage = {
id: makeId(), agent: "System", agentColor: "",
content: `Failed to send message: ${errMsg}`,
timestamp: "", type: "system", thread,
timestamp: "", type: "system", thread, createdAt: Date.now(),
};
setSessionsByAgent(prev => ({
...prev,
@@ -1446,7 +1395,7 @@ export default function Workspace() {
const errorMsg: ChatMessage = {
id: makeId(), agent: "System", agentColor: "",
content: "Cannot send message: backend is not connected. Please wait for the agent to load.",
timestamp: "", type: "system", thread,
timestamp: "", type: "system", thread, createdAt: Date.now(),
};
setSessionsByAgent(prev => ({
...prev,
@@ -1458,11 +1407,48 @@ export default function Workspace() {
}
}, [activeWorker, activeSession, agentStates, updateAgentState]);
// --- handleWorkerReply: send user input to the worker via dedicated endpoint ---
const handleWorkerReply = useCallback((text: string) => {
if (!activeSession) return;
const state = agentStates[activeWorker];
if (!state?.sessionId || !state?.ready) return;
// Add user reply to chat thread
const userMsg: ChatMessage = {
id: makeId(), agent: "You", agentColor: "",
content: text, timestamp: "", type: "user", thread: activeWorker, createdAt: Date.now(),
};
setSessionsByAgent(prev => ({
...prev,
[activeWorker]: prev[activeWorker].map(s =>
s.id === activeSession.id ? { ...s, messages: [...s.messages, userMsg] } : s
),
}));
// Clear awaiting state optimistically
updateAgentState(activeWorker, { awaitingInput: false, workerInputMessageId: null, isTyping: true });
executionApi.workerInput(state.sessionId, text).catch((err: unknown) => {
const errMsg = err instanceof Error ? err.message : String(err);
const errorChatMsg: ChatMessage = {
id: makeId(), agent: "System", agentColor: "",
content: `Failed to send to worker: ${errMsg}`,
timestamp: "", type: "system", thread: activeWorker, createdAt: Date.now(),
};
setSessionsByAgent(prev => ({
...prev,
[activeWorker]: prev[activeWorker].map(s =>
s.id === activeSession.id ? { ...s, messages: [...s.messages, errorChatMsg] } : s
),
}));
updateAgentState(activeWorker, { isTyping: false, isStreaming: false });
});
}, [activeWorker, activeSession, agentStates, updateAgentState]);
const handleLoadAgent = useCallback(async (agentPath: string) => {
const state = agentStates[activeWorker];
if (!state?.sessionId) return;
setLoadingWorker(true);
try {
await sessionsApi.loadWorker(state.sessionId, agentPath);
// Success: worker_loaded SSE event will handle UI updates automatically
@@ -1472,7 +1458,6 @@ export default function Workspace() {
const body = err.body as Record<string, unknown>;
setCredentialAgentPath((body.agent_path as string) || null);
setCredentialsOpen(true);
setLoadingWorker(false);
return;
}
@@ -1481,7 +1466,7 @@ export default function Workspace() {
const errorMsg: ChatMessage = {
id: makeId(), agent: "System", agentColor: "",
content: `Failed to load agent: ${errMsg}`,
timestamp: "", type: "system", thread: activeWorker,
timestamp: "", type: "system", thread: activeWorker, createdAt: Date.now(),
};
setSessionsByAgent(prev => ({
...prev,
@@ -1489,10 +1474,9 @@ export default function Workspace() {
s.id === activeId ? { ...s, messages: [...s.messages, errorMsg] } : s
),
}));
} finally {
setLoadingWorker(false);
}
}, [activeWorker, agentStates]);
void handleLoadAgent; // Used by load-agent modal (wired dynamically)
const closeAgentTab = useCallback((agentType: string) => {
setSelectedNode(null);
@@ -1592,30 +1576,6 @@ export default function Workspace() {
</>
}
>
{activeWorker === "new-agent" && activeAgentState?.ready && !activeAgentState?.graphId && (
<>
<button
ref={loadAgentBtnRef}
onClick={() => setLoadAgentOpen(o => !o)}
disabled={loadingWorker}
className="flex items-center gap-1.5 px-3 py-1.5 rounded-md text-xs font-medium text-muted-foreground hover:text-foreground hover:bg-muted/50 transition-colors flex-shrink-0"
>
{loadingWorker ? (
<Loader2 className="w-3.5 h-3.5 animate-spin" />
) : (
<Bot className="w-3.5 h-3.5" />
)}
Load Agent
</button>
<LoadAgentPopover
open={loadAgentOpen}
onClose={() => setLoadAgentOpen(false)}
anchorRef={loadAgentBtnRef}
discoverAgents={discoverAgents}
onSelect={handleLoadAgent}
/>
</>
)}
<button
onClick={() => setCredentialsOpen(true)}
className="flex items-center gap-1.5 px-3 py-1.5 rounded-md text-xs font-medium text-muted-foreground hover:text-foreground hover:bg-muted/50 transition-colors flex-shrink-0"
@@ -1697,9 +1657,12 @@ export default function Workspace() {
messages={activeSession.messages}
onSend={handleSend}
onCancel={handleCancelQueen}
onWorkerReply={handleWorkerReply}
activeThread={activeWorker}
isWaiting={(activeAgentState?.isTyping && !activeAgentState?.isStreaming) ?? false}
awaitingInput={activeAgentState?.awaitingInput ?? false}
workerAwaitingInput={
(activeAgentState?.awaitingInput && activeAgentState?.workerRunState === "running") ?? false
}
disabled={
(activeAgentState?.loading ?? true) ||
!(activeAgentState?.queenReady)
+6 -15
View File
@@ -307,8 +307,9 @@ Write-Host ""
Write-Step -Number "1" -Text "Step 1: Checking Python..."
# On Windows "python3.x" aliases don't exist; prefer "python" then "python3"
$PythonCmd = $null
foreach ($candidate in @("python3.13", "python3.12", "python3.11", "python3", "python")) {
foreach ($candidate in @("python", "python3", "python3.13", "python3.12", "python3.11")) {
try {
$ver = & $candidate -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>$null
if ($LASTEXITCODE -eq 0 -and $ver) {
@@ -326,17 +327,7 @@ foreach ($candidate in @("python3.13", "python3.12", "python3.11", "python3", "p
}
if (-not $PythonCmd) {
# Try plain "python" as final fallback (common on Windows)
try {
$ver = & python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>$null
if ($LASTEXITCODE -eq 0) {
Write-Color -Text "Python $ver found but 3.11+ is required." -Color Red
} else {
Write-Color -Text "Python is not installed." -Color Red
}
} catch {
Write-Color -Text "Python is not installed." -Color Red
}
Write-Color -Text "Python 3.11+ is not installed or not on PATH." -Color Red
Write-Host ""
Write-Host "Please install Python 3.11+ from https://python.org"
Write-Host " - Make sure to check 'Add Python to PATH' during installation"
@@ -673,7 +664,7 @@ $imports = @(
$modulesToCheck = @("framework", "aden_tools", "litellm", "framework.mcp.agent_builder_server")
try {
$checkOutput = & uv run $PythonCmd scripts/check_requirements.py @modulesToCheck 2>&1 | Out-String
$checkOutput = & uv run python scripts/check_requirements.py @modulesToCheck 2>&1 | Out-String
$resultJson = $null
# Try to parse JSON result
@@ -1241,7 +1232,7 @@ $verifyErrors = 0
$verifyModules = @("framework", "aden_tools")
try {
$verifyOutput = & uv run $PythonCmd scripts/check_requirements.py @verifyModules 2>&1 | Out-String
$verifyOutput = & uv run python scripts/check_requirements.py @verifyModules 2>&1 | Out-String
$verifyJson = $null
try {
@@ -1251,7 +1242,7 @@ try {
# Fall back to basic checks if JSON parsing fails
foreach ($mod in $verifyModules) {
Write-Host " $([char]0x2B21) $mod... " -NoNewline
$null = & uv run $PythonCmd -c "import $mod" 2>&1
$null = & uv run python -c "import $mod" 2>&1
if ($LASTEXITCODE -eq 0) { Write-Ok "ok" }
else { Write-Fail "failed"; $verifyErrors++ }
}