From 579f1d75127c5e4c01949b34407a7d851c97d6fb Mon Sep 17 00:00:00 2001 From: Richard Tang Date: Wed, 29 Apr 2026 17:33:34 -0700 Subject: [PATCH] feat(tasks): refactor task folder --- core/framework/agents/discovery.py | 2 +- core/framework/tasks/models.py | 14 +- core/framework/tasks/store.py | 763 +++++++++++++++-------- core/framework/tasks/tests/test_store.py | 113 +++- 4 files changed, 615 insertions(+), 277 deletions(-) diff --git a/core/framework/agents/discovery.py b/core/framework/agents/discovery.py index 806ba455..b73994fa 100644 --- a/core/framework/agents/discovery.py +++ b/core/framework/agents/discovery.py @@ -152,7 +152,7 @@ def _count_runs(agent_name: str) -> int: return len(run_ids) -_EXCLUDED_JSON_STEMS = {"agent", "flowchart", "triggers", "configuration", "metadata"} +_EXCLUDED_JSON_STEMS = {"agent", "flowchart", "triggers", "configuration", "metadata", "tasks"} def _is_colony_dir(path: Path) -> bool: diff --git a/core/framework/tasks/models.py b/core/framework/tasks/models.py index c8756b79..ec3fb594 100644 --- a/core/framework/tasks/models.py +++ b/core/framework/tasks/models.py @@ -49,7 +49,7 @@ class TaskRecord(BaseModel): class TaskListMeta(BaseModel): - """Per-list metadata stored in ``meta.json`` next to the task files.""" + """Per-list metadata. Embedded in ``TaskListDocument``.""" task_list_id: str role: TaskListRole @@ -59,6 +59,18 @@ class TaskListMeta(BaseModel): schema_version: int = 1 +class TaskListDocument(BaseModel): + """Whole task list as a single JSON document on disk. + + Lives at ``{task_list_path(list_id)}/tasks.json``; the list-lock + sentinel is its sibling ``tasks.json.lock``. + """ + + meta: TaskListMeta + highwatermark: int = 0 + tasks: list[TaskRecord] = Field(default_factory=list) + + # Tagged union for claim_task_with_busy_check. Used by run_parallel_workers # when stamping ``assigned_session`` on a colony template entry — the only # place a "claim" actually happens under the hive model. diff --git a/core/framework/tasks/store.py b/core/framework/tasks/store.py index 3ad745b5..31b0d3bb 100644 --- a/core/framework/tasks/store.py +++ b/core/framework/tasks/store.py @@ -2,29 +2,26 @@ Layout per list:: - {root}/{task_list_id}/ - meta.json -- TaskListMeta - tasks/ - 0001.json -- TaskRecord (zero-padded for ls-sort) - 0002.json - ... - .lock -- list-level lock - .highwatermark -- ID floor (deleted ids never reused) + {task_list_path}/tasks.json -- TaskListDocument (meta + hwm + tasks) + {task_list_path}/tasks.json.lock -- list-level lock sentinel -Two list-roots: +Where ``task_list_path`` is: - colony:{colony_id} -> ~/.hive/colonies/{colony_id}/tasks/ - session:{a}:{s} -> ~/.hive/agents/{a}/sessions/{s}/tasks/ + colony:{c} -> ~/.hive/colonies/{c}/ + session:{a}:{s} -> ~/.hive/agents/{a}/sessions/{s}/ + unscoped:{a} -> ~/.hive/unscoped/{a}/ + {malformed} -> ~/.hive/_misc/{slug}/ + +An older layout used the same root + a nested ``tasks/`` subdir holding +``meta.json``, ``.highwatermark``, ``.lock``, and ``NNNN.json`` per task. +That produced the ugly ``…/tasks/tasks/0001.json`` path. Migration is +lazy — the first lock-protected access on such a list folds the legacy +artifacts into ``tasks.json`` and unlinks them. All filesystem I/O is wrapped in ``asyncio.to_thread`` so the event loop -never blocks. Locks use a 30-retry / ~2.6s budget — comfortable headroom -for the only realistic write contender (colony template under concurrent +never blocks. Locks use a ~3s budget — comfortable headroom for the only +realistic write contender (colony template under concurrent ``colony_template_*`` and ``run_parallel_workers`` stamps). - -The "_unsafe" variants exist because filelock is **not re-entrant**: a -caller already holding a lock must NOT re-acquire it (would deadlock). -The unsafe path skips acquisition and is callable only from inside another -locked function. See ``claim_task_with_busy_check`` and ``delete_task``. """ from __future__ import annotations @@ -32,6 +29,8 @@ from __future__ import annotations import asyncio import logging import os +import shutil +import threading import time from collections.abc import Iterable from pathlib import Path @@ -46,6 +45,7 @@ from framework.tasks.models import ( ClaimNotFound, ClaimOk, ClaimResult, + TaskListDocument, TaskListMeta, TaskListRole, TaskRecord, @@ -57,6 +57,24 @@ logger = logging.getLogger(__name__) LOCK_TIMEOUT_SECONDS = 3.0 # ~30 retries × ~100ms +DOC_FILENAME = "tasks.json" +LOCK_FILENAME = "tasks.json.lock" # only colony lists (cross-process writers) + +# Per-list in-memory locks for single-process scopes (session/unscoped/_misc). +# Sessions have one owning agent, so only same-process concurrency matters +# (e.g. parallel tool use within a single turn) — no on-disk lock needed. +_INPROC_LOCKS: dict[str, threading.Lock] = {} +_INPROC_LOCKS_GUARD = threading.Lock() + + +def _get_inproc_lock(task_list_id: str) -> threading.Lock: + with _INPROC_LOCKS_GUARD: + lock = _INPROC_LOCKS.get(task_list_id) + if lock is None: + lock = threading.Lock() + _INPROC_LOCKS[task_list_id] = lock + return lock + class _Unset: """Sentinel for "owner argument not provided" — distinct from owner=None.""" @@ -72,27 +90,87 @@ def _hive_root() -> Path: return Path(os.environ.get("HIVE_HOME", str(Path.home() / ".hive"))) +def _find_queen_session_dir(session_id: str, *, hive_root: Path) -> Path | None: + """Return ``agents/queens/{queen}/sessions/{session_id}`` if one exists. + + Queens live under ``QUEENS_DIR = hive_root / "agents" / "queens"`` (see + ``framework.config``). The task system gets a generic ``agent_id == + "queen"`` in its ``task_list_id``, which would otherwise dead-end at + ``agents/queen/...``, decoupled from the real session folder. By + probing the canonical layout we keep the task doc beside conversations, + events, summary, and meta for the same session. + """ + queens_dir = hive_root / "agents" / "queens" + if not queens_dir.exists(): + return None + try: + candidates = [d for d in queens_dir.iterdir() if d.is_dir()] + except OSError: + return None + for queen_dir in candidates: + candidate = queen_dir / "sessions" / session_id + if candidate.is_dir(): + return candidate + return None + + def task_list_path(task_list_id: str, *, hive_root: Path | None = None) -> Path: - """Resolve task_list_id -> on-disk root.""" + """Resolve task_list_id -> directory containing ``tasks.json``. + + Note: this returns the *parent* of the doc file, not the file itself. + For session/colony/unscoped lists, this is the agent or colony's home + dir; the task doc is one filename inside it. (The older layout had an + extra ``tasks/`` subdir under this path — see ``_legacy_root``.) + + For ``session:`` lists, the canonical queen session folder is preferred + when it exists on disk: the task doc lives next to the rest of that + session's data (conversations, events, summary). + """ root = hive_root or _hive_root() if task_list_id.startswith("colony:"): colony_id = task_list_id[len("colony:") :] - return root / "colonies" / colony_id / "tasks" + return root / "colonies" / colony_id if task_list_id.startswith("session:"): rest = task_list_id[len("session:") :] agent_id, _, session_id = rest.partition(":") if not session_id: raise ValueError(f"Malformed session task_list_id: {task_list_id!r}") - return root / "agents" / agent_id / "sessions" / session_id / "tasks" + canonical = _find_queen_session_dir(session_id, hive_root=root) + if canonical is not None: + return canonical + return root / "agents" / agent_id / "sessions" / session_id if task_list_id.startswith("unscoped:"): agent_id = task_list_id[len("unscoped:") :] - return root / "unscoped" / agent_id / "tasks" + return root / "unscoped" / agent_id # Last-ditch sanitization for HIVE_TASK_LIST_ID overrides — slugify the # whole thing so the test/dev path can't escape the hive root. safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in task_list_id) return root / "_misc" / safe +def _legacy_root(task_list_id: str, *, hive_root: Path | None = None) -> Path: + """Where the older artifacts (meta.json, .highwatermark, tasks/NNNN.json) lived. + + Pinned to the *pre-canonical* layout — for queen session lists this is + ``agents/{agent_id}/sessions/{session_id}/tasks`` (i.e. the literal + ``agent_id`` folder, not the canonical ``agents/queens/{queen}/...`` + path). The lazy migration reads from here and writes the new doc to + wherever ``task_list_path`` resolves now. + """ + root = hive_root or _hive_root() + if task_list_id.startswith("colony:"): + return root / "colonies" / task_list_id[len("colony:") :] / "tasks" + if task_list_id.startswith("session:"): + rest = task_list_id[len("session:") :] + agent_id, _, session_id = rest.partition(":") + return root / "agents" / agent_id / "sessions" / session_id / "tasks" + if task_list_id.startswith("unscoped:"): + return root / "unscoped" / task_list_id[len("unscoped:") :] / "tasks" + # _misc fallback: legacy lived directly in the slug dir, same as the new parent. + safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in task_list_id) + return root / "_misc" / safe + + # --------------------------------------------------------------------------- # TaskStore — public façade # --------------------------------------------------------------------------- @@ -132,23 +210,16 @@ class TaskStore: ) async def list_exists(self, task_list_id: str) -> bool: - """A list exists if its meta.json OR any task file is on disk. + """A list exists if its doc is on disk OR a legacy artifact is. - meta.json is normally written by ``ensure_task_list``, but session - lists may be created lazily via the first ``task_create`` (see - ``_create_task_sync``) — in that case meta.json is backfilled the - first time the list is read. Until then, we still want to expose - the list's tasks via REST. + The legacy fallback exists so that lists created under the older + layout and not yet migrated still surface to the REST layer. """ def _check() -> bool: - root = self._list_root(task_list_id) - if (root / "meta.json").exists(): + if self._doc_path(task_list_id).exists(): return True - tasks_dir = root / "tasks" - if tasks_dir.exists() and any(p.suffix == ".json" for p in tasks_dir.iterdir()): - return True - return False + return self._has_legacy_artifacts(task_list_id) return await asyncio.to_thread(_check) @@ -156,7 +227,7 @@ class TaskStore: return await asyncio.to_thread(self._read_meta_sync, task_list_id) async def reset_task_list(self, task_list_id: str) -> None: - """Delete all task files but preserve the high-water-mark. + """Delete all tasks but preserve the high-water-mark. Test helper. Never wired to runtime lifecycle. """ @@ -173,13 +244,9 @@ class TaskStore: Each spec is a dict with keys: subject (required), description, active_form, owner, metadata. Ids are assigned sequentially and - contiguously — if any task fails to write, an exception is raised - and the whole batch is rolled back (file unlinked, high-water-mark - kept at the prior value). - - Atomic-or-none semantics matter for the tool surface: a failed - partial batch would leave the LLM reasoning about cleanup, which - defeats the point of batching as a single decision. + contiguously; if any spec is malformed the whole batch is + rejected before any write. The doc model makes "atomic-or-none" + free — we mutate one in-memory document and write it once. """ return await asyncio.to_thread( self._create_tasks_batch_sync, task_list_id, specs @@ -274,30 +341,222 @@ class TaskStore: # Sync internals — all called via asyncio.to_thread # ===================================================================== - def _list_root(self, task_list_id: str) -> Path: + def _list_dir(self, task_list_id: str) -> Path: return task_list_path(task_list_id, hive_root=self._hive_root) - def _tasks_dir(self, task_list_id: str) -> Path: - return self._list_root(task_list_id) / "tasks" + def _doc_path(self, task_list_id: str) -> Path: + return self._list_dir(task_list_id) / DOC_FILENAME - def _list_lock(self, task_list_id: str) -> FileLock: - # FileLock targets a sentinel file; it tolerates the file being absent - # by creating it on first acquire. We use the .lock filename so it's - # visible alongside the other list files. - root = self._list_root(task_list_id) - root.mkdir(parents=True, exist_ok=True) - return FileLock(str(root / ".lock"), timeout=LOCK_TIMEOUT_SECONDS) + def _list_lock(self, task_list_id: str): + """Return a context manager that serialises writes to this list. - def _highwatermark_path(self, task_list_id: str) -> Path: - return self._list_root(task_list_id) / ".highwatermark" + Colony template lists need a cross-process ``FileLock`` because + ``run_parallel_workers`` spawns worker subprocesses that stamp + completion back onto the template. Session/unscoped/_misc lists + have a single owning agent — only same-process concurrency + matters (e.g. parallel tool use within one turn), so an + in-memory ``threading.Lock`` is enough and avoids the visible + ``tasks.json.lock`` sentinel beside session folders. + """ + d = self._list_dir(task_list_id) + d.mkdir(parents=True, exist_ok=True) + if task_list_id.startswith("colony:"): + return FileLock(str(d / LOCK_FILENAME), timeout=LOCK_TIMEOUT_SECONDS) + return _get_inproc_lock(task_list_id) - def _meta_path(self, task_list_id: str) -> Path: - return self._list_root(task_list_id) / "meta.json" + def _legacy_dir(self, task_list_id: str) -> Path: + return _legacy_root(task_list_id, hive_root=self._hive_root) - def _task_path(self, task_list_id: str, task_id: int) -> Path: - return self._tasks_dir(task_list_id) / f"{task_id:04d}.json" + def _legacy_meta_path(self, task_list_id: str) -> Path: + return self._legacy_dir(task_list_id) / "meta.json" - # ----- meta --------------------------------------------------------- + def _legacy_hwm_path(self, task_list_id: str) -> Path: + return self._legacy_dir(task_list_id) / ".highwatermark" + + def _legacy_lock_path(self, task_list_id: str) -> Path: + return self._legacy_dir(task_list_id) / ".lock" + + def _legacy_tasks_dir(self, task_list_id: str) -> Path: + return self._legacy_dir(task_list_id) / "tasks" + + def _has_legacy_artifacts(self, task_list_id: str) -> bool: + if self._legacy_meta_path(task_list_id).exists(): + return True + td = self._legacy_tasks_dir(task_list_id) + if td.exists(): + try: + return any(p.suffix == ".json" for p in td.iterdir()) + except OSError: + return False + return False + + # ----- doc IO ------------------------------------------------------- + + def _read_doc_sync(self, task_list_id: str) -> TaskListDocument | None: + """Lock-free read for already-migrated lists; falls back to a + lock-protected migration if only legacy artifacts exist. + + Returns None if the list doesn't exist on disk in either form. + """ + doc_path = self._doc_path(task_list_id) + if doc_path.exists(): + try: + return TaskListDocument.model_validate_json( + doc_path.read_text(encoding="utf-8") + ) + except Exception: + logger.warning("Corrupt tasks.json at %s", doc_path, exc_info=True) + # Fall through — legacy fallback may rescue us. + + if self._has_legacy_artifacts(task_list_id): + with self._list_lock(task_list_id): + # Re-check under lock: a parallel writer may have just + # finished migrating, in which case we read the new doc. + if doc_path.exists(): + try: + return TaskListDocument.model_validate_json( + doc_path.read_text(encoding="utf-8") + ) + except Exception: + logger.warning( + "Corrupt tasks.json at %s (post-lock)", + doc_path, + exc_info=True, + ) + doc = self._migrate_legacy_unsafe(task_list_id) + if doc is not None: + self._write_doc_unsafe(task_list_id, doc) + self._cleanup_legacy_unsafe(task_list_id) + return doc + return None + + def _read_doc_unsafe(self, task_list_id: str) -> TaskListDocument | None: + """Same as ``_read_doc_sync`` but assumes the list-lock is already + held — used by methods that are already inside ``with self._list_lock``. + Migration happens in-place without re-entering the lock. + """ + doc_path = self._doc_path(task_list_id) + if doc_path.exists(): + try: + return TaskListDocument.model_validate_json( + doc_path.read_text(encoding="utf-8") + ) + except Exception: + logger.warning("Corrupt tasks.json at %s", doc_path, exc_info=True) + if self._has_legacy_artifacts(task_list_id): + doc = self._migrate_legacy_unsafe(task_list_id) + if doc is not None: + self._write_doc_unsafe(task_list_id, doc) + self._cleanup_legacy_unsafe(task_list_id) + return doc + return None + + def _write_doc_unsafe(self, task_list_id: str, doc: TaskListDocument) -> None: + """Atomically rewrite the doc. Caller MUST hold the list-lock.""" + path = self._doc_path(task_list_id) + path.parent.mkdir(parents=True, exist_ok=True) + with atomic_write(path) as f: + f.write(doc.model_dump_json(indent=2)) + + # ----- migration ---------------------------------------------------- + + def _migrate_legacy_unsafe(self, task_list_id: str) -> TaskListDocument | None: + """Fold legacy artifacts into a TaskListDocument. Caller MUST hold lock.""" + meta = self._read_legacy_meta(task_list_id) + if meta is None: + inferred_role = ( + TaskListRole.TEMPLATE + if task_list_id.startswith("colony:") + else TaskListRole.SESSION + ) + meta = TaskListMeta(task_list_id=task_list_id, role=inferred_role) + + tasks: list[TaskRecord] = [] + td = self._legacy_tasks_dir(task_list_id) + if td.exists(): + for p in sorted(td.iterdir()): + if p.suffix != ".json": + continue + try: + tasks.append(TaskRecord.model_validate_json(p.read_text(encoding="utf-8"))) + except Exception: + logger.warning( + "Skipping corrupt legacy task file %s during migration", + p, + exc_info=True, + ) + tasks.sort(key=lambda r: r.id) + + hwm = self._read_legacy_hwm(task_list_id) + max_id = max((r.id for r in tasks), default=0) + hwm = max(hwm, max_id) + + if not tasks and hwm == 0 and not self._legacy_meta_path(task_list_id).exists(): + return None + + return TaskListDocument( + meta=meta, + highwatermark=hwm, + tasks=tasks, + ) + + def _read_legacy_meta(self, task_list_id: str) -> TaskListMeta | None: + path = self._legacy_meta_path(task_list_id) + if not path.exists(): + return None + try: + return TaskListMeta.model_validate_json(path.read_text(encoding="utf-8")) + except Exception: + logger.warning("Corrupt legacy meta.json at %s", path, exc_info=True) + return None + + def _read_legacy_hwm(self, task_list_id: str) -> int: + path = self._legacy_hwm_path(task_list_id) + if not path.exists(): + return 0 + try: + return int(path.read_text(encoding="utf-8").strip() or "0") + except (ValueError, OSError): + return 0 + + def _cleanup_legacy_unsafe(self, task_list_id: str) -> None: + """Remove the older layout's files. Caller MUST hold the list-lock. + + For session/colony/unscoped lists the legacy_dir is a dedicated + ``tasks/`` subdir, so we remove the whole tree. For the ``_misc`` + fallback the legacy_dir is the same as the new parent dir — we + delete only the specific legacy filenames so we don't clobber + the new ``tasks.json``. + """ + legacy = self._legacy_dir(task_list_id) + if not legacy.exists(): + return + + if legacy != self._list_dir(task_list_id): + try: + shutil.rmtree(legacy) + except OSError: + logger.warning("Failed to remove legacy task dir %s", legacy, exc_info=True) + return + + # _misc case: shared parent dir — surgical delete only. + for p in ( + self._legacy_meta_path(task_list_id), + self._legacy_hwm_path(task_list_id), + self._legacy_lock_path(task_list_id), + ): + try: + p.unlink(missing_ok=True) + except OSError: + logger.warning("Failed to remove %s", p, exc_info=True) + td = self._legacy_tasks_dir(task_list_id) + if td.exists(): + try: + shutil.rmtree(td) + except OSError: + logger.warning("Failed to remove legacy tasks subdir %s", td, exc_info=True) + + # ----- meta accessors over the doc ---------------------------------- def _ensure_task_list_sync( self, @@ -306,107 +565,47 @@ class TaskStore: creator_agent_id: str | None, session_id: str | None, ) -> TaskListMeta: - root = self._list_root(task_list_id) - root.mkdir(parents=True, exist_ok=True) - (root / "tasks").mkdir(exist_ok=True) - meta_path = self._meta_path(task_list_id) with self._list_lock(task_list_id): - if meta_path.exists(): - meta = self._read_meta_sync(task_list_id) - if meta is None: - # File existed but failed to parse — rewrite fresh. - meta = TaskListMeta( - task_list_id=task_list_id, - role=role, - creator_agent_id=creator_agent_id, - ) - if session_id and session_id not in meta.last_seen_session_ids: - meta.last_seen_session_ids.append(session_id) - # Cap at 10 to keep the audit trail bounded. - meta.last_seen_session_ids = meta.last_seen_session_ids[-10:] - self._write_meta_sync(task_list_id, meta) + doc = self._read_doc_unsafe(task_list_id) + if doc is None: + meta = TaskListMeta( + task_list_id=task_list_id, + role=role, + creator_agent_id=creator_agent_id, + last_seen_session_ids=[session_id] if session_id else [], + ) + doc = TaskListDocument(meta=meta) + self._write_doc_unsafe(task_list_id, doc) return meta - meta = TaskListMeta( - task_list_id=task_list_id, - role=role, - creator_agent_id=creator_agent_id, - last_seen_session_ids=[session_id] if session_id else [], - ) - self._write_meta_sync(task_list_id, meta) + + meta = doc.meta + if session_id and session_id not in meta.last_seen_session_ids: + meta.last_seen_session_ids.append(session_id) + # Cap at 10 to keep the audit trail bounded. + meta.last_seen_session_ids = meta.last_seen_session_ids[-10:] + self._write_doc_unsafe(task_list_id, doc) return meta def _read_meta_sync(self, task_list_id: str) -> TaskListMeta | None: - path = self._meta_path(task_list_id) - if not path.exists(): - return None - try: - return TaskListMeta.model_validate_json(path.read_text(encoding="utf-8")) - except Exception: - logger.warning("Corrupt meta.json at %s", path, exc_info=True) - return None - - def _write_meta_sync(self, task_list_id: str, meta: TaskListMeta) -> None: - path = self._meta_path(task_list_id) - path.parent.mkdir(parents=True, exist_ok=True) - with atomic_write(path) as f: - f.write(meta.model_dump_json(indent=2)) + doc = self._read_doc_sync(task_list_id) + return doc.meta if doc is not None else None # ----- task IO ------------------------------------------------------ def _read_task_sync(self, task_list_id: str, task_id: int) -> TaskRecord | None: - path = self._task_path(task_list_id, task_id) - if not path.exists(): + doc = self._read_doc_sync(task_list_id) + if doc is None: return None - try: - return TaskRecord.model_validate_json(path.read_text(encoding="utf-8")) - except Exception: - logger.warning("Corrupt task file at %s", path, exc_info=True) - return None - - def _write_task_sync(self, task_list_id: str, record: TaskRecord) -> None: - path = self._task_path(task_list_id, record.id) - path.parent.mkdir(parents=True, exist_ok=True) - with atomic_write(path) as f: - f.write(record.model_dump_json(indent=2)) + for r in doc.tasks: + if r.id == task_id: + return r + return None def _list_tasks_sync(self, task_list_id: str) -> list[TaskRecord]: - d = self._tasks_dir(task_list_id) - if not d.exists(): + doc = self._read_doc_sync(task_list_id) + if doc is None: return [] - records: list[TaskRecord] = [] - for path in sorted(d.iterdir()): - if path.suffix != ".json": - continue - try: - records.append(TaskRecord.model_validate_json(path.read_text(encoding="utf-8"))) - except Exception: - logger.warning("Skipping corrupt task file %s", path, exc_info=True) - records.sort(key=lambda r: r.id) - return records - - # ----- highwatermark / id assignment -------------------------------- - - def _read_highwatermark_sync(self, task_list_id: str) -> int: - path = self._highwatermark_path(task_list_id) - if not path.exists(): - return 0 - try: - return int(path.read_text(encoding="utf-8").strip() or "0") - except (ValueError, OSError): - return 0 - - def _write_highwatermark_sync(self, task_list_id: str, value: int) -> None: - path = self._highwatermark_path(task_list_id) - path.parent.mkdir(parents=True, exist_ok=True) - with atomic_write(path) as f: - f.write(str(value)) - - def _next_id_sync(self, task_list_id: str) -> int: - """Compute next id under the assumption the list-lock is held.""" - existing = self._list_tasks_sync(task_list_id) - max_existing = max((r.id for r in existing), default=0) - floor = self._read_highwatermark_sync(task_list_id) - return max(max_existing, floor) + 1 + return sorted(doc.tasks, key=lambda r: r.id) # ----- create ------------------------------------------------------- @@ -420,20 +619,17 @@ class TaskStore: metadata: dict[str, Any], ) -> TaskRecord: with self._list_lock(task_list_id): - # Lazy-create meta.json on first task. Session lists are - # frequently created via the first task_create (no explicit - # ensure_task_list call); without this backfill the REST - # endpoint can't discover them. Role is inferred from prefix. - if not self._meta_path(task_list_id).exists(): - inferred_role = TaskListRole.TEMPLATE if task_list_id.startswith("colony:") else TaskListRole.SESSION - self._write_meta_sync( - task_list_id, - TaskListMeta( - task_list_id=task_list_id, - role=inferred_role, - ), + doc = self._read_doc_unsafe(task_list_id) + if doc is None: + inferred_role = ( + TaskListRole.TEMPLATE + if task_list_id.startswith("colony:") + else TaskListRole.SESSION ) - new_id = self._next_id_sync(task_list_id) + doc = TaskListDocument( + meta=TaskListMeta(task_list_id=task_list_id, role=inferred_role) + ) + new_id = self._next_id_for_doc(doc) now = time.time() record = TaskRecord( id=new_id, @@ -446,11 +642,10 @@ class TaskStore: created_at=now, updated_at=now, ) - self._write_task_sync(task_list_id, record) - # Bump high-water-mark eagerly so even a concurrent racer that - # somehow missed the listing snapshot can't pick the same id. - if new_id > self._read_highwatermark_sync(task_list_id): - self._write_highwatermark_sync(task_list_id, new_id) + doc.tasks.append(record) + if new_id > doc.highwatermark: + doc.highwatermark = new_id + self._write_doc_unsafe(task_list_id, doc) return record def _create_tasks_batch_sync( @@ -467,19 +662,18 @@ class TaskStore: raise ValueError(f"specs[{i}].subject must be a non-empty string") with self._list_lock(task_list_id): - # Same lazy meta backfill as _create_task_sync. - if not self._meta_path(task_list_id).exists(): + doc = self._read_doc_unsafe(task_list_id) + if doc is None: inferred_role = ( TaskListRole.TEMPLATE if task_list_id.startswith("colony:") else TaskListRole.SESSION ) - self._write_meta_sync( - task_list_id, - TaskListMeta(task_list_id=task_list_id, role=inferred_role), + doc = TaskListDocument( + meta=TaskListMeta(task_list_id=task_list_id, role=inferred_role) ) - base_id = self._next_id_sync(task_list_id) + base_id = self._next_id_for_doc(doc) now = time.time() records: list[TaskRecord] = [] for offset, spec in enumerate(specs): @@ -496,27 +690,20 @@ class TaskStore: ) records.append(rec) - # Write all task files; on any failure, unlink everything we - # wrote so far and re-raise. High-water-mark is bumped only - # after a successful full-batch write. - written: list[Path] = [] - try: - for rec in records: - self._write_task_sync(task_list_id, rec) - written.append(self._task_path(task_list_id, rec.id)) - except Exception: - for path in written: - try: - path.unlink(missing_ok=True) - except OSError: - logger.warning("Failed to roll back batch task at %s", path, exc_info=True) - raise - + doc.tasks.extend(records) highest = records[-1].id - if highest > self._read_highwatermark_sync(task_list_id): - self._write_highwatermark_sync(task_list_id, highest) + if highest > doc.highwatermark: + doc.highwatermark = highest + # Single write — atomic batch is free with the doc model. + self._write_doc_unsafe(task_list_id, doc) return records + # ----- id assignment ------------------------------------------------ + + def _next_id_for_doc(self, doc: TaskListDocument) -> int: + max_existing = max((r.id for r in doc.tasks), default=0) + return max(max_existing, doc.highwatermark) + 1 + # ----- update ------------------------------------------------------- def _update_task_sync( @@ -533,25 +720,27 @@ class TaskStore: metadata_patch: dict[str, Any] | None, ) -> tuple[TaskRecord | None, list[str]]: with self._list_lock(task_list_id): - current = self._read_task_sync(task_list_id, task_id) - if current is None: + doc = self._read_doc_unsafe(task_list_id) + if doc is None: return None, [] - return self._update_task_unsafe( - task_list_id, - current, - subject=subject, - description=description, - active_form=active_form, - owner=owner, - status=status, - add_blocks=add_blocks, - add_blocked_by=add_blocked_by, - metadata_patch=metadata_patch, - ) + target = next((r for r in doc.tasks if r.id == task_id), None) + if target is None: + return None, [] + new, changed = self._update_task_in_doc(doc, target, subject=subject, + description=description, + active_form=active_form, + owner=owner, + status=status, + add_blocks=add_blocks, + add_blocked_by=add_blocked_by, + metadata_patch=metadata_patch) + if changed: + self._write_doc_unsafe(task_list_id, doc) + return new, changed - def _update_task_unsafe( + def _update_task_in_doc( self, - task_list_id: str, + doc: TaskListDocument, current: TaskRecord, *, subject: str | None = None, @@ -563,84 +752,89 @@ class TaskStore: add_blocked_by: list[int] | None = None, metadata_patch: dict[str, Any] | None = None, ) -> tuple[TaskRecord, list[str]]: - """Update without acquiring the list-lock. Caller MUST hold it.""" + """Mutate ``current`` in place inside ``doc`` and return (record, changed). + Bidirectional blocks/blocked_by also mutate the targets in ``doc``. + """ changed: list[str] = [] - new = current.model_copy(deep=True) - if subject is not None and subject != new.subject: - new.subject = subject + if subject is not None and subject != current.subject: + current.subject = subject changed.append("subject") - if description is not None and description != new.description: - new.description = description + if description is not None and description != current.description: + current.description = description changed.append("description") - if active_form is not None and active_form != new.active_form: - new.active_form = active_form + if active_form is not None and active_form != current.active_form: + current.active_form = active_form changed.append("active_form") - if not isinstance(owner, _Unset) and owner != new.owner: - new.owner = owner + if not isinstance(owner, _Unset) and owner != current.owner: + current.owner = owner changed.append("owner") - if status is not None and status != new.status: - new.status = status + if status is not None and status != current.status: + current.status = status changed.append("status") + if add_blocks: for b in add_blocks: - if b not in new.blocks and b != new.id: - new.blocks.append(b) - if "blocks" not in changed: - changed.append("blocks") - # Maintain the bidirectional invariant by stamping - # blocked_by on the target as well. - target = self._read_task_sync(task_list_id, b) - if target and new.id not in target.blocked_by: - target.blocked_by.append(new.id) - target.updated_at = time.time() - self._write_task_sync(task_list_id, target) + if b in current.blocks or b == current.id: + continue + current.blocks.append(b) + if "blocks" not in changed: + changed.append("blocks") + target = next((r for r in doc.tasks if r.id == b), None) + if target is not None and current.id not in target.blocked_by: + target.blocked_by.append(current.id) + target.updated_at = time.time() + if add_blocked_by: for b in add_blocked_by: - if b not in new.blocked_by and b != new.id: - new.blocked_by.append(b) - if "blocked_by" not in changed: - changed.append("blocked_by") - target = self._read_task_sync(task_list_id, b) - if target and new.id not in target.blocks: - target.blocks.append(new.id) - target.updated_at = time.time() - self._write_task_sync(task_list_id, target) + if b in current.blocked_by or b == current.id: + continue + current.blocked_by.append(b) + if "blocked_by" not in changed: + changed.append("blocked_by") + target = next((r for r in doc.tasks if r.id == b), None) + if target is not None and current.id not in target.blocks: + target.blocks.append(current.id) + target.updated_at = time.time() + if metadata_patch is not None: - md = dict(new.metadata) + md = dict(current.metadata) for k, v in metadata_patch.items(): if v is None: md.pop(k, None) else: md[k] = v - if md != new.metadata: - new.metadata = md + if md != current.metadata: + current.metadata = md changed.append("metadata") if not changed: - return new, [] + return current, [] - new.updated_at = time.time() - self._write_task_sync(task_list_id, new) - return new, changed + current.updated_at = time.time() + return current, changed # ----- delete ------------------------------------------------------- def _delete_task_sync(self, task_list_id: str, task_id: int) -> tuple[bool, list[int]]: with self._list_lock(task_list_id): - path = self._task_path(task_list_id, task_id) - if not path.exists(): + doc = self._read_doc_unsafe(task_list_id) + if doc is None: return False, [] - # 1. Bump high-water-mark BEFORE unlinking so a crash mid-delete - # can't accidentally re-allocate the id. - current_floor = self._read_highwatermark_sync(task_list_id) - if task_id > current_floor: - self._write_highwatermark_sync(task_list_id, task_id) - # 2. Unlink the task itself. - path.unlink() + idx = next((i for i, r in enumerate(doc.tasks) if r.id == task_id), None) + if idx is None: + return False, [] + # 1. Bump high-water-mark BEFORE removing so a crash mid-write + # can't cause id reuse on the next create. (atomic_write + # guarantees we either commit the whole new state or none.) + if task_id > doc.highwatermark: + doc.highwatermark = task_id + # 2. Remove the task itself. + doc.tasks.pop(idx) # 3. Cascade: strip references from all other tasks. cascaded: list[int] = [] - for other in self._list_tasks_sync(task_list_id): + now = time.time() + for other in doc.tasks: touched = False if task_id in other.blocks: other.blocks = [b for b in other.blocks if b != task_id] @@ -649,31 +843,31 @@ class TaskStore: other.blocked_by = [b for b in other.blocked_by if b != task_id] touched = True if touched: - other.updated_at = time.time() - self._write_task_sync(task_list_id, other) + other.updated_at = now cascaded.append(other.id) + self._write_doc_unsafe(task_list_id, doc) return True, cascaded # ----- reset -------------------------------------------------------- def _reset_sync(self, task_list_id: str) -> None: with self._list_lock(task_list_id): - tasks = self._list_tasks_sync(task_list_id) - max_id = max((r.id for r in tasks), default=0) - floor = self._read_highwatermark_sync(task_list_id) - new_floor = max(max_id, floor) - self._write_highwatermark_sync(task_list_id, new_floor) - d = self._tasks_dir(task_list_id) - if d.exists(): - for p in d.iterdir(): - if p.suffix == ".json": - p.unlink() + doc = self._read_doc_unsafe(task_list_id) + if doc is None: + return + max_id = max((r.id for r in doc.tasks), default=0) + doc.highwatermark = max(doc.highwatermark, max_id) + doc.tasks = [] + self._write_doc_unsafe(task_list_id, doc) # ----- claim -------------------------------------------------------- def _claim_sync(self, task_list_id: str, task_id: int, claimant: str) -> ClaimResult: with self._list_lock(task_list_id): - current = self._read_task_sync(task_list_id, task_id) + doc = self._read_doc_unsafe(task_list_id) + if doc is None: + return ClaimNotFound(kind="not_found") + current = next((r for r in doc.tasks if r.id == task_id), None) if current is None: return ClaimNotFound(kind="not_found") if current.status == TaskStatus.COMPLETED: @@ -682,12 +876,13 @@ class TaskStore: return ClaimAlreadyOwned(kind="already_owned", by=current.owner) unresolved_blockers: list[int] = [] for b in current.blocked_by: - blocker = self._read_task_sync(task_list_id, b) + blocker = next((r for r in doc.tasks if r.id == b), None) if blocker is not None and blocker.status != TaskStatus.COMPLETED: unresolved_blockers.append(b) if unresolved_blockers: return ClaimBlocked(kind="blocked", by=unresolved_blockers) - new, _ = self._update_task_unsafe(task_list_id, current, owner=claimant) + new, _ = self._update_task_in_doc(doc, current, owner=claimant) + self._write_doc_unsafe(task_list_id, doc) return ClaimOk(kind="ok", record=new) @@ -713,10 +908,32 @@ def get_task_store() -> TaskStore: # Convenience for tests / utilities. def fingerprint_for_test(task_list_id: str, hive_root: Path) -> Iterable[Path]: - """Yield every file under a list root — used by tests to assert + """Yield every task-list-related file — used by tests to assert byte-equivalence pre/post shutdown. + + Includes the doc + lock and any legacy leftovers (so this still works + while a list is mid-migration). """ - root = task_list_path(task_list_id, hive_root=hive_root) - if not root.exists(): + files: list[Path] = [] + base = task_list_path(task_list_id, hive_root=hive_root) + if not base.exists(): return [] - return sorted(root.rglob("*")) + doc = base / DOC_FILENAME + if doc.exists(): + files.append(doc) + lock = base / LOCK_FILENAME + if lock.exists(): + files.append(lock) + legacy = _legacy_root(task_list_id, hive_root=hive_root) + if legacy.exists() and legacy != base: + files.extend(sorted(legacy.rglob("*"))) + elif legacy.exists(): + # _misc fallback: include only legacy filenames + for name in ("meta.json", ".highwatermark", ".lock"): + p = legacy / name + if p.exists(): + files.append(p) + td = legacy / "tasks" + if td.exists(): + files.extend(sorted(td.rglob("*"))) + return sorted(files) diff --git a/core/framework/tasks/tests/test_store.py b/core/framework/tasks/tests/test_store.py index 655d13f2..8ab96319 100644 --- a/core/framework/tasks/tests/test_store.py +++ b/core/framework/tasks/tests/test_store.py @@ -7,6 +7,7 @@ primitives the rest of the system relies on. from __future__ import annotations import asyncio +import json from pathlib import Path import pytest @@ -263,11 +264,119 @@ async def test_ensure_task_list_caps_history(store: TaskStore, list_id: str) -> @pytest.mark.asyncio async def test_colony_path(store: TaskStore, tmp_path: Path) -> None: await store.ensure_task_list("colony:abc", role=TaskListRole.TEMPLATE) - assert (tmp_path / "colonies" / "abc" / "tasks" / "meta.json").exists() + assert (tmp_path / "colonies" / "abc" / "tasks.json").exists() @pytest.mark.asyncio async def test_session_path(store: TaskStore, tmp_path: Path) -> None: await store.ensure_task_list("session:agent_x:sess_y", role=TaskListRole.SESSION) - p = tmp_path / "agents" / "agent_x" / "sessions" / "sess_y" / "tasks" / "meta.json" + p = tmp_path / "agents" / "agent_x" / "sessions" / "sess_y" / "tasks.json" assert p.exists() + + +@pytest.mark.asyncio +async def test_canonical_queen_session_dir_wins(store: TaskStore, tmp_path: Path) -> None: + """When ``agents/queens/{name}/sessions/{sid}/`` exists on disk, the task + doc lands there — beside conversations/events/summary — instead of in + the orphaned ``agents/{agent_id}/sessions/{sid}/`` location. + """ + sid = "session_20260429_test" + canonical = tmp_path / "agents" / "queens" / "queen_growth" / "sessions" / sid + canonical.mkdir(parents=True) + # Pretend the rest of the session is here. + (canonical / "events.jsonl").write_text("", encoding="utf-8") + + list_id = f"session:queen:{sid}" + await store.ensure_task_list(list_id, role=TaskListRole.SESSION) + rec = await store.create_task(list_id, subject="hello") + + assert (canonical / "tasks.json").exists() + assert not (tmp_path / "agents" / "queen" / "sessions" / sid / "tasks.json").exists() + fetched = await store.list_tasks(list_id) + assert [r.id for r in fetched] == [rec.id] + + +# --------------------------------------------------------------------------- +# Lazy migration from the older fan-out layout +# --------------------------------------------------------------------------- + + +def _seed_legacy_session(tmp_path: Path, agent: str, sess: str, n_tasks: int) -> Path: + """Hand-craft an older ``{root}/tasks/`` layout the way it used to live + on disk, so we can prove the lazy migration folds it correctly. + """ + legacy = tmp_path / "agents" / agent / "sessions" / sess / "tasks" + (legacy / "tasks").mkdir(parents=True) + list_id = f"session:{agent}:{sess}" + (legacy / "meta.json").write_text( + json.dumps( + { + "task_list_id": list_id, + "role": "session", + "creator_agent_id": None, + "created_at": 1000.0, + "last_seen_session_ids": ["s1"], + "schema_version": 1, + } + ), + encoding="utf-8", + ) + (legacy / ".highwatermark").write_text(str(n_tasks), encoding="utf-8") + (legacy / ".lock").write_text("", encoding="utf-8") + for i in range(1, n_tasks + 1): + (legacy / "tasks" / f"{i:04d}.json").write_text( + json.dumps( + { + "id": i, + "subject": f"legacy {i}", + "description": "", + "active_form": None, + "owner": None, + "status": "pending", + "blocks": [], + "blocked_by": [], + "metadata": {}, + "created_at": 1000.0 + i, + "updated_at": 1000.0 + i, + } + ), + encoding="utf-8", + ) + return legacy + + +@pytest.mark.asyncio +async def test_legacy_layout_migrates_on_first_read(store: TaskStore, tmp_path: Path) -> None: + legacy = _seed_legacy_session(tmp_path, "agent_z", "sess_z", 3) + list_id = "session:agent_z:sess_z" + # First read should fold the legacy fan-out into tasks.json. + records = await store.list_tasks(list_id) + assert [r.id for r in records] == [1, 2, 3] + assert [r.subject for r in records] == ["legacy 1", "legacy 2", "legacy 3"] + # New doc exists; the legacy dir is gone. + new_doc = tmp_path / "agents" / "agent_z" / "sessions" / "sess_z" / "tasks.json" + assert new_doc.exists() + assert not legacy.exists() + # Highwatermark is preserved — next id is 4, not 1. + new_rec = await store.create_task(list_id, subject="post-migration") + assert new_rec.id == 4 + + +@pytest.mark.asyncio +async def test_legacy_layout_migrates_on_first_write(store: TaskStore, tmp_path: Path) -> None: + _seed_legacy_session(tmp_path, "agent_w", "sess_w", 2) + list_id = "session:agent_w:sess_w" + # Update a legacy task — must trigger migration, then mutate. + new, changed = await store.update_task(list_id, 2, status=TaskStatus.IN_PROGRESS) + assert new is not None + assert changed == ["status"] + assert new.status == TaskStatus.IN_PROGRESS + # Doc reflects both legacy tasks. + listed = await store.list_tasks(list_id) + assert len(listed) == 2 + + +@pytest.mark.asyncio +async def test_legacy_list_exists(store: TaskStore, tmp_path: Path) -> None: + _seed_legacy_session(tmp_path, "agent_q", "sess_q", 1) + assert await store.list_exists("session:agent_q:sess_q")