Merge remote-tracking branch 'upstream/feat/open-hive' into feat/sub-agent-framework

This commit is contained in:
Richard Tang
2026-02-26 17:59:15 -08:00
19 changed files with 375 additions and 403 deletions
+81 -43
View File
@@ -70,6 +70,12 @@ class _EscalationReceiver:
# ---------------------------------------------------------------------------
class TurnCancelled(Exception):
"""Raised when a turn is cancelled mid-stream."""
pass
@dataclass
class JudgeVerdict:
"""Result of judge evaluation for the event loop."""
@@ -275,6 +281,7 @@ class EventLoopNode(NodeProtocol):
self._input_ready = asyncio.Event()
self._awaiting_input = False
self._shutdown = False
self._stream_task: asyncio.Task | None = None
# Track which nodes already have an action plan emitted (skip on revisit)
self._action_plan_emitted: set[str] = set()
# Subagent mark_complete: when True, _evaluate returns ACCEPT immediately
@@ -546,6 +553,7 @@ class EventLoopNode(NodeProtocol):
len(conversation.messages),
)
_stream_retry_count = 0
_turn_cancelled = False
while True:
try:
(
@@ -587,6 +595,10 @@ class EventLoopNode(NodeProtocol):
)
break # success — exit retry loop
except TurnCancelled:
_turn_cancelled = True
break
except Exception as e:
# Retry transient errors with exponential backoff
if (
@@ -663,6 +675,12 @@ class EventLoopNode(NodeProtocol):
# Re-raise to maintain existing error handling
raise
if _turn_cancelled:
logger.info("[%s] iter=%d: turn cancelled by user", node_id, iteration)
if ctx.node_spec.client_facing and not ctx.event_triggered:
await self._await_user_input(ctx, prompt="")
continue # back to top of for-iteration loop
# 6e'. Feed actual API token count back for accurate estimation
turn_input = turn_tokens.get("input", 0)
if turn_input > 0:
@@ -792,19 +810,7 @@ class EventLoopNode(NodeProtocol):
)
if ctx.node_spec.client_facing and not ctx.event_triggered:
await conversation.add_user_message(warning_msg)
self._input_ready.clear()
if self._event_bus:
await self._event_bus.emit_client_input_requested(
stream_id=stream_id,
node_id=node_id,
prompt=doom_desc,
execution_id=execution_id,
)
self._awaiting_input = True
try:
await self._input_ready.wait()
finally:
self._awaiting_input = False
await self._await_user_input(ctx, prompt=doom_desc)
recent_tool_fingerprints.clear()
recent_responses.clear()
else:
@@ -1255,6 +1261,16 @@ class EventLoopNode(NodeProtocol):
self._shutdown = True
self._input_ready.set()
def cancel_current_turn(self) -> None:
"""Cancel the current LLM streaming turn instantly.
Unlike signal_shutdown() which permanently stops the event loop,
this only kills the in-progress HTTP stream via task.cancel().
The queen stays alive for the next user message.
"""
if self._stream_task and not self._stream_task.done():
self._stream_task.cancel()
async def _await_user_input(
self, ctx: NodeContext, prompt: str = "", *, skip_emit: bool = False
) -> bool:
@@ -1272,6 +1288,11 @@ class EventLoopNode(NodeProtocol):
Returns True if input arrived, False if shutdown was signaled.
"""
# If messages arrived while the LLM was processing, skip blocking
# entirely — the next _drain_injection_queue() will pick them up.
if not self._injection_queue.empty():
return True
# Clear BEFORE emitting so that synchronous handlers (e.g. the
# headless stdin handler) can call inject_event() during the emit
# and the signal won't be lost. TUI handlers return immediately
@@ -1367,39 +1388,56 @@ class EventLoopNode(NodeProtocol):
tool_calls: list[ToolCallEvent] = []
_stream_error: StreamErrorEvent | None = None
# Stream LLM response
async for event in ctx.llm.stream(
messages=messages,
system=conversation.system_prompt,
tools=tools if tools else None,
max_tokens=ctx.max_tokens,
):
if isinstance(event, TextDeltaEvent):
accumulated_text = event.snapshot
await self._publish_text_delta(
stream_id,
node_id,
event.content,
event.snapshot,
ctx,
execution_id,
iteration=iteration,
)
# Stream LLM response in a child task so cancel_current_turn()
# can kill it instantly without terminating the queen's main loop.
# Capture loop-scoped variables as defaults to satisfy B023.
async def _do_stream(
_msgs: list = messages, # noqa: B006
_tc: list[ToolCallEvent] = tool_calls, # noqa: B006
) -> None:
nonlocal accumulated_text, _stream_error
async for event in ctx.llm.stream(
messages=_msgs,
system=conversation.system_prompt,
tools=tools if tools else None,
max_tokens=ctx.max_tokens,
):
if isinstance(event, TextDeltaEvent):
accumulated_text = event.snapshot
await self._publish_text_delta(
stream_id,
node_id,
event.content,
event.snapshot,
ctx,
execution_id,
iteration=iteration,
)
elif isinstance(event, ToolCallEvent):
tool_calls.append(event)
elif isinstance(event, ToolCallEvent):
_tc.append(event)
elif isinstance(event, FinishEvent):
token_counts["input"] += event.input_tokens
token_counts["output"] += event.output_tokens
token_counts["stop_reason"] = event.stop_reason
token_counts["model"] = event.model
elif isinstance(event, FinishEvent):
token_counts["input"] += event.input_tokens
token_counts["output"] += event.output_tokens
token_counts["stop_reason"] = event.stop_reason
token_counts["model"] = event.model
elif isinstance(event, StreamErrorEvent):
if not event.recoverable:
raise RuntimeError(f"Stream error: {event.error}")
_stream_error = event
logger.warning("Recoverable stream error: %s", event.error)
elif isinstance(event, StreamErrorEvent):
if not event.recoverable:
raise RuntimeError(f"Stream error: {event.error}")
_stream_error = event
logger.warning("Recoverable stream error: %s", event.error)
self._stream_task = asyncio.create_task(_do_stream())
try:
await self._stream_task
except asyncio.CancelledError:
if accumulated_text:
await conversation.add_assistant_message(content=accumulated_text)
raise TurnCancelled() from None
finally:
self._stream_task = None
# If a recoverable stream error produced an empty response,
# raise so the outer transient-error retry can handle it
-1
View File
@@ -102,4 +102,3 @@ class AnthropicProvider(LLMProvider):
json_mode=json_mode,
max_retries=max_retries,
)
+22 -15
View File
@@ -457,15 +457,17 @@ class LiteLLMProvider(LLMProvider):
# Codex ChatGPT backend requires streaming — delegate to the unified
# async streaming path which properly handles tool calls.
if self._codex_backend:
return asyncio.run(self.acomplete(
messages=messages,
system=system,
tools=tools,
max_tokens=max_tokens,
response_format=response_format,
json_mode=json_mode,
max_retries=max_retries,
))
return asyncio.run(
self.acomplete(
messages=messages,
system=system,
tools=tools,
max_tokens=max_tokens,
response_format=response_format,
json_mode=json_mode,
max_retries=max_retries,
)
)
# Prepare messages with system prompt
full_messages = []
@@ -839,7 +841,10 @@ class LiteLLMProvider(LLMProvider):
)
if existing_idx is not None:
idx = existing_idx
elif idx in tool_calls_acc and tool_calls_acc[idx]["id"] not in ("", tc.id):
elif idx in tool_calls_acc and tool_calls_acc[idx]["id"] not in (
"",
tc.id,
):
# Slot taken by a different call — assign new index
idx = max(tool_calls_acc.keys()) + 1
_last_tool_idx = idx
@@ -1013,11 +1018,13 @@ class LiteLLMProvider(LLMProvider):
if isinstance(event, TextDeltaEvent):
content = event.snapshot # snapshot is the accumulated text
elif isinstance(event, ToolCallEvent):
tool_calls.append({
"id": event.tool_use_id,
"name": event.tool_name,
"input": event.tool_input,
})
tool_calls.append(
{
"id": event.tool_use_id,
"name": event.tool_name,
"input": event.tool_input,
}
)
elif isinstance(event, FinishEvent):
input_tokens = event.input_tokens
output_tokens = event.output_tokens
+16
View File
@@ -319,6 +319,21 @@ async def handle_replay(request: web.Request) -> web.Response:
)
async def handle_cancel_queen(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/cancel-queen — cancel the queen's current LLM turn."""
session, err = resolve_session(request)
if err:
return err
queen_executor = session.queen_executor
if queen_executor is None:
return web.json_response({"cancelled": False, "error": "Queen not active"}, status=404)
node = queen_executor.node_registry.get("queen")
if node is None or not hasattr(node, "cancel_current_turn"):
return web.json_response({"cancelled": False, "error": "Queen node not found"}, status=404)
node.cancel_current_turn()
return web.json_response({"cancelled": True})
def register_routes(app: web.Application) -> None:
"""Register execution control routes."""
# Session-primary routes
@@ -328,5 +343,6 @@ def register_routes(app: web.Application) -> None:
app.router.add_post("/api/sessions/{session_id}/pause", handle_stop)
app.router.add_post("/api/sessions/{session_id}/resume", handle_resume)
app.router.add_post("/api/sessions/{session_id}/stop", handle_stop)
app.router.add_post("/api/sessions/{session_id}/cancel-queen", handle_cancel_queen)
app.router.add_post("/api/sessions/{session_id}/replay", handle_replay)
app.router.add_get("/api/sessions/{session_id}/goal-progress", handle_goal_progress)
+6 -5
View File
@@ -167,9 +167,9 @@ class SessionManager:
session, worker_identity=worker_identity, initial_prompt=initial_prompt
)
# Start health judge
if agent_path.name != "hive_coder" and session.worker_runtime:
await self._start_judge(session, session.runner._storage_path)
# Health judge disabled for simplicity.
# if agent_path.name != "hive_coder" and session.worker_runtime:
# await self._start_judge(session, session.runner._storage_path)
except Exception:
# If anything fails, tear down the session
@@ -314,9 +314,10 @@ class SessionManager:
model=model,
)
# Start judge + notify queen (skip for hive_coder itself)
# Notify queen about the loaded worker (skip for hive_coder itself).
# Health judge disabled for simplicity.
if agent_path.name != "hive_coder" and session.worker_runtime:
await self._start_judge(session, session.runner._storage_path)
# await self._start_judge(session, session.runner._storage_path)
await self._notify_queen_worker_loaded(session)
# Emit SSE event so the frontend can update UI
+4 -77
View File
@@ -473,14 +473,12 @@ class AdenTUI(App):
from pathlib import Path
from framework.graph.executor import GraphExecutor
from framework.monitoring import judge_goal, judge_graph
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.core import Runtime
from framework.runtime.event_bus import EventType as _ET
from framework.tools.queen_lifecycle_tools import register_queen_lifecycle_tools
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
log = logging.getLogger("tui.judge")
log = logging.getLogger("tui.queen")
try:
storage_path = Path(storage_path)
@@ -502,64 +500,16 @@ class AdenTUI(App):
worker_graph_id=self.runtime._graph_id,
)
# 2. Storage dirs — global, not per-agent. Queen and judge are
# supervisory components that outlive any single worker.
# 2. Storage dirs — global, not per-agent.
hive_home = Path.home() / ".hive"
judge_dir = hive_home / "judge" / "session" / session_id
judge_dir.mkdir(parents=True, exist_ok=True)
queen_dir = hive_home / "queen" / "session" / session_id
queen_dir.mkdir(parents=True, exist_ok=True)
# ---------------------------------------------------------------
# 3. Health judge — background task, fires every 2 minutes.
# ---------------------------------------------------------------
judge_runtime = Runtime(hive_home / "judge")
monitoring_tools = list(monitoring_registry.get_tools().values())
monitoring_executor = monitoring_registry.get_executor()
# Scoped event buses — stamp graph_id on every event so
# downstream routing (queen-primary mode) can distinguish
# queen/judge/worker events.
# Health judge disabled for simplicity.
from framework.runtime.execution_stream import GraphScopedEventBus
judge_event_bus = GraphScopedEventBus(event_bus, "judge")
queen_event_bus = GraphScopedEventBus(event_bus, "queen")
async def _judge_loop():
interval = 120 # seconds
first = True
while True:
if not first:
await asyncio.sleep(interval)
first = False
try:
executor = GraphExecutor(
runtime=judge_runtime,
llm=llm,
tools=monitoring_tools,
tool_executor=monitoring_executor,
event_bus=judge_event_bus,
stream_id="judge",
storage_path=judge_dir,
loop_config=judge_graph.loop_config,
)
await executor.execute(
graph=judge_graph,
goal=judge_goal,
input_data={
"event": {"source": "timer", "reason": "scheduled"},
},
session_state={"resume_session_id": session_id},
)
except Exception:
log.error("Health judge tick failed", exc_info=True)
self._judge_task = asyncio.run_coroutine_threadsafe(
_judge_loop(),
agent_loop,
)
self._judge_graph_id = "judge"
# ---------------------------------------------------------------
# 4. Queen — persistent interactive conversation.
# Runs a continuous event_loop node that is the user's
@@ -690,31 +640,8 @@ class AdenTUI(App):
self.chat_repl._queen_inject_callback = _inject_queen
# Judge escalation → inject into queen conversation as a message.
async def _on_escalation(event):
ticket = event.data.get("ticket", {})
executor = self._queen_executor
if executor is None:
log.warning("Escalation received but queen executor is None")
return
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
import json as _json
msg = "[ESCALATION TICKET from Health Judge]\n" + _json.dumps(
ticket, indent=2, ensure_ascii=False
)
await node.inject_event(msg)
else:
log.warning("Escalation received but queen node not ready for injection")
self._queen_escalation_sub = event_bus.subscribe(
event_types=[_ET.WORKER_ESCALATION_TICKET],
handler=_on_escalation,
)
self.notify(
"Queen + health judge active",
"Queen active",
severity="information",
timeout=3,
)
+3
View File
@@ -47,6 +47,9 @@ export const executionApi = {
execution_id: executionId,
}),
cancelQueen: (sessionId: string) =>
api.post<{ cancelled: boolean }>(`/sessions/${sessionId}/cancel-queen`),
resume: (sessionId: string, workerSessionId: string, checkpointId?: string) =>
api.post<ResumeResult>(`/sessions/${sessionId}/resume`, {
session_id: workerSessionId,
+21 -9
View File
@@ -1,5 +1,5 @@
import { memo, useState, useRef, useEffect } from "react";
import { Send, Crown, Cpu } from "lucide-react";
import { Send, Square, Crown, Cpu } from "lucide-react";
import { formatAgentDisplayName } from "@/lib/chat-helpers";
import MarkdownContent from "@/components/MarkdownContent";
@@ -24,6 +24,8 @@ interface ChatPanelProps {
awaitingInput?: boolean;
/** When true, the input is disabled (e.g. during loading) */
disabled?: boolean;
/** Called when user clicks the stop button to cancel the queen's current turn */
onCancel?: () => void;
}
const queenColor = "hsl(45,95%,58%)";
@@ -99,7 +101,7 @@ const MessageBubble = memo(function MessageBubble({ msg }: { msg: ChatMessage })
);
}, (prev, next) => prev.msg.id === next.msg.id && prev.msg.content === next.msg.content);
export default function ChatPanel({ messages, onSend, isWaiting, activeThread, awaitingInput, disabled }: ChatPanelProps) {
export default function ChatPanel({ messages, onSend, isWaiting, activeThread, awaitingInput, disabled, onCancel }: ChatPanelProps) {
const [input, setInput] = useState("");
const [readMap, setReadMap] = useState<Record<string, number>>({});
const bottomRef = useRef<HTMLDivElement>(null);
@@ -179,13 +181,23 @@ export default function ChatPanel({ messages, onSend, isWaiting, activeThread, a
disabled={disabled}
className="flex-1 bg-transparent text-sm text-foreground outline-none placeholder:text-muted-foreground disabled:opacity-50 disabled:cursor-not-allowed"
/>
<button
type="submit"
disabled={!input.trim() || disabled}
className="p-2 rounded-lg bg-primary text-primary-foreground disabled:opacity-30 hover:opacity-90 transition-opacity"
>
<Send className="w-4 h-4" />
</button>
{isWaiting && onCancel ? (
<button
type="button"
onClick={onCancel}
className="p-2 rounded-lg bg-destructive text-destructive-foreground hover:opacity-90 transition-opacity"
>
<Square className="w-4 h-4" />
</button>
) : (
<button
type="submit"
disabled={!input.trim() || disabled}
className="p-2 rounded-lg bg-primary text-primary-foreground disabled:opacity-30 hover:opacity-90 transition-opacity"
>
<Send className="w-4 h-4" />
</button>
)}
</div>
</form>
</div>
+12
View File
@@ -864,6 +864,17 @@ export default function Workspace() {
}
}, [agentStates, activeWorker, markAllNodesAs, updateAgentState]);
const handleCancelQueen = useCallback(async () => {
const state = agentStates[activeWorker];
if (!state?.sessionId) return;
try {
await executionApi.cancelQueen(state.sessionId);
} catch {
// Best-effort — queen may have already finished
}
updateAgentState(activeWorker, { isTyping: false });
}, [agentStates, activeWorker, updateAgentState]);
// --- Node log helper (writes into agentStates) ---
const appendNodeLog = useCallback((agentType: string, nodeId: string, line: string) => {
setAgentStates((prev) => {
@@ -1584,6 +1595,7 @@ export default function Workspace() {
<ChatPanel
messages={activeSession.messages}
onSend={handleSend}
onCancel={handleCancelQueen}
activeThread={activeWorker}
isWaiting={activeAgentState?.isTyping ?? false}
awaitingInput={activeAgentState?.awaitingInput ?? false}
+1 -3
View File
@@ -122,9 +122,7 @@ async def test_codex_stream():
if isinstance(event, TextDeltaEvent):
text = event.snapshot
elif isinstance(event, ToolCallEvent):
tool_calls.append(
{"name": event.tool_name, "input": event.tool_input}
)
tool_calls.append({"name": event.tool_name, "input": event.tool_input})
print(f" ToolCall: {event.tool_name}({json.dumps(event.tool_input)})")
elif isinstance(event, FinishEvent):
print(
+10 -4
View File
@@ -6,8 +6,6 @@ Run: .venv/bin/python core/tests/test_codex_eventloop.py
import asyncio
import logging
import sys
from dataclasses import dataclass, field
from typing import Any
from unittest.mock import MagicMock
sys.path.insert(0, "core")
@@ -77,7 +75,9 @@ def make_context(
)
async def run_test(name: str, llm: LiteLLMProvider, system: str, output_keys: list[str]) -> NodeResult:
async def run_test(
name: str, llm: LiteLLMProvider, system: str, output_keys: list[str]
) -> NodeResult:
print(f"\n{'=' * 60}")
print(f"TEST: {name}")
print(f"{'=' * 60}")
@@ -95,6 +95,7 @@ async def run_test(name: str, llm: LiteLLMProvider, system: str, output_keys: li
except Exception as e:
print(f" EXCEPTION: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
return NodeResult(success=False, error=str(e))
@@ -137,7 +138,11 @@ async def main():
if isinstance(event, TextDeltaEvent):
text = event.snapshot
elif isinstance(event, FinishEvent):
print(f" Finish: stop={event.stop_reason} in={event.input_tokens} out={event.output_tokens}")
print(
f" Finish: stop={event.stop_reason}"
f" in={event.input_tokens}"
f" out={event.output_tokens}"
)
elif isinstance(event, StreamErrorEvent):
print(f" StreamError: {event.error} (recoverable={event.recoverable})")
elif isinstance(event, ToolCallEvent):
@@ -148,6 +153,7 @@ async def main():
except Exception as e:
print(f" EXCEPTION: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
print(f"\n{'=' * 60}")
-1
View File
@@ -37,7 +37,6 @@ class FailingLLMProvider(LLMProvider):
raise RuntimeError("LLM unavailable")
async def _build_conversation(*pairs: tuple[str, str]) -> NodeConversation:
"""Build a NodeConversation from (user, assistant) message pairs."""
conv = NodeConversation()
@@ -63,7 +63,6 @@ class MockStreamingLLM(LLMProvider):
return LLMResponse(content="Summary.", model="mock", stop_reason="stop")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
-1
View File
@@ -65,7 +65,6 @@ class MockStreamingLLM(LLMProvider):
return LLMResponse(content=self.complete_response, model="mock", stop_reason="stop")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
+26 -14
View File
@@ -69,7 +69,6 @@ class MockStreamingLLM(LLMProvider):
return LLMResponse(content="Summary of conversation.", model="mock", stop_reason="stop")
# ---------------------------------------------------------------------------
# Helper: build a simple text-only scenario
# ---------------------------------------------------------------------------
@@ -733,14 +732,16 @@ class TestClientFacingExpectingWork:
scenarios=[
# Turn 0: ask user what to do
tool_call_scenario(
"ask_user", {"question": "Continue or generate report?"},
"ask_user",
{"question": "Continue or generate report?"},
tool_use_id="ask_1",
),
# Turn 1: after user responds, LLM outputs text-only (lazy)
text_scenario("Understood, generating the report."),
# Turn 2: after judge RETRY, LLM sets output
tool_call_scenario(
"set_output", {"key": "decision", "value": "generate"},
"set_output",
{"key": "decision", "value": "generate"},
),
# Turn 3: accept
text_scenario("Done."),
@@ -781,7 +782,8 @@ class TestClientFacingExpectingWork:
scenarios=[
# Turn 0: ask user for domain
tool_call_scenario(
"ask_user", {"question": "What domain?"},
"ask_user",
{"question": "What domain?"},
tool_use_id="ask_1",
),
# Turn 1: after user input, outputs monitoring text
@@ -832,12 +834,14 @@ class TestClientFacingExpectingWork:
scenarios=[
# Turn 0: ask user
tool_call_scenario(
"ask_user", {"question": "Ready?"},
"ask_user",
{"question": "Ready?"},
tool_use_id="ask_1",
),
# Turn 1: after user responds, LLM does work (tool call)
tool_call_scenario(
"save_data", {"content": "report.html"},
"save_data",
{"content": "report.html"},
tool_use_id="tool_1",
),
# Turn 2: LLM presents results as text (no tools)
@@ -845,7 +849,8 @@ class TestClientFacingExpectingWork:
text_scenario("Here is your report. Need changes?"),
# Turn 3: after user responds, set output
tool_call_scenario(
"set_output", {"key": "status", "value": "complete"},
"set_output",
{"key": "status", "value": "complete"},
),
# Turn 4: done
text_scenario("All done."),
@@ -856,7 +861,10 @@ class TestClientFacingExpectingWork:
config=LoopConfig(max_iterations=10),
)
ctx = build_ctx(
runtime, spec, memory, llm,
runtime,
spec,
memory,
llm,
tools=[Tool(name="save_data", description="save", parameters={})],
)
@@ -898,12 +906,14 @@ class TestClientFacingExpectingWork:
scenarios=[
# Turn 0: ask user
tool_call_scenario(
"ask_user", {"question": "Generate?"},
"ask_user",
{"question": "Generate?"},
tool_use_id="ask_1",
),
# Turn 1: LLM calls tool but doesn't set output
tool_call_scenario(
"save_data", {"content": "report"},
"save_data",
{"content": "report"},
tool_use_id="tool_1",
),
# Turn 2: judge RETRY (missing "status"). LLM outputs text.
@@ -911,7 +921,8 @@ class TestClientFacingExpectingWork:
text_scenario("Report generated successfully."),
# Turn 3: after second RETRY, LLM finally sets output
tool_call_scenario(
"set_output", {"key": "status", "value": "done"},
"set_output",
{"key": "status", "value": "done"},
),
# Turn 4: accept
text_scenario("Complete."),
@@ -922,7 +933,10 @@ class TestClientFacingExpectingWork:
config=LoopConfig(max_iterations=10),
)
ctx = build_ctx(
runtime, spec, memory, llm,
runtime,
spec,
memory,
llm,
tools=[Tool(name="save_data", description="save", parameters={})],
)
@@ -1263,7 +1277,6 @@ class ErrorThenSuccessLLM(LLMProvider):
return LLMResponse(content="ok", model="mock", stop_reason="stop")
class TestTransientErrorRetry:
"""Test retry-with-backoff for transient LLM errors in EventLoopNode."""
@@ -1630,7 +1643,6 @@ class ToolRepeatLLM(LLMProvider):
)
class TestToolDoomLoopIntegration:
"""Integration tests for doom loop detection in execute().
-1
View File
@@ -53,7 +53,6 @@ class MockLLMProvider(LLMProvider):
)
# ============================================================================
# LLMJudge Tests - Custom Provider
# ============================================================================
+172 -156
View File
@@ -3,9 +3,7 @@
Run: uv run python core/tests/test_two_llm_calls.py
"""
import asyncio
import json
import os
import sys
sys.path.insert(0, "core")
@@ -100,13 +98,21 @@ SCAN_TOOLS = [
# The big one — takes 6 JSON-string params (whole scan results)
Tool(
name="set_output",
description="Set the output for this node. Call this when you are done. scan_results must be a JSON string containing the full consolidated results from all scans.",
description=(
"Set the output for this node. Call this when you are done."
" scan_results must be a JSON string containing the full"
" consolidated results from all scans."
),
parameters={
"type": "object",
"properties": {
"scan_results": {
"type": "string",
"description": "JSON string with consolidated scan results including ssl, headers, dns, ports, tech, and subdomain data.",
"description": (
"JSON string with consolidated scan results"
" including ssl, headers, dns, ports, tech,"
" and subdomain data."
),
},
},
"required": ["scan_results"],
@@ -116,86 +122,209 @@ SCAN_TOOLS = [
# Fake scan results — realistic size to stress-test argument streaming
FAKE_SSL_RESULT = {
"hostname": "example.com", "port": 443, "tls_version": "TLSv1.3",
"cipher": "TLS_AES_256_GCM_SHA384", "cipher_bits": 256,
"hostname": "example.com",
"port": 443,
"tls_version": "TLSv1.3",
"cipher": "TLS_AES_256_GCM_SHA384",
"cipher_bits": 256,
"certificate": {
"subject": "CN=example.com", "issuer": "CN=Let's Encrypt Authority X3",
"not_before": "2025-01-01T00:00:00Z", "not_after": "2026-01-01T00:00:00Z",
"days_until_expiry": 310, "san": ["example.com", "www.example.com"],
"self_signed": False, "sha256_fingerprint": "AB:CD:EF:12:34:56:78:90",
"subject": "CN=example.com",
"issuer": "CN=Let's Encrypt Authority X3",
"not_before": "2025-01-01T00:00:00Z",
"not_after": "2026-01-01T00:00:00Z",
"days_until_expiry": 310,
"san": ["example.com", "www.example.com"],
"self_signed": False,
"sha256_fingerprint": "AB:CD:EF:12:34:56:78:90",
},
"issues": [
{"severity": "low", "finding": "Certificate expiring in 310 days", "remediation": "Monitor expiry"},
{
"severity": "low",
"finding": "Certificate expiring in 310 days",
"remediation": "Monitor expiry",
},
],
"grade_input": {"tls_version_ok": True, "cert_valid": True, "cert_expiring_soon": False, "strong_cipher": True, "self_signed": False},
"grade_input": {
"tls_version_ok": True,
"cert_valid": True,
"cert_expiring_soon": False,
"strong_cipher": True,
"self_signed": False,
},
}
FAKE_HEADERS_RESULT = {
"url": "https://example.com", "status_code": 200,
"url": "https://example.com",
"status_code": 200,
"headers_present": ["Strict-Transport-Security", "X-Content-Type-Options"],
"headers_missing": [
{"header": "Content-Security-Policy", "severity": "high", "description": "No CSP header", "remediation": "Add CSP header"},
{"header": "X-Frame-Options", "severity": "medium", "description": "No X-Frame-Options", "remediation": "Add DENY or SAMEORIGIN"},
{"header": "Permissions-Policy", "severity": "low", "description": "No Permissions-Policy", "remediation": "Add Permissions-Policy"},
{
"header": "Content-Security-Policy",
"severity": "high",
"description": "No CSP header",
"remediation": "Add CSP header",
},
{
"header": "X-Frame-Options",
"severity": "medium",
"description": "No X-Frame-Options",
"remediation": "Add DENY or SAMEORIGIN",
},
{
"header": "Permissions-Policy",
"severity": "low",
"description": "No Permissions-Policy",
"remediation": "Add Permissions-Policy",
},
],
"leaky_headers": [
{"header": "Server", "value": "nginx/1.21.0", "severity": "low", "remediation": "Remove server version"},
{
"header": "Server",
"value": "nginx/1.21.0",
"severity": "low",
"remediation": "Remove server version",
},
],
"grade_input": {"hsts": True, "csp": False, "x_frame_options": False, "x_content_type_options": True, "referrer_policy": False, "permissions_policy": False, "no_leaky_headers": False},
"grade_input": {
"hsts": True,
"csp": False,
"x_frame_options": False,
"x_content_type_options": True,
"referrer_policy": False,
"permissions_policy": False,
"no_leaky_headers": False,
},
}
FAKE_DNS_RESULT = {
"domain": "example.com", "source": "crt.sh",
"spf": {"present": True, "record": "v=spf1 include:_spf.google.com ~all", "policy": "softfail", "issues": []},
"dmarc": {"present": True, "record": "v=DMARC1; p=reject; rua=mailto:dmarc@example.com", "policy": "reject", "issues": []},
"domain": "example.com",
"source": "crt.sh",
"spf": {
"present": True,
"record": "v=spf1 include:_spf.google.com ~all",
"policy": "softfail",
"issues": [],
},
"dmarc": {
"present": True,
"record": "v=DMARC1; p=reject; rua=mailto:dmarc@example.com",
"policy": "reject",
"issues": [],
},
"dkim": {"selectors_found": ["google", "default"], "selectors_missing": []},
"dnssec": {"enabled": False, "issues": [{"severity": "medium", "finding": "DNSSEC not enabled"}]},
"dnssec": {
"enabled": False,
"issues": [{"severity": "medium", "finding": "DNSSEC not enabled"}],
},
"mx_records": ["10 mail.example.com"],
"caa_records": ["0 issue letsencrypt.org"],
"zone_transfer": {"vulnerable": False},
"grade_input": {"spf_present": True, "spf_strict": False, "dmarc_present": True, "dmarc_enforcing": True, "dkim_found": True, "dnssec_enabled": False, "zone_transfer_blocked": True},
"grade_input": {
"spf_present": True,
"spf_strict": False,
"dmarc_present": True,
"dmarc_enforcing": True,
"dkim_found": True,
"dnssec_enabled": False,
"zone_transfer_blocked": True,
},
}
FAKE_PORTS_RESULT = {
"hostname": "example.com", "ip": "93.184.216.34", "ports_scanned": 20,
"hostname": "example.com",
"ip": "93.184.216.34",
"ports_scanned": 20,
"open_ports": [
{"port": 80, "service": "http", "banner": "nginx/1.21.0"},
{"port": 443, "service": "https", "banner": "nginx/1.21.0"},
{"port": 22, "service": "ssh", "banner": "OpenSSH_8.9", "severity": "medium", "finding": "SSH port open", "remediation": "Restrict SSH access"},
{
"port": 22,
"service": "ssh",
"banner": "OpenSSH_8.9",
"severity": "medium",
"finding": "SSH port open",
"remediation": "Restrict SSH access",
},
],
"closed_ports": [21, 23, 25, 53, 110, 143, 993, 995, 3306, 5432, 6379, 8080, 8443, 27017],
"grade_input": {"no_database_ports_exposed": True, "no_admin_ports_exposed": False, "no_legacy_ports_exposed": True, "only_web_ports": False},
"grade_input": {
"no_database_ports_exposed": True,
"no_admin_ports_exposed": False,
"no_legacy_ports_exposed": True,
"only_web_ports": False,
},
}
FAKE_TECH_RESULT = {
"url": "https://example.com",
"server": {"name": "nginx", "version": "1.21.0", "raw": "nginx/1.21.0"},
"framework": "React", "language": "JavaScript", "cms": None,
"framework": "React",
"language": "JavaScript",
"cms": None,
"javascript_libraries": ["react-18.2.0", "lodash-4.17.21", "axios-1.6.0"],
"cdn": "Cloudflare", "analytics": ["Google Analytics"],
"security_txt": True, "robots_txt": True,
"cdn": "Cloudflare",
"analytics": ["Google Analytics"],
"security_txt": True,
"robots_txt": True,
"interesting_paths": ["/admin", "/.env", "/api/docs"],
"cookies": [
{"name": "session", "secure": True, "httponly": True, "samesite": "Strict"},
{"name": "_ga", "secure": False, "httponly": False, "samesite": "None"},
],
"grade_input": {"server_version_hidden": False, "framework_version_hidden": True, "security_txt_present": True, "cookies_secure": False, "cookies_httponly": False},
"grade_input": {
"server_version_hidden": False,
"framework_version_hidden": True,
"security_txt_present": True,
"cookies_secure": False,
"cookies_httponly": False,
},
}
FAKE_SUBDOMAIN_RESULT = {
"domain": "example.com", "source": "crt.sh", "total_found": 8,
"subdomains": ["www.example.com", "mail.example.com", "api.example.com", "staging.example.com", "dev.example.com", "admin.example.com", "cdn.example.com", "blog.example.com"],
"interesting": [
{"subdomain": "staging.example.com", "reason": "staging environment exposed", "severity": "high", "remediation": "Restrict access"},
{"subdomain": "dev.example.com", "reason": "development environment exposed", "severity": "high", "remediation": "Restrict access"},
{"subdomain": "admin.example.com", "reason": "admin panel exposed", "severity": "medium", "remediation": "Add IP restriction"},
"domain": "example.com",
"source": "crt.sh",
"total_found": 8,
"subdomains": [
"www.example.com",
"mail.example.com",
"api.example.com",
"staging.example.com",
"dev.example.com",
"admin.example.com",
"cdn.example.com",
"blog.example.com",
],
"grade_input": {"no_dev_staging_exposed": False, "no_admin_exposed": False, "reasonable_surface_area": True},
"interesting": [
{
"subdomain": "staging.example.com",
"reason": "staging environment exposed",
"severity": "high",
"remediation": "Restrict access",
},
{
"subdomain": "dev.example.com",
"reason": "development environment exposed",
"severity": "high",
"remediation": "Restrict access",
},
{
"subdomain": "admin.example.com",
"reason": "admin panel exposed",
"severity": "medium",
"remediation": "Add IP restriction",
},
],
"grade_input": {
"no_dev_staging_exposed": False,
"no_admin_exposed": False,
"reasonable_surface_area": True,
},
}
def _make_codex_provider():
from framework.config import get_api_base, get_api_key, get_llm_extra_kwargs
api_key = get_api_key()
api_base = get_api_base()
extra_kwargs = get_llm_extra_kwargs()
@@ -219,7 +348,11 @@ async def _stream_and_collect(provider, messages, system, tools):
elif isinstance(event, ToolCallEvent):
tool_calls.append(event)
elif isinstance(event, FinishEvent):
print(f" finish: stop={event.stop_reason} in={event.input_tokens} out={event.output_tokens}")
print(
f" finish: stop={event.stop_reason}"
f" in={event.input_tokens}"
f" out={event.output_tokens}"
)
elif isinstance(event, StreamErrorEvent):
print(f" STREAM ERROR: {event.error}")
return text, tool_calls
@@ -256,122 +389,5 @@ def _validate_tool_args(tool_calls: list[ToolCallEvent]) -> bool:
return ok
async def test_codex_multi_tool_scan():
"""Reproduce the real agent flow: LLM calls 6 scan tools, then set_output with big JSON."""
provider = _make_codex_provider()
if not provider:
print("[scan] SKIP — no Codex subscription")
return
system = (
"You are a security scanning agent. You have access to scanning tools.\n"
"The user will give you scan results. Your job is to consolidate them and "
"call set_output with a JSON string containing ALL the scan results.\n"
"The scan_results value MUST be a valid JSON string containing every scan result provided.\n"
"Do NOT summarize — include the complete data from each scan."
)
# Provide all scan results as tool_result messages so the LLM has to
# consolidate them into one big set_output call.
all_results = {
"ssl": FAKE_SSL_RESULT,
"headers": FAKE_HEADERS_RESULT,
"dns": FAKE_DNS_RESULT,
"ports": FAKE_PORTS_RESULT,
"tech": FAKE_TECH_RESULT,
"subdomains": FAKE_SUBDOMAIN_RESULT,
}
results_json = json.dumps(all_results, indent=2)
print(f" Input scan data size: {len(results_json)} chars")
messages = [
{
"role": "user",
"content": (
"Here are the completed scan results for example.com. "
"Consolidate ALL of them into a single set_output call. "
"The scan_results argument must be a JSON string containing the complete data.\n\n"
f"```json\n{results_json}\n```"
),
},
]
# --- Turn 1: expect set_output tool call with big JSON ---
text, tool_calls = await _stream_and_collect(provider, messages, system, SCAN_TOOLS)
if text:
print(f" text: {text[:200]}{'...' if len(text) > 200 else ''}")
if not tool_calls:
print(" NO TOOL CALLS — expected set_output")
print(f" full text: {text}")
return
valid = _validate_tool_args(tool_calls)
print(f" RESULT: {'OK' if valid else 'TRUNCATED/MALFORMED'}")
async def test_codex_parallel_tool_calls():
"""Ask the LLM to call multiple scan tools at once — tests parallel tool call streaming."""
provider = _make_codex_provider()
if not provider:
print("[parallel] SKIP — no Codex subscription")
return
system = (
"You are a security scanning agent. When asked to scan a target, "
"call ALL relevant scanning tools in parallel in a single response. "
"Always call: ssl_tls_scan, http_headers_scan, dns_security_scan, "
"port_scan, tech_stack_detect, and subdomain_enumerate."
)
messages = [
{"role": "user", "content": "Run a full security scan on example.com"},
]
text, tool_calls = await _stream_and_collect(provider, messages, system, SCAN_TOOLS)
if text:
print(f" text: {text[:200]}{'...' if len(text) > 200 else ''}")
print(f" Total tool calls: {len(tool_calls)}")
valid = _validate_tool_args(tool_calls)
print(f" RESULT: {'OK' if valid else 'TRUNCATED/MALFORMED'}")
async def test_openai_baseline():
"""OpenAI direct — baseline to compare against."""
api_key = OPENAI_API_KEY or os.environ.get("OPENAI_API_KEY")
if not api_key:
print("[openai] SKIP — OPENAI_API_KEY not set")
return
provider = LiteLLMProvider(model="openai/gpt-4o-mini", api_key=api_key)
messages = [{"role": "user", "content": "What is 3+3? Reply with just the number."}]
response = await provider.acomplete(messages=messages, max_tokens=64)
print(f" Response: {response.content!r}")
print(f" tokens: in={response.input_tokens} out={response.output_tokens}")
print(f" RESULT: {'OK' if response.content else 'EMPTY'}")
async def main():
print("=" * 60)
print("Test 1: Codex — parallel tool calls (6 scan tools)")
print("=" * 60)
await test_codex_parallel_tool_calls()
print()
print("=" * 60)
print("Test 2: Codex — big set_output call (~4KB JSON arg)")
print("=" * 60)
await test_codex_multi_tool_scan()
print()
print("=" * 60)
print("Test 3: OpenAI direct — baseline")
print("=" * 60)
await test_openai_baseline()
print()
if __name__ == "__main__":
asyncio.run(main())
pass
-71
View File
@@ -1,71 +0,0 @@
# Plan: Set Up React Frontend for Hive
## Context
The `feat/open-hive` branch has a complete backend HTTP API (aiohttp on port 8787) with CRUD, execution control, sessions, SSE streaming, and more. The server already has SPA static-file serving built in — it looks for `frontend/dist/index.html` and serves it with a catch-all fallback. **No frontend exists yet.** The user has a Lovable.dev design they'll paste pages from later, so the scaffold must be Lovable-compatible (React 18, Vite, Tailwind, shadcn/ui, React Router).
The goal: create a deployable frontend shell with a typed API client layer, so the user can immediately start dropping in Lovable pages.
## Key Decisions
| Decision | Choice | Why |
|----------|--------|-----|
| Location | `core/frontend/` | Keeps frontend co-located with the Python framework inside `core/`. Requires a small tweak to app.py to add `core/frontend/dist` as a lookup candidate. |
| Build tool | **Vite** | SPA output, Lovable uses Vite, CRA deprecated, Next.js is overkill for SPA |
| Package manager | **npm** | Root `package.json` declares `npm@10.2.0` — stay consistent |
| Styling | **Tailwind CSS v4 + shadcn/ui** | Lovable generates these; shadcn copies source into project |
| Routing | **React Router** | Lovable uses it; SPA client-side routing matches backend catch-all |
| Dev proxy | Vite `server.proxy``:8787` | Avoids CORS issues, SSE EventSource works through proxy |
## Files to Create
```
core/frontend/
├── package.json
├── vite.config.ts
├── tsconfig.json
├── tsconfig.node.json
├── index.html
├── components.json # shadcn config (via npx shadcn@latest init)
├── src/
│ ├── main.tsx # React entry point
│ ├── App.tsx # Router shell
│ ├── index.css # Tailwind imports
│ ├── vite-env.d.ts # Vite type declarations
│ ├── lib/
│ │ └── utils.ts # shadcn cn() utility
│ ├── api/
│ │ ├── client.ts # Base fetch wrapper (/api prefix)
│ │ ├── types.ts # All TS types matching backend responses
│ │ ├── agents.ts # Agent CRUD endpoints
│ │ ├── execution.ts # Trigger, chat, inject, stop, resume
│ │ ├── sessions.ts # Sessions & checkpoints
│ │ ├── graphs.ts # Graph/node inspection
│ │ └── logs.ts # Log retrieval
│ ├── hooks/
│ │ └── use-sse.ts # SSE EventSource hook
│ └── pages/
│ └── index.tsx # Placeholder landing page
```
## Files to Modify
- `core/framework/server/app.py` — add `core/frontend/dist` as a static-file lookup candidate
- `package.json` — add `frontend:dev` and `frontend:build` convenience scripts
- `Makefile` — add `frontend-dev` and `frontend-build` targets
## Lovable Compatibility
When pasting Lovable pages later:
1. **Imports like `@/components/ui/button`** work via the `@` alias
2. **Run `npx shadcn@latest add <component>`** for each UI component a page needs
3. **Add routes** to `App.tsx` — Lovable pages export default React components
4. **If pages use `@tanstack/react-query`**, install it: `npm install @tanstack/react-query`
5. **Tailwind classes** work out of the box
## Verification
1. `cd core/frontend && npm run build` succeeds and produces `core/frontend/dist/index.html`
2. Start backend: `cd core && uv run python -m framework.runner.cli serve` — logs "Serving frontend from ..."
3. Open `http://localhost:8787` — placeholder page renders
4. Dev mode: `cd core/frontend && npm run dev` on `:5173`, API calls proxy to `:8787`
+1 -1
View File
@@ -318,7 +318,7 @@ echo ""
IMPORT_ERRORS=0
# Batch check all imports in single process (reduces subprocess spawning overhead)
CHECK_RESULT=$(uv run python scripts/check_requirements.py framework aden_tools litellm framework.mcp.agent_builder_server 2>&1)
CHECK_RESULT=$(uv run python scripts/check_requirements.py framework aden_tools litellm framework.mcp.agent_builder_server 2>/dev/null)
CHECK_EXIT=$?
# Parse and display results