refactor(hive): home hive dir structure

This commit is contained in:
Timothy
2026-04-07 19:21:16 -07:00
parent 6b86c602c7
commit b3759db83b
11 changed files with 287 additions and 42 deletions
+19 -5
View File
@@ -27,8 +27,8 @@ def _get_last_active(agent_path: Path) -> str | None:
"""Return the most recent updated_at timestamp across all sessions.
Checks both worker sessions (``~/.hive/agents/{name}/sessions/``) and
queen sessions (``~/.hive/queen/session/``) whose ``meta.json`` references
the same *agent_path*.
queen sessions (``~/.hive/agents/queens/default/sessions/``) whose
``meta.json`` references the same *agent_path*.
"""
from datetime import datetime
@@ -53,7 +53,9 @@ def _get_last_active(agent_path: Path) -> str | None:
continue
# 2. Queen sessions
queen_sessions_dir = Path.home() / ".hive" / "queen" / "session"
from framework.config import QUEENS_DIR
queen_sessions_dir = QUEENS_DIR / "default" / "sessions"
if queen_sessions_dir.exists():
resolved = agent_path.resolve()
for d in queen_sessions_dir.iterdir():
@@ -163,13 +165,20 @@ def discover_agents() -> dict[str, list[AgentEntry]]:
_is_valid_agent_dir,
)
from framework.config import COLONIES_DIR
groups: dict[str, list[AgentEntry]] = {}
sources = [
("Your Agents", Path("exports")),
("Your Agents", COLONIES_DIR),
("Your Agents", Path("exports")), # compat fallback
("Framework", _get_framework_agents_dir()),
("Examples", Path("examples/templates")),
]
# Track seen agent directory names to avoid duplicates when the same
# agent exists in both colonies/ and exports/ (colonies takes priority).
_seen_agent_names: set[str] = set()
for category, base_dir in sources:
if not base_dir.exists():
continue
@@ -177,6 +186,9 @@ def discover_agents() -> dict[str, list[AgentEntry]]:
for path in sorted(base_dir.iterdir(), key=lambda p: p.name):
if not _is_valid_agent_dir(path):
continue
if path.name in _seen_agent_names:
continue
_seen_agent_names.add(path.name)
name, desc = _extract_python_agent_metadata(path)
config_fallback_name = path.name.replace("_", " ").title()
@@ -215,6 +227,8 @@ def discover_agents() -> dict[str, list[AgentEntry]]:
)
)
if entries:
groups[category] = entries
existing = groups.get(category, [])
existing.extend(entries)
groups[category] = existing
return groups
@@ -450,11 +450,11 @@ When a user says "my agent is failing" or "debug this agent":
**You should only reach this step after the user has approved the draft design \
and you have called `confirm_and_build(agent_name="my_agent")`.**
`confirm_and_build` created the agent directory at `exports/{name}/`. \
`confirm_and_build` created the agent directory (returned in agent_path). \
Now write the complete agent config directly:
```
write_file("exports/{name}/agent.json", <complete JSON config>)
write_file("<colony_path>/agent.json", <complete JSON config>)
```
The agent.json must include ALL of these in one write:
@@ -516,7 +516,7 @@ validation, tests) and returns a consolidated result. If anything \
fails: read the error, fix with read_file+write_file, re-validate. Up to 3x.
When validation passes, immediately call \
`load_built_agent("exports/{name}")` to load the agent into the \
`load_built_agent("<agent_path>")` to load the agent into the \
session. This switches to STAGING phase and shows the graph in the \
visualizer. Do NOT wait for user input between validation and loading.
"""
+28 -7
View File
@@ -1,9 +1,15 @@
"""Queen global memory helpers.
Global memory lives in ``~/.hive/queen/global_memory/`` and stores durable
cross-session knowledge about the user (profile, preferences, environment,
feedback). Each memory is an individual ``.md`` file with optional YAML
frontmatter (name, type, description).
Memory hierarchy::
~/.hive/memories/
global/ # shared across all queens and colonies
colonies/{name}/ # colony-scoped memories
agents/queens/{name}/ # queen-specific memories
agents/{name}/ # per-worker-agent memories
Each memory is an individual ``.md`` file with optional YAML frontmatter
(name, type, description).
"""
from __future__ import annotations
@@ -21,7 +27,7 @@ logger = logging.getLogger(__name__)
GLOBAL_MEMORY_CATEGORIES: tuple[str, ...] = ("profile", "preference", "environment", "feedback")
_HIVE_QUEEN_DIR = Path.home() / ".hive" / "queen"
from framework.config import MEMORIES_DIR
MAX_FILES: int = 200
MAX_FILE_SIZE_BYTES: int = 4096 # 4 KB hard limit per memory file
@@ -31,8 +37,23 @@ _HEADER_LINE_LIMIT: int = 30
def global_memory_dir() -> Path:
"""Return the queen-global memory directory."""
return _HIVE_QUEEN_DIR / "global_memory"
"""Return the global memory directory (shared across all queens/colonies)."""
return MEMORIES_DIR / "global"
def colony_memory_dir(colony_name: str) -> Path:
"""Return the memory directory for a named colony."""
return MEMORIES_DIR / "colonies" / colony_name
def queen_memory_dir(queen_name: str = "default") -> Path:
"""Return the memory directory for a named queen."""
return MEMORIES_DIR / "agents" / "queens" / queen_name
def agent_memory_dir(agent_name: str) -> Path:
"""Return the memory directory for a worker agent."""
return MEMORIES_DIR / "agents" / agent_name
# ---------------------------------------------------------------------------
@@ -2,7 +2,7 @@
A lightweight side agent that runs after each queen LLM turn. It inspects
recent conversation messages and extracts durable user knowledge into
individual memory files in ``~/.hive/queen/global_memory/``.
individual memory files in ``~/.hive/memories/global/``.
Two reflection types:
- **Short reflection**: after conversational queen turns. Distills
+35 -1
View File
@@ -14,11 +14,45 @@ from typing import Any
from framework.orchestrator.edge import DEFAULT_MAX_TOKENS
# ---------------------------------------------------------------------------
# Hive home directory structure
# ---------------------------------------------------------------------------
HIVE_HOME = Path.home() / ".hive"
QUEENS_DIR = HIVE_HOME / "agents" / "queens"
COLONIES_DIR = HIVE_HOME / "colonies"
MEMORIES_DIR = HIVE_HOME / "memories"
def queen_dir(queen_name: str = "default") -> Path:
"""Return the storage directory for a named queen agent."""
return QUEENS_DIR / queen_name
def colony_dir(colony_name: str) -> Path:
"""Return the directory for a named colony."""
return COLONIES_DIR / colony_name
def memory_dir(scope: str, name: str | None = None) -> Path:
"""Return memory dir for a scope.
Examples::
memory_dir("global") -> ~/.hive/memories/global
memory_dir("colonies", "my_agent") -> ~/.hive/memories/colonies/my_agent
memory_dir("agents/queens", "default")-> ~/.hive/memories/agents/queens/default
memory_dir("agents", "worker_name") -> ~/.hive/memories/agents/worker_name
"""
base = MEMORIES_DIR / scope
return base / name if name else base
# ---------------------------------------------------------------------------
# Low-level config file access
# ---------------------------------------------------------------------------
HIVE_CONFIG_FILE = Path.home() / ".hive" / "configuration.json"
HIVE_CONFIG_FILE = HIVE_HOME / "configuration.json"
# Hive LLM router endpoint (Anthropic-compatible).
# litellm's Anthropic handler appends /v1/messages, so this is just the base host.
+6 -2
View File
@@ -28,8 +28,11 @@ def _get_allowed_agent_roots() -> tuple[Path, ...]:
"""
global _ALLOWED_AGENT_ROOTS
if _ALLOWED_AGENT_ROOTS is None:
from framework.config import COLONIES_DIR
_ALLOWED_AGENT_ROOTS = (
(_REPO_ROOT / "exports").resolve(),
COLONIES_DIR.resolve(), # ~/.hive/colonies/
(_REPO_ROOT / "exports").resolve(), # compat fallback
(_REPO_ROOT / "examples").resolve(),
(Path.home() / ".hive" / "agents").resolve(),
)
@@ -53,7 +56,8 @@ def validate_agent_path(agent_path: str | Path) -> Path:
if resolved.is_relative_to(root) and resolved != root:
return resolved
raise ValueError(
"agent_path must be inside an allowed directory (exports/, examples/, or ~/.hive/agents/)"
"agent_path must be inside an allowed directory "
"(~/.hive/colonies/, exports/, examples/, or ~/.hive/agents/)"
)
+1 -1
View File
@@ -345,7 +345,7 @@ async def create_queen(
if set(available_tools) != set(declared_tools):
missing = sorted(set(declared_tools) - registered_tool_names)
if missing:
logger.warning("Queen: tools not available: %s", missing)
logger.debug("Queen: tools not yet available (registered on worker load): %s", missing)
node_updates["tools"] = available_tools
adjusted_node = _orig_node.model_copy(update=node_updates)
+11 -5
View File
@@ -583,7 +583,9 @@ async def handle_session_events_history(request: web.Request) -> web.Response:
"""
session_id = request.match_info["session_id"]
queen_dir = Path.home() / ".hive" / "queen" / "session" / session_id
from framework.server.session_manager import _queen_session_dir
queen_dir = _queen_session_dir(session_id)
events_path = queen_dir / "events.jsonl"
if not events_path.exists():
return web.json_response({"events": [], "session_id": session_id})
@@ -608,7 +610,7 @@ async def handle_session_events_history(request: web.Request) -> web.Response:
async def handle_session_history(request: web.Request) -> web.Response:
"""GET /api/sessions/history — all queen sessions on disk (live + cold).
Returns every session directory under ~/.hive/queen/session/, newest first.
Returns every queen session directory on disk, newest first.
Live sessions have ``live: true, cold: false``; sessions that survived a
server restart have ``live: false, cold: true``.
"""
@@ -634,7 +636,7 @@ async def handle_delete_history_session(request: web.Request) -> web.Response:
"""DELETE /api/sessions/history/{session_id} — permanently remove a session.
Stops the live session (if still running) and deletes the queen session
directory from disk at ~/.hive/queen/session/{session_id}/.
directory from disk.
This is the frontend 'delete from history' action.
"""
manager = _get_manager(request)
@@ -645,7 +647,9 @@ async def handle_delete_history_session(request: web.Request) -> web.Response:
await manager.stop_session(session_id)
# Delete the queen session directory from disk
queen_session_dir = Path.home() / ".hive" / "queen" / "session" / session_id
from framework.server.session_manager import _queen_session_dir
queen_session_dir = _queen_session_dir(session_id)
if queen_session_dir.exists() and queen_session_dir.is_dir():
try:
shutil.rmtree(queen_session_dir)
@@ -697,7 +701,9 @@ async def handle_reveal_session_folder(request: web.Request) -> web.Response:
session = manager.get_session(session_id)
storage_session_id = (session.queen_resume_from or session.id) if session else session_id
folder = Path.home() / ".hive" / "queen" / "session" / storage_session_id
from framework.server.session_manager import _queen_session_dir
folder = _queen_session_dir(storage_session_id)
folder.mkdir(parents=True, exist_ok=True)
try:
+24 -10
View File
@@ -19,11 +19,17 @@ from datetime import datetime
from pathlib import Path
from typing import Any
from framework.config import QUEENS_DIR
from framework.host.triggers import TriggerDefinition
logger = logging.getLogger(__name__)
def _queen_session_dir(session_id: str, queen_name: str = "default") -> Path:
"""Return the on-disk directory for a queen session."""
return QUEENS_DIR / queen_name / "sessions" / session_id
@dataclass
class Session:
"""A live session with a queen and optional worker."""
@@ -67,6 +73,10 @@ class Session:
queen_resume_from: str | None = None
# Queen session directory (set during _start_queen, used for shutdown reflection)
queen_dir: Path | None = None
# Multi-queen support: which queen profile this session uses
queen_name: str = "default"
# Colony name: set when a worker is loaded from a colony
colony_name: str | None = None
class SessionManager:
@@ -86,6 +96,14 @@ class SessionManager:
# reflections) so they aren't garbage-collected before completion.
self._background_tasks: set[asyncio.Task] = set()
# Run one-time v2 directory structure migration
from framework.storage.migrate_v2 import run_migration
try:
run_migration()
except Exception:
logger.warning("v2 migration failed (non-fatal)", exc_info=True)
# ------------------------------------------------------------------
# Session lifecycle
# ------------------------------------------------------------------
@@ -194,9 +212,7 @@ class SessionManager:
# is incomplete and will fail to import).
if queen_resume_from:
_resume_phase = None
_meta_path = (
Path.home() / ".hive" / "queen" / "session" / queen_resume_from / "meta.json"
)
_meta_path = _queen_session_dir(queen_resume_from) / "meta.json"
if _meta_path.exists():
try:
_meta = json.loads(_meta_path.read_text(encoding="utf-8"))
@@ -536,7 +552,7 @@ class SessionManager:
# Update meta.json so cold-restore can discover this session by agent_path
storage_session_id = session.queen_resume_from or session.id
meta_path = Path.home() / ".hive" / "queen" / "session" / storage_session_id / "meta.json"
meta_path = _queen_session_dir(storage_session_id, session.queen_name) / "meta.json"
try:
_agent_name = (
session.worker_info.name
@@ -755,13 +771,11 @@ class SessionManager:
session.queen_executor,
)
hive_home = Path.home() / ".hive"
# Determine which session directory to use for queen storage.
# When queen_resume_from is set we write to the ORIGINAL session's
# directory so that all messages accumulate in one place.
storage_session_id = session.queen_resume_from or session.id
queen_dir = hive_home / "queen" / "session" / storage_session_id
queen_dir = _queen_session_dir(storage_session_id, session.queen_name)
queen_dir.mkdir(parents=True, exist_ok=True)
session.queen_dir = queen_dir
@@ -1076,10 +1090,10 @@ class SessionManager:
"""Return disk metadata for a session that is no longer live in memory.
Checks whether queen conversation files exist at
~/.hive/queen/session/{session_id}/conversations/. Returns None when
~/.hive/agents/queens/{name}/sessions/{session_id}/conversations/. Returns None when
no data is found so callers can fall through to a 404.
"""
queen_dir = Path.home() / ".hive" / "queen" / "session" / session_id
queen_dir = _queen_session_dir(session_id)
convs_dir = queen_dir / "conversations"
if not convs_dir.exists():
return None
@@ -1134,7 +1148,7 @@ class SessionManager:
@staticmethod
def list_cold_sessions() -> list[dict]:
"""Return metadata for every queen session directory on disk, newest first."""
queen_sessions_dir = Path.home() / ".hive" / "queen" / "session"
queen_sessions_dir = QUEENS_DIR / "default" / "sessions"
if not queen_sessions_dir.exists():
return []
+145
View File
@@ -0,0 +1,145 @@
"""One-time migration to the v2 ~/.hive/ directory structure.
Moves:
- exports/{name}/ -> ~/.hive/colonies/{name}/
- ~/.hive/queen/session/{id}/ -> ~/.hive/agents/queens/default/sessions/{id}/
- ~/.hive/queen/global_memory/ -> ~/.hive/memories/global/
Runs automatically on first startup when the marker file is absent.
Safe to re-run (skips already-migrated items).
"""
from __future__ import annotations
import json
import logging
import shutil
from pathlib import Path
from framework.config import COLONIES_DIR, HIVE_HOME, MEMORIES_DIR, QUEENS_DIR
logger = logging.getLogger(__name__)
_MIGRATION_MARKER = HIVE_HOME / ".migrated-v2"
def needs_migration() -> bool:
"""Return True if the v2 migration has not yet run."""
return not _MIGRATION_MARKER.exists()
def run_migration(*, exports_dir: Path | None = None) -> None:
"""Run the full v2 migration. Idempotent and safe to re-run."""
if not needs_migration():
return
logger.info("migrate_v2: starting ~/.hive structure migration")
_migrate_colonies(exports_dir or Path("exports"))
_migrate_queen_sessions()
_migrate_memories()
_cleanup_old_queen_dir()
# Write marker
HIVE_HOME.mkdir(parents=True, exist_ok=True)
_MIGRATION_MARKER.write_text("1\n", encoding="utf-8")
logger.info("migrate_v2: migration complete")
def _migrate_colonies(exports_dir: Path) -> None:
"""Copy exports/{name}/ -> ~/.hive/colonies/{name}/."""
if not exports_dir.exists():
return
COLONIES_DIR.mkdir(parents=True, exist_ok=True)
migrated = 0
for agent_dir in sorted(exports_dir.iterdir()):
if not agent_dir.is_dir() or agent_dir.name.startswith("."):
continue
target = COLONIES_DIR / agent_dir.name
if target.exists():
continue
try:
shutil.copytree(agent_dir, target)
migrated += 1
except OSError:
logger.warning("migrate_v2: failed to copy %s", agent_dir, exc_info=True)
if migrated:
logger.info("migrate_v2: copied %d agent(s) from exports/ to colonies/", migrated)
def _migrate_queen_sessions() -> None:
"""Move ~/.hive/queen/session/{id}/ -> ~/.hive/agents/queens/default/sessions/{id}/."""
old_sessions = HIVE_HOME / "queen" / "session"
if not old_sessions.exists():
return
new_sessions = QUEENS_DIR / "default" / "sessions"
new_sessions.mkdir(parents=True, exist_ok=True)
migrated = 0
for session_dir in sorted(old_sessions.iterdir()):
if not session_dir.is_dir():
continue
target = new_sessions / session_dir.name
if target.exists():
continue
try:
session_dir.rename(target)
migrated += 1
except OSError:
logger.warning(
"migrate_v2: failed to move session %s", session_dir, exc_info=True
)
if migrated:
logger.info("migrate_v2: moved %d queen session(s) to new path", migrated)
def _migrate_memories() -> None:
"""Move ~/.hive/queen/global_memory/ -> ~/.hive/memories/global/."""
old_global = HIVE_HOME / "queen" / "global_memory"
if not old_global.exists():
return
new_global = MEMORIES_DIR / "global"
if new_global.exists():
# Already has content -- merge individual files
merged = 0
for f in old_global.iterdir():
if f.is_file() and not (new_global / f.name).exists():
try:
shutil.copy2(f, new_global / f.name)
merged += 1
except OSError:
pass
if merged:
logger.info("migrate_v2: merged %d memory file(s) into global/", merged)
return
new_global.mkdir(parents=True, exist_ok=True)
migrated = 0
for f in old_global.iterdir():
if f.is_file():
try:
shutil.copy2(f, new_global / f.name)
migrated += 1
except OSError:
pass
if migrated:
logger.info("migrate_v2: copied %d memory file(s) to memories/global/", migrated)
def _cleanup_old_queen_dir() -> None:
"""Remove ~/.hive/queen/ after all content has been migrated."""
old_queen = HIVE_HOME / "queen"
if not old_queen.exists():
return
try:
shutil.rmtree(old_queen)
logger.info("migrate_v2: removed old ~/.hive/queen/ directory")
except OSError:
logger.debug("migrate_v2: could not remove old queen dir", exc_info=True)
+14 -7
View File
@@ -751,8 +751,11 @@ def _update_meta_json(session_manager, manager_session_id, updates: dict) -> Non
srv_session = session_manager.get_session(manager_session_id)
if not srv_session:
return
from framework.config import QUEENS_DIR
storage_sid = getattr(srv_session, "queen_resume_from", None) or srv_session.id
meta_path = Path.home() / ".hive" / "queen" / "session" / storage_sid / "meta.json"
queen_name = getattr(srv_session, "queen_name", "default")
meta_path = QUEENS_DIR / queen_name / "sessions" / storage_sid / "meta.json"
try:
existing = {}
if meta_path.exists():
@@ -1587,7 +1590,9 @@ def register_queen_lifecycle_tools(
# Worker not loaded yet — resolve from draft name
draft_name = draft.get("agent_name", "")
if draft_name:
candidate = Path("exports") / draft_name
from framework.config import COLONIES_DIR
candidate = COLONIES_DIR / draft_name
if candidate.is_dir():
save_path = candidate
_save_flowchart_file(
@@ -1902,7 +1907,9 @@ def register_queen_lifecycle_tools(
or phase_state.draft_graph.get("agent_name", "").strip()
)
if _agent_name:
_agent_folder = Path("exports") / _agent_name
from framework.config import COLONIES_DIR
_agent_folder = COLONIES_DIR / _agent_name
_agent_folder.mkdir(parents=True, exist_ok=True)
_save_flowchart_file(_agent_folder, original_copy, fmap)
phase_state.agent_path = str(_agent_folder)
@@ -1949,13 +1956,13 @@ def register_queen_lifecycle_tools(
"status": "confirmed",
"phase": "building",
"agent_name": _agent_name,
"agent_path": f"exports/{_agent_name}",
"agent_path": str(_agent_folder),
"planning_nodes_dissolved": dissolved_count,
"flowchart_map": fmap,
"message": (
"Design confirmed and directory created. "
+ ("; ".join(dissolution_parts) + ". " if dissolution_parts else "")
+ f"Now write the complete agent config to exports/{_agent_name}/agent.json "
+ f"Now write the complete agent config to {_agent_folder}/agent.json "
"using write_file(). Include all system prompts, tools, edges, and goal."
),
}
@@ -3166,7 +3173,7 @@ def register_queen_lifecycle_tools(
description=(
"Load a newly built agent as the worker in this session. "
"After building and validating an agent, call this with the agent's "
"path (e.g. 'exports/my_agent') to make it available immediately. "
"path (e.g. '~/.hive/colonies/my_agent') to make it available immediately. "
"The user will see the agent's graph and can interact with it."
),
parameters={
@@ -3174,7 +3181,7 @@ def register_queen_lifecycle_tools(
"properties": {
"agent_path": {
"type": "string",
"description": ("Path to the agent directory (e.g. 'exports/my_agent')"),
"description": ("Path to the agent directory (e.g. '~/.hive/colonies/my_agent')"),
},
},
"required": ["agent_path"],