Merge branch 'feat/queen-profile' into feature/hive-experimental-comp-pipeline

This commit is contained in:
Timothy
2026-04-08 12:07:21 -07:00
11 changed files with 602 additions and 194 deletions
+9 -4
View File
@@ -52,13 +52,18 @@ def _get_last_active(agent_path: Path) -> str | None:
except Exception:
continue
# 2. Queen sessions
# 2. Queen sessions (scan all queen identity directories)
from framework.config import QUEENS_DIR
queen_sessions_dir = QUEENS_DIR / "default" / "sessions"
if queen_sessions_dir.exists():
if QUEENS_DIR.exists():
resolved = agent_path.resolve()
for d in queen_sessions_dir.iterdir():
for queen_dir in QUEENS_DIR.iterdir():
if not queen_dir.is_dir():
continue
sessions_dir = queen_dir / "sessions"
if not sessions_dir.exists():
continue
for d in sessions_dir.iterdir():
if not d.is_dir():
continue
meta_file = d / "meta.json"
@@ -1,138 +0,0 @@
"""Queen thinking hook — persona + communication style classifier.
Fires once when the queen enters building mode at session start.
Makes a single non-streaming LLM call (acting as an HR Director) to select
the best-fit expert persona for the user's request AND classify the user's
communication style, then returns a PersonaResult containing both.
This is designed to activate the model's latent domain expertise — a CFO
persona on a financial question, a Lawyer on a legal question, etc. while
also adapting the Queen's communication approach to the individual user.
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from framework.llm.provider import LLMProvider
logger = logging.getLogger(__name__)
_HR_SYSTEM_PROMPT = """\
You are an expert HR Director and communication consultant at a world-class firm.
A new request has arrived. You must:
1. Identify which professional role best serves this request.
2. Read the user's signals to determine HOW to communicate with them.
For communication style, look for:
- Technical depth: Do they use precise terms? Do they ask "how" or "what"?
- Pace: Short messages = fast and direct. Long explanations = exploratory.
- Tone: Are they casual ("hey, can you...") or formal ("I need a system that...")?
If cross-session memory is provided, factor in what is already known about this \
person don't rediscover what's already understood.
Reply with ONLY a valid JSON object no markdown, no prose, no explanation:
{"role": "<job title>", "persona": "<2-3 sentence first-person identity statement>", \
"style": "<one of: peer-technical, mentor-guiding, consultant-structured>"}
Rules:
- Choose from any real professional role: CFO, CEO, CTO, Lawyer, Data Scientist,
Product Manager, Security Engineer, DevOps Engineer, Software Architect,
HR Director, Marketing Director, Business Analyst, UX Designer,
Financial Analyst, Operations Director, Legal Counsel, etc.
- The persona statement must be written in first person ("I am..." or "I have...").
- Select the role whose domain knowledge most directly applies to solving the request.
- If the request is clearly about coding or building software systems, pick Software Architect.
- "Queen" is your internal alias do not include it in the persona.
- For style: "peer-technical" for users who demonstrate domain expertise, \
"mentor-guiding" for users who are learning or exploring, \
"consultant-structured" for users who want structured, accountable delivery.
- Default to "peer-technical" if signals are ambiguous.
"""
# Communication style directives injected into the Queen's system prompt.
_STYLE_DIRECTIVES: dict[str, str] = {
"peer-technical": (
"## Communication Style: Peer\n\n"
"This person is technical. Use precise language, skip high-level "
"overviews they already know, and get into specifics quickly. "
"When they push back on a design choice, engage with the technical "
"argument directly."
),
"mentor-guiding": (
"## Communication Style: Guide\n\n"
"This person is learning or exploring. Explain your reasoning as you "
"go — not patronizingly, but so they can follow the logic. When you "
"make a design choice, briefly say why. Offer to go deeper on anything."
),
"consultant-structured": (
"## Communication Style: Structured\n\n"
"This person wants structured, accountable delivery. Lead with "
"summaries and options. Number your proposals. Be explicit about "
"trade-offs. Avoid open-ended questions — give them choices to react to."
),
}
@dataclass
class PersonaResult:
"""Result of persona + style classification."""
persona_prefix: str # e.g. "You are a CFO. I am a CFO with 20 years..."
style_directive: str # e.g. "## Communication Style: Peer\n\n..."
async def select_expert_persona(
user_message: str,
llm: LLMProvider,
*,
memory_context: str = "",
) -> PersonaResult | None:
"""Run the HR classifier and return a PersonaResult.
Makes a single non-streaming acomplete() call with the session LLM.
Returns None on any failure so the queen falls back gracefully to its
default character with no style directive.
Args:
user_message: The user's opening message for the session.
llm: The session LLM provider.
memory_context: Optional cross-session memory to inform style classification.
Returns:
A PersonaResult with persona_prefix and style_directive, or None on failure.
"""
if not user_message.strip():
return None
prompt = user_message
if memory_context:
prompt = f"{user_message}\n\n{memory_context}"
try:
response = await llm.acomplete(
messages=[{"role": "user", "content": prompt}],
system=_HR_SYSTEM_PROMPT,
max_tokens=1024,
json_mode=True,
)
raw = response.content.strip()
parsed = json.loads(raw)
role = parsed.get("role", "").strip()
persona = parsed.get("persona", "").strip()
style_key = parsed.get("style", "peer-technical").strip()
if not role or not persona:
logger.warning("Thinking hook: empty role/persona in response: %r", raw)
return None
persona_prefix = f"You are a {role}. {persona}"
style_directive = _STYLE_DIRECTIVES.get(style_key, _STYLE_DIRECTIVES["peer-technical"])
logger.info("Thinking hook: selected persona — %s, style — %s", role, style_key)
return PersonaResult(persona_prefix=persona_prefix, style_directive=style_directive)
except Exception:
logger.warning("Thinking hook: persona classification failed", exc_info=True)
return None
@@ -0,0 +1,417 @@
"""Queen identity profiles -- static queen personas stored as YAML files.
Each queen has a unique identity (Head of Technology, Head of Growth, etc.)
stored in ``~/.hive/agents/queens/{queen_id}/profile.yaml``. Profiles are
initialized with rich defaults and can be edited via the API.
At session start, a lightweight LLM classifier selects the best-matching
queen for the user's request, and the profile is injected into the system
prompt.
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any
import yaml
from framework.config import QUEENS_DIR
if TYPE_CHECKING:
from framework.llm.provider import LLMProvider
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Default queen profiles
# ---------------------------------------------------------------------------
DEFAULT_QUEENS: dict[str, dict[str, Any]] = {
"queen_technology": {
"name": "Alexandra",
"title": "Head of Technology",
"summary": (
"Technical leader with 12+ years building scalable systems for "
"startups from 0 to $50M ARR. Expert in translating business ideas "
"into robust, cost-efficient architectures."
),
"experience": [
{
"role": "Head of Technology — Multiple Startups (2020Present)",
"details": [
"Led architecture for 5 startups (2 successful exits)",
"Reduced cloud costs by 40% through infrastructure redesign",
"Hired and managed teams of 1025 engineers",
],
},
{
"role": "VP Engineering — SaaSCo (20172020)",
"details": [
"Scaled platform to 1M+ users",
"Migrated monolith to microservices",
],
},
],
"skills": "System design, cloud infrastructure, hiring, DevOps, scalability",
"signature_achievement": "Built MVP and scaled to 500k users without major rewrite",
},
"queen_growth": {
"name": "Marcus",
"title": "Head of Growth",
"summary": (
"Growth strategist who has taken three B2B SaaS products from "
"launch to $10M+ ARR. Deep expertise in acquisition funnels, "
"retention loops, and data-driven experimentation."
),
"experience": [
{
"role": "Head of Growth — ScaleUp Inc. (2021Present)",
"details": [
"Grew MRR from $200k to $2.5M in 18 months",
"Built growth team of 8 across acquisition, activation, and retention",
"Designed referral program generating 30% of new signups",
],
},
{
"role": "Senior Growth Manager — RapidLaunch (20182021)",
"details": [
"Led product-led growth strategy reaching 500k free users",
"Reduced CAC by 55% through organic channel optimization",
],
},
],
"skills": "Growth modeling, A/B testing, funnel optimization, PLG strategy, analytics",
"signature_achievement": "Built self-serve acquisition engine that drove 70% of revenue with zero sales team",
},
"queen_product_strategy": {
"name": "Sophia",
"title": "Head of Product Strategy",
"summary": (
"Product leader with a track record of defining and executing "
"product vision for early-stage startups. Bridges user research, "
"business strategy, and engineering to ship products people love."
),
"experience": [
{
"role": "Head of Product — NovaTech (2020Present)",
"details": [
"Defined product roadmap that drove 3x user growth in one year",
"Introduced OKR framework aligning product, engineering, and sales",
"Led discovery sprints that identified $5M untapped market segment",
],
},
{
"role": "Senior Product Manager — BuildFast (20172020)",
"details": [
"Shipped 12 major features with 95% on-time delivery",
"Grew NPS from 32 to 67 through systematic user feedback loops",
],
},
],
"skills": "Product roadmapping, user research, prioritization frameworks, go-to-market strategy",
"signature_achievement": "Pivoted failing product into market leader within 9 months by redefining ICP and value proposition",
},
"queen_finance_fundraising": {
"name": "Daniel",
"title": "Head of Finance & Fundraising",
"summary": (
"Finance executive who has raised over $150M across seed to Series C "
"rounds. Expert in financial modeling, unit economics, and investor "
"relations for high-growth startups."
),
"experience": [
{
"role": "Head of Finance — VentureScale (2019Present)",
"details": [
"Led $45M Series B raise at 12x revenue multiple",
"Built financial planning infrastructure from scratch",
"Reduced burn rate by 25% while maintaining growth trajectory",
],
},
{
"role": "Finance Director — FinBridge Capital (20162019)",
"details": [
"Advised 20+ startups on fundraising strategy and cap table management",
"Structured convertible notes and SAFEs for early-stage companies",
],
},
],
"skills": "Financial modeling, fundraising strategy, investor relations, cap table management, unit economics",
"signature_achievement": "Closed oversubscribed Series A in 3 weeks with 40+ inbound investor inquiries",
},
"queen_legal": {
"name": "Catherine",
"title": "Head of Legal",
"summary": (
"Startup legal counsel with deep expertise in corporate governance, "
"IP protection, and regulatory compliance. Has guided 15+ startups "
"through incorporations, funding rounds, and exits."
),
"experience": [
{
"role": "General Counsel — TechLegal Partners (2019Present)",
"details": [
"Structured legal frameworks for startups across 5 jurisdictions",
"Negotiated $200M+ in commercial contracts",
"Managed IP portfolio of 30+ patents and trademarks",
],
},
{
"role": "Corporate Attorney — Whitfield & Associates (20152019)",
"details": [
"Led due diligence for 12 M&A transactions",
"Drafted and negotiated term sheets for Series A through C rounds",
],
},
],
"skills": "Corporate law, IP protection, contract negotiation, regulatory compliance, employment law",
"signature_achievement": "Saved client $3M by identifying and resolving IP ownership dispute before Series B close",
},
"queen_brand_design": {
"name": "Elena",
"title": "Head of Brand & Design",
"summary": (
"Creative director who builds brand identities that drive business "
"results. Expert in translating startup vision into cohesive visual "
"systems, messaging frameworks, and user experiences."
),
"experience": [
{
"role": "Head of Brand & Design — StudioPulse (2020Present)",
"details": [
"Built brand identity for 10+ funded startups from zero",
"Designed design systems adopted by engineering teams of 20+",
"Led rebrand that increased conversion rate by 35%",
],
},
{
"role": "Senior Design Lead — CreativeForge (20172020)",
"details": [
"Managed team of 6 designers across brand, product, and marketing",
"Established design ops practice reducing design-to-dev handoff time by 60%",
],
},
],
"skills": "Brand strategy, visual identity, design systems, UX design, creative direction",
"signature_achievement": "Created brand identity for pre-launch startup that became recognizable industry name within 6 months",
},
"queen_talent": {
"name": "James",
"title": "Head of Talent",
"summary": (
"People leader who has built high-performing teams from founding "
"stage to 200+ employees. Expert in recruiting strategy, culture "
"building, and organizational design for fast-scaling startups."
),
"experience": [
{
"role": "Head of Talent — HyperGrowth Labs (2020Present)",
"details": [
"Scaled team from 15 to 180 in 18 months with 92% retention",
"Built recruiting engine processing 5,000+ candidates per quarter",
"Designed compensation framework competitive across 12 markets",
],
},
{
"role": "Senior Recruiter — TalentBridge (20172020)",
"details": [
"Placed 100+ engineering and leadership hires at Series AC startups",
"Reduced average time-to-hire from 45 to 22 days",
],
},
],
"skills": "Recruiting strategy, organizational design, culture building, compensation planning, employer branding",
"signature_achievement": "Built engineering team of 50 in 6 months with zero external agency spend",
},
"queen_operations": {
"name": "Rachel",
"title": "Head of Operations",
"summary": (
"Operations leader who builds the systems that let startups scale "
"without chaos. Expert in process design, vendor management, and "
"cross-functional coordination."
),
"experience": [
{
"role": "Head of Operations — OptiFlow (2020Present)",
"details": [
"Designed operational playbooks supporting 10x revenue growth",
"Managed $8M annual vendor budget with 20% cost reduction",
"Built cross-functional workflows connecting sales, product, and support",
],
},
{
"role": "Operations Manager — StreamLine Co. (20172020)",
"details": [
"Automated 40% of manual operational processes",
"Led office expansion across 3 new markets",
],
},
],
"skills": "Process optimization, vendor management, cross-functional coordination, project management, systems thinking",
"signature_achievement": "Built operational infrastructure that supported 5x team growth with zero additional ops hires",
},
}
# ---------------------------------------------------------------------------
# Profile CRUD
# ---------------------------------------------------------------------------
def ensure_default_queens() -> None:
"""Create default queen profiles on disk if they don't already exist.
Safe to call multiple times skips any profile that already has a file.
"""
for queen_id, profile in DEFAULT_QUEENS.items():
queen_dir = QUEENS_DIR / queen_id
profile_path = queen_dir / "profile.yaml"
if profile_path.exists():
continue
queen_dir.mkdir(parents=True, exist_ok=True)
profile_path.write_text(yaml.safe_dump(profile, sort_keys=False, allow_unicode=True))
logger.info("Queen profiles ensured at %s", QUEENS_DIR)
def list_queens() -> list[dict[str, str]]:
"""Return a summary list of all queen profiles on disk."""
results: list[dict[str, str]] = []
if not QUEENS_DIR.is_dir():
return results
for profile_path in sorted(QUEENS_DIR.glob("*/profile.yaml")):
queen_id = profile_path.parent.name
try:
data = yaml.safe_load(profile_path.read_text())
results.append({
"id": queen_id,
"name": data.get("name", ""),
"title": data.get("title", ""),
})
except Exception:
logger.warning("Failed to read queen profile %s", profile_path)
return results
def load_queen_profile(queen_id: str) -> dict[str, Any]:
"""Load and return a queen's full profile.
Raises FileNotFoundError if the profile doesn't exist.
"""
profile_path = QUEENS_DIR / queen_id / "profile.yaml"
if not profile_path.exists():
raise FileNotFoundError(f"Queen profile not found: {queen_id}")
data = yaml.safe_load(profile_path.read_text())
return data
def update_queen_profile(queen_id: str, updates: dict[str, Any]) -> dict[str, Any]:
"""Merge partial updates into an existing queen profile and persist.
Returns the full updated profile.
Raises FileNotFoundError if the profile doesn't exist.
"""
profile_path = QUEENS_DIR / queen_id / "profile.yaml"
if not profile_path.exists():
raise FileNotFoundError(f"Queen profile not found: {queen_id}")
data = yaml.safe_load(profile_path.read_text())
data.update(updates)
profile_path.write_text(yaml.safe_dump(data, sort_keys=False, allow_unicode=True))
return data
# ---------------------------------------------------------------------------
# Prompt formatting
# ---------------------------------------------------------------------------
def format_queen_identity_prompt(profile: dict[str, Any]) -> str:
"""Convert a queen profile dict into a system prompt identity section."""
parts = [
f"# Your Identity\n\n"
f"You are {profile.get('name', 'the Queen')}, {profile.get('title', 'Senior Advisor')}.\n\n"
f"{profile.get('summary', '')}"
]
experience = profile.get("experience")
if experience:
lines = ["\n\n## Experience"]
for entry in experience:
role = entry.get("role", "")
details = entry.get("details", [])
lines.append(f"\n- **{role}**")
for detail in details:
lines.append(f" - {detail}")
parts.append("\n".join(lines))
skills = profile.get("skills")
if skills:
parts.append(f"\n\n## Core Skills\n\n{skills}")
achievement = profile.get("signature_achievement")
if achievement:
parts.append(f"\n\n## Signature Achievement\n\n{achievement}")
return "".join(parts)
# ---------------------------------------------------------------------------
# Queen selection (lightweight LLM classifier)
# ---------------------------------------------------------------------------
_QUEEN_SELECTOR_SYSTEM_PROMPT = """\
You are a routing classifier. Given a user's request, select the single best-matching \
queen identity from the list below.
Queens:
- queen_technology: Technical architecture, software engineering, infrastructure, DevOps, system design
- queen_growth: User acquisition, retention, growth experiments, PLG, marketing funnels, analytics
- queen_product_strategy: Product vision, roadmapping, user research, feature prioritization, go-to-market
- queen_finance_fundraising: Financial modeling, fundraising, investor relations, cap tables, unit economics, budgeting
- queen_legal: Contracts, IP, compliance, corporate governance, employment law, regulatory matters
- queen_brand_design: Brand identity, visual design, UX, design systems, creative direction, messaging
- queen_talent: Hiring, recruiting, team building, culture, compensation, organizational design
- queen_operations: Process optimization, vendor management, cross-functional coordination, project management
Reply with ONLY a valid JSON object no markdown, no prose:
{"queen_id": "<one of the IDs above>"}
Rules:
- Pick the queen whose domain most directly applies to the user's request.
- If the request is about building software, coding, or technical systems, pick queen_technology.
- If the request spans multiple domains, pick the one most central to the ask.
- If truly ambiguous, default to queen_technology.
"""
_DEFAULT_QUEEN_ID = "queen_technology"
async def select_queen(user_message: str, llm: LLMProvider) -> str:
"""Classify a user message into the best-matching queen ID.
Makes a single non-streaming LLM call. Returns the queen_id string.
Falls back to head-of-technology on any failure.
"""
if not user_message.strip():
return _DEFAULT_QUEEN_ID
try:
response = await llm.acomplete(
messages=[{"role": "user", "content": user_message}],
system=_QUEEN_SELECTOR_SYSTEM_PROMPT,
max_tokens=64,
json_mode=True,
)
raw = response.content.strip()
parsed = json.loads(raw)
queen_id = parsed.get("queen_id", "").strip()
if queen_id not in DEFAULT_QUEENS:
logger.warning("Queen selector returned unknown ID %r, falling back", queen_id)
return _DEFAULT_QUEEN_ID
logger.info("Queen selector: selected %s for request", queen_id)
return queen_id
except Exception:
logger.warning("Queen selection failed, falling back to %s", _DEFAULT_QUEEN_ID, exc_info=True)
return _DEFAULT_QUEEN_ID
+2 -2
View File
@@ -148,8 +148,8 @@ class EventType(StrEnum):
# Queen phase changes (building <-> staging <-> running)
QUEEN_PHASE_CHANGED = "queen_phase_changed"
# Queen thinking hook — persona selected for the current building session
QUEEN_PERSONA_SELECTED = "queen_persona_selected"
# Queen identity — which queen profile was selected for this session
QUEEN_IDENTITY_SELECTED = "queen_identity_selected"
# Subagent reports (one-way progress updates from sub-agents)
SUBAGENT_REPORT = "subagent_report"
+2
View File
@@ -259,6 +259,7 @@ def create_app(model: str | None = None) -> web.Application:
from framework.server.routes_execution import register_routes as register_execution_routes
from framework.server.routes_graphs import register_routes as register_graph_routes
from framework.server.routes_logs import register_routes as register_log_routes
from framework.server.routes_queens import register_routes as register_queen_routes
from framework.server.routes_sessions import register_routes as register_session_routes
register_credential_routes(app)
@@ -267,6 +268,7 @@ def create_app(model: str | None = None) -> web.Application:
register_session_routes(app)
register_graph_routes(app)
register_log_routes(app)
register_queen_routes(app)
# Static file serving — Option C production mode
# If frontend/dist/ exists, serve built frontend files on /
+28 -11
View File
@@ -64,7 +64,12 @@ async def create_queen(
_queen_tools_staging,
_shared_building_knowledge,
)
from framework.agents.queen.nodes.thinking_hook import select_expert_persona
from framework.agents.queen.queen_profiles import (
ensure_default_queens,
format_queen_identity_prompt,
load_queen_profile,
select_queen,
)
from framework.agent_loop.agent_loop import HookContext, HookResult
from framework.loader.mcp_registry import MCPRegistry
from framework.loader.tool_registry import ToolRegistry
@@ -271,7 +276,7 @@ async def create_queen(
except Exception:
logger.debug("Queen skill loading failed (non-fatal)", exc_info=True)
# ---- Persona hook ------------------------------------------------
# ---- Queen identity hook -----------------------------------------
_session_llm = session.llm
_session_event_bus = session.event_bus
@@ -299,20 +304,32 @@ async def create_queen(
filter_stream="queen",
)
async def _persona_hook(ctx: HookContext) -> HookResult | None:
async def _queen_identity_hook(ctx: HookContext) -> HookResult | None:
ensure_default_queens()
trigger = ctx.trigger or ""
result = await select_expert_persona(trigger, _session_llm, memory_context="")
if not result:
queen_id = await select_queen(trigger, _session_llm)
try:
profile = load_queen_profile(queen_id)
except FileNotFoundError:
logger.warning("Queen profile %s not found after selection", queen_id)
return None
# Store on phase_state so persona/style persist across dynamic prompt refreshes
phase_state.persona_prefix = result.persona_prefix
phase_state.style_directive = result.style_directive
identity_prompt = format_queen_identity_prompt(profile)
# Store on phase_state so identity persists across dynamic prompt refreshes
phase_state.queen_id = queen_id
phase_state.queen_profile = profile
phase_state.queen_identity_prompt = identity_prompt
# Route session storage to ~/.hive/agents/queens/{queen_id}/sessions/
session.queen_name = queen_id
if _session_event_bus is not None:
await _session_event_bus.publish(
AgentEvent(
type=EventType.QUEEN_PERSONA_SELECTED,
type=EventType.QUEEN_IDENTITY_SELECTED,
stream_id="queen",
data={"persona": result.persona_prefix},
data={
"queen_id": queen_id,
"name": profile.get("name", ""),
"title": profile.get("title", ""),
},
)
)
@@ -351,7 +368,7 @@ async def create_queen(
adjusted_node = _orig_node.model_copy(update=node_updates)
_queen_loop_config = {
**_base_loop_config,
"hooks": {"session_start": [_persona_hook]},
"hooks": {"session_start": [_queen_identity_hook]},
}
# ---- Queen event loop (AgentLoop directly, no Orchestrator) -------
+60
View File
@@ -0,0 +1,60 @@
"""Queen identity profile routes.
- GET /api/queen/profiles list all queen profiles (id, name, title)
- GET /api/queen/{queen_id}/profile get full queen profile
- PATCH /api/queen/{queen_id}/profile update queen profile fields
"""
import logging
from aiohttp import web
from framework.agents.queen.queen_profiles import (
ensure_default_queens,
list_queens,
load_queen_profile,
update_queen_profile,
)
logger = logging.getLogger(__name__)
async def handle_list_profiles(request: web.Request) -> web.Response:
"""GET /api/queen/profiles — list all queen profiles."""
ensure_default_queens()
queens = list_queens()
return web.json_response({"queens": queens})
async def handle_get_profile(request: web.Request) -> web.Response:
"""GET /api/queen/{queen_id}/profile — get full queen profile."""
queen_id = request.match_info["queen_id"]
ensure_default_queens()
try:
profile = load_queen_profile(queen_id)
except FileNotFoundError:
return web.json_response({"error": f"Queen '{queen_id}' not found"}, status=404)
return web.json_response({"id": queen_id, **profile})
async def handle_update_profile(request: web.Request) -> web.Response:
"""PATCH /api/queen/{queen_id}/profile — update queen profile fields."""
queen_id = request.match_info["queen_id"]
try:
body = await request.json()
except Exception:
return web.json_response({"error": "Invalid JSON body"}, status=400)
if not isinstance(body, dict):
return web.json_response({"error": "Body must be a JSON object"}, status=400)
try:
updated = update_queen_profile(queen_id, body)
except FileNotFoundError:
return web.json_response({"error": f"Queen '{queen_id}' not found"}, status=404)
return web.json_response({"id": queen_id, **updated})
def register_routes(app: web.Application) -> None:
"""Register queen profile routes."""
app.router.add_get("/api/queen/profiles", handle_list_profiles)
app.router.add_get("/api/queen/{queen_id}/profile", handle_get_profile)
app.router.add_patch("/api/queen/{queen_id}/profile", handle_update_profile)
+11 -6
View File
@@ -63,6 +63,8 @@ def _session_to_live_dict(session) -> dict:
if phase_state
else ("staging" if session.graph_runtime else "planning"),
"queen_supports_images": supports_image_tool_results(queen_model) if queen_model else True,
"queen_id": getattr(phase_state, "queen_id", None) if phase_state else None,
"queen_name": (phase_state.queen_profile or {}).get("name") if phase_state else None,
}
@@ -583,9 +585,9 @@ async def handle_session_events_history(request: web.Request) -> web.Response:
"""
session_id = request.match_info["session_id"]
from framework.server.session_manager import _queen_session_dir
from framework.server.session_manager import _find_queen_session_dir
queen_dir = _queen_session_dir(session_id)
queen_dir = _find_queen_session_dir(session_id)
events_path = queen_dir / "events.jsonl"
if not events_path.exists():
return web.json_response({"events": [], "session_id": session_id})
@@ -647,9 +649,9 @@ async def handle_delete_history_session(request: web.Request) -> web.Response:
await manager.stop_session(session_id)
# Delete the queen session directory from disk
from framework.server.session_manager import _queen_session_dir
from framework.server.session_manager import _find_queen_session_dir
queen_session_dir = _queen_session_dir(session_id)
queen_session_dir = _find_queen_session_dir(session_id)
if queen_session_dir.exists() and queen_session_dir.is_dir():
try:
shutil.rmtree(queen_session_dir)
@@ -701,9 +703,12 @@ async def handle_reveal_session_folder(request: web.Request) -> web.Response:
session = manager.get_session(session_id)
storage_session_id = (session.queen_resume_from or session.id) if session else session_id
if session:
from framework.server.session_manager import _queen_session_dir
folder = _queen_session_dir(storage_session_id)
folder = _queen_session_dir(storage_session_id, session.queen_name)
else:
from framework.server.session_manager import _find_queen_session_dir
folder = _find_queen_session_dir(storage_session_id)
folder.mkdir(parents=True, exist_ok=True)
try:
+42 -12
View File
@@ -30,6 +30,18 @@ def _queen_session_dir(session_id: str, queen_name: str = "default") -> Path:
return QUEENS_DIR / queen_name / "sessions" / session_id
def _find_queen_session_dir(session_id: str) -> Path:
"""Search all queen directories for a session. Falls back to default."""
if QUEENS_DIR.exists():
for queen_dir in QUEENS_DIR.iterdir():
if not queen_dir.is_dir():
continue
candidate = queen_dir / "sessions" / session_id
if candidate.exists():
return candidate
return _queen_session_dir(session_id)
@dataclass
class Session:
"""A live session with a queen and optional worker."""
@@ -210,13 +222,15 @@ class SessionManager:
# When cold-restoring, check meta.json for the phase — if the agent
# was still being built we must NOT try to load the worker (the code
# is incomplete and will fail to import).
_resume_queen_id: str | None = None
if queen_resume_from:
_resume_phase = None
_meta_path = _queen_session_dir(queen_resume_from) / "meta.json"
_meta_path = _find_queen_session_dir(queen_resume_from) / "meta.json"
if _meta_path.exists():
try:
_meta = json.loads(_meta_path.read_text(encoding="utf-8"))
_resume_phase = _meta.get("phase")
_resume_queen_id = _meta.get("queen_id")
except (json.JSONDecodeError, OSError):
pass
if _resume_phase in ("building", "planning"):
@@ -237,6 +251,8 @@ class SessionManager:
model=model,
)
session.queen_resume_from = queen_resume_from
if _resume_queen_id:
session.queen_name = _resume_queen_id
try:
# Load the graph FIRST (before queen) so queen gets full tools
await self._load_worker_core(
@@ -802,7 +818,10 @@ class SessionManager:
_existing_meta = json.loads(_meta_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
pass
_new_meta: dict = {"created_at": time.time()}
_new_meta: dict = {
"created_at": time.time(),
"queen_id": session.queen_name,
}
if _agent_name is not None:
_new_meta["agent_name"] = _agent_name
if session.worker_path is not None:
@@ -1094,7 +1113,7 @@ class SessionManager:
~/.hive/agents/queens/{name}/sessions/{session_id}/conversations/. Returns None when
no data is found so callers can fall through to a 404.
"""
queen_dir = _queen_session_dir(session_id)
queen_dir = _find_queen_session_dir(session_id)
convs_dir = queen_dir / "conversations"
if not convs_dir.exists():
return None
@@ -1149,21 +1168,28 @@ class SessionManager:
@staticmethod
def list_cold_sessions() -> list[dict]:
"""Return metadata for every queen session directory on disk, newest first."""
queen_sessions_dir = QUEENS_DIR / "default" / "sessions"
if not queen_sessions_dir.exists():
if not QUEENS_DIR.exists():
return []
results: list[dict] = []
# Collect session dirs from all queen identities
all_session_dirs: list[Path] = []
try:
entries = sorted(
queen_sessions_dir.iterdir(),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
for queen_dir in QUEENS_DIR.iterdir():
if not queen_dir.is_dir():
continue
sessions_dir = queen_dir / "sessions"
if sessions_dir.exists():
for d in sessions_dir.iterdir():
if d.is_dir():
all_session_dirs.append(d)
except OSError:
return []
for d in entries:
# Sort all sessions by mtime, newest first
all_session_dirs.sort(key=lambda p: p.stat().st_mtime, reverse=True)
results: list[dict] = []
for d in all_session_dirs:
if not d.is_dir():
continue
try:
@@ -1238,6 +1264,9 @@ class SessionManager:
except OSError:
pass
# Derive queen_id from directory structure: queens/{queen_id}/sessions/{session_id}
queen_id = d.parent.parent.name if d.parent.name == "sessions" else None
results.append(
{
"session_id": d.name,
@@ -1249,6 +1278,7 @@ class SessionManager:
"agent_path": agent_path,
"last_message": last_message,
"message_count": message_count,
"queen_id": queen_id,
}
)
@@ -128,10 +128,11 @@ class QueenPhaseState:
# Community skills catalog (XML) — appended after protocols
skills_catalog_prompt: str = ""
# Persona and communication style (set once at session start by persona hook,
# persisted here so they survive dynamic prompt refreshes across iterations).
persona_prefix: str = "" # e.g. "You are a CFO. I am a CFO with 20 years..."
style_directive: str = "" # e.g. "## Communication Style: Peer\n\n..."
# Queen identity (set once at session start by queen identity hook,
# persisted here so it survives dynamic prompt refreshes across iterations).
queen_id: str | None = None
queen_profile: dict | None = None
queen_identity_prompt: str = ""
# Cached global recall block — populated async by recall_selector after each turn.
_cached_global_recall_block: str = ""
@@ -164,11 +165,9 @@ class QueenPhaseState:
base = self.prompt_building
parts = []
if self.persona_prefix:
parts.append(self.persona_prefix)
if self.queen_identity_prompt:
parts.append(self.queen_identity_prompt)
parts.append(base)
if self.style_directive:
parts.append(self.style_directive)
if self.skills_catalog_prompt:
parts.append(self.skills_catalog_prompt)
if self.protocols_prompt:
+11
View File
@@ -2078,6 +2078,17 @@ fi
echo ""
# ============================================================
# Initialize Queen Profiles
# ============================================================
echo -n " ⬡ queen profiles... "
if uv run python -c "from framework.agents.queen.queen_profiles import ensure_default_queens; ensure_default_queens()" > /dev/null 2>&1; then
echo -e "${GREEN}ok${NC}"
else
echo -e "${YELLOW}skipped${NC} ${DIM}(non-fatal)${NC}"
fi
# ============================================================
# Success!
# ============================================================