feat: wire queen dms

This commit is contained in:
Timothy
2026-04-08 12:56:00 -07:00
parent cf1e26b012
commit 6e88bb0205
10 changed files with 250 additions and 58 deletions
@@ -401,7 +401,7 @@ async def select_queen(user_message: str, llm: LLMProvider) -> str:
response = await llm.acomplete(
messages=[{"role": "user", "content": user_message}],
system=_QUEEN_SELECTOR_SYSTEM_PROMPT,
max_tokens=64,
max_tokens=2048,
json_mode=True,
)
raw = response.content.strip()
+6 -1
View File
@@ -307,7 +307,12 @@ async def create_queen(
async def _queen_identity_hook(ctx: HookContext) -> HookResult | None:
ensure_default_queens()
trigger = ctx.trigger or ""
queen_id = await select_queen(trigger, _session_llm)
# If the session was pre-bound to a queen (user clicked a specific
# queen in the UI), use that identity instead of LLM auto-selection.
if session.queen_name and session.queen_name != "default":
queen_id = session.queen_name
else:
queen_id = await select_queen(trigger, _session_llm)
try:
profile = load_queen_profile(queen_id)
except FileNotFoundError:
+115 -3
View File
@@ -1,10 +1,12 @@
"""Queen identity profile routes.
- GET /api/queen/profiles list all queen profiles (id, name, title)
- GET /api/queen/{queen_id}/profile get full queen profile
- PATCH /api/queen/{queen_id}/profile update queen profile fields
- GET /api/queen/profiles -- list all queen profiles (id, name, title)
- GET /api/queen/{queen_id}/profile -- get full queen profile
- PATCH /api/queen/{queen_id}/profile -- update queen profile fields
- POST /api/queen/{queen_id}/session -- get or create a persistent session for a queen
"""
import json
import logging
from aiohttp import web
@@ -15,6 +17,7 @@ from framework.agents.queen.queen_profiles import (
load_queen_profile,
update_queen_profile,
)
from framework.config import QUEENS_DIR
logger = logging.getLogger(__name__)
@@ -53,8 +56,117 @@ async def handle_update_profile(request: web.Request) -> web.Response:
return web.json_response({"id": queen_id, **updated})
async def handle_queen_session(request: web.Request) -> web.Response:
"""POST /api/queen/{queen_id}/session -- get or create a persistent session.
If this queen already has a live session, return it.
If not, find the most recent cold session and resume it.
If no session exists at all, create a fresh one.
The session is bound to this queen identity -- ``session.queen_name``
is set so storage routes to ``~/.hive/agents/queens/{queen_id}/sessions/``.
"""
from framework.server.session_manager import SessionManager
queen_id = request.match_info["queen_id"]
manager: SessionManager = request.app["manager"]
ensure_default_queens()
try:
load_queen_profile(queen_id)
except FileNotFoundError:
return web.json_response({"error": f"Queen '{queen_id}' not found"}, status=404)
body = await request.json() if request.can_read_body else {}
initial_prompt = body.get("initial_prompt")
# 1. Check for an existing live session bound to this queen.
# Stop any live sessions bound to a *different* queen so only one
# queen is active at a time.
other_sessions: list[str] = []
for session in manager.list_sessions():
if session.queen_name == queen_id:
return web.json_response({
"session_id": session.id,
"queen_id": queen_id,
"status": "live",
})
other_sessions.append(session.id)
for sid in other_sessions:
try:
await manager.stop_session(sid)
except Exception:
logger.debug("Failed to stop session %s during queen switch", sid)
# 2. Find the most recent cold session for this queen and resume it
queen_sessions_dir = QUEENS_DIR / queen_id / "sessions"
resume_from: str | None = None
if queen_sessions_dir.exists():
try:
candidates = sorted(
(d for d in queen_sessions_dir.iterdir() if d.is_dir()),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if candidates:
resume_from = candidates[0].name
except OSError:
pass
# 3. Create (or resume) the session, pre-bound to this queen
if resume_from:
# Check if the cold session had a worker loaded
meta_path = queen_sessions_dir / resume_from / "meta.json"
agent_path = None
if meta_path.exists():
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
agent_path = meta.get("agent_path")
except (json.JSONDecodeError, OSError):
pass
if agent_path:
try:
from framework.server.app import validate_agent_path
agent_path = str(validate_agent_path(agent_path))
session = await manager.create_session_with_worker_graph(
agent_path,
queen_resume_from=resume_from,
initial_prompt=initial_prompt,
queen_name=queen_id,
)
except Exception:
session = await manager.create_session(
queen_resume_from=resume_from,
initial_prompt=initial_prompt,
queen_name=queen_id,
)
else:
session = await manager.create_session(
queen_resume_from=resume_from,
initial_prompt=initial_prompt,
queen_name=queen_id,
)
status = "resumed"
else:
session = await manager.create_session(
initial_prompt=initial_prompt,
queen_name=queen_id,
)
status = "created"
return web.json_response({
"session_id": session.id,
"queen_id": queen_id,
"status": status,
})
def register_routes(app: web.Application) -> None:
"""Register queen profile routes."""
app.router.add_get("/api/queen/profiles", handle_list_profiles)
app.router.add_get("/api/queen/{queen_id}/profile", handle_get_profile)
app.router.add_patch("/api/queen/{queen_id}/profile", handle_update_profile)
app.router.add_post("/api/queen/{queen_id}/session", handle_queen_session)
+10 -1
View File
@@ -177,17 +177,23 @@ class SessionManager:
model: str | None = None,
initial_prompt: str | None = None,
queen_resume_from: str | None = None,
queen_name: str | None = None,
) -> Session:
"""Create a new session with a queen but no worker.
When ``queen_resume_from`` is set the queen writes conversation messages
to that existing session's directory instead of creating a new one.
This preserves full conversation history across server restarts.
When ``queen_name`` is set the session is pre-bound to that queen
identity, skipping LLM auto-selection in the identity hook.
"""
# Reuse the original session ID when cold-restoring
resolved_session_id = queen_resume_from or session_id
session = await self._create_session_core(session_id=resolved_session_id, model=model)
session.queen_resume_from = queen_resume_from
if queen_name:
session.queen_name = queen_name
# Start queen immediately (queen-only, no worker tools yet)
await self._start_queen(session, worker_identity=None, initial_prompt=initial_prompt)
@@ -207,6 +213,7 @@ class SessionManager:
model: str | None = None,
initial_prompt: str | None = None,
queen_resume_from: str | None = None,
queen_name: str | None = None,
) -> Session:
"""Create a session and load a worker in one step.
@@ -251,7 +258,9 @@ class SessionManager:
model=model,
)
session.queen_resume_from = queen_resume_from
if _resume_queen_id:
if queen_name:
session.queen_name = queen_name
elif _resume_queen_id:
session.queen_name = _resume_queen_id
try:
# Load the graph FIRST (before queen) so queen gets full tools
+4 -1
View File
@@ -164,7 +164,10 @@ class SkillsManager:
self._protocols_prompt = "" # all skills use progressive disclosure now
if catalog_prompt:
logger.warning("Skill system produced empty protocols_prompt")
logger.info(
"Skill system ready: catalog=%d chars",
len(catalog_prompt),
)
# ------------------------------------------------------------------
# Hot-reload: watch skill directories for SKILL.md changes.
+39
View File
@@ -0,0 +1,39 @@
import { api } from "./client";
export interface QueenProfile {
id: string;
name: string;
title: string;
summary?: string;
experience?: Array<{ role: string; details: string }>;
skills?: string;
signature_achievement?: string;
}
export interface QueenSessionResult {
session_id: string;
queen_id: string;
status: "live" | "resumed" | "created";
}
export const queensApi = {
/** List all queen profiles (id, name, title). */
list: () =>
api.get<{ queens: Array<{ id: string; name: string; title: string }> }>(
"/queen/profiles",
),
/** Get full profile for a queen. */
getProfile: (queenId: string) =>
api.get<QueenProfile>(`/queen/${queenId}/profile`),
/** Update queen profile fields (partial update). */
updateProfile: (queenId: string, updates: Partial<QueenProfile>) =>
api.patch<QueenProfile>(`/queen/${queenId}/profile`, updates),
/** Get or create a persistent session for a queen. */
getOrCreateSession: (queenId: string, initialPrompt?: string) =>
api.post<QueenSessionResult>(`/queen/${queenId}/session`, {
initial_prompt: initialPrompt,
}),
};
+4
View File
@@ -16,6 +16,10 @@ export interface LiveSession {
queen_phase?: "planning" | "building" | "staging" | "running";
/** Whether the queen's LLM supports image content in messages */
queen_supports_images?: boolean;
/** Selected queen identity ID (e.g. "queen_technology") */
queen_id?: string | null;
/** Selected queen display name (e.g. "Alexandra") */
queen_name?: string | null;
/** Present in 409 conflict responses when worker is still loading */
loading?: boolean;
}
+18 -14
View File
@@ -8,14 +8,14 @@ import {
type ReactNode,
} from "react";
import type { Colony, QueenBee, UserProfile } from "@/types/colony";
import type { DiscoverEntry } from "@/api/types";
import type { DiscoverEntry, LiveSession } from "@/api/types";
import { agentsApi } from "@/api/agents";
import { sessionsApi } from "@/api/sessions";
import { queensApi } from "@/api/queens";
import {
agentSlug,
slugToColonyId,
slugToDisplayName,
getQueenForAgent,
} from "@/lib/colony-registry";
// ── localStorage keys ────────────────────────────────────────────────────────
@@ -118,9 +118,10 @@ export function ColonyProvider({ children }: { children: ReactNode }) {
// Only called on mount, visibility change, and after create/delete.
const fetchColonies = useCallback(async () => {
try {
const [discoverResult, sessionsResult] = await Promise.all([
const [discoverResult, sessionsResult, queenProfilesResult] = await Promise.all([
agentsApi.discover(),
sessionsApi.list().catch(() => ({ sessions: [] as { session_id: string; agent_path: string }[] })),
sessionsApi.list().catch(() => ({ sessions: [] as LiveSession[] })),
queensApi.list().catch(() => ({ queens: [] as { id: string; name: string; title: string }[] })),
]);
// Skip "Framework" agents — those are internal to the hive runtime
@@ -156,16 +157,19 @@ export function ColonyProvider({ children }: { children: ReactNode }) {
};
});
const newQueens: QueenBee[] = newColonies.map((colony) => {
const qi = getQueenForAgent(colony.queenId);
return {
id: colony.queenId,
name: qi.name,
role: qi.role,
colonyId: colony.id,
status: colony.status === "running" ? "online" : "offline",
};
});
// Build queens from backend profiles (not derived from colonies)
const liveQueenIds = new Set(
sessionsResult.sessions
.filter((s) => s.queen_id)
.map((s) => s.queen_id as string),
);
const newQueens: QueenBee[] = queenProfilesResult.queens.map((qp) => ({
id: qp.id,
name: qp.name,
role: qp.title,
status: liveQueenIds.has(qp.id) ? "online" : "offline",
}));
setColonies(newColonies);
setQueens(newQueens);
+51 -36
View File
@@ -4,8 +4,9 @@ import { Loader2 } from "lucide-react";
import ChatPanel, { type ChatMessage, type ImageContent } from "@/components/ChatPanel";
import { executionApi } from "@/api/execution";
import { sessionsApi } from "@/api/sessions";
import { queensApi } from "@/api/queens";
import { useMultiSSE } from "@/hooks/use-sse";
import type { LiveSession, AgentEvent } from "@/api/types";
import type { AgentEvent } from "@/api/types";
import { sseEventToChatMessage } from "@/lib/chat-helpers";
import { useColony } from "@/context/ColonyContext";
import { getQueenForAgent } from "@/lib/colony-registry";
@@ -31,56 +32,70 @@ export default function QueenDM() {
const turnCounterRef = useRef(0);
const queenIterTextRef = useRef<Record<string, Record<number, string>>>({});
const loadingRef = useRef(false);
// Create queen-only session
// Switch queen session when queenId changes
useEffect(() => {
if (!queenId || loadingRef.current) return;
loadingRef.current = true;
setLoading(true);
setMessages([]);
if (!queenId) return;
// Immediately reset UI for the new queen
setSessionId(null);
setMessages([]);
setQueenReady(false);
setLoading(true);
setIsTyping(false);
setIsStreaming(false);
setPendingQuestion(null);
setPendingOptions(null);
setAwaitingInput(false);
turnCounterRef.current = 0;
queenIterTextRef.current = {};
let cancelled = false;
(async () => {
try {
// Check for existing queen-only sessions
const { sessions: allLive } = await sessionsApi.list();
let session: LiveSession | undefined = allLive.find(
(s) => !s.has_worker && !s.agent_path,
);
const result = await queensApi.getOrCreateSession(queenId);
if (cancelled) return;
if (!session) {
session = await sessionsApi.create(undefined, undefined, undefined, undefined, undefined);
}
setSessionId(session.session_id);
setLoading(false);
const sid = result.session_id;
setSessionId(sid);
setQueenReady(true);
// Show typing indicator while the queen initializes (identity hook + first turn)
setIsTyping(true);
// Restore messages
try {
const { events } = await sessionsApi.eventsHistory(session.session_id);
const restored: ChatMessage[] = [];
for (const evt of events) {
const msg = sseEventToChatMessage(evt, "queen-dm", queenName);
if (!msg) continue;
if (evt.stream_id === "queen") msg.role = "queen";
restored.push(msg);
// Restore messages from history
if (result.status === "live" || result.status === "resumed") {
try {
const { events } = await sessionsApi.eventsHistory(sid);
if (cancelled) return;
const restored: ChatMessage[] = [];
for (const evt of events) {
const msg = sseEventToChatMessage(evt, "queen-dm", queenName);
if (!msg) continue;
if (evt.stream_id === "queen") msg.role = "queen";
restored.push(msg);
}
if (restored.length > 0) {
restored.sort((a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0));
if (!cancelled) {
setMessages(restored);
setIsTyping(false);
}
}
} catch {
// No history
}
if (restored.length > 0) {
restored.sort((a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0));
setMessages(restored);
}
} catch {
// No history
}
} catch (err) {
setLoading(false);
} catch {
// Session creation failed
} finally {
loadingRef.current = false;
if (!cancelled) setLoading(false);
}
})();
return () => {
cancelled = true;
};
}, [queenId, queenName]);
// SSE handler
+2 -1
View File
@@ -18,7 +18,8 @@ export interface QueenBee {
id: string;
name: string;
role: string;
colonyId: string;
/** Colony this queen is currently managing (if any). */
colonyId?: string;
status: "online" | "offline";
}