feat(tasks): refactor task folder

This commit is contained in:
Richard Tang
2026-04-29 17:33:34 -07:00
parent e80d275321
commit 579f1d7512
4 changed files with 615 additions and 277 deletions
+1 -1
View File
@@ -152,7 +152,7 @@ def _count_runs(agent_name: str) -> int:
return len(run_ids)
_EXCLUDED_JSON_STEMS = {"agent", "flowchart", "triggers", "configuration", "metadata"}
_EXCLUDED_JSON_STEMS = {"agent", "flowchart", "triggers", "configuration", "metadata", "tasks"}
def _is_colony_dir(path: Path) -> bool:
+13 -1
View File
@@ -49,7 +49,7 @@ class TaskRecord(BaseModel):
class TaskListMeta(BaseModel):
"""Per-list metadata stored in ``meta.json`` next to the task files."""
"""Per-list metadata. Embedded in ``TaskListDocument``."""
task_list_id: str
role: TaskListRole
@@ -59,6 +59,18 @@ class TaskListMeta(BaseModel):
schema_version: int = 1
class TaskListDocument(BaseModel):
"""Whole task list as a single JSON document on disk.
Lives at ``{task_list_path(list_id)}/tasks.json``; the list-lock
sentinel is its sibling ``tasks.json.lock``.
"""
meta: TaskListMeta
highwatermark: int = 0
tasks: list[TaskRecord] = Field(default_factory=list)
# Tagged union for claim_task_with_busy_check. Used by run_parallel_workers
# when stamping ``assigned_session`` on a colony template entry — the only
# place a "claim" actually happens under the hive model.
+490 -273
View File
@@ -2,29 +2,26 @@
Layout per list::
{root}/{task_list_id}/
meta.json -- TaskListMeta
tasks/
0001.json -- TaskRecord (zero-padded for ls-sort)
0002.json
...
.lock -- list-level lock
.highwatermark -- ID floor (deleted ids never reused)
{task_list_path}/tasks.json -- TaskListDocument (meta + hwm + tasks)
{task_list_path}/tasks.json.lock -- list-level lock sentinel
Two list-roots:
Where ``task_list_path`` is:
colony:{colony_id} -> ~/.hive/colonies/{colony_id}/tasks/
session:{a}:{s} -> ~/.hive/agents/{a}/sessions/{s}/tasks/
colony:{c} -> ~/.hive/colonies/{c}/
session:{a}:{s} -> ~/.hive/agents/{a}/sessions/{s}/
unscoped:{a} -> ~/.hive/unscoped/{a}/
{malformed} -> ~/.hive/_misc/{slug}/
An older layout used the same root + a nested ``tasks/`` subdir holding
``meta.json``, ``.highwatermark``, ``.lock``, and ``NNNN.json`` per task.
That produced the ugly ``/tasks/tasks/0001.json`` path. Migration is
lazy the first lock-protected access on such a list folds the legacy
artifacts into ``tasks.json`` and unlinks them.
All filesystem I/O is wrapped in ``asyncio.to_thread`` so the event loop
never blocks. Locks use a 30-retry / ~2.6s budget comfortable headroom
for the only realistic write contender (colony template under concurrent
never blocks. Locks use a ~3s budget comfortable headroom for the only
realistic write contender (colony template under concurrent
``colony_template_*`` and ``run_parallel_workers`` stamps).
The "_unsafe" variants exist because filelock is **not re-entrant**: a
caller already holding a lock must NOT re-acquire it (would deadlock).
The unsafe path skips acquisition and is callable only from inside another
locked function. See ``claim_task_with_busy_check`` and ``delete_task``.
"""
from __future__ import annotations
@@ -32,6 +29,8 @@ from __future__ import annotations
import asyncio
import logging
import os
import shutil
import threading
import time
from collections.abc import Iterable
from pathlib import Path
@@ -46,6 +45,7 @@ from framework.tasks.models import (
ClaimNotFound,
ClaimOk,
ClaimResult,
TaskListDocument,
TaskListMeta,
TaskListRole,
TaskRecord,
@@ -57,6 +57,24 @@ logger = logging.getLogger(__name__)
LOCK_TIMEOUT_SECONDS = 3.0 # ~30 retries × ~100ms
DOC_FILENAME = "tasks.json"
LOCK_FILENAME = "tasks.json.lock" # only colony lists (cross-process writers)
# Per-list in-memory locks for single-process scopes (session/unscoped/_misc).
# Sessions have one owning agent, so only same-process concurrency matters
# (e.g. parallel tool use within a single turn) — no on-disk lock needed.
_INPROC_LOCKS: dict[str, threading.Lock] = {}
_INPROC_LOCKS_GUARD = threading.Lock()
def _get_inproc_lock(task_list_id: str) -> threading.Lock:
with _INPROC_LOCKS_GUARD:
lock = _INPROC_LOCKS.get(task_list_id)
if lock is None:
lock = threading.Lock()
_INPROC_LOCKS[task_list_id] = lock
return lock
class _Unset:
"""Sentinel for "owner argument not provided" — distinct from owner=None."""
@@ -72,27 +90,87 @@ def _hive_root() -> Path:
return Path(os.environ.get("HIVE_HOME", str(Path.home() / ".hive")))
def _find_queen_session_dir(session_id: str, *, hive_root: Path) -> Path | None:
"""Return ``agents/queens/{queen}/sessions/{session_id}`` if one exists.
Queens live under ``QUEENS_DIR = hive_root / "agents" / "queens"`` (see
``framework.config``). The task system gets a generic ``agent_id ==
"queen"`` in its ``task_list_id``, which would otherwise dead-end at
``agents/queen/...``, decoupled from the real session folder. By
probing the canonical layout we keep the task doc beside conversations,
events, summary, and meta for the same session.
"""
queens_dir = hive_root / "agents" / "queens"
if not queens_dir.exists():
return None
try:
candidates = [d for d in queens_dir.iterdir() if d.is_dir()]
except OSError:
return None
for queen_dir in candidates:
candidate = queen_dir / "sessions" / session_id
if candidate.is_dir():
return candidate
return None
def task_list_path(task_list_id: str, *, hive_root: Path | None = None) -> Path:
"""Resolve task_list_id -> on-disk root."""
"""Resolve task_list_id -> directory containing ``tasks.json``.
Note: this returns the *parent* of the doc file, not the file itself.
For session/colony/unscoped lists, this is the agent or colony's home
dir; the task doc is one filename inside it. (The older layout had an
extra ``tasks/`` subdir under this path see ``_legacy_root``.)
For ``session:`` lists, the canonical queen session folder is preferred
when it exists on disk: the task doc lives next to the rest of that
session's data (conversations, events, summary).
"""
root = hive_root or _hive_root()
if task_list_id.startswith("colony:"):
colony_id = task_list_id[len("colony:") :]
return root / "colonies" / colony_id / "tasks"
return root / "colonies" / colony_id
if task_list_id.startswith("session:"):
rest = task_list_id[len("session:") :]
agent_id, _, session_id = rest.partition(":")
if not session_id:
raise ValueError(f"Malformed session task_list_id: {task_list_id!r}")
return root / "agents" / agent_id / "sessions" / session_id / "tasks"
canonical = _find_queen_session_dir(session_id, hive_root=root)
if canonical is not None:
return canonical
return root / "agents" / agent_id / "sessions" / session_id
if task_list_id.startswith("unscoped:"):
agent_id = task_list_id[len("unscoped:") :]
return root / "unscoped" / agent_id / "tasks"
return root / "unscoped" / agent_id
# Last-ditch sanitization for HIVE_TASK_LIST_ID overrides — slugify the
# whole thing so the test/dev path can't escape the hive root.
safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in task_list_id)
return root / "_misc" / safe
def _legacy_root(task_list_id: str, *, hive_root: Path | None = None) -> Path:
"""Where the older artifacts (meta.json, .highwatermark, tasks/NNNN.json) lived.
Pinned to the *pre-canonical* layout for queen session lists this is
``agents/{agent_id}/sessions/{session_id}/tasks`` (i.e. the literal
``agent_id`` folder, not the canonical ``agents/queens/{queen}/...``
path). The lazy migration reads from here and writes the new doc to
wherever ``task_list_path`` resolves now.
"""
root = hive_root or _hive_root()
if task_list_id.startswith("colony:"):
return root / "colonies" / task_list_id[len("colony:") :] / "tasks"
if task_list_id.startswith("session:"):
rest = task_list_id[len("session:") :]
agent_id, _, session_id = rest.partition(":")
return root / "agents" / agent_id / "sessions" / session_id / "tasks"
if task_list_id.startswith("unscoped:"):
return root / "unscoped" / task_list_id[len("unscoped:") :] / "tasks"
# _misc fallback: legacy lived directly in the slug dir, same as the new parent.
safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in task_list_id)
return root / "_misc" / safe
# ---------------------------------------------------------------------------
# TaskStore — public façade
# ---------------------------------------------------------------------------
@@ -132,23 +210,16 @@ class TaskStore:
)
async def list_exists(self, task_list_id: str) -> bool:
"""A list exists if its meta.json OR any task file is on disk.
"""A list exists if its doc is on disk OR a legacy artifact is.
meta.json is normally written by ``ensure_task_list``, but session
lists may be created lazily via the first ``task_create`` (see
``_create_task_sync``) in that case meta.json is backfilled the
first time the list is read. Until then, we still want to expose
the list's tasks via REST.
The legacy fallback exists so that lists created under the older
layout and not yet migrated still surface to the REST layer.
"""
def _check() -> bool:
root = self._list_root(task_list_id)
if (root / "meta.json").exists():
if self._doc_path(task_list_id).exists():
return True
tasks_dir = root / "tasks"
if tasks_dir.exists() and any(p.suffix == ".json" for p in tasks_dir.iterdir()):
return True
return False
return self._has_legacy_artifacts(task_list_id)
return await asyncio.to_thread(_check)
@@ -156,7 +227,7 @@ class TaskStore:
return await asyncio.to_thread(self._read_meta_sync, task_list_id)
async def reset_task_list(self, task_list_id: str) -> None:
"""Delete all task files but preserve the high-water-mark.
"""Delete all tasks but preserve the high-water-mark.
Test helper. Never wired to runtime lifecycle.
"""
@@ -173,13 +244,9 @@ class TaskStore:
Each spec is a dict with keys: subject (required), description,
active_form, owner, metadata. Ids are assigned sequentially and
contiguously if any task fails to write, an exception is raised
and the whole batch is rolled back (file unlinked, high-water-mark
kept at the prior value).
Atomic-or-none semantics matter for the tool surface: a failed
partial batch would leave the LLM reasoning about cleanup, which
defeats the point of batching as a single decision.
contiguously; if any spec is malformed the whole batch is
rejected before any write. The doc model makes "atomic-or-none"
free we mutate one in-memory document and write it once.
"""
return await asyncio.to_thread(
self._create_tasks_batch_sync, task_list_id, specs
@@ -274,30 +341,222 @@ class TaskStore:
# Sync internals — all called via asyncio.to_thread
# =====================================================================
def _list_root(self, task_list_id: str) -> Path:
def _list_dir(self, task_list_id: str) -> Path:
return task_list_path(task_list_id, hive_root=self._hive_root)
def _tasks_dir(self, task_list_id: str) -> Path:
return self._list_root(task_list_id) / "tasks"
def _doc_path(self, task_list_id: str) -> Path:
return self._list_dir(task_list_id) / DOC_FILENAME
def _list_lock(self, task_list_id: str) -> FileLock:
# FileLock targets a sentinel file; it tolerates the file being absent
# by creating it on first acquire. We use the .lock filename so it's
# visible alongside the other list files.
root = self._list_root(task_list_id)
root.mkdir(parents=True, exist_ok=True)
return FileLock(str(root / ".lock"), timeout=LOCK_TIMEOUT_SECONDS)
def _list_lock(self, task_list_id: str):
"""Return a context manager that serialises writes to this list.
def _highwatermark_path(self, task_list_id: str) -> Path:
return self._list_root(task_list_id) / ".highwatermark"
Colony template lists need a cross-process ``FileLock`` because
``run_parallel_workers`` spawns worker subprocesses that stamp
completion back onto the template. Session/unscoped/_misc lists
have a single owning agent only same-process concurrency
matters (e.g. parallel tool use within one turn), so an
in-memory ``threading.Lock`` is enough and avoids the visible
``tasks.json.lock`` sentinel beside session folders.
"""
d = self._list_dir(task_list_id)
d.mkdir(parents=True, exist_ok=True)
if task_list_id.startswith("colony:"):
return FileLock(str(d / LOCK_FILENAME), timeout=LOCK_TIMEOUT_SECONDS)
return _get_inproc_lock(task_list_id)
def _meta_path(self, task_list_id: str) -> Path:
return self._list_root(task_list_id) / "meta.json"
def _legacy_dir(self, task_list_id: str) -> Path:
return _legacy_root(task_list_id, hive_root=self._hive_root)
def _task_path(self, task_list_id: str, task_id: int) -> Path:
return self._tasks_dir(task_list_id) / f"{task_id:04d}.json"
def _legacy_meta_path(self, task_list_id: str) -> Path:
return self._legacy_dir(task_list_id) / "meta.json"
# ----- meta ---------------------------------------------------------
def _legacy_hwm_path(self, task_list_id: str) -> Path:
return self._legacy_dir(task_list_id) / ".highwatermark"
def _legacy_lock_path(self, task_list_id: str) -> Path:
return self._legacy_dir(task_list_id) / ".lock"
def _legacy_tasks_dir(self, task_list_id: str) -> Path:
return self._legacy_dir(task_list_id) / "tasks"
def _has_legacy_artifacts(self, task_list_id: str) -> bool:
if self._legacy_meta_path(task_list_id).exists():
return True
td = self._legacy_tasks_dir(task_list_id)
if td.exists():
try:
return any(p.suffix == ".json" for p in td.iterdir())
except OSError:
return False
return False
# ----- doc IO -------------------------------------------------------
def _read_doc_sync(self, task_list_id: str) -> TaskListDocument | None:
"""Lock-free read for already-migrated lists; falls back to a
lock-protected migration if only legacy artifacts exist.
Returns None if the list doesn't exist on disk in either form.
"""
doc_path = self._doc_path(task_list_id)
if doc_path.exists():
try:
return TaskListDocument.model_validate_json(
doc_path.read_text(encoding="utf-8")
)
except Exception:
logger.warning("Corrupt tasks.json at %s", doc_path, exc_info=True)
# Fall through — legacy fallback may rescue us.
if self._has_legacy_artifacts(task_list_id):
with self._list_lock(task_list_id):
# Re-check under lock: a parallel writer may have just
# finished migrating, in which case we read the new doc.
if doc_path.exists():
try:
return TaskListDocument.model_validate_json(
doc_path.read_text(encoding="utf-8")
)
except Exception:
logger.warning(
"Corrupt tasks.json at %s (post-lock)",
doc_path,
exc_info=True,
)
doc = self._migrate_legacy_unsafe(task_list_id)
if doc is not None:
self._write_doc_unsafe(task_list_id, doc)
self._cleanup_legacy_unsafe(task_list_id)
return doc
return None
def _read_doc_unsafe(self, task_list_id: str) -> TaskListDocument | None:
"""Same as ``_read_doc_sync`` but assumes the list-lock is already
held used by methods that are already inside ``with self._list_lock``.
Migration happens in-place without re-entering the lock.
"""
doc_path = self._doc_path(task_list_id)
if doc_path.exists():
try:
return TaskListDocument.model_validate_json(
doc_path.read_text(encoding="utf-8")
)
except Exception:
logger.warning("Corrupt tasks.json at %s", doc_path, exc_info=True)
if self._has_legacy_artifacts(task_list_id):
doc = self._migrate_legacy_unsafe(task_list_id)
if doc is not None:
self._write_doc_unsafe(task_list_id, doc)
self._cleanup_legacy_unsafe(task_list_id)
return doc
return None
def _write_doc_unsafe(self, task_list_id: str, doc: TaskListDocument) -> None:
"""Atomically rewrite the doc. Caller MUST hold the list-lock."""
path = self._doc_path(task_list_id)
path.parent.mkdir(parents=True, exist_ok=True)
with atomic_write(path) as f:
f.write(doc.model_dump_json(indent=2))
# ----- migration ----------------------------------------------------
def _migrate_legacy_unsafe(self, task_list_id: str) -> TaskListDocument | None:
"""Fold legacy artifacts into a TaskListDocument. Caller MUST hold lock."""
meta = self._read_legacy_meta(task_list_id)
if meta is None:
inferred_role = (
TaskListRole.TEMPLATE
if task_list_id.startswith("colony:")
else TaskListRole.SESSION
)
meta = TaskListMeta(task_list_id=task_list_id, role=inferred_role)
tasks: list[TaskRecord] = []
td = self._legacy_tasks_dir(task_list_id)
if td.exists():
for p in sorted(td.iterdir()):
if p.suffix != ".json":
continue
try:
tasks.append(TaskRecord.model_validate_json(p.read_text(encoding="utf-8")))
except Exception:
logger.warning(
"Skipping corrupt legacy task file %s during migration",
p,
exc_info=True,
)
tasks.sort(key=lambda r: r.id)
hwm = self._read_legacy_hwm(task_list_id)
max_id = max((r.id for r in tasks), default=0)
hwm = max(hwm, max_id)
if not tasks and hwm == 0 and not self._legacy_meta_path(task_list_id).exists():
return None
return TaskListDocument(
meta=meta,
highwatermark=hwm,
tasks=tasks,
)
def _read_legacy_meta(self, task_list_id: str) -> TaskListMeta | None:
path = self._legacy_meta_path(task_list_id)
if not path.exists():
return None
try:
return TaskListMeta.model_validate_json(path.read_text(encoding="utf-8"))
except Exception:
logger.warning("Corrupt legacy meta.json at %s", path, exc_info=True)
return None
def _read_legacy_hwm(self, task_list_id: str) -> int:
path = self._legacy_hwm_path(task_list_id)
if not path.exists():
return 0
try:
return int(path.read_text(encoding="utf-8").strip() or "0")
except (ValueError, OSError):
return 0
def _cleanup_legacy_unsafe(self, task_list_id: str) -> None:
"""Remove the older layout's files. Caller MUST hold the list-lock.
For session/colony/unscoped lists the legacy_dir is a dedicated
``tasks/`` subdir, so we remove the whole tree. For the ``_misc``
fallback the legacy_dir is the same as the new parent dir we
delete only the specific legacy filenames so we don't clobber
the new ``tasks.json``.
"""
legacy = self._legacy_dir(task_list_id)
if not legacy.exists():
return
if legacy != self._list_dir(task_list_id):
try:
shutil.rmtree(legacy)
except OSError:
logger.warning("Failed to remove legacy task dir %s", legacy, exc_info=True)
return
# _misc case: shared parent dir — surgical delete only.
for p in (
self._legacy_meta_path(task_list_id),
self._legacy_hwm_path(task_list_id),
self._legacy_lock_path(task_list_id),
):
try:
p.unlink(missing_ok=True)
except OSError:
logger.warning("Failed to remove %s", p, exc_info=True)
td = self._legacy_tasks_dir(task_list_id)
if td.exists():
try:
shutil.rmtree(td)
except OSError:
logger.warning("Failed to remove legacy tasks subdir %s", td, exc_info=True)
# ----- meta accessors over the doc ----------------------------------
def _ensure_task_list_sync(
self,
@@ -306,107 +565,47 @@ class TaskStore:
creator_agent_id: str | None,
session_id: str | None,
) -> TaskListMeta:
root = self._list_root(task_list_id)
root.mkdir(parents=True, exist_ok=True)
(root / "tasks").mkdir(exist_ok=True)
meta_path = self._meta_path(task_list_id)
with self._list_lock(task_list_id):
if meta_path.exists():
meta = self._read_meta_sync(task_list_id)
if meta is None:
# File existed but failed to parse — rewrite fresh.
meta = TaskListMeta(
task_list_id=task_list_id,
role=role,
creator_agent_id=creator_agent_id,
)
if session_id and session_id not in meta.last_seen_session_ids:
meta.last_seen_session_ids.append(session_id)
# Cap at 10 to keep the audit trail bounded.
meta.last_seen_session_ids = meta.last_seen_session_ids[-10:]
self._write_meta_sync(task_list_id, meta)
doc = self._read_doc_unsafe(task_list_id)
if doc is None:
meta = TaskListMeta(
task_list_id=task_list_id,
role=role,
creator_agent_id=creator_agent_id,
last_seen_session_ids=[session_id] if session_id else [],
)
doc = TaskListDocument(meta=meta)
self._write_doc_unsafe(task_list_id, doc)
return meta
meta = TaskListMeta(
task_list_id=task_list_id,
role=role,
creator_agent_id=creator_agent_id,
last_seen_session_ids=[session_id] if session_id else [],
)
self._write_meta_sync(task_list_id, meta)
meta = doc.meta
if session_id and session_id not in meta.last_seen_session_ids:
meta.last_seen_session_ids.append(session_id)
# Cap at 10 to keep the audit trail bounded.
meta.last_seen_session_ids = meta.last_seen_session_ids[-10:]
self._write_doc_unsafe(task_list_id, doc)
return meta
def _read_meta_sync(self, task_list_id: str) -> TaskListMeta | None:
path = self._meta_path(task_list_id)
if not path.exists():
return None
try:
return TaskListMeta.model_validate_json(path.read_text(encoding="utf-8"))
except Exception:
logger.warning("Corrupt meta.json at %s", path, exc_info=True)
return None
def _write_meta_sync(self, task_list_id: str, meta: TaskListMeta) -> None:
path = self._meta_path(task_list_id)
path.parent.mkdir(parents=True, exist_ok=True)
with atomic_write(path) as f:
f.write(meta.model_dump_json(indent=2))
doc = self._read_doc_sync(task_list_id)
return doc.meta if doc is not None else None
# ----- task IO ------------------------------------------------------
def _read_task_sync(self, task_list_id: str, task_id: int) -> TaskRecord | None:
path = self._task_path(task_list_id, task_id)
if not path.exists():
doc = self._read_doc_sync(task_list_id)
if doc is None:
return None
try:
return TaskRecord.model_validate_json(path.read_text(encoding="utf-8"))
except Exception:
logger.warning("Corrupt task file at %s", path, exc_info=True)
return None
def _write_task_sync(self, task_list_id: str, record: TaskRecord) -> None:
path = self._task_path(task_list_id, record.id)
path.parent.mkdir(parents=True, exist_ok=True)
with atomic_write(path) as f:
f.write(record.model_dump_json(indent=2))
for r in doc.tasks:
if r.id == task_id:
return r
return None
def _list_tasks_sync(self, task_list_id: str) -> list[TaskRecord]:
d = self._tasks_dir(task_list_id)
if not d.exists():
doc = self._read_doc_sync(task_list_id)
if doc is None:
return []
records: list[TaskRecord] = []
for path in sorted(d.iterdir()):
if path.suffix != ".json":
continue
try:
records.append(TaskRecord.model_validate_json(path.read_text(encoding="utf-8")))
except Exception:
logger.warning("Skipping corrupt task file %s", path, exc_info=True)
records.sort(key=lambda r: r.id)
return records
# ----- highwatermark / id assignment --------------------------------
def _read_highwatermark_sync(self, task_list_id: str) -> int:
path = self._highwatermark_path(task_list_id)
if not path.exists():
return 0
try:
return int(path.read_text(encoding="utf-8").strip() or "0")
except (ValueError, OSError):
return 0
def _write_highwatermark_sync(self, task_list_id: str, value: int) -> None:
path = self._highwatermark_path(task_list_id)
path.parent.mkdir(parents=True, exist_ok=True)
with atomic_write(path) as f:
f.write(str(value))
def _next_id_sync(self, task_list_id: str) -> int:
"""Compute next id under the assumption the list-lock is held."""
existing = self._list_tasks_sync(task_list_id)
max_existing = max((r.id for r in existing), default=0)
floor = self._read_highwatermark_sync(task_list_id)
return max(max_existing, floor) + 1
return sorted(doc.tasks, key=lambda r: r.id)
# ----- create -------------------------------------------------------
@@ -420,20 +619,17 @@ class TaskStore:
metadata: dict[str, Any],
) -> TaskRecord:
with self._list_lock(task_list_id):
# Lazy-create meta.json on first task. Session lists are
# frequently created via the first task_create (no explicit
# ensure_task_list call); without this backfill the REST
# endpoint can't discover them. Role is inferred from prefix.
if not self._meta_path(task_list_id).exists():
inferred_role = TaskListRole.TEMPLATE if task_list_id.startswith("colony:") else TaskListRole.SESSION
self._write_meta_sync(
task_list_id,
TaskListMeta(
task_list_id=task_list_id,
role=inferred_role,
),
doc = self._read_doc_unsafe(task_list_id)
if doc is None:
inferred_role = (
TaskListRole.TEMPLATE
if task_list_id.startswith("colony:")
else TaskListRole.SESSION
)
new_id = self._next_id_sync(task_list_id)
doc = TaskListDocument(
meta=TaskListMeta(task_list_id=task_list_id, role=inferred_role)
)
new_id = self._next_id_for_doc(doc)
now = time.time()
record = TaskRecord(
id=new_id,
@@ -446,11 +642,10 @@ class TaskStore:
created_at=now,
updated_at=now,
)
self._write_task_sync(task_list_id, record)
# Bump high-water-mark eagerly so even a concurrent racer that
# somehow missed the listing snapshot can't pick the same id.
if new_id > self._read_highwatermark_sync(task_list_id):
self._write_highwatermark_sync(task_list_id, new_id)
doc.tasks.append(record)
if new_id > doc.highwatermark:
doc.highwatermark = new_id
self._write_doc_unsafe(task_list_id, doc)
return record
def _create_tasks_batch_sync(
@@ -467,19 +662,18 @@ class TaskStore:
raise ValueError(f"specs[{i}].subject must be a non-empty string")
with self._list_lock(task_list_id):
# Same lazy meta backfill as _create_task_sync.
if not self._meta_path(task_list_id).exists():
doc = self._read_doc_unsafe(task_list_id)
if doc is None:
inferred_role = (
TaskListRole.TEMPLATE
if task_list_id.startswith("colony:")
else TaskListRole.SESSION
)
self._write_meta_sync(
task_list_id,
TaskListMeta(task_list_id=task_list_id, role=inferred_role),
doc = TaskListDocument(
meta=TaskListMeta(task_list_id=task_list_id, role=inferred_role)
)
base_id = self._next_id_sync(task_list_id)
base_id = self._next_id_for_doc(doc)
now = time.time()
records: list[TaskRecord] = []
for offset, spec in enumerate(specs):
@@ -496,27 +690,20 @@ class TaskStore:
)
records.append(rec)
# Write all task files; on any failure, unlink everything we
# wrote so far and re-raise. High-water-mark is bumped only
# after a successful full-batch write.
written: list[Path] = []
try:
for rec in records:
self._write_task_sync(task_list_id, rec)
written.append(self._task_path(task_list_id, rec.id))
except Exception:
for path in written:
try:
path.unlink(missing_ok=True)
except OSError:
logger.warning("Failed to roll back batch task at %s", path, exc_info=True)
raise
doc.tasks.extend(records)
highest = records[-1].id
if highest > self._read_highwatermark_sync(task_list_id):
self._write_highwatermark_sync(task_list_id, highest)
if highest > doc.highwatermark:
doc.highwatermark = highest
# Single write — atomic batch is free with the doc model.
self._write_doc_unsafe(task_list_id, doc)
return records
# ----- id assignment ------------------------------------------------
def _next_id_for_doc(self, doc: TaskListDocument) -> int:
max_existing = max((r.id for r in doc.tasks), default=0)
return max(max_existing, doc.highwatermark) + 1
# ----- update -------------------------------------------------------
def _update_task_sync(
@@ -533,25 +720,27 @@ class TaskStore:
metadata_patch: dict[str, Any] | None,
) -> tuple[TaskRecord | None, list[str]]:
with self._list_lock(task_list_id):
current = self._read_task_sync(task_list_id, task_id)
if current is None:
doc = self._read_doc_unsafe(task_list_id)
if doc is None:
return None, []
return self._update_task_unsafe(
task_list_id,
current,
subject=subject,
description=description,
active_form=active_form,
owner=owner,
status=status,
add_blocks=add_blocks,
add_blocked_by=add_blocked_by,
metadata_patch=metadata_patch,
)
target = next((r for r in doc.tasks if r.id == task_id), None)
if target is None:
return None, []
new, changed = self._update_task_in_doc(doc, target, subject=subject,
description=description,
active_form=active_form,
owner=owner,
status=status,
add_blocks=add_blocks,
add_blocked_by=add_blocked_by,
metadata_patch=metadata_patch)
if changed:
self._write_doc_unsafe(task_list_id, doc)
return new, changed
def _update_task_unsafe(
def _update_task_in_doc(
self,
task_list_id: str,
doc: TaskListDocument,
current: TaskRecord,
*,
subject: str | None = None,
@@ -563,84 +752,89 @@ class TaskStore:
add_blocked_by: list[int] | None = None,
metadata_patch: dict[str, Any] | None = None,
) -> tuple[TaskRecord, list[str]]:
"""Update without acquiring the list-lock. Caller MUST hold it."""
"""Mutate ``current`` in place inside ``doc`` and return (record, changed).
Bidirectional blocks/blocked_by also mutate the targets in ``doc``.
"""
changed: list[str] = []
new = current.model_copy(deep=True)
if subject is not None and subject != new.subject:
new.subject = subject
if subject is not None and subject != current.subject:
current.subject = subject
changed.append("subject")
if description is not None and description != new.description:
new.description = description
if description is not None and description != current.description:
current.description = description
changed.append("description")
if active_form is not None and active_form != new.active_form:
new.active_form = active_form
if active_form is not None and active_form != current.active_form:
current.active_form = active_form
changed.append("active_form")
if not isinstance(owner, _Unset) and owner != new.owner:
new.owner = owner
if not isinstance(owner, _Unset) and owner != current.owner:
current.owner = owner
changed.append("owner")
if status is not None and status != new.status:
new.status = status
if status is not None and status != current.status:
current.status = status
changed.append("status")
if add_blocks:
for b in add_blocks:
if b not in new.blocks and b != new.id:
new.blocks.append(b)
if "blocks" not in changed:
changed.append("blocks")
# Maintain the bidirectional invariant by stamping
# blocked_by on the target as well.
target = self._read_task_sync(task_list_id, b)
if target and new.id not in target.blocked_by:
target.blocked_by.append(new.id)
target.updated_at = time.time()
self._write_task_sync(task_list_id, target)
if b in current.blocks or b == current.id:
continue
current.blocks.append(b)
if "blocks" not in changed:
changed.append("blocks")
target = next((r for r in doc.tasks if r.id == b), None)
if target is not None and current.id not in target.blocked_by:
target.blocked_by.append(current.id)
target.updated_at = time.time()
if add_blocked_by:
for b in add_blocked_by:
if b not in new.blocked_by and b != new.id:
new.blocked_by.append(b)
if "blocked_by" not in changed:
changed.append("blocked_by")
target = self._read_task_sync(task_list_id, b)
if target and new.id not in target.blocks:
target.blocks.append(new.id)
target.updated_at = time.time()
self._write_task_sync(task_list_id, target)
if b in current.blocked_by or b == current.id:
continue
current.blocked_by.append(b)
if "blocked_by" not in changed:
changed.append("blocked_by")
target = next((r for r in doc.tasks if r.id == b), None)
if target is not None and current.id not in target.blocks:
target.blocks.append(current.id)
target.updated_at = time.time()
if metadata_patch is not None:
md = dict(new.metadata)
md = dict(current.metadata)
for k, v in metadata_patch.items():
if v is None:
md.pop(k, None)
else:
md[k] = v
if md != new.metadata:
new.metadata = md
if md != current.metadata:
current.metadata = md
changed.append("metadata")
if not changed:
return new, []
return current, []
new.updated_at = time.time()
self._write_task_sync(task_list_id, new)
return new, changed
current.updated_at = time.time()
return current, changed
# ----- delete -------------------------------------------------------
def _delete_task_sync(self, task_list_id: str, task_id: int) -> tuple[bool, list[int]]:
with self._list_lock(task_list_id):
path = self._task_path(task_list_id, task_id)
if not path.exists():
doc = self._read_doc_unsafe(task_list_id)
if doc is None:
return False, []
# 1. Bump high-water-mark BEFORE unlinking so a crash mid-delete
# can't accidentally re-allocate the id.
current_floor = self._read_highwatermark_sync(task_list_id)
if task_id > current_floor:
self._write_highwatermark_sync(task_list_id, task_id)
# 2. Unlink the task itself.
path.unlink()
idx = next((i for i, r in enumerate(doc.tasks) if r.id == task_id), None)
if idx is None:
return False, []
# 1. Bump high-water-mark BEFORE removing so a crash mid-write
# can't cause id reuse on the next create. (atomic_write
# guarantees we either commit the whole new state or none.)
if task_id > doc.highwatermark:
doc.highwatermark = task_id
# 2. Remove the task itself.
doc.tasks.pop(idx)
# 3. Cascade: strip references from all other tasks.
cascaded: list[int] = []
for other in self._list_tasks_sync(task_list_id):
now = time.time()
for other in doc.tasks:
touched = False
if task_id in other.blocks:
other.blocks = [b for b in other.blocks if b != task_id]
@@ -649,31 +843,31 @@ class TaskStore:
other.blocked_by = [b for b in other.blocked_by if b != task_id]
touched = True
if touched:
other.updated_at = time.time()
self._write_task_sync(task_list_id, other)
other.updated_at = now
cascaded.append(other.id)
self._write_doc_unsafe(task_list_id, doc)
return True, cascaded
# ----- reset --------------------------------------------------------
def _reset_sync(self, task_list_id: str) -> None:
with self._list_lock(task_list_id):
tasks = self._list_tasks_sync(task_list_id)
max_id = max((r.id for r in tasks), default=0)
floor = self._read_highwatermark_sync(task_list_id)
new_floor = max(max_id, floor)
self._write_highwatermark_sync(task_list_id, new_floor)
d = self._tasks_dir(task_list_id)
if d.exists():
for p in d.iterdir():
if p.suffix == ".json":
p.unlink()
doc = self._read_doc_unsafe(task_list_id)
if doc is None:
return
max_id = max((r.id for r in doc.tasks), default=0)
doc.highwatermark = max(doc.highwatermark, max_id)
doc.tasks = []
self._write_doc_unsafe(task_list_id, doc)
# ----- claim --------------------------------------------------------
def _claim_sync(self, task_list_id: str, task_id: int, claimant: str) -> ClaimResult:
with self._list_lock(task_list_id):
current = self._read_task_sync(task_list_id, task_id)
doc = self._read_doc_unsafe(task_list_id)
if doc is None:
return ClaimNotFound(kind="not_found")
current = next((r for r in doc.tasks if r.id == task_id), None)
if current is None:
return ClaimNotFound(kind="not_found")
if current.status == TaskStatus.COMPLETED:
@@ -682,12 +876,13 @@ class TaskStore:
return ClaimAlreadyOwned(kind="already_owned", by=current.owner)
unresolved_blockers: list[int] = []
for b in current.blocked_by:
blocker = self._read_task_sync(task_list_id, b)
blocker = next((r for r in doc.tasks if r.id == b), None)
if blocker is not None and blocker.status != TaskStatus.COMPLETED:
unresolved_blockers.append(b)
if unresolved_blockers:
return ClaimBlocked(kind="blocked", by=unresolved_blockers)
new, _ = self._update_task_unsafe(task_list_id, current, owner=claimant)
new, _ = self._update_task_in_doc(doc, current, owner=claimant)
self._write_doc_unsafe(task_list_id, doc)
return ClaimOk(kind="ok", record=new)
@@ -713,10 +908,32 @@ def get_task_store() -> TaskStore:
# Convenience for tests / utilities.
def fingerprint_for_test(task_list_id: str, hive_root: Path) -> Iterable[Path]:
"""Yield every file under a list root — used by tests to assert
"""Yield every task-list-related file — used by tests to assert
byte-equivalence pre/post shutdown.
Includes the doc + lock and any legacy leftovers (so this still works
while a list is mid-migration).
"""
root = task_list_path(task_list_id, hive_root=hive_root)
if not root.exists():
files: list[Path] = []
base = task_list_path(task_list_id, hive_root=hive_root)
if not base.exists():
return []
return sorted(root.rglob("*"))
doc = base / DOC_FILENAME
if doc.exists():
files.append(doc)
lock = base / LOCK_FILENAME
if lock.exists():
files.append(lock)
legacy = _legacy_root(task_list_id, hive_root=hive_root)
if legacy.exists() and legacy != base:
files.extend(sorted(legacy.rglob("*")))
elif legacy.exists():
# _misc fallback: include only legacy filenames
for name in ("meta.json", ".highwatermark", ".lock"):
p = legacy / name
if p.exists():
files.append(p)
td = legacy / "tasks"
if td.exists():
files.extend(sorted(td.rglob("*")))
return sorted(files)
+111 -2
View File
@@ -7,6 +7,7 @@ primitives the rest of the system relies on.
from __future__ import annotations
import asyncio
import json
from pathlib import Path
import pytest
@@ -263,11 +264,119 @@ async def test_ensure_task_list_caps_history(store: TaskStore, list_id: str) ->
@pytest.mark.asyncio
async def test_colony_path(store: TaskStore, tmp_path: Path) -> None:
await store.ensure_task_list("colony:abc", role=TaskListRole.TEMPLATE)
assert (tmp_path / "colonies" / "abc" / "tasks" / "meta.json").exists()
assert (tmp_path / "colonies" / "abc" / "tasks.json").exists()
@pytest.mark.asyncio
async def test_session_path(store: TaskStore, tmp_path: Path) -> None:
await store.ensure_task_list("session:agent_x:sess_y", role=TaskListRole.SESSION)
p = tmp_path / "agents" / "agent_x" / "sessions" / "sess_y" / "tasks" / "meta.json"
p = tmp_path / "agents" / "agent_x" / "sessions" / "sess_y" / "tasks.json"
assert p.exists()
@pytest.mark.asyncio
async def test_canonical_queen_session_dir_wins(store: TaskStore, tmp_path: Path) -> None:
"""When ``agents/queens/{name}/sessions/{sid}/`` exists on disk, the task
doc lands there beside conversations/events/summary instead of in
the orphaned ``agents/{agent_id}/sessions/{sid}/`` location.
"""
sid = "session_20260429_test"
canonical = tmp_path / "agents" / "queens" / "queen_growth" / "sessions" / sid
canonical.mkdir(parents=True)
# Pretend the rest of the session is here.
(canonical / "events.jsonl").write_text("", encoding="utf-8")
list_id = f"session:queen:{sid}"
await store.ensure_task_list(list_id, role=TaskListRole.SESSION)
rec = await store.create_task(list_id, subject="hello")
assert (canonical / "tasks.json").exists()
assert not (tmp_path / "agents" / "queen" / "sessions" / sid / "tasks.json").exists()
fetched = await store.list_tasks(list_id)
assert [r.id for r in fetched] == [rec.id]
# ---------------------------------------------------------------------------
# Lazy migration from the older fan-out layout
# ---------------------------------------------------------------------------
def _seed_legacy_session(tmp_path: Path, agent: str, sess: str, n_tasks: int) -> Path:
"""Hand-craft an older ``{root}/tasks/`` layout the way it used to live
on disk, so we can prove the lazy migration folds it correctly.
"""
legacy = tmp_path / "agents" / agent / "sessions" / sess / "tasks"
(legacy / "tasks").mkdir(parents=True)
list_id = f"session:{agent}:{sess}"
(legacy / "meta.json").write_text(
json.dumps(
{
"task_list_id": list_id,
"role": "session",
"creator_agent_id": None,
"created_at": 1000.0,
"last_seen_session_ids": ["s1"],
"schema_version": 1,
}
),
encoding="utf-8",
)
(legacy / ".highwatermark").write_text(str(n_tasks), encoding="utf-8")
(legacy / ".lock").write_text("", encoding="utf-8")
for i in range(1, n_tasks + 1):
(legacy / "tasks" / f"{i:04d}.json").write_text(
json.dumps(
{
"id": i,
"subject": f"legacy {i}",
"description": "",
"active_form": None,
"owner": None,
"status": "pending",
"blocks": [],
"blocked_by": [],
"metadata": {},
"created_at": 1000.0 + i,
"updated_at": 1000.0 + i,
}
),
encoding="utf-8",
)
return legacy
@pytest.mark.asyncio
async def test_legacy_layout_migrates_on_first_read(store: TaskStore, tmp_path: Path) -> None:
legacy = _seed_legacy_session(tmp_path, "agent_z", "sess_z", 3)
list_id = "session:agent_z:sess_z"
# First read should fold the legacy fan-out into tasks.json.
records = await store.list_tasks(list_id)
assert [r.id for r in records] == [1, 2, 3]
assert [r.subject for r in records] == ["legacy 1", "legacy 2", "legacy 3"]
# New doc exists; the legacy dir is gone.
new_doc = tmp_path / "agents" / "agent_z" / "sessions" / "sess_z" / "tasks.json"
assert new_doc.exists()
assert not legacy.exists()
# Highwatermark is preserved — next id is 4, not 1.
new_rec = await store.create_task(list_id, subject="post-migration")
assert new_rec.id == 4
@pytest.mark.asyncio
async def test_legacy_layout_migrates_on_first_write(store: TaskStore, tmp_path: Path) -> None:
_seed_legacy_session(tmp_path, "agent_w", "sess_w", 2)
list_id = "session:agent_w:sess_w"
# Update a legacy task — must trigger migration, then mutate.
new, changed = await store.update_task(list_id, 2, status=TaskStatus.IN_PROGRESS)
assert new is not None
assert changed == ["status"]
assert new.status == TaskStatus.IN_PROGRESS
# Doc reflects both legacy tasks.
listed = await store.list_tasks(list_id)
assert len(listed) == 2
@pytest.mark.asyncio
async def test_legacy_list_exists(store: TaskStore, tmp_path: Path) -> None:
_seed_legacy_session(tmp_path, "agent_q", "sess_q", 1)
assert await store.list_exists("session:agent_q:sess_q")