feat(tasks): file-backed task system with colony template + UI

This commit is contained in:
Richard Tang
2026-04-26 18:49:45 -07:00
parent 6fae1f04c8
commit 0af597881f
38 changed files with 4570 additions and 6 deletions
+87
View File
@@ -421,6 +421,14 @@ class AgentLoop(AgentProtocol):
# dashboards can build aggregates over many runs.
self._counters: dict[str, int] = {}
# Task-system reminder state (see framework/tasks/reminders.py).
# Bumped each iteration; reset whenever a task op tool was called
# in the iteration that just completed; nudges the agent via the
# injection queue when it's been silent on tasks for too long.
from framework.tasks.reminders import ReminderState as _RS
self._task_reminder_state: _RS = _RS()
def _bump(self, key: str, by: int = 1) -> None:
"""Increment a reliability counter (creates the key on first use)."""
self._counters[key] = self._counters.get(key, 0) + by
@@ -991,6 +999,17 @@ class AgentLoop(AgentProtocol):
)
total_input_tokens += turn_tokens.get("input", 0)
total_output_tokens += turn_tokens.get("output", 0)
# Task-system reminder: if the model has been silent on
# task ops for too long but still has open tasks, drop
# a steering reminder onto the injection queue. Drained
# at the next iteration's 6b so it lands as the next
# user turn via the normal injection path. Best-effort
# — never raises.
try:
await self._maybe_inject_task_reminder(ctx, logged_tool_calls)
except Exception:
logger.debug("task reminder check failed", exc_info=True)
await self._publish_llm_turn_complete(
stream_id,
node_id,
@@ -4204,6 +4223,74 @@ class AgentLoop(AgentProtocol):
execution_id=execution_id,
)
async def _maybe_inject_task_reminder(
self,
ctx: AgentContext,
logged_tool_calls: list[dict[str, Any]] | None,
) -> None:
"""Layer 3 task-system steering — periodic reminder injection.
Called once per iteration after the LLM turn completes. If the
model has been silent on task ops for a while AND there are open
tasks on its session list, queue a system-style reminder onto
the injection queue so the next iteration drains it as a user
turn. Idempotent / safe to call always gates internally.
``logged_tool_calls`` is a list of dicts with at least a "name"
key, as accumulated by ``_run_single_turn``. Names like
``task_create``, ``task_update``, ``colony_template_*`` reset
the counter (see ``framework.tasks.reminders.TASK_OP_TOOL_NAMES``).
"""
from framework.tasks import get_task_store
from framework.tasks.models import TaskStatus
from framework.tasks.reminders import build_reminder, saw_task_op
state = self._task_reminder_state
# 1. Update counters based on this turn's tool calls.
names: list[str] = []
for call in logged_tool_calls or []:
try:
name = call.get("name") or call.get("tool_name")
if name:
names.append(name)
except (AttributeError, TypeError):
continue
if saw_task_op(names):
state.on_task_op()
state.on_iteration()
# 2. Resolve the agent's task list. Skip if context isn't wired yet.
list_id = getattr(ctx, "task_list_id", None)
if not list_id:
return
# 3. Read the open-task snapshot. Best-effort.
try:
store = get_task_store()
records = await store.list_tasks(list_id)
except Exception:
return
open_tasks = [r for r in records if r.status != TaskStatus.COMPLETED]
if not state.should_remind(bool(open_tasks)):
return
body = build_reminder(records)
if not body:
return
# 4. Enqueue. Drained at the next iteration's 6b drain step and
# rendered as a user turn (with the "[External event]" prefix).
await self._injection_queue.put((body, False, None))
state.on_reminder_sent()
logger.info(
"[task-reminder] queued nudge for %s (open=%d, silent_turns=%d)",
list_id,
len(open_tasks),
state.turns_since_task_op,
)
self._bump("task_reminders_sent")
async def _run_hooks(
self,
event: str,
+13
View File
@@ -180,6 +180,19 @@ class AgentContext:
stream_id: str = ""
# ----- Task system fields (see framework/tasks) -------------------
# task_list_id: this agent's own session-scoped list, e.g.
# session:{agent_id}:{session_id}. Set by the runner / ColonyRuntime
# before the loop starts; immutable after first task_create.
task_list_id: str | None = None
# colony_id: set on the queen of a colony AND on every spawned worker
# so workers can render the "picked up" chip and the queen can address
# her colony template via colony_template_* tools.
colony_id: str | None = None
# picked_up_from: for workers, the (colony_task_list_id, template_task_id)
# pair their session was spawned for. None for the queen and queen-DM.
picked_up_from: tuple[str, int] | None = None
dynamic_tools_provider: Any = None
dynamic_prompt_provider: Any = None
# Optional Callable[[], str]: when set alongside ``dynamic_prompt_provider``,
@@ -224,6 +224,11 @@ user decide next steps. Read generated files or worker reports with \
read_file when the user asks for specifics. If the user wants \
another pass, kick it off with run_parallel_workers; otherwise stay \
conversational.
If the review itself is multi-step (e.g. "verify each worker's output, \
then draft a summary, then propose next steps"), you may use \
`task_create` / `task_update` to keep yourself organised. Skip them \
for a single-paragraph summary.
"""
@@ -409,6 +414,18 @@ worked / failed) after the run. Scale order once inline succeeds: \
repeat inline (10 items) `run_parallel_workers` (batch, results \
now) `create_colony` (recurring / background). Conceptual or \
strategic questions: answer directly, skip execution.
## Tracking multi-step work with task_*
Break down and manage your work with `task_create`. **Mark each task \
`completed` as soon as you are done with it. Do not batch up multiple \
tasks before marking them completed** the user's right-rail panel \
treats `completed` transitions as your progress heartbeat.
**Granularity: one task per atomic action, not one umbrella per project.** \
Replying to 5 posts is 5 tasks, not 1. Crawling 3 sites is 3 tasks. \
An umbrella task that stays `in_progress` for the whole run looks \
identical to the user as "the queen is stuck".
"""
_queen_behavior_always = """
+67
View File
@@ -236,6 +236,28 @@ class ColonyRuntime:
self.batch_init_nudge: str | None = self._skills_manager.batch_init_nudge
self._colony_id: str = colony_id or "primary"
# Ensure the colony task template exists. Idempotent — if the
# colony was created previously, this is a no-op (it just stamps
# last_seen_session_ids if a session id is provided later).
try:
import asyncio as _asyncio
from framework.tasks import TaskListRole, get_task_store
from framework.tasks.scoping import colony_task_list_id
_store = get_task_store()
_list_id = colony_task_list_id(self._colony_id)
try:
# Best-effort: schedule on the running loop, or do it inline
# if no loop is yet running (e.g. during construction).
_loop = _asyncio.get_running_loop()
_loop.create_task(_store.ensure_task_list(_list_id, role=TaskListRole.TEMPLATE))
except RuntimeError:
_asyncio.run(_store.ensure_task_list(_list_id, role=TaskListRole.TEMPLATE))
except Exception:
logger.debug("Failed to ensure colony task template", exc_info=True)
self._accounts_prompt = accounts_prompt
self._accounts_data = accounts_data
self._tool_provider_map = tool_provider_map
@@ -253,6 +275,16 @@ class ColonyRuntime:
self._event_bus = event_bus or EventBus(max_history=self._config.max_history)
self._scoped_event_bus = StreamEventBus(self._event_bus, self._colony_id)
# Make the event bus visible to the task-system event emitters so
# task lifecycle events fan out to the same bus the rest of the
# system uses. Idempotent — last writer wins.
try:
from framework.tasks.events import set_default_event_bus
set_default_event_bus(self._event_bus)
except Exception:
logger.debug("Failed to register default task event bus", exc_info=True)
self._llm = llm
self._tools = tools or []
self._tool_executor = tool_executor
@@ -387,6 +419,19 @@ class ColonyRuntime:
def _apply_pipeline_results(self) -> None:
for stage in self._pipeline.stages:
if stage.tool_registry is not None:
# Register task tools on the same registry every worker
# pulls from. Done here (not at worker spawn) so the
# colony's `_tools` snapshot includes them.
try:
from framework.tasks.tools import register_task_tools
register_task_tools(stage.tool_registry)
except Exception:
logger.warning(
"Failed to register task tools on pipeline registry",
exc_info=True,
)
tools = list(stage.tool_registry.get_tools().values())
if tools:
self._tools = tools
@@ -909,6 +954,25 @@ class ColonyRuntime:
# free-variable capture here.
_provider = None if _db_path_pre_activated else (lambda mgr=self._skills_manager: mgr.skills_catalog_prompt)
# Task-system fields. Each worker owns its session task list;
# picked_up_from records the colony template entry it was
# spawned for, when applicable.
from framework.tasks.scoping import (
colony_task_list_id as _colony_list_id,
session_task_list_id as _session_list_id,
)
_worker_list_id = _session_list_id(worker_id, worker_id)
_picked_up = None
_template_id = (
input_data.get("__template_task_id") if isinstance(input_data, dict) else None
)
if _template_id is not None:
try:
_picked_up = (_colony_list_id(self._colony_id), int(_template_id))
except (TypeError, ValueError):
_picked_up = None
agent_context = AgentContext(
runtime=self._make_runtime_adapter(worker_id),
agent_id=worker_id,
@@ -925,6 +989,9 @@ class ColonyRuntime:
dynamic_skills_catalog_provider=_provider,
execution_id=worker_id,
stream_id=explicit_stream_id or f"worker:{worker_id}",
task_list_id=_worker_list_id,
colony_id=self._colony_id,
picked_up_from=_picked_up,
)
worker = Worker(
+8
View File
@@ -165,6 +165,14 @@ class EventType(StrEnum):
TRIGGER_REMOVED = "trigger_removed"
TRIGGER_UPDATED = "trigger_updated"
# Task system lifecycle (per-list diffs streamed to the UI)
TASK_CREATED = "task_created"
TASK_UPDATED = "task_updated"
TASK_DELETED = "task_deleted"
TASK_LIST_RESET = "task_list_reset"
TASK_LIST_REATTACH_MISMATCH = "task_list_reattach_mismatch"
COLONY_TEMPLATE_ASSIGNMENT = "colony_template_assignment"
@dataclass
class AgentEvent:
+15 -2
View File
@@ -154,11 +154,24 @@ class Worker:
# value without affecting the queen's ongoing calls.
try:
from framework.loader.tool_registry import ToolRegistry
from framework.tasks.scoping import session_task_list_id
ToolRegistry.set_execution_context(profile=self.id)
ctx = self._context
agent_id = getattr(ctx, "agent_id", None) or self.id
list_id = (
getattr(ctx, "task_list_id", None)
or session_task_list_id(agent_id, self.id)
)
ToolRegistry.set_execution_context(
profile=self.id,
agent_id=agent_id,
task_list_id=list_id,
colony_id=getattr(ctx, "colony_id", None),
picked_up_from=getattr(ctx, "picked_up_from", None),
)
except Exception:
logger.debug(
"Worker %s: failed to scope browser profile",
"Worker %s: failed to scope execution context",
self.id,
exc_info=True,
)
+11
View File
@@ -155,6 +155,17 @@ class SessionState(BaseModel):
# True after first successful worker execution (gates trigger delivery on restart)
worker_configured: bool = Field(default=False)
# Task-system fields (see framework/tasks).
# task_list_id: this session's own task list id (populated on first
# task_create; immutable thereafter). Used for resume reattachment —
# if it differs from resolve_task_list_id(ctx) on resume, a
# TASK_LIST_REATTACH_MISMATCH event is emitted and a fresh list is
# created at the resolved id (the orphan stays on disk).
task_list_id: str | None = None
# picked_up_from: for worker sessions, the (colony_task_list_id,
# template_task_id) pair this session was spawned for.
picked_up_from: list[Any] | None = None
model_config = {"extra": "allow"}
@property
+2
View File
@@ -358,6 +358,7 @@ def create_app(model: str | None = None) -> web.Application:
from framework.server.routes_queens import register_routes as register_queen_routes
from framework.server.routes_sessions import register_routes as register_session_routes
from framework.server.routes_skills import register_routes as register_skills_routes
from framework.server.routes_tasks import register_routes as register_task_routes
from framework.server.routes_workers import register_routes as register_worker_routes
register_config_routes(app)
@@ -375,6 +376,7 @@ def create_app(model: str | None = None) -> web.Application:
register_colony_worker_routes(app)
register_prompt_routes(app)
register_skills_routes(app)
register_task_routes(app)
# Static file serving — Option C production mode
# If frontend/dist/ exists, serve built frontend files on /
+29 -2
View File
@@ -488,6 +488,21 @@ async def create_queen(
phase_state=phase_state,
)
# ---- Task system tools --------------------------------------------
# Every queen gets the four session task tools. Queens-of-colony
# additionally get the colony_template_* tools (gated by colony_id).
from framework.tasks.tools import (
register_colony_template_tools,
register_task_tools,
)
register_task_tools(queen_registry)
_colony_id_for_queen = getattr(session, "colony_id", None) or getattr(
getattr(session, "colony_runtime", None), "_colony_id", None
)
if _colony_id_for_queen:
register_colony_template_tools(queen_registry, colony_id=_colony_id_for_queen)
# ---- Colony runtime check (only when worker is loaded) ----------------
if session.colony_runtime:
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
@@ -919,10 +934,22 @@ async def create_queen(
# token stays local to this task.
try:
from framework.loader.tool_registry import ToolRegistry
from framework.tasks.scoping import session_task_list_id
ToolRegistry.set_execution_context(profile=session.id)
queen_agent_id = getattr(session, "agent_id", None) or "queen"
queen_list_id = session_task_list_id(queen_agent_id, session.id)
colony_id = (
getattr(session, "colony_id", None)
or getattr(getattr(session, "colony_runtime", None), "_colony_id", None)
)
ToolRegistry.set_execution_context(
profile=session.id,
agent_id=queen_agent_id,
task_list_id=queen_list_id,
colony_id=colony_id,
)
except Exception:
logger.debug("Queen: failed to set browser profile for session %s", session.id, exc_info=True)
logger.debug("Queen: failed to set execution context for session %s", session.id, exc_info=True)
try:
lc = _queen_loop_config
queen_loop_config = LoopConfig(
+114
View File
@@ -0,0 +1,114 @@
"""REST routes for task lists.
GET /api/tasks/{task_list_id} -- snapshot of one list
GET /api/colonies/{colony_id}/task_lists -- helper for colony view
GET /api/sessions/{session_id}/task_list_id -- helper for session view
The task_list_id segment uses URL-encoded colons (``colony%3Aabc`` /
``session%3Aagent%3Asess``); aiohttp decodes them automatically.
"""
from __future__ import annotations
import logging
from aiohttp import web
from framework.tasks import get_task_store
from framework.tasks.scoping import (
colony_task_list_id,
session_task_list_id,
)
logger = logging.getLogger(__name__)
async def handle_get_task_list(request: web.Request) -> web.Response:
raw = request.match_info.get("task_list_id", "")
if not raw:
return web.json_response({"error": "task_list_id required"}, status=400)
store = get_task_store()
if not await store.list_exists(raw):
return web.json_response(
{"error": f"Task list {raw!r} not found", "task_list_id": raw, "tasks": []},
status=404,
)
meta = await store.get_meta(raw)
records = await store.list_tasks(raw)
return web.json_response(
{
"task_list_id": raw,
"role": meta.role.value if meta else "session",
"meta": meta.model_dump(mode="json") if meta else None,
"tasks": [
{
"id": r.id,
"subject": r.subject,
"description": r.description,
"active_form": r.active_form,
"owner": r.owner,
"status": r.status.value,
"blocks": list(r.blocks),
"blocked_by": list(r.blocked_by),
"metadata": dict(r.metadata),
"created_at": r.created_at,
"updated_at": r.updated_at,
}
for r in records
],
}
)
async def handle_get_colony_task_lists(request: web.Request) -> web.Response:
"""Return template_task_list_id and queen_session_task_list_id for a colony."""
colony_id = request.match_info.get("colony_id", "")
if not colony_id:
return web.json_response({"error": "colony_id required"}, status=400)
template_id = colony_task_list_id(colony_id)
# Queen's session list — the queen-of-colony's session_id == the
# browser-facing colony session id. The frontend already knows that
# value; we surface what we have on disk for completeness.
queen_session_id = request.query.get("queen_session_id")
queen_list_id = (
session_task_list_id("queen", queen_session_id) if queen_session_id else None
)
return web.json_response(
{
"template_task_list_id": template_id,
"queen_session_task_list_id": queen_list_id,
}
)
async def handle_get_session_task_list_id(request: web.Request) -> web.Response:
"""Return task_list_id and picked_up_from for a session.
The session_id is the queen's session id or a worker's session id;
both follow the same path. The agent_id is read from the request query
(passed by the frontend, which already knows which agent the session
belongs to).
"""
session_id = request.match_info.get("session_id", "")
agent_id = request.query.get("agent_id", "queen")
if not session_id:
return web.json_response({"error": "session_id required"}, status=400)
task_list_id = session_task_list_id(agent_id, session_id)
store = get_task_store()
exists = await store.list_exists(task_list_id)
return web.json_response(
{
"task_list_id": task_list_id if exists else None,
"picked_up_from": None,
}
)
def register_routes(app: web.Application) -> None:
app.router.add_get("/api/tasks/{task_list_id}", handle_get_task_list)
app.router.add_get("/api/colonies/{colony_id}/task_lists", handle_get_colony_task_lists)
app.router.add_get("/api/sessions/{session_id}/task_list_id", handle_get_session_task_list_id)
+44
View File
@@ -0,0 +1,44 @@
"""File-backed, lock-coordinated task tracker for the hive agent loop.
See temp/tasks-system-implementation-plan.md for the design. Two list types:
colony:{colony_id} -- the queen's spawn-plan template
session:{agent_id}:{sess_id} -- per-session working list
Each agent operates on its own session list via the four task tools
(`task_create`, `task_update`, `task_list`, `task_get`). The colony
template is addressed only by the queen's `colony_template_*` tools and by
the UI/event surface.
"""
from framework.tasks.models import (
ClaimResult,
TaskListMeta,
TaskListRole,
TaskRecord,
TaskStatus,
)
from framework.tasks.scoping import (
colony_task_list_id,
parse_task_list_id,
resolve_task_list_id,
session_task_list_id,
)
from framework.tasks.store import (
TaskStore,
get_task_store,
)
__all__ = [
"ClaimResult",
"TaskListMeta",
"TaskListRole",
"TaskRecord",
"TaskStatus",
"TaskStore",
"colony_task_list_id",
"get_task_store",
"parse_task_list_id",
"resolve_task_list_id",
"session_task_list_id",
]
+158
View File
@@ -0,0 +1,158 @@
"""Bridge from the task store to the EventBus.
The store is intentionally event-free it's pure storage. The tool
executors (and run_parallel_workers, and any future colony_template_*
caller) are responsible for emitting the lifecycle events to the bus
after successful mutations.
Events are scoped to a stream_id pulled from the execution context if
available; otherwise they fan out at the global ``primary`` stream so the
UI's broad subscriptions still see them.
"""
from __future__ import annotations
import logging
from typing import Any
from framework.host.event_bus import AgentEvent, EventBus, EventType
from framework.tasks.models import TaskRecord
logger = logging.getLogger(__name__)
# Process-global default — set by the runner / orchestrator at bringup.
_DEFAULT_BUS: EventBus | None = None
def set_default_event_bus(bus: EventBus | None) -> None:
global _DEFAULT_BUS
_DEFAULT_BUS = bus
def _get_bus(bus: EventBus | None = None) -> EventBus | None:
return bus or _DEFAULT_BUS
def _serialize_record(rec: TaskRecord) -> dict[str, Any]:
return {
"id": rec.id,
"subject": rec.subject,
"description": rec.description,
"active_form": rec.active_form,
"owner": rec.owner,
"status": rec.status.value,
"blocks": list(rec.blocks),
"blocked_by": list(rec.blocked_by),
"metadata": dict(rec.metadata),
"created_at": rec.created_at,
"updated_at": rec.updated_at,
}
async def emit_task_created(
*,
task_list_id: str,
record: TaskRecord,
stream_id: str = "primary",
bus: EventBus | None = None,
) -> None:
b = _get_bus(bus)
if b is None:
return
try:
await b.publish(
AgentEvent(
type=EventType.TASK_CREATED,
stream_id=stream_id,
data={
"task_list_id": task_list_id,
"task": _serialize_record(record),
},
)
)
except Exception:
logger.debug("emit_task_created failed", exc_info=True)
async def emit_task_updated(
*,
task_list_id: str,
record: TaskRecord,
fields: list[str],
stream_id: str = "primary",
bus: EventBus | None = None,
) -> None:
b = _get_bus(bus)
if b is None or not fields:
return
try:
await b.publish(
AgentEvent(
type=EventType.TASK_UPDATED,
stream_id=stream_id,
data={
"task_list_id": task_list_id,
"task_id": record.id,
"after": _serialize_record(record),
"fields": fields,
},
)
)
except Exception:
logger.debug("emit_task_updated failed", exc_info=True)
async def emit_task_deleted(
*,
task_list_id: str,
task_id: int,
cascade: list[int],
stream_id: str = "primary",
bus: EventBus | None = None,
) -> None:
b = _get_bus(bus)
if b is None:
return
try:
await b.publish(
AgentEvent(
type=EventType.TASK_DELETED,
stream_id=stream_id,
data={
"task_list_id": task_list_id,
"task_id": task_id,
"cascade": cascade,
},
)
)
except Exception:
logger.debug("emit_task_deleted failed", exc_info=True)
async def emit_colony_template_assignment(
*,
colony_id: str,
task_id: int,
assigned_session: str | None,
assigned_worker_id: str | None,
stream_id: str = "primary",
bus: EventBus | None = None,
) -> None:
b = _get_bus(bus)
if b is None:
return
try:
await b.publish(
AgentEvent(
type=EventType.COLONY_TEMPLATE_ASSIGNMENT,
stream_id=stream_id,
data={
"colony_id": colony_id,
"task_id": task_id,
"assigned_session": assigned_session,
"assigned_worker_id": assigned_worker_id,
},
)
)
except Exception:
logger.debug("emit_colony_template_assignment failed", exc_info=True)
+103
View File
@@ -0,0 +1,103 @@
"""Task lifecycle hooks.
Two events:
* ``task_created`` -- fires after the task file is written but before the
tool returns. Hooks may raise ``BlockingHookError``
to abort creation; the wrapper deletes the just-
created task and returns an error tool_result.
* ``task_completed`` -- fires when ``task_update`` transitions a task to
``completed``. A blocking error rolls the status
back to ``in_progress`` and surfaces the error.
Hooks are registered on a process-global registry so callers (test
fixtures, integrations) can install them without threading through the
agent loop. They run in registration order; any hook may abort by raising
``BlockingHookError``.
"""
from __future__ import annotations
import inspect
import logging
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import Any
logger = logging.getLogger(__name__)
HOOK_TASK_CREATED = "task_created"
HOOK_TASK_COMPLETED = "task_completed"
class BlockingHookError(Exception):
"""Raised by a hook to veto the surrounding tool operation."""
@dataclass
class TaskHookContext:
event: str
task_list_id: str
task: Any # TaskRecord (avoid import cycle)
agent_id: str | None = None
metadata: dict[str, Any] | None = None
HookFn = Callable[[TaskHookContext], Any | Awaitable[Any]]
_HOOK_REGISTRY: dict[str, list[HookFn]] = {
HOOK_TASK_CREATED: [],
HOOK_TASK_COMPLETED: [],
}
def register_hook(event: str, fn: HookFn) -> None:
if event not in _HOOK_REGISTRY:
raise ValueError(f"Unknown hook event: {event!r}")
_HOOK_REGISTRY[event].append(fn)
def clear_hooks(event: str | None = None) -> None:
"""Test helper. Clear all hooks (or just one event's)."""
if event is None:
for k in _HOOK_REGISTRY:
_HOOK_REGISTRY[k].clear()
else:
_HOOK_REGISTRY.get(event, []).clear()
async def run_task_hooks(
event: str,
*,
task_list_id: str,
task: Any,
agent_id: str | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
"""Run all hooks registered for ``event``.
Re-raises ``BlockingHookError`` from any hook; the caller is responsible
for rolling back the operation.
"""
hooks = list(_HOOK_REGISTRY.get(event, ()))
if not hooks:
return
ctx = TaskHookContext(
event=event,
task_list_id=task_list_id,
task=task,
agent_id=agent_id,
metadata=metadata,
)
for hook in hooks:
try:
result = hook(ctx)
if inspect.isawaitable(result):
await result
except BlockingHookError:
raise
except Exception:
# Non-blocking exceptions are logged but do not abort the operation.
logger.exception("Non-blocking hook failed for %s", event)
+93
View File
@@ -0,0 +1,93 @@
"""Data models for the task tracker.
The schema follows the UI-facing task-record shape with one notable
difference: ids are integers (Python is cleaner that way) and rendered
as ``#N`` only in user-facing strings.
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from enum import StrEnum
from typing import Any, Literal
from pydantic import BaseModel, Field
class TaskStatus(StrEnum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
class TaskListRole(StrEnum):
"""Distinguishes a colony template from a session-scoped working list.
Used for sanity-checking which write paths are allowed (e.g. the four
session tools must never touch a ``template`` list).
"""
TEMPLATE = "template" # colony:{colony_id}
SESSION = "session" # session:{agent_id}:{session_id}
class TaskRecord(BaseModel):
"""One unit of work tracked by an agent."""
id: int # monotonic, never reused — see store.py
subject: str
description: str = ""
active_form: str | None = None # present-continuous label, surfaces in UI
owner: str | None = None # agent_id of the owning agent
status: TaskStatus = TaskStatus.PENDING
blocks: list[int] = Field(default_factory=list)
blocked_by: list[int] = Field(default_factory=list)
metadata: dict[str, Any] = Field(default_factory=dict)
created_at: float = Field(default_factory=time.time)
updated_at: float = Field(default_factory=time.time)
class TaskListMeta(BaseModel):
"""Per-list metadata stored in ``meta.json`` next to the task files."""
task_list_id: str
role: TaskListRole
creator_agent_id: str | None = None
created_at: float = Field(default_factory=time.time)
last_seen_session_ids: list[str] = Field(default_factory=list)
schema_version: int = 1
# Tagged union for claim_task_with_busy_check. Used by run_parallel_workers
# when stamping ``assigned_session`` on a colony template entry — the only
# place a "claim" actually happens under the hive model.
@dataclass
class ClaimOk:
kind: Literal["ok"]
record: TaskRecord
@dataclass
class ClaimNotFound:
kind: Literal["not_found"]
@dataclass
class ClaimAlreadyOwned:
kind: Literal["already_owned"]
by: str
@dataclass
class ClaimAlreadyCompleted:
kind: Literal["already_completed"]
@dataclass
class ClaimBlocked:
kind: Literal["blocked"]
by: list[int]
ClaimResult = ClaimOk | ClaimNotFound | ClaimAlreadyOwned | ClaimAlreadyCompleted | ClaimBlocked
+107
View File
@@ -0,0 +1,107 @@
"""Periodic task-reminder injection.
After enough silent turns since the last task tool call, inject a
reminder summarizing the current open tasks. Catches the failure mode
where the agent has silently absorbed multiple finished steps into one
in_progress task and stopped using the task tools.
The reminder counter lives on the AgentLoop instance; this module owns
the policy (threshold, cooldown, message text) and the integration
helper. Wiring lives in :mod:`framework.tasks.integrations.agent_loop`.
"""
from __future__ import annotations
import logging
import os
from collections.abc import Iterable
from dataclasses import dataclass
from framework.tasks.models import TaskRecord, TaskStatus
logger = logging.getLogger(__name__)
REMINDER_THRESHOLD_TURNS = int(os.environ.get("HIVE_TASK_REMINDER_TURNS", "8"))
REMINDER_COOLDOWN_TURNS = int(os.environ.get("HIVE_TASK_REMINDER_COOLDOWN", "8"))
# Names that count as "task ops" — calling any of these resets the silence
# counter. Keep narrow: only mutating ops re-establish discipline. task_list
# / task_get are read-only and shouldn't reset the counter (the agent could
# read forever without making progress).
TASK_OP_TOOL_NAMES: frozenset[str] = frozenset(
{
"task_create",
"task_update",
"colony_template_add",
"colony_template_update",
"colony_template_remove",
}
)
@dataclass
class ReminderState:
"""Per-loop counter — caller bumps it each iteration."""
turns_since_task_op: int = 0
turns_since_last_reminder: int = 0
def on_iteration(self) -> None:
self.turns_since_task_op += 1
self.turns_since_last_reminder += 1
def on_task_op(self) -> None:
self.turns_since_task_op = 0
def on_reminder_sent(self) -> None:
self.turns_since_last_reminder = 0
def should_remind(self, has_open_tasks: bool) -> bool:
return (
has_open_tasks
and self.turns_since_task_op >= REMINDER_THRESHOLD_TURNS
and self.turns_since_last_reminder >= REMINDER_COOLDOWN_TURNS
)
def saw_task_op(tool_names: Iterable[str]) -> bool:
"""True if any of the names is a counter-resetting task op."""
return any(name in TASK_OP_TOOL_NAMES for name in tool_names)
def build_reminder(records: list[TaskRecord]) -> str:
"""Compose the reminder body — pending/in-progress focus."""
open_ = [r for r in records if r.status != TaskStatus.COMPLETED]
if not open_:
return ""
in_progress = [r for r in open_ if r.status == TaskStatus.IN_PROGRESS]
head = (
"[task_reminder] The task tools haven't been used in several "
"turns. If you're working on tasks that would benefit from "
"tracked progress:"
)
bullets = [
" - Mark the in_progress task `completed` THE MOMENT it's done — "
"before starting the next step. Don't batch completions.",
" - If you've finished work that wasn't on the list, add a "
"task_create + task_update completed pair so the panel reflects it.",
" - If you're umbrella-tracking ('reply to all posts' as one "
"task), break it into one task per atomic action.",
]
if in_progress:
bullets.append(
" - Currently in_progress (consider whether they're really "
"still active): "
+ ", ".join(f'#{r.id} "{r.subject}"' for r in in_progress[:5])
)
listing = ["", "Open tasks:"]
for r in open_[:10]:
listing.append(f" #{r.id} [{r.status.value}] {r.subject}")
if len(open_) > 10:
listing.append(f" ... and {len(open_) - 10} more")
listing.append(
"\nOnly act on this if relevant to the current work. NEVER mention "
"this reminder to the user."
)
return "\n".join([head, *bullets, *listing])
+83
View File
@@ -0,0 +1,83 @@
"""Task list id resolution.
Under the corrected model (see plan §5):
- Every agent session owns one task list: ``session:{agent_id}:{session_id}``
- The colony has a separate template list: ``colony:{colony_id}``
``resolve_task_list_id(ctx)`` returns the agent's OWN session list id —
what the four task tools write to. The colony template is addressed via
the dedicated ``colony_template_*`` tools and the UI; never via the four
session tools.
"""
from __future__ import annotations
import logging
import os
from typing import Any
logger = logging.getLogger(__name__)
def session_task_list_id(agent_id: str, session_id: str) -> str:
return f"session:{agent_id}:{session_id}"
def colony_task_list_id(colony_id: str) -> str:
return f"colony:{colony_id}"
def parse_task_list_id(task_list_id: str) -> dict[str, str]:
"""Decode a task_list_id into its component parts.
Returns a dict with at least ``kind`` ("session" / "colony" / "unscoped"
/ "raw"), and the relevant ids when applicable.
"""
if task_list_id.startswith("session:"):
rest = task_list_id[len("session:") :]
agent_id, _, session_id = rest.partition(":")
return {"kind": "session", "agent_id": agent_id, "session_id": session_id}
if task_list_id.startswith("colony:"):
return {"kind": "colony", "colony_id": task_list_id[len("colony:") :]}
if task_list_id.startswith("unscoped:"):
return {"kind": "unscoped", "agent_id": task_list_id[len("unscoped:") :]}
return {"kind": "raw", "value": task_list_id}
def resolve_task_list_id(ctx: Any) -> str:
"""Return the agent's own session-scoped task list id.
Resolution priority:
1. ``HIVE_TASK_LIST_ID`` env var (test/CLI override)
2. ``ctx.task_list_id`` if already populated by the runner
3. ``session:{ctx.agent_id}:{ctx.run_id or ctx.execution_id}``
4. ``unscoped:{ctx.agent_id}`` sentinel (should not happen in prod)
"""
override = os.environ.get("HIVE_TASK_LIST_ID")
if override:
return override
existing = getattr(ctx, "task_list_id", None)
if existing:
return existing
agent_id = getattr(ctx, "agent_id", None) or ""
session_id = (
getattr(ctx, "run_id", None)
or getattr(ctx, "execution_id", None)
or getattr(ctx, "stream_id", None)
or ""
)
if agent_id and session_id:
return session_task_list_id(agent_id, session_id)
fallback = f"unscoped:{agent_id or 'unknown'}"
logger.warning(
"resolve_task_list_id falling back to %s — agent_id=%r session_id=%r",
fallback,
agent_id,
session_id,
)
return fallback
+642
View File
@@ -0,0 +1,642 @@
"""File-backed task store with filelock-based coordination.
Layout per list::
{root}/{task_list_id}/
meta.json -- TaskListMeta
tasks/
0001.json -- TaskRecord (zero-padded for ls-sort)
0002.json
...
.lock -- list-level lock
.highwatermark -- ID floor (deleted ids never reused)
Two list-roots:
colony:{colony_id} -> ~/.hive/colonies/{colony_id}/tasks/
session:{a}:{s} -> ~/.hive/agents/{a}/sessions/{s}/tasks/
All filesystem I/O is wrapped in ``asyncio.to_thread`` so the event loop
never blocks. Locks use a 30-retry / ~2.6s budget comfortable headroom
for the only realistic write contender (colony template under concurrent
``colony_template_*`` and ``run_parallel_workers`` stamps).
The "_unsafe" variants exist because filelock is **not re-entrant**: a
caller already holding a lock must NOT re-acquire it (would deadlock).
The unsafe path skips acquisition and is callable only from inside another
locked function. See ``claim_task_with_busy_check`` and ``delete_task``.
"""
from __future__ import annotations
import asyncio
import logging
import os
import time
from collections.abc import Iterable
from pathlib import Path
from typing import Any
from filelock import FileLock
from framework.tasks.models import (
ClaimAlreadyCompleted,
ClaimAlreadyOwned,
ClaimBlocked,
ClaimNotFound,
ClaimOk,
ClaimResult,
TaskListMeta,
TaskListRole,
TaskRecord,
TaskStatus,
)
from framework.utils.io import atomic_write
logger = logging.getLogger(__name__)
LOCK_TIMEOUT_SECONDS = 3.0 # ~30 retries × ~100ms
class _Unset:
"""Sentinel for "owner argument not provided" — distinct from owner=None."""
__slots__ = ()
_UNSET_SENTINEL: _Unset = _Unset()
def _hive_root() -> Path:
"""Location of the hive data dir; honors HIVE_HOME for tests."""
return Path(os.environ.get("HIVE_HOME", str(Path.home() / ".hive")))
def task_list_path(task_list_id: str, *, hive_root: Path | None = None) -> Path:
"""Resolve task_list_id -> on-disk root."""
root = hive_root or _hive_root()
if task_list_id.startswith("colony:"):
colony_id = task_list_id[len("colony:") :]
return root / "colonies" / colony_id / "tasks"
if task_list_id.startswith("session:"):
rest = task_list_id[len("session:") :]
agent_id, _, session_id = rest.partition(":")
if not session_id:
raise ValueError(f"Malformed session task_list_id: {task_list_id!r}")
return root / "agents" / agent_id / "sessions" / session_id / "tasks"
if task_list_id.startswith("unscoped:"):
agent_id = task_list_id[len("unscoped:") :]
return root / "unscoped" / agent_id / "tasks"
# Last-ditch sanitization for HIVE_TASK_LIST_ID overrides — slugify the
# whole thing so the test/dev path can't escape the hive root.
safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in task_list_id)
return root / "_misc" / safe
# ---------------------------------------------------------------------------
# TaskStore — public façade
# ---------------------------------------------------------------------------
class TaskStore:
"""Async wrapper around the on-disk store.
A single TaskStore is fine to share across the process; locking is
file-based, so even multiple processes are safe.
"""
def __init__(self, *, hive_root: Path | None = None) -> None:
self._hive_root = hive_root
# ----- list-level ---------------------------------------------------
async def ensure_task_list(
self,
task_list_id: str,
*,
role: TaskListRole,
creator_agent_id: str | None = None,
session_id: str | None = None,
) -> TaskListMeta:
"""Create a list if absent; if present, append session_id to last_seen.
Idempotent: callers (ColonyRuntime bringup, lazy session creation)
can call this every time.
"""
return await asyncio.to_thread(
self._ensure_task_list_sync,
task_list_id,
role,
creator_agent_id,
session_id,
)
async def list_exists(self, task_list_id: str) -> bool:
"""A list exists if its meta.json OR any task file is on disk.
meta.json is normally written by ``ensure_task_list``, but session
lists may be created lazily via the first ``task_create`` (see
``_create_task_sync``) in that case meta.json is backfilled the
first time the list is read. Until then, we still want to expose
the list's tasks via REST.
"""
def _check() -> bool:
root = self._list_root(task_list_id)
if (root / "meta.json").exists():
return True
tasks_dir = root / "tasks"
if tasks_dir.exists() and any(
p.suffix == ".json" for p in tasks_dir.iterdir()
):
return True
return False
return await asyncio.to_thread(_check)
async def get_meta(self, task_list_id: str) -> TaskListMeta | None:
return await asyncio.to_thread(self._read_meta_sync, task_list_id)
async def reset_task_list(self, task_list_id: str) -> None:
"""Delete all task files but preserve the high-water-mark.
Test helper. Never wired to runtime lifecycle.
"""
await asyncio.to_thread(self._reset_sync, task_list_id)
# ----- task CRUD ----------------------------------------------------
async def create_task(
self,
task_list_id: str,
*,
subject: str,
description: str = "",
active_form: str | None = None,
owner: str | None = None,
metadata: dict[str, Any] | None = None,
) -> TaskRecord:
return await asyncio.to_thread(
self._create_task_sync,
task_list_id,
subject,
description,
active_form,
owner,
metadata or {},
)
async def get_task(self, task_list_id: str, task_id: int) -> TaskRecord | None:
return await asyncio.to_thread(self._read_task_sync, task_list_id, task_id)
async def list_tasks(
self,
task_list_id: str,
*,
include_internal: bool = False,
) -> list[TaskRecord]:
records = await asyncio.to_thread(self._list_tasks_sync, task_list_id)
if include_internal:
return records
return [r for r in records if not r.metadata.get("_internal")]
async def update_task(
self,
task_list_id: str,
task_id: int,
*,
subject: str | None = None,
description: str | None = None,
active_form: str | None = None,
owner: str | None | _Unset = _UNSET_SENTINEL,
status: TaskStatus | None = None,
add_blocks: list[int] | None = None,
add_blocked_by: list[int] | None = None,
metadata_patch: dict[str, Any] | None = None,
) -> tuple[TaskRecord | None, list[str]]:
"""Update a task; returns (new_record, fields_changed) or (None, [])."""
return await asyncio.to_thread(
self._update_task_sync,
task_list_id,
task_id,
subject,
description,
active_form,
owner,
status,
add_blocks,
add_blocked_by,
metadata_patch,
)
async def delete_task(self, task_list_id: str, task_id: int) -> tuple[bool, list[int]]:
"""Delete a task; returns (was_deleted, cascaded_ids).
``cascaded_ids`` are the ids of other tasks whose blocks/blocked_by
referenced the deleted id and were stripped.
"""
return await asyncio.to_thread(self._delete_task_sync, task_list_id, task_id)
async def claim_task_with_busy_check(
self,
task_list_id: str,
task_id: int,
claimant: str,
) -> ClaimResult:
"""Atomic claim under list-lock.
Used internally by ``run_parallel_workers`` when stamping
``metadata.assigned_session`` on colony template entries not
exposed to LLMs as a worker-facing claim race.
"""
return await asyncio.to_thread(self._claim_sync, task_list_id, task_id, claimant)
# =====================================================================
# Sync internals — all called via asyncio.to_thread
# =====================================================================
def _list_root(self, task_list_id: str) -> Path:
return task_list_path(task_list_id, hive_root=self._hive_root)
def _tasks_dir(self, task_list_id: str) -> Path:
return self._list_root(task_list_id) / "tasks"
def _list_lock(self, task_list_id: str) -> FileLock:
# FileLock targets a sentinel file; it tolerates the file being absent
# by creating it on first acquire. We use the .lock filename so it's
# visible alongside the other list files.
root = self._list_root(task_list_id)
root.mkdir(parents=True, exist_ok=True)
return FileLock(str(root / ".lock"), timeout=LOCK_TIMEOUT_SECONDS)
def _highwatermark_path(self, task_list_id: str) -> Path:
return self._list_root(task_list_id) / ".highwatermark"
def _meta_path(self, task_list_id: str) -> Path:
return self._list_root(task_list_id) / "meta.json"
def _task_path(self, task_list_id: str, task_id: int) -> Path:
return self._tasks_dir(task_list_id) / f"{task_id:04d}.json"
# ----- meta ---------------------------------------------------------
def _ensure_task_list_sync(
self,
task_list_id: str,
role: TaskListRole,
creator_agent_id: str | None,
session_id: str | None,
) -> TaskListMeta:
root = self._list_root(task_list_id)
root.mkdir(parents=True, exist_ok=True)
(root / "tasks").mkdir(exist_ok=True)
meta_path = self._meta_path(task_list_id)
with self._list_lock(task_list_id):
if meta_path.exists():
meta = self._read_meta_sync(task_list_id)
if meta is None:
# File existed but failed to parse — rewrite fresh.
meta = TaskListMeta(
task_list_id=task_list_id,
role=role,
creator_agent_id=creator_agent_id,
)
if session_id and session_id not in meta.last_seen_session_ids:
meta.last_seen_session_ids.append(session_id)
# Cap at 10 to keep the audit trail bounded.
meta.last_seen_session_ids = meta.last_seen_session_ids[-10:]
self._write_meta_sync(task_list_id, meta)
return meta
meta = TaskListMeta(
task_list_id=task_list_id,
role=role,
creator_agent_id=creator_agent_id,
last_seen_session_ids=[session_id] if session_id else [],
)
self._write_meta_sync(task_list_id, meta)
return meta
def _read_meta_sync(self, task_list_id: str) -> TaskListMeta | None:
path = self._meta_path(task_list_id)
if not path.exists():
return None
try:
return TaskListMeta.model_validate_json(path.read_text(encoding="utf-8"))
except Exception:
logger.warning("Corrupt meta.json at %s", path, exc_info=True)
return None
def _write_meta_sync(self, task_list_id: str, meta: TaskListMeta) -> None:
path = self._meta_path(task_list_id)
path.parent.mkdir(parents=True, exist_ok=True)
with atomic_write(path) as f:
f.write(meta.model_dump_json(indent=2))
# ----- task IO ------------------------------------------------------
def _read_task_sync(self, task_list_id: str, task_id: int) -> TaskRecord | None:
path = self._task_path(task_list_id, task_id)
if not path.exists():
return None
try:
return TaskRecord.model_validate_json(path.read_text(encoding="utf-8"))
except Exception:
logger.warning("Corrupt task file at %s", path, exc_info=True)
return None
def _write_task_sync(self, task_list_id: str, record: TaskRecord) -> None:
path = self._task_path(task_list_id, record.id)
path.parent.mkdir(parents=True, exist_ok=True)
with atomic_write(path) as f:
f.write(record.model_dump_json(indent=2))
def _list_tasks_sync(self, task_list_id: str) -> list[TaskRecord]:
d = self._tasks_dir(task_list_id)
if not d.exists():
return []
records: list[TaskRecord] = []
for path in sorted(d.iterdir()):
if path.suffix != ".json":
continue
try:
records.append(TaskRecord.model_validate_json(path.read_text(encoding="utf-8")))
except Exception:
logger.warning("Skipping corrupt task file %s", path, exc_info=True)
records.sort(key=lambda r: r.id)
return records
# ----- highwatermark / id assignment --------------------------------
def _read_highwatermark_sync(self, task_list_id: str) -> int:
path = self._highwatermark_path(task_list_id)
if not path.exists():
return 0
try:
return int(path.read_text(encoding="utf-8").strip() or "0")
except (ValueError, OSError):
return 0
def _write_highwatermark_sync(self, task_list_id: str, value: int) -> None:
path = self._highwatermark_path(task_list_id)
path.parent.mkdir(parents=True, exist_ok=True)
with atomic_write(path) as f:
f.write(str(value))
def _next_id_sync(self, task_list_id: str) -> int:
"""Compute next id under the assumption the list-lock is held."""
existing = self._list_tasks_sync(task_list_id)
max_existing = max((r.id for r in existing), default=0)
floor = self._read_highwatermark_sync(task_list_id)
return max(max_existing, floor) + 1
# ----- create -------------------------------------------------------
def _create_task_sync(
self,
task_list_id: str,
subject: str,
description: str,
active_form: str | None,
owner: str | None,
metadata: dict[str, Any],
) -> TaskRecord:
with self._list_lock(task_list_id):
# Lazy-create meta.json on first task. Session lists are
# frequently created via the first task_create (no explicit
# ensure_task_list call); without this backfill the REST
# endpoint can't discover them. Role is inferred from prefix.
if not self._meta_path(task_list_id).exists():
inferred_role = (
TaskListRole.TEMPLATE
if task_list_id.startswith("colony:")
else TaskListRole.SESSION
)
self._write_meta_sync(
task_list_id,
TaskListMeta(
task_list_id=task_list_id,
role=inferred_role,
),
)
new_id = self._next_id_sync(task_list_id)
now = time.time()
record = TaskRecord(
id=new_id,
subject=subject,
description=description,
active_form=active_form,
owner=owner,
status=TaskStatus.PENDING,
metadata=metadata,
created_at=now,
updated_at=now,
)
self._write_task_sync(task_list_id, record)
# Bump high-water-mark eagerly so even a concurrent racer that
# somehow missed the listing snapshot can't pick the same id.
if new_id > self._read_highwatermark_sync(task_list_id):
self._write_highwatermark_sync(task_list_id, new_id)
return record
# ----- update -------------------------------------------------------
def _update_task_sync(
self,
task_list_id: str,
task_id: int,
subject: str | None,
description: str | None,
active_form: str | None,
owner: str | None | _Unset,
status: TaskStatus | None,
add_blocks: list[int] | None,
add_blocked_by: list[int] | None,
metadata_patch: dict[str, Any] | None,
) -> tuple[TaskRecord | None, list[str]]:
with self._list_lock(task_list_id):
current = self._read_task_sync(task_list_id, task_id)
if current is None:
return None, []
return self._update_task_unsafe(
task_list_id,
current,
subject=subject,
description=description,
active_form=active_form,
owner=owner,
status=status,
add_blocks=add_blocks,
add_blocked_by=add_blocked_by,
metadata_patch=metadata_patch,
)
def _update_task_unsafe(
self,
task_list_id: str,
current: TaskRecord,
*,
subject: str | None = None,
description: str | None = None,
active_form: str | None = None,
owner: str | None | _Unset = _UNSET_SENTINEL,
status: TaskStatus | None = None,
add_blocks: list[int] | None = None,
add_blocked_by: list[int] | None = None,
metadata_patch: dict[str, Any] | None = None,
) -> tuple[TaskRecord, list[str]]:
"""Update without acquiring the list-lock. Caller MUST hold it."""
changed: list[str] = []
new = current.model_copy(deep=True)
if subject is not None and subject != new.subject:
new.subject = subject
changed.append("subject")
if description is not None and description != new.description:
new.description = description
changed.append("description")
if active_form is not None and active_form != new.active_form:
new.active_form = active_form
changed.append("active_form")
if not isinstance(owner, _Unset) and owner != new.owner:
new.owner = owner
changed.append("owner")
if status is not None and status != new.status:
new.status = status
changed.append("status")
if add_blocks:
for b in add_blocks:
if b not in new.blocks and b != new.id:
new.blocks.append(b)
if "blocks" not in changed:
changed.append("blocks")
# Maintain the bidirectional invariant by stamping
# blocked_by on the target as well.
target = self._read_task_sync(task_list_id, b)
if target and new.id not in target.blocked_by:
target.blocked_by.append(new.id)
target.updated_at = time.time()
self._write_task_sync(task_list_id, target)
if add_blocked_by:
for b in add_blocked_by:
if b not in new.blocked_by and b != new.id:
new.blocked_by.append(b)
if "blocked_by" not in changed:
changed.append("blocked_by")
target = self._read_task_sync(task_list_id, b)
if target and new.id not in target.blocks:
target.blocks.append(new.id)
target.updated_at = time.time()
self._write_task_sync(task_list_id, target)
if metadata_patch is not None:
md = dict(new.metadata)
for k, v in metadata_patch.items():
if v is None:
md.pop(k, None)
else:
md[k] = v
if md != new.metadata:
new.metadata = md
changed.append("metadata")
if not changed:
return new, []
new.updated_at = time.time()
self._write_task_sync(task_list_id, new)
return new, changed
# ----- delete -------------------------------------------------------
def _delete_task_sync(self, task_list_id: str, task_id: int) -> tuple[bool, list[int]]:
with self._list_lock(task_list_id):
path = self._task_path(task_list_id, task_id)
if not path.exists():
return False, []
# 1. Bump high-water-mark BEFORE unlinking so a crash mid-delete
# can't accidentally re-allocate the id.
current_floor = self._read_highwatermark_sync(task_list_id)
if task_id > current_floor:
self._write_highwatermark_sync(task_list_id, task_id)
# 2. Unlink the task itself.
path.unlink()
# 3. Cascade: strip references from all other tasks.
cascaded: list[int] = []
for other in self._list_tasks_sync(task_list_id):
touched = False
if task_id in other.blocks:
other.blocks = [b for b in other.blocks if b != task_id]
touched = True
if task_id in other.blocked_by:
other.blocked_by = [b for b in other.blocked_by if b != task_id]
touched = True
if touched:
other.updated_at = time.time()
self._write_task_sync(task_list_id, other)
cascaded.append(other.id)
return True, cascaded
# ----- reset --------------------------------------------------------
def _reset_sync(self, task_list_id: str) -> None:
with self._list_lock(task_list_id):
tasks = self._list_tasks_sync(task_list_id)
max_id = max((r.id for r in tasks), default=0)
floor = self._read_highwatermark_sync(task_list_id)
new_floor = max(max_id, floor)
self._write_highwatermark_sync(task_list_id, new_floor)
d = self._tasks_dir(task_list_id)
if d.exists():
for p in d.iterdir():
if p.suffix == ".json":
p.unlink()
# ----- claim --------------------------------------------------------
def _claim_sync(self, task_list_id: str, task_id: int, claimant: str) -> ClaimResult:
with self._list_lock(task_list_id):
current = self._read_task_sync(task_list_id, task_id)
if current is None:
return ClaimNotFound(kind="not_found")
if current.status == TaskStatus.COMPLETED:
return ClaimAlreadyCompleted(kind="already_completed")
if current.owner is not None and current.owner != claimant:
return ClaimAlreadyOwned(kind="already_owned", by=current.owner)
unresolved_blockers: list[int] = []
for b in current.blocked_by:
blocker = self._read_task_sync(task_list_id, b)
if blocker is not None and blocker.status != TaskStatus.COMPLETED:
unresolved_blockers.append(b)
if unresolved_blockers:
return ClaimBlocked(kind="blocked", by=unresolved_blockers)
new, _ = self._update_task_unsafe(task_list_id, current, owner=claimant)
return ClaimOk(kind="ok", record=new)
# ---------------------------------------------------------------------------
# Process-wide singleton (small, stateless wrapper)
# ---------------------------------------------------------------------------
_default_store: TaskStore | None = None
def get_task_store() -> TaskStore:
"""Process-wide default TaskStore (resolves HIVE_HOME at first call).
Tests should construct a TaskStore directly with hive_root=tmp_path
rather than relying on the singleton.
"""
global _default_store
if _default_store is None:
_default_store = TaskStore()
return _default_store
# Convenience for tests / utilities.
def fingerprint_for_test(task_list_id: str, hive_root: Path) -> Iterable[Path]:
"""Yield every file under a list root — used by tests to assert
byte-equivalence pre/post shutdown.
"""
root = task_list_path(task_list_id, hive_root=hive_root)
if not root.exists():
return []
return sorted(root.rglob("*"))
+284
View File
@@ -0,0 +1,284 @@
"""End-to-end tests:
- Session task tools fire EventBus events
- REST routes return correct snapshots
- run_parallel_workers-style flow stamps assigned_session
- Durability: store survives a process boundary (subprocess)
"""
from __future__ import annotations
import asyncio
import os
import subprocess
import sys
from pathlib import Path
import pytest
import pytest_asyncio
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
from framework.host.event_bus import AgentEvent, EventBus, EventType
from framework.llm.provider import ToolUse
from framework.loader.tool_registry import ToolRegistry
from framework.tasks import TaskListRole, TaskStore
from framework.tasks.events import set_default_event_bus
from framework.tasks.hooks import clear_hooks
from framework.tasks.tools import register_colony_template_tools, register_task_tools
@pytest.fixture(autouse=True)
def _reset_hooks() -> None:
clear_hooks()
yield
clear_hooks()
@pytest.fixture
def store(tmp_path: Path) -> TaskStore:
return TaskStore(hive_root=tmp_path)
@pytest.fixture
def registry(store: TaskStore) -> ToolRegistry:
reg = ToolRegistry()
register_task_tools(reg, store=store)
register_colony_template_tools(reg, colony_id="abc", store=store)
return reg
async def _invoke(registry: ToolRegistry, name: str, **inputs):
executor = registry.get_executor()
result = executor(ToolUse(id=f"call_{name}", name=name, input=inputs))
if asyncio.iscoroutine(result):
result = await result
return result
# ---------------------------------------------------------------------------
# EventBus integration
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_task_created_emits_event(registry: ToolRegistry) -> None:
bus = EventBus()
set_default_event_bus(bus)
received: list[AgentEvent] = []
async def handler(ev: AgentEvent) -> None:
received.append(ev)
bus.subscribe([EventType.TASK_CREATED], handler)
token = ToolRegistry.set_execution_context(
agent_id="alice", task_list_id="session:alice:s1"
)
try:
await _invoke(registry, "task_create", subject="hello")
finally:
ToolRegistry.reset_execution_context(token)
# Allow the publish to fan out.
await asyncio.sleep(0.05)
assert len(received) == 1
assert received[0].type == EventType.TASK_CREATED
assert received[0].data["task"]["subject"] == "hello"
set_default_event_bus(None)
@pytest.mark.asyncio
async def test_task_updated_emits_event(registry: ToolRegistry) -> None:
bus = EventBus()
set_default_event_bus(bus)
received: list[AgentEvent] = []
async def handler(ev: AgentEvent) -> None:
received.append(ev)
bus.subscribe([EventType.TASK_UPDATED], handler)
token = ToolRegistry.set_execution_context(
agent_id="alice", task_list_id="session:alice:s1"
)
try:
await _invoke(registry, "task_create", subject="x")
await _invoke(registry, "task_update", id=1, status="in_progress")
finally:
ToolRegistry.reset_execution_context(token)
await asyncio.sleep(0.05)
assert len(received) >= 1
assert received[0].type == EventType.TASK_UPDATED
set_default_event_bus(None)
# ---------------------------------------------------------------------------
# REST routes integration
# ---------------------------------------------------------------------------
@pytest_asyncio.fixture
async def http_client(tmp_path: Path) -> TestClient:
"""Spin up a stripped-down aiohttp app exposing only the task routes."""
# Point the default TaskStore at the tmp_path so routes see our test data.
os.environ["HIVE_HOME"] = str(tmp_path)
# Force a fresh singleton.
import framework.tasks.store as _store_mod
_store_mod._default_store = None
from framework.server.routes_tasks import register_routes
app = web.Application()
register_routes(app)
server = TestServer(app)
client = TestClient(server)
await client.start_server()
yield client
await client.close()
@pytest.mark.asyncio
async def test_rest_get_task_list_404(http_client: TestClient) -> None:
resp = await http_client.get("/api/tasks/session:nope:nope")
assert resp.status == 404
body = await resp.json()
assert body["task_list_id"] == "session:nope:nope"
@pytest.mark.asyncio
async def test_rest_get_task_list_after_create(http_client: TestClient) -> None:
# Create a list + task via the store directly so we don't have to mount
# the tools just for this test.
from framework.tasks import get_task_store
store = get_task_store()
await store.ensure_task_list("session:alice:s1", role=TaskListRole.SESSION)
await store.create_task("session:alice:s1", subject="abc")
resp = await http_client.get("/api/tasks/session:alice:s1")
assert resp.status == 200
body = await resp.json()
assert body["task_list_id"] == "session:alice:s1"
assert body["role"] == "session"
assert len(body["tasks"]) == 1
assert body["tasks"][0]["subject"] == "abc"
@pytest.mark.asyncio
async def test_rest_colony_lists(http_client: TestClient) -> None:
resp = await http_client.get("/api/colonies/test_colony/task_lists?queen_session_id=sess123")
assert resp.status == 200
body = await resp.json()
assert body["template_task_list_id"] == "colony:test_colony"
assert body["queen_session_task_list_id"] == "session:queen:sess123"
# ---------------------------------------------------------------------------
# Cross-process durability — write in subprocess A, read in subprocess B.
# Demonstrates the "task survives runtime restart" guarantee.
# ---------------------------------------------------------------------------
def test_durability_across_subprocesses(tmp_path: Path) -> None:
env = dict(os.environ)
env["HIVE_HOME"] = str(tmp_path)
env["PYTHONUNBUFFERED"] = "1"
write_script = """
import asyncio
from framework.tasks import TaskStore, TaskListRole
async def main():
s = TaskStore()
await s.ensure_task_list('session:a:b', role=TaskListRole.SESSION)
rec = await s.create_task('session:a:b', subject='persisted')
print(rec.id)
asyncio.run(main())
"""
out = subprocess.run(
[sys.executable, "-c", write_script],
env=env,
check=True,
capture_output=True,
text=True,
)
written_id = int(out.stdout.strip())
assert written_id == 1
read_script = """
import asyncio
from framework.tasks import TaskStore
async def main():
s = TaskStore()
rs = await s.list_tasks('session:a:b')
print(len(rs), rs[0].subject if rs else '')
asyncio.run(main())
"""
out2 = subprocess.run(
[sys.executable, "-c", read_script],
env=env,
check=True,
capture_output=True,
text=True,
)
count, subject = out2.stdout.strip().split(" ", 1)
assert count == "1"
assert subject == "persisted"
# ---------------------------------------------------------------------------
# "run_parallel_workers" style flow at the storage level.
# Validates plan-and-spawn pattern: queen publishes templates, then stamps
# assigned_session per spawned worker.
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_template_assignment_flow(store: TaskStore) -> None:
template_id = "colony:swarm"
await store.ensure_task_list(template_id, role=TaskListRole.TEMPLATE)
rec1 = await store.create_task(template_id, subject="crawl A")
rec2 = await store.create_task(template_id, subject="crawl B")
# Simulate run_parallel_workers stamping after spawn.
await store.update_task(
template_id,
rec1.id,
metadata_patch={"assigned_session": "session:w1:w1", "assigned_worker_id": "w1"},
)
await store.update_task(
template_id,
rec2.id,
metadata_patch={"assigned_session": "session:w2:w2", "assigned_worker_id": "w2"},
)
rs = await store.list_tasks(template_id)
assert all(r.metadata.get("assigned_worker_id") for r in rs)
# ---------------------------------------------------------------------------
# Reset preserves byte-equivalence semantics (durability under graceful op)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_graceful_no_op_preserves_files(store: TaskStore, tmp_path: Path) -> None:
"""The store has no shutdown hook — touching it never deletes files."""
list_id = "session:a:b"
await store.ensure_task_list(list_id, role=TaskListRole.SESSION)
rec = await store.create_task(list_id, subject="x")
pre = sorted((tmp_path).rglob("*.json"))
pre_bytes = {p.name: p.read_bytes() for p in pre}
# Simulate "agent loop teardown" — should be a no-op.
# (No method to call — the absence of teardown hooks IS the test.)
post = sorted((tmp_path).rglob("*.json"))
assert {p.name for p in post} == {p.name for p in pre}
for p in post:
assert p.read_bytes() == pre_bytes[p.name]
assert rec.id == 1
@@ -0,0 +1,193 @@
"""Integration tests that wire multiple subsystems together.
Verifies the plan-and-spawn pattern end-to-end:
- Queen authors colony template entries (via colony_template_add)
- "spawn" stamps assigned_session metadata + emits the right event
- Workers operate on their own session list (no fall-through)
"""
from __future__ import annotations
import asyncio
import json
from pathlib import Path
import pytest
from framework.host.event_bus import AgentEvent, EventBus, EventType
from framework.llm.provider import ToolUse
from framework.loader.tool_registry import ToolRegistry
from framework.tasks import TaskListRole, TaskStore
from framework.tasks.events import (
emit_colony_template_assignment,
set_default_event_bus,
)
from framework.tasks.hooks import clear_hooks
from framework.tasks.scoping import (
colony_task_list_id,
session_task_list_id,
)
from framework.tasks.tools import register_colony_template_tools, register_task_tools
@pytest.fixture(autouse=True)
def _reset_hooks() -> None:
clear_hooks()
yield
clear_hooks()
async def _invoke(reg: ToolRegistry, name: str, **inputs):
executor = reg.get_executor()
result = executor(ToolUse(id=f"call_{name}", name=name, input=inputs))
if asyncio.iscoroutine(result):
result = await result
return result
@pytest.mark.asyncio
async def test_queen_plans_workers_pick_up(tmp_path: Path) -> None:
"""Queen authors a 3-step plan; we simulate spawning 3 workers, each
associated with one template entry. Each worker writes to its own
session list. The colony template gets stamped with assigned_session.
"""
bus = EventBus()
set_default_event_bus(bus)
received: list[AgentEvent] = []
async def handler(ev: AgentEvent) -> None:
received.append(ev)
bus.subscribe(
[
EventType.TASK_CREATED,
EventType.TASK_UPDATED,
EventType.COLONY_TEMPLATE_ASSIGNMENT,
],
handler,
)
store = TaskStore(hive_root=tmp_path)
queen_reg = ToolRegistry()
register_task_tools(queen_reg, store=store)
register_colony_template_tools(queen_reg, colony_id="alpha", store=store)
# 1. Queen authors the plan.
qtoken = ToolRegistry.set_execution_context(
agent_id="queen",
task_list_id=session_task_list_id("queen", "qsess"),
colony_id="alpha",
)
try:
for subject in ("crawl A", "crawl B", "crawl C"):
r = await _invoke(queen_reg, "colony_template_add", subject=subject)
assert json.loads(r.content)["success"] is True
# Verify the colony template now has 3 entries.
list_result = await _invoke(queen_reg, "colony_template_list")
body = json.loads(list_result.content)
assert body["count"] == 3
template_entries = body["tasks"]
finally:
ToolRegistry.reset_execution_context(qtoken)
template_list_id = colony_task_list_id("alpha")
# 2. Simulate spawning a worker per template entry: stamp the
# assigned_session and emit the assignment event.
worker_ids = ["w1", "w2", "w3"]
for entry, wid in zip(template_entries, worker_ids, strict=True):
await store.update_task(
template_list_id,
entry["id"],
metadata_patch={
"assigned_session": session_task_list_id(wid, wid),
"assigned_worker_id": wid,
},
)
await emit_colony_template_assignment(
colony_id="alpha",
task_id=entry["id"],
assigned_session=session_task_list_id(wid, wid),
assigned_worker_id=wid,
)
# 3. Each worker operates on its OWN session list.
for wid in worker_ids:
worker_reg = ToolRegistry()
register_task_tools(worker_reg, store=store)
wtoken = ToolRegistry.set_execution_context(
agent_id=wid, task_list_id=session_task_list_id(wid, wid)
)
try:
await _invoke(worker_reg, "task_create", subject=f"setup for {wid}")
await _invoke(worker_reg, "task_update", id=1, status="in_progress")
finally:
ToolRegistry.reset_execution_context(wtoken)
# 4. Verify the colony template entries are stamped + workers have
# their own private lists.
template_after = await store.list_tasks(template_list_id)
assert all(
t.metadata.get("assigned_worker_id") in {"w1", "w2", "w3"}
for t in template_after
)
for wid in worker_ids:
worker_tasks = await store.list_tasks(session_task_list_id(wid, wid))
assert len(worker_tasks) == 1
assert worker_tasks[0].owner == wid # auto-stamped on in_progress
assert worker_tasks[0].subject == f"setup for {wid}"
# 5. Confirm the assignment events fired.
await asyncio.sleep(0.05)
assignments = [e for e in received if e.type == EventType.COLONY_TEMPLATE_ASSIGNMENT]
assert len(assignments) == 3
set_default_event_bus(None)
@pytest.mark.asyncio
async def test_session_tools_never_touch_template(tmp_path: Path) -> None:
"""The four session tools must operate exclusively on the session list.
Even when colony_id is set in execution context, task_create writes to
session list, not the template.
"""
store = TaskStore(hive_root=tmp_path)
reg = ToolRegistry()
register_task_tools(reg, store=store)
token = ToolRegistry.set_execution_context(
agent_id="alice",
task_list_id=session_task_list_id("alice", "sess1"),
colony_id="alpha", # has colony_id but we still write to session
)
try:
await _invoke(reg, "task_create", subject="my work")
finally:
ToolRegistry.reset_execution_context(token)
# Session list got the task.
session_tasks = await store.list_tasks(session_task_list_id("alice", "sess1"))
assert len(session_tasks) == 1
# Colony template MUST be empty (no leakage).
assert not await store.list_exists(colony_task_list_id("alpha"))
@pytest.mark.asyncio
async def test_resume_persisted_handle(tmp_path: Path) -> None:
"""A session list created in 'session A' is still readable as long as
we resolve to the same task_list_id."""
store = TaskStore(hive_root=tmp_path)
list_id = session_task_list_id("alice", "sess_persistent")
await store.ensure_task_list(list_id, role=TaskListRole.SESSION)
await store.create_task(list_id, subject="a")
await store.create_task(list_id, subject="b")
# Simulate a fresh process / "resume" — same hive_root, same list_id.
store2 = TaskStore(hive_root=tmp_path)
rs = await store2.list_tasks(list_id)
assert [t.subject for t in rs] == ["a", "b"]
@@ -0,0 +1,121 @@
"""Tests for the periodic task-reminder logic.
The reminder state is a small counter machine; the policy is:
- Bump on each iteration
- Reset to zero on any task op tool call (task_create / task_update /
colony_template_*)
- When ``turns_since_task_op >= REMINDER_THRESHOLD_TURNS`` AND
``turns_since_last_reminder >= REMINDER_COOLDOWN_TURNS`` AND there
are open tasks, fire a reminder
The build_reminder helper composes the message body checked for the
key behavioral nudges (granularity + completion discipline).
"""
from __future__ import annotations
from pathlib import Path
import pytest
from framework.tasks import TaskListRole, TaskStore
from framework.tasks.models import TaskStatus
from framework.tasks.reminders import (
REMINDER_COOLDOWN_TURNS,
REMINDER_THRESHOLD_TURNS,
ReminderState,
build_reminder,
saw_task_op,
)
def test_state_bumps_each_iteration() -> None:
s = ReminderState()
s.on_iteration()
s.on_iteration()
assert s.turns_since_task_op == 2
assert s.turns_since_last_reminder == 2
def test_state_resets_on_task_op() -> None:
s = ReminderState()
for _ in range(5):
s.on_iteration()
s.on_task_op()
assert s.turns_since_task_op == 0
# Reminder cooldown is independent — it tracks reminders, not ops.
assert s.turns_since_last_reminder == 5
def test_should_remind_below_threshold() -> None:
s = ReminderState()
s.turns_since_task_op = REMINDER_THRESHOLD_TURNS - 1
s.turns_since_last_reminder = REMINDER_COOLDOWN_TURNS
assert not s.should_remind(has_open_tasks=True)
def test_should_remind_no_tasks() -> None:
s = ReminderState()
s.turns_since_task_op = REMINDER_THRESHOLD_TURNS + 5
s.turns_since_last_reminder = REMINDER_COOLDOWN_TURNS + 5
assert not s.should_remind(has_open_tasks=False)
def test_should_remind_at_threshold() -> None:
s = ReminderState()
s.turns_since_task_op = REMINDER_THRESHOLD_TURNS
s.turns_since_last_reminder = REMINDER_COOLDOWN_TURNS
assert s.should_remind(has_open_tasks=True)
def test_cooldown_blocks_back_to_back() -> None:
s = ReminderState()
s.turns_since_task_op = REMINDER_THRESHOLD_TURNS + 5
s.on_reminder_sent()
assert not s.should_remind(has_open_tasks=True)
def test_saw_task_op_recognizes_mutating_tools() -> None:
assert saw_task_op(["task_create"])
assert saw_task_op(["read_file", "task_update"])
assert saw_task_op(["colony_template_add"])
# Reads do NOT reset the counter — important: model could read forever
# without making progress.
assert not saw_task_op(["task_list", "task_get"])
assert not saw_task_op([])
@pytest.mark.asyncio
async def test_build_reminder_includes_open_tasks(tmp_path: Path) -> None:
store = TaskStore(hive_root=tmp_path)
await store.ensure_task_list("session:a:b", role=TaskListRole.SESSION)
await store.create_task("session:a:b", subject="step 1")
rec2 = await store.create_task("session:a:b", subject="step 2")
await store.create_task("session:a:b", subject="step 3")
# Mark #2 in_progress so the reminder mentions it.
await store.update_task("session:a:b", rec2.id, status=TaskStatus.IN_PROGRESS)
records = await store.list_tasks("session:a:b")
body = build_reminder(records)
assert "task_reminder" in body
assert "step 1" in body
assert "step 2" in body
assert "step 3" in body
# Granularity nudge present.
assert "umbrella" in body.lower() or "atomic" in body.lower()
# Completion-discipline nudge present.
assert "completed" in body.lower()
# Anti-nag boilerplate remains present.
assert "NEVER mention this reminder to the user" in body
@pytest.mark.asyncio
async def test_build_reminder_empty_when_no_open(tmp_path: Path) -> None:
store = TaskStore(hive_root=tmp_path)
await store.ensure_task_list("session:a:b", role=TaskListRole.SESSION)
rec = await store.create_task("session:a:b", subject="done already")
await store.update_task("session:a:b", rec.id, status=TaskStatus.COMPLETED)
records = await store.list_tasks("session:a:b")
assert build_reminder(records) == ""
@@ -0,0 +1,65 @@
"""Tests for resolve_task_list_id."""
from __future__ import annotations
from dataclasses import dataclass
import pytest
from framework.tasks.scoping import (
colony_task_list_id,
parse_task_list_id,
resolve_task_list_id,
session_task_list_id,
)
@dataclass
class FakeCtx:
agent_id: str = ""
run_id: str = ""
execution_id: str = ""
stream_id: str = ""
task_list_id: str | None = None
def test_session_helper() -> None:
assert session_task_list_id("a", "b") == "session:a:b"
def test_colony_helper() -> None:
assert colony_task_list_id("c") == "colony:c"
def test_parse_session() -> None:
parts = parse_task_list_id("session:agent:sess")
assert parts == {"kind": "session", "agent_id": "agent", "session_id": "sess"}
def test_parse_colony() -> None:
parts = parse_task_list_id("colony:abc")
assert parts == {"kind": "colony", "colony_id": "abc"}
def test_resolve_uses_existing(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("HIVE_TASK_LIST_ID", raising=False)
ctx = FakeCtx(agent_id="x", run_id="r1", task_list_id="session:x:r1")
assert resolve_task_list_id(ctx) == "session:x:r1"
def test_resolve_env_override(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("HIVE_TASK_LIST_ID", "forced")
ctx = FakeCtx(agent_id="x", run_id="r1")
assert resolve_task_list_id(ctx) == "forced"
def test_resolve_synthesizes_session(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("HIVE_TASK_LIST_ID", raising=False)
ctx = FakeCtx(agent_id="alice", run_id="r123")
assert resolve_task_list_id(ctx) == "session:alice:r123"
def test_resolve_falls_back_to_unscoped(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("HIVE_TASK_LIST_ID", raising=False)
ctx = FakeCtx(agent_id="alice")
assert resolve_task_list_id(ctx).startswith("unscoped:")
+277
View File
@@ -0,0 +1,277 @@
"""Tests for the file-backed task store.
Concurrency / id-monotonicity / cascade / claim / reset the engineering
primitives the rest of the system relies on.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
import pytest
from framework.tasks import TaskListRole, TaskStatus, TaskStore
from framework.tasks.models import ClaimAlreadyOwned, ClaimBlocked, ClaimNotFound, ClaimOk
@pytest.fixture
def store(tmp_path: Path) -> TaskStore:
return TaskStore(hive_root=tmp_path)
@pytest.fixture
def list_id() -> str:
return "session:test_agent:test_session"
# ---------------------------------------------------------------------------
# Basic CRUD
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_create_and_get(store: TaskStore, list_id: str) -> None:
await store.ensure_task_list(list_id, role=TaskListRole.SESSION)
rec = await store.create_task(list_id, subject="hi")
assert rec.id == 1
fetched = await store.get_task(list_id, 1)
assert fetched is not None
assert fetched.subject == "hi"
assert fetched.status == TaskStatus.PENDING
@pytest.mark.asyncio
async def test_get_missing_returns_none(store: TaskStore, list_id: str) -> None:
assert await store.get_task(list_id, 999) is None
@pytest.mark.asyncio
async def test_list_ascending(store: TaskStore, list_id: str) -> None:
await store.ensure_task_list(list_id, role=TaskListRole.SESSION)
await store.create_task(list_id, subject="a")
await store.create_task(list_id, subject="b")
await store.create_task(list_id, subject="c")
rs = await store.list_tasks(list_id)
assert [r.id for r in rs] == [1, 2, 3]
@pytest.mark.asyncio
async def test_list_filters_internal(store: TaskStore, list_id: str) -> None:
await store.ensure_task_list(list_id, role=TaskListRole.SESSION)
await store.create_task(list_id, subject="visible")
await store.create_task(list_id, subject="hidden", metadata={"_internal": True})
public = await store.list_tasks(list_id)
assert len(public) == 1
all_ = await store.list_tasks(list_id, include_internal=True)
assert len(all_) == 2
# ---------------------------------------------------------------------------
# Concurrent creation: two parallel calls -> N and N+1
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_concurrent_create_distinct_ids(store: TaskStore, list_id: str) -> None:
await store.ensure_task_list(list_id, role=TaskListRole.SESSION)
results = await asyncio.gather(
*(store.create_task(list_id, subject=f"t{i}") for i in range(20))
)
ids = sorted(r.id for r in results)
assert ids == list(range(1, 21))
# ---------------------------------------------------------------------------
# Update + change detection
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_update_returns_changed_fields(store: TaskStore, list_id: str) -> None:
await store.ensure_task_list(list_id, role=TaskListRole.SESSION)
rec = await store.create_task(list_id, subject="orig")
new, fields = await store.update_task(
list_id, rec.id, subject="orig", status=TaskStatus.IN_PROGRESS
)
assert fields == ["status"] # subject unchanged shouldn't appear
assert new.status == TaskStatus.IN_PROGRESS
@pytest.mark.asyncio
async def test_update_missing_returns_none(store: TaskStore, list_id: str) -> None:
new, fields = await store.update_task(list_id, 42, subject="x")
assert new is None
assert fields == []
@pytest.mark.asyncio
async def test_metadata_patch_merges_and_deletes(store: TaskStore, list_id: str) -> None:
await store.ensure_task_list(list_id, role=TaskListRole.SESSION)
rec = await store.create_task(list_id, subject="x", metadata={"a": 1, "b": 2})
new, _ = await store.update_task(list_id, rec.id, metadata_patch={"a": 10, "b": None})
assert new.metadata == {"a": 10}
# ---------------------------------------------------------------------------
# Bidirectional blocks
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_blocks_bidirectional(store: TaskStore, list_id: str) -> None:
await store.ensure_task_list(list_id, role=TaskListRole.SESSION)
a = await store.create_task(list_id, subject="a")
b = await store.create_task(list_id, subject="b")
new_a, _ = await store.update_task(list_id, a.id, add_blocks=[b.id])
assert b.id in new_a.blocks
fetched_b = await store.get_task(list_id, b.id)
assert a.id in fetched_b.blocked_by
@pytest.mark.asyncio
async def test_blocked_by_bidirectional(store: TaskStore, list_id: str) -> None:
await store.ensure_task_list(list_id, role=TaskListRole.SESSION)
a = await store.create_task(list_id, subject="a")
b = await store.create_task(list_id, subject="b")
new_b, _ = await store.update_task(list_id, b.id, add_blocked_by=[a.id])
assert a.id in new_b.blocked_by
fetched_a = await store.get_task(list_id, a.id)
assert b.id in fetched_a.blocks
# ---------------------------------------------------------------------------
# Delete: highwatermark + cascade
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_delete_increments_highwatermark(store: TaskStore, list_id: str) -> None:
await store.ensure_task_list(list_id, role=TaskListRole.SESSION)
await store.create_task(list_id, subject="a")
b = await store.create_task(list_id, subject="b")
deleted, _ = await store.delete_task(list_id, b.id)
assert deleted
new = await store.create_task(list_id, subject="c")
assert new.id == b.id + 1, "deleted ids must never be reused"
@pytest.mark.asyncio
async def test_delete_cascades_blocks(store: TaskStore, list_id: str) -> None:
await store.ensure_task_list(list_id, role=TaskListRole.SESSION)
a = await store.create_task(list_id, subject="a")
b = await store.create_task(list_id, subject="b")
c = await store.create_task(list_id, subject="c")
await store.update_task(list_id, a.id, add_blocks=[b.id])
await store.update_task(list_id, c.id, add_blocked_by=[b.id])
_, cascade = await store.delete_task(list_id, b.id)
assert sorted(cascade) == sorted([a.id, c.id])
fetched_a = await store.get_task(list_id, a.id)
fetched_c = await store.get_task(list_id, c.id)
assert b.id not in fetched_a.blocks
assert b.id not in fetched_c.blocked_by
@pytest.mark.asyncio
async def test_delete_missing_returns_false(store: TaskStore, list_id: str) -> None:
deleted, cascade = await store.delete_task(list_id, 42)
assert not deleted
assert cascade == []
# ---------------------------------------------------------------------------
# Reset preserves high-water-mark
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_reset_preserves_floor(store: TaskStore, list_id: str) -> None:
await store.ensure_task_list(list_id, role=TaskListRole.SESSION)
for _ in range(5):
await store.create_task(list_id, subject="x")
await store.reset_task_list(list_id)
new = await store.create_task(list_id, subject="post-reset")
assert new.id == 6
# ---------------------------------------------------------------------------
# Claim semantics (used by run_parallel_workers)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_claim_ok(store: TaskStore, list_id: str) -> None:
await store.ensure_task_list(list_id, role=TaskListRole.TEMPLATE)
rec = await store.create_task(list_id, subject="x")
result = await store.claim_task_with_busy_check(list_id, rec.id, "agent_a")
assert isinstance(result, ClaimOk)
assert result.record.owner == "agent_a"
@pytest.mark.asyncio
async def test_claim_already_owned(store: TaskStore, list_id: str) -> None:
await store.ensure_task_list(list_id, role=TaskListRole.TEMPLATE)
rec = await store.create_task(list_id, subject="x", owner="agent_a")
result = await store.claim_task_with_busy_check(list_id, rec.id, "agent_b")
assert isinstance(result, ClaimAlreadyOwned)
assert result.by == "agent_a"
@pytest.mark.asyncio
async def test_claim_not_found(store: TaskStore, list_id: str) -> None:
result = await store.claim_task_with_busy_check(list_id, 999, "agent_a")
assert isinstance(result, ClaimNotFound)
@pytest.mark.asyncio
async def test_claim_blocked(store: TaskStore, list_id: str) -> None:
await store.ensure_task_list(list_id, role=TaskListRole.TEMPLATE)
a = await store.create_task(list_id, subject="prereq")
b = await store.create_task(list_id, subject="dep")
await store.update_task(list_id, b.id, add_blocked_by=[a.id])
# a is still pending -> b blocked.
result = await store.claim_task_with_busy_check(list_id, b.id, "agent_a")
assert isinstance(result, ClaimBlocked)
assert a.id in result.by
# ---------------------------------------------------------------------------
# Meta lifecycle: ensure_task_list is idempotent and tracks last_seen
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ensure_task_list_idempotent(store: TaskStore, list_id: str) -> None:
m1 = await store.ensure_task_list(list_id, role=TaskListRole.SESSION, session_id="s1")
m2 = await store.ensure_task_list(list_id, role=TaskListRole.SESSION, session_id="s2")
assert m1.created_at == m2.created_at # same dir
assert "s1" in m2.last_seen_session_ids
assert "s2" in m2.last_seen_session_ids
@pytest.mark.asyncio
async def test_ensure_task_list_caps_history(store: TaskStore, list_id: str) -> None:
for i in range(15):
await store.ensure_task_list(list_id, role=TaskListRole.SESSION, session_id=f"s{i}")
meta = await store.get_meta(list_id)
assert len(meta.last_seen_session_ids) == 10
assert "s14" in meta.last_seen_session_ids
assert "s4" not in meta.last_seen_session_ids
# ---------------------------------------------------------------------------
# Path resolution sanity
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_colony_path(store: TaskStore, tmp_path: Path) -> None:
await store.ensure_task_list("colony:abc", role=TaskListRole.TEMPLATE)
assert (tmp_path / "colonies" / "abc" / "tasks" / "meta.json").exists()
@pytest.mark.asyncio
async def test_session_path(store: TaskStore, tmp_path: Path) -> None:
await store.ensure_task_list("session:agent_x:sess_y", role=TaskListRole.SESSION)
p = tmp_path / "agents" / "agent_x" / "sessions" / "sess_y" / "tasks" / "meta.json"
assert p.exists()
+324
View File
@@ -0,0 +1,324 @@
"""End-to-end tool tests via ToolRegistry.get_executor()."""
from __future__ import annotations
import asyncio
import json
from pathlib import Path
import pytest
from framework.llm.provider import ToolUse
from framework.loader.tool_registry import ToolRegistry
from framework.tasks import TaskStore
from framework.tasks.hooks import (
HOOK_TASK_COMPLETED,
HOOK_TASK_CREATED,
BlockingHookError,
clear_hooks,
register_hook,
)
from framework.tasks.tools import register_colony_template_tools, register_task_tools
@pytest.fixture(autouse=True)
def _reset_hooks() -> None:
clear_hooks()
yield
clear_hooks()
@pytest.fixture
def store(tmp_path: Path) -> TaskStore:
return TaskStore(hive_root=tmp_path)
@pytest.fixture
def registry_with_session_tools(store: TaskStore) -> ToolRegistry:
reg = ToolRegistry()
register_task_tools(reg, store=store)
return reg
async def _invoke(registry: ToolRegistry, name: str, **inputs):
"""Invoke a tool via the registry's executor protocol."""
executor = registry.get_executor()
result = executor(ToolUse(id=f"call_{name}", name=name, input=inputs))
if asyncio.iscoroutine(result):
result = await result
return result
def _set_ctx(*, agent_id: str, task_list_id: str, **extra):
return ToolRegistry.set_execution_context(
agent_id=agent_id, task_list_id=task_list_id, **extra
)
# ---------------------------------------------------------------------------
# Session tools — happy paths
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_create_then_list(registry_with_session_tools: ToolRegistry) -> None:
reg = registry_with_session_tools
list_id = "session:agent_a:sess_1"
token = _set_ctx(agent_id="agent_a", task_list_id=list_id)
try:
result = await _invoke(reg, "task_create", subject="Plan retrieval")
assert result.is_error is False
body = json.loads(result.content)
assert body["success"] is True
assert body["task_id"] == 1
result2 = await _invoke(reg, "task_list")
body2 = json.loads(result2.content)
assert body2["count"] == 1
assert body2["tasks"][0]["subject"] == "Plan retrieval"
finally:
ToolRegistry.reset_execution_context(token)
@pytest.mark.asyncio
async def test_update_in_progress_auto_owner(
registry_with_session_tools: ToolRegistry,
) -> None:
reg = registry_with_session_tools
list_id = "session:agent_a:sess_1"
token = _set_ctx(agent_id="agent_a", task_list_id=list_id)
try:
await _invoke(reg, "task_create", subject="x")
result = await _invoke(reg, "task_update", id=1, status="in_progress")
body = json.loads(result.content)
assert body["success"] is True
assert body["task"]["status"] == "in_progress"
assert body["task"]["owner"] == "agent_a" # auto-filled
finally:
ToolRegistry.reset_execution_context(token)
@pytest.mark.asyncio
async def test_update_status_deleted(
registry_with_session_tools: ToolRegistry,
) -> None:
reg = registry_with_session_tools
list_id = "session:agent_a:sess_1"
token = _set_ctx(agent_id="agent_a", task_list_id=list_id)
try:
await _invoke(reg, "task_create", subject="x")
result = await _invoke(reg, "task_update", id=1, status="deleted")
body = json.loads(result.content)
assert body["success"] is True
assert body["deleted"] is True
# Subsequent list sees nothing.
body2 = json.loads((await _invoke(reg, "task_list")).content)
assert body2["count"] == 0
finally:
ToolRegistry.reset_execution_context(token)
@pytest.mark.asyncio
async def test_get_returns_full_record(
registry_with_session_tools: ToolRegistry,
) -> None:
reg = registry_with_session_tools
list_id = "session:agent_a:sess_1"
token = _set_ctx(agent_id="agent_a", task_list_id=list_id)
try:
await _invoke(reg, "task_create", subject="x", description="full body")
result = await _invoke(reg, "task_get", id=1)
body = json.loads(result.content)
assert body["task"]["description"] == "full body"
finally:
ToolRegistry.reset_execution_context(token)
# ---------------------------------------------------------------------------
# Task-not-found is non-error (so sibling tool cancellation doesn't cascade)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_task_not_found_is_not_error(
registry_with_session_tools: ToolRegistry,
) -> None:
reg = registry_with_session_tools
list_id = "session:agent_a:sess_1"
token = _set_ctx(agent_id="agent_a", task_list_id=list_id)
try:
result = await _invoke(reg, "task_update", id=42, subject="ghost")
# is_error must be False so the streaming executor doesn't cascade-cancel.
assert result.is_error is False
body = json.loads(result.content)
assert body["success"] is False
finally:
ToolRegistry.reset_execution_context(token)
# ---------------------------------------------------------------------------
# Hooks: task_created blocking deletes the just-created task
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_completion_suffix_points_to_next_pending(
registry_with_session_tools: ToolRegistry,
) -> None:
"""When a task is marked completed, the result should point at the
lowest-id pending task as a steering nudge."""
reg = registry_with_session_tools
list_id = "session:agent_a:sess_1"
token = _set_ctx(agent_id="agent_a", task_list_id=list_id)
try:
await _invoke(reg, "task_create", subject="step 1")
await _invoke(reg, "task_create", subject="step 2")
await _invoke(reg, "task_create", subject="step 3")
await _invoke(reg, "task_update", id=1, status="in_progress")
result = await _invoke(reg, "task_update", id=1, status="completed")
body = json.loads(result.content)
assert body["success"] is True
assert "Next pending: #2" in body["message"]
assert "step 2" in body["message"]
finally:
ToolRegistry.reset_execution_context(token)
@pytest.mark.asyncio
async def test_completion_suffix_signals_all_done(
registry_with_session_tools: ToolRegistry,
) -> None:
reg = registry_with_session_tools
list_id = "session:agent_a:sess_1"
token = _set_ctx(agent_id="agent_a", task_list_id=list_id)
try:
await _invoke(reg, "task_create", subject="only step")
await _invoke(reg, "task_update", id=1, status="in_progress")
result = await _invoke(reg, "task_update", id=1, status="completed")
body = json.loads(result.content)
assert "All tasks complete" in body["message"]
finally:
ToolRegistry.reset_execution_context(token)
@pytest.mark.asyncio
async def test_completion_suffix_skips_blocked_pending(
registry_with_session_tools: ToolRegistry,
) -> None:
"""If the only pending task is blocked, the suffix should not point at
it fall through to "all done" or note in-progress siblings."""
reg = registry_with_session_tools
list_id = "session:agent_a:sess_1"
token = _set_ctx(agent_id="agent_a", task_list_id=list_id)
try:
await _invoke(reg, "task_create", subject="prereq")
await _invoke(reg, "task_create", subject="blocked dep")
# #2 is blocked by #1.
await _invoke(reg, "task_update", id=2, add_blocked_by=[1])
await _invoke(reg, "task_update", id=1, status="in_progress")
# Don't actually complete #1 — instead add an unrelated done.
await _invoke(reg, "task_create", subject="extra step")
await _invoke(reg, "task_update", id=3, status="in_progress")
result = await _invoke(reg, "task_update", id=3, status="completed")
body = json.loads(result.content)
# #2 is still blocked by uncompleted #1, so the suffix shouldn't
# surface it. #1 is in_progress, so the suffix highlights that.
assert "Still in progress: #1" in body["message"]
finally:
ToolRegistry.reset_execution_context(token)
@pytest.mark.asyncio
async def test_hook_blocks_task_created(
registry_with_session_tools: ToolRegistry,
) -> None:
reg = registry_with_session_tools
list_id = "session:agent_a:sess_1"
def blocker(ctx) -> None:
raise BlockingHookError("test policy")
register_hook(HOOK_TASK_CREATED, blocker)
token = _set_ctx(agent_id="agent_a", task_list_id=list_id)
try:
result = await _invoke(reg, "task_create", subject="will be aborted")
body = json.loads(result.content)
assert body["success"] is False
# The task must have been rolled back.
body2 = json.loads((await _invoke(reg, "task_list")).content)
assert body2["count"] == 0
finally:
ToolRegistry.reset_execution_context(token)
@pytest.mark.asyncio
async def test_hook_blocks_task_completed(
registry_with_session_tools: ToolRegistry,
) -> None:
reg = registry_with_session_tools
list_id = "session:agent_a:sess_1"
register_hook(HOOK_TASK_COMPLETED, lambda ctx: (_ for _ in ()).throw(BlockingHookError("nope")))
token = _set_ctx(agent_id="agent_a", task_list_id=list_id)
try:
await _invoke(reg, "task_create", subject="x")
await _invoke(reg, "task_update", id=1, status="in_progress")
result = await _invoke(reg, "task_update", id=1, status="completed")
body = json.loads(result.content)
assert body["success"] is False
# Status rolled back to in_progress, not stuck on completed.
body2 = json.loads((await _invoke(reg, "task_get", id=1)).content)
assert body2["task"]["status"] == "in_progress"
finally:
ToolRegistry.reset_execution_context(token)
# ---------------------------------------------------------------------------
# Colony template tools
# ---------------------------------------------------------------------------
@pytest.fixture
def queen_registry(store: TaskStore) -> ToolRegistry:
reg = ToolRegistry()
register_task_tools(reg, store=store)
register_colony_template_tools(reg, colony_id="abc", store=store)
return reg
@pytest.mark.asyncio
async def test_colony_template_add_and_list(queen_registry: ToolRegistry) -> None:
reg = queen_registry
queen_session_list = "session:queen:sess_1"
token = _set_ctx(agent_id="queen", task_list_id=queen_session_list, colony_id="abc")
try:
await _invoke(reg, "colony_template_add", subject="crawl")
await _invoke(reg, "colony_template_add", subject="parse")
body = json.loads((await _invoke(reg, "colony_template_list")).content)
assert body["count"] == 2
# The session task list should be empty — colony tools don't write there.
body_session = json.loads((await _invoke(reg, "task_list")).content)
assert body_session["count"] == 0
finally:
ToolRegistry.reset_execution_context(token)
@pytest.mark.asyncio
async def test_colony_template_remove(queen_registry: ToolRegistry) -> None:
reg = queen_registry
token = _set_ctx(
agent_id="queen", task_list_id="session:queen:sess_1", colony_id="abc"
)
try:
await _invoke(reg, "colony_template_add", subject="a")
await _invoke(reg, "colony_template_add", subject="b")
result = await _invoke(reg, "colony_template_remove", id=2)
body = json.loads(result.content)
assert body["success"] is True
# Next add gets id 3 (highwatermark preserved)
result2 = await _invoke(reg, "colony_template_add", subject="c")
body2 = json.loads(result2.content)
assert body2["task_id"] == 3
finally:
ToolRegistry.reset_execution_context(token)
+11
View File
@@ -0,0 +1,11 @@
"""Task tools — the four session-list tools and the queen-only colony template tools."""
from framework.tasks.tools.register import (
register_colony_template_tools,
register_task_tools,
)
__all__ = [
"register_colony_template_tools",
"register_task_tools",
]
+39
View File
@@ -0,0 +1,39 @@
"""Context resolution for task-tool executors.
Tool executors run synchronously inside ``ToolRegistry.get_executor()``;
they need the calling agent's id and task_list_id to know which list to
write to. We pull both from contextvars set by the runner /
ColonyRuntime / orchestrator before each agent's iteration.
"""
from __future__ import annotations
from typing import Any
from framework.loader.tool_registry import _execution_context
def current_context() -> dict[str, Any]:
return dict(_execution_context.get() or {})
def current_agent_id() -> str | None:
return current_context().get("agent_id")
def current_task_list_id() -> str | None:
return current_context().get("task_list_id")
def current_colony_id() -> str | None:
return current_context().get("colony_id")
def current_picked_up_from() -> tuple[str, int] | None:
"""If this session was spawned for a colony template entry, return it."""
raw = current_context().get("picked_up_from")
if not raw:
return None
if isinstance(raw, tuple) and len(raw) == 2:
return raw[0], int(raw[1])
return None
+238
View File
@@ -0,0 +1,238 @@
"""Queen-only colony template tools.
These tools manipulate a colony's task template — the queen's spawn plan.
They are gated to the queen of a colony at registration time
(``register_colony_template_tools(colony_id=...)``).
Workers never see these tools. The four session tools (`task_create`,
`task_update`, `task_list`, `task_get`) operate exclusively on the
caller's session list — never the colony template.
"""
from __future__ import annotations
import logging
from typing import Any
from framework.llm.provider import Tool
from framework.tasks.events import (
emit_task_created,
emit_task_deleted,
emit_task_updated,
)
from framework.tasks.models import TaskRecord, TaskStatus
from framework.tasks.scoping import colony_task_list_id
from framework.tasks.store import _UNSET_SENTINEL, TaskStore, get_task_store
from framework.tasks.tools.session_tools import _serialize_task
logger = logging.getLogger(__name__)
def _add_schema() -> dict[str, Any]:
return {
"type": "object",
"properties": {
"subject": {"type": "string"},
"description": {"type": "string"},
"active_form": {"type": "string"},
"metadata": {"type": "object"},
},
"required": ["subject"],
}
def _update_schema() -> dict[str, Any]:
return {
"type": "object",
"properties": {
"id": {"type": "integer"},
"subject": {"type": "string"},
"description": {"type": "string"},
"active_form": {"type": "string"},
"owner": {"type": ["string", "null"]},
"status": {
"type": "string",
"enum": ["pending", "in_progress", "completed"],
},
"metadata_patch": {"type": "object"},
},
"required": ["id"],
}
def _remove_schema() -> dict[str, Any]:
return {
"type": "object",
"properties": {"id": {"type": "integer"}},
"required": ["id"],
}
def _list_schema() -> dict[str, Any]:
return {"type": "object", "properties": {}}
_ADD_DESC = (
"Append a task to your colony's spawn-plan template. Templates are read "
"by `run_parallel_workers` and the UI; workers do not pull from the "
"template after spawn. Use this to plan colony work before spawning."
)
_UPDATE_DESC = (
"Update a template entry on your colony's spawn-plan template (e.g., "
"stamp completion when a worker reports back, adjust subject/description). "
"Only the queen can call this."
)
_REMOVE_DESC = (
"Remove a template entry from your colony's spawn-plan template. The "
"id is reserved (high-water-mark preserved) — never reused."
)
_LIST_DESC = (
"List all entries on your colony's spawn-plan template. Each entry "
"includes any `metadata.assigned_session` stamp that ties the entry to "
"a spawned worker."
)
def _make_add_executor(store: TaskStore, list_id: str):
async def execute(inputs: dict) -> dict[str, Any]:
rec: TaskRecord = await store.create_task(
list_id,
subject=inputs["subject"],
description=inputs.get("description", ""),
active_form=inputs.get("active_form"),
metadata=inputs.get("metadata") or {},
)
await emit_task_created(task_list_id=list_id, record=rec)
return {
"success": True,
"task_list_id": list_id,
"task_id": rec.id,
"message": f"Template entry #{rec.id} added: {rec.subject}",
"task": _serialize_task(rec),
}
return execute
def _make_update_executor(store: TaskStore, list_id: str):
async def execute(inputs: dict) -> dict[str, Any]:
task_id = int(inputs["id"])
status_in = inputs.get("status")
status_enum = TaskStatus(status_in) if status_in else None
owner_in = inputs.get("owner", _UNSET_SENTINEL)
new, fields = await store.update_task(
list_id,
task_id,
subject=inputs.get("subject"),
description=inputs.get("description"),
active_form=inputs.get("active_form"),
owner=owner_in,
status=status_enum,
metadata_patch=inputs.get("metadata_patch"),
)
if new is None:
return {
"success": False,
"task_list_id": list_id,
"task_id": task_id,
"message": f"Template entry #{task_id} not found.",
}
if fields:
await emit_task_updated(task_list_id=list_id, record=new, fields=fields)
return {
"success": True,
"task_list_id": list_id,
"task_id": task_id,
"fields": fields,
"message": f"Template entry #{task_id} updated. Fields: {', '.join(fields) or '(none)'}.",
"task": _serialize_task(new),
}
return execute
def _make_remove_executor(store: TaskStore, list_id: str):
async def execute(inputs: dict) -> dict[str, Any]:
task_id = int(inputs["id"])
deleted, cascade = await store.delete_task(list_id, task_id)
if not deleted:
return {
"success": False,
"task_list_id": list_id,
"task_id": task_id,
"message": f"Template entry #{task_id} not found.",
}
await emit_task_deleted(task_list_id=list_id, task_id=task_id, cascade=cascade)
return {
"success": True,
"task_list_id": list_id,
"task_id": task_id,
"deleted": True,
"cascade": cascade,
"message": f"Template entry #{task_id} removed.",
}
return execute
def _make_list_executor(store: TaskStore, list_id: str):
async def execute(inputs: dict) -> dict[str, Any]:
records = await store.list_tasks(list_id)
return {
"success": True,
"task_list_id": list_id,
"count": len(records),
"tasks": [_serialize_task(r) for r in records],
}
return execute
def build_colony_template_tools(
*,
colony_id: str,
store: TaskStore | None = None,
) -> list[tuple[Tool, Any]]:
s = store or get_task_store()
list_id = colony_task_list_id(colony_id)
return [
(
Tool(
name="colony_template_add",
description=_ADD_DESC,
parameters=_add_schema(),
concurrency_safe=False,
),
_make_add_executor(s, list_id),
),
(
Tool(
name="colony_template_update",
description=_UPDATE_DESC,
parameters=_update_schema(),
concurrency_safe=False,
),
_make_update_executor(s, list_id),
),
(
Tool(
name="colony_template_remove",
description=_REMOVE_DESC,
parameters=_remove_schema(),
concurrency_safe=False,
),
_make_remove_executor(s, list_id),
),
(
Tool(
name="colony_template_list",
description=_LIST_DESC,
parameters=_list_schema(),
concurrency_safe=True,
),
_make_list_executor(s, list_id),
),
]
+74
View File
@@ -0,0 +1,74 @@
"""Wire task tools into a ToolRegistry.
The four session task tools are registered for every agent that gets a
ToolRegistry. The colony template tools are queen-only and registered
separately by ``register_colony_template_tools``.
"""
from __future__ import annotations
import logging
from typing import Any
from framework.loader.tool_registry import ToolRegistry
from framework.tasks.store import TaskStore
logger = logging.getLogger(__name__)
def _wrap_async_executor(async_executor):
"""Adapt an async executor to ToolRegistry's sync executor protocol.
ToolRegistry's executor expects ``Callable[[dict], Any]`` where Any may
be a coroutine; the registry awaits it. We just pass the coroutine
through.
"""
def executor(inputs: dict) -> Any:
return async_executor(inputs)
return executor
def register_task_tools(
registry: ToolRegistry,
*,
store: TaskStore | None = None,
) -> None:
"""Register the four session task tools on ``registry``.
Idempotent: re-registering overwrites the previous executor (which is
fine they share the same TaskStore singleton anyway).
"""
from framework.tasks.tools.session_tools import build_session_tools
pairs = build_session_tools(store=store)
for tool, async_executor in pairs:
registry.register(tool.name, tool, _wrap_async_executor(async_executor))
# Also stamp into the concurrency-safe set if appropriate so the
# parallel batch dispatcher knows it can fan reads out.
if tool.concurrency_safe and tool.name not in ToolRegistry.CONCURRENCY_SAFE_TOOLS:
# CONCURRENCY_SAFE_TOOLS is a frozenset; attribute is a frozenset
# at the class level, so we instead set the attribute on the Tool
# object itself (already done) and trust the dispatcher to read it.
pass
logger.debug("Registered task tools on %s", registry)
def register_colony_template_tools(
registry: ToolRegistry,
*,
colony_id: str,
store: TaskStore | None = None,
) -> None:
"""Register the queen-only colony_template_* tools on ``registry``.
Should only be called for the queen of a colony workers and queen-DM
do not get these tools.
"""
from framework.tasks.tools.colony_tools import build_colony_template_tools
pairs = build_colony_template_tools(colony_id=colony_id, store=store)
for tool, async_executor in pairs:
registry.register(tool.name, tool, _wrap_async_executor(async_executor))
logger.debug("Registered colony_template_* tools (colony_id=%s)", colony_id)
+465
View File
@@ -0,0 +1,465 @@
"""The four session task tools: task_create, task_update, task_list, task_get.
All four operate on the calling agent's OWN session list. They never touch
the colony template the queen has separate ``colony_template_*`` tools
for that (see ``colony_tools.py``).
Concurrency safety:
task_list, task_get -> concurrency_safe=True (pure reads)
task_create, task_update -> concurrency_safe=False (writes serialize)
"""
from __future__ import annotations
import logging
from typing import Any
from framework.llm.provider import Tool
from framework.tasks.events import (
emit_task_created,
emit_task_deleted,
emit_task_updated,
)
from framework.tasks.hooks import (
HOOK_TASK_COMPLETED,
HOOK_TASK_CREATED,
BlockingHookError,
run_task_hooks,
)
from framework.tasks.models import TaskRecord, TaskStatus
from framework.tasks.store import (
_UNSET_SENTINEL as _UNSET, # re-export for clarity
TaskStore,
get_task_store,
)
from framework.tasks.tools._context import (
current_agent_id,
current_task_list_id,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Schemas (Anthropic-style JSONSchema)
# ---------------------------------------------------------------------------
_TASK_STATUS_VALUES = ["pending", "in_progress", "completed", "deleted"]
def _create_schema() -> dict[str, Any]:
return {
"type": "object",
"properties": {
"subject": {
"type": "string",
"description": "Imperative title (e.g., 'Crawl target URLs').",
},
"description": {
"type": "string",
"description": "Brief description of what to do.",
},
"active_form": {
"type": "string",
"description": "Present-continuous label shown while in_progress (e.g., 'Crawling target URLs').",
},
"metadata": {
"type": "object",
"description": "Arbitrary key/value metadata. Use _internal=true to hide from task_list.",
},
},
"required": ["subject"],
}
def _update_schema() -> dict[str, Any]:
return {
"type": "object",
"properties": {
"id": {"type": "integer", "description": "Task id (the #N from task_list)."},
"subject": {"type": "string"},
"description": {"type": "string"},
"active_form": {"type": "string"},
"owner": {
"type": ["string", "null"],
"description": "Agent id of the owner. Null clears ownership.",
},
"status": {"type": "string", "enum": _TASK_STATUS_VALUES},
"add_blocks": {
"type": "array",
"items": {"type": "integer"},
"description": "Add task ids that this task blocks (bidirectional).",
},
"add_blocked_by": {
"type": "array",
"items": {"type": "integer"},
"description": "Add task ids that block this task (bidirectional).",
},
"metadata_patch": {
"type": "object",
"description": "Merge into metadata. Null values delete keys.",
},
},
"required": ["id"],
}
def _list_schema() -> dict[str, Any]:
return {"type": "object", "properties": {}}
def _get_schema() -> dict[str, Any]:
return {
"type": "object",
"properties": {"id": {"type": "integer"}},
"required": ["id"],
}
# ---------------------------------------------------------------------------
# Tool descriptions
# ---------------------------------------------------------------------------
_CREATE_DESC = (
"Create a task on your own session task list to break down and track "
"multi-step work. Use when you have 3+ distinct steps, non-trivial "
"planning, or the user explicitly asks for tracked progress. Capture "
"tasks IMMEDIATELY after receiving instructions — don't narrate intent. "
"DO NOT use this for: a single trivial task, purely conversational "
"replies, greetings, or work that fits in one tool call. The user "
"sees this list live in the right rail.\n\n"
"Fields:\n"
"- subject: short imperative title (e.g. 'Crawl target URLs').\n"
"- description: optional, slightly longer 'what to do' note.\n"
"- active_form: present-continuous label shown while in_progress (e.g. "
"'Crawling target URLs'). If omitted, the spinner shows the subject.\n"
"- metadata: optional KV. Set _internal=true to hide from task_list."
)
_UPDATE_DESC = (
"Update a task on your own session task list. Workflow:\n"
"- Mark a task `in_progress` BEFORE you start working on it.\n"
"- Mark it `completed` AS SOON as you finish it — never batch up "
" multiple completions to flush at the end.\n"
"- Set status='deleted' to drop a task that's no longer relevant.\n\n"
"ONLY mark `completed` when the task is FULLY done. If you hit errors, "
"blockers, or partial state, keep it `in_progress` and create a new "
"task describing what's blocking. Never mark completed with caveats; "
"if it's not done, it's not done.\n\n"
"Setting status='in_progress' without owner auto-fills your agent_id."
)
_LIST_DESC = (
"Show your session task list, sorted by id ascending. Internal tasks "
"(metadata._internal=true) and resolved blockers are filtered out. "
"**Prefer working on tasks in id order** (lowest first) — earlier "
"tasks usually set up context for later ones."
)
_GET_DESC = (
"Read the full record of one task (description, metadata, timestamps) "
"from your own session task list. Use this to refresh your view of a "
"task before updating it if you're not sure of current fields."
)
# ---------------------------------------------------------------------------
# Executors
# ---------------------------------------------------------------------------
def _resolve_list_id() -> str | None:
"""Pull the calling agent's session task_list_id from execution context."""
return current_task_list_id()
def _serialize_task(t: TaskRecord) -> dict[str, Any]:
return {
"id": t.id,
"subject": t.subject,
"description": t.description,
"active_form": t.active_form,
"owner": t.owner,
"status": t.status.value,
"blocks": list(t.blocks),
"blocked_by": list(t.blocked_by),
"metadata": dict(t.metadata),
"created_at": t.created_at,
"updated_at": t.updated_at,
}
def _make_create_executor(store: TaskStore):
async def execute(inputs: dict) -> dict[str, Any]:
list_id = _resolve_list_id()
if not list_id:
return {"success": False, "error": "No task_list_id resolved for this agent."}
agent_id = current_agent_id() or ""
kwargs = {
"subject": inputs["subject"],
"description": inputs.get("description", ""),
"active_form": inputs.get("active_form"),
"metadata": inputs.get("metadata") or {},
}
rec = await store.create_task(list_id, **kwargs)
# task_created hooks may block creation -> rollback by deleting.
try:
await run_task_hooks(
HOOK_TASK_CREATED,
task_list_id=list_id,
task=rec,
agent_id=agent_id,
)
except BlockingHookError as exc:
logger.warning("task_created hook blocked task #%s: %s", rec.id, exc)
await store.delete_task(list_id, rec.id)
return {"success": False, "error": f"Hook blocked task creation: {exc}"}
await emit_task_created(task_list_id=list_id, record=rec)
return {
"success": True,
"task_list_id": list_id,
"task_id": rec.id,
"message": f"Task #{rec.id} created successfully: {rec.subject}",
"task": _serialize_task(rec),
}
return execute
def _make_update_executor(store: TaskStore):
async def execute(inputs: dict) -> dict[str, Any]:
list_id = _resolve_list_id()
if not list_id:
return {"success": False, "error": "No task_list_id resolved for this agent."}
agent_id = current_agent_id() or ""
task_id = int(inputs["id"])
status_in = inputs.get("status")
# 'deleted' is a synthetic status — handle it as a separate path.
if status_in == "deleted":
deleted, cascade = await store.delete_task(list_id, task_id)
if not deleted:
return {
"success": False,
"task_list_id": list_id,
"task_id": task_id,
"message": f"Task #{task_id} not found (already deleted?)",
}
await emit_task_deleted(task_list_id=list_id, task_id=task_id, cascade=cascade)
return {
"success": True,
"task_list_id": list_id,
"task_id": task_id,
"deleted": True,
"cascade": cascade,
"message": f"Task #{task_id} deleted.",
}
# Auto-owner on in_progress.
owner_in = inputs.get("owner", _OwnerSentinel)
status_enum = TaskStatus(status_in) if status_in else None
if (
status_enum == TaskStatus.IN_PROGRESS
and owner_in is _OwnerSentinel
and agent_id
):
owner_in = agent_id
new, fields = await store.update_task(
list_id,
task_id,
subject=inputs.get("subject"),
description=inputs.get("description"),
active_form=inputs.get("active_form"),
owner=owner_in if owner_in is not _OwnerSentinel else _UNSET,
status=status_enum,
add_blocks=inputs.get("add_blocks"),
add_blocked_by=inputs.get("add_blocked_by"),
metadata_patch=inputs.get("metadata_patch"),
)
if new is None:
# "Task not found" is not an error — keep is_error=False semantics.
return {
"success": False,
"task_list_id": list_id,
"task_id": task_id,
"message": f"Task #{task_id} not found.",
}
# task_completed hooks fire on transition to completed; can block.
if status_enum == TaskStatus.COMPLETED and "status" in fields:
try:
await run_task_hooks(
HOOK_TASK_COMPLETED,
task_list_id=list_id,
task=new,
agent_id=agent_id,
)
except BlockingHookError as exc:
# Roll back the status transition and surface the error.
logger.warning("task_completed hook blocked #%s: %s", new.id, exc)
rb_new, _ = await store.update_task(list_id, task_id, status=TaskStatus.IN_PROGRESS)
return {
"success": False,
"task_list_id": list_id,
"task_id": task_id,
"message": f"Hook blocked completion of #{task_id}: {exc}",
"task": _serialize_task(rb_new) if rb_new else None,
}
if fields:
await emit_task_updated(task_list_id=list_id, record=new, fields=fields)
# Layer 4: tool-result steering. When a task just completed,
# peek at remaining work and append a focused next-step nudge.
# For hive's solo (non-claim) model, point at the lowest-id
# pending task or signal "all done".
message = f"Task #{task_id} updated. Fields changed: {', '.join(fields) or '(none)'}."
if status_enum == TaskStatus.COMPLETED and "status" in fields:
others = await store.list_tasks(list_id)
completed_ids = {r.id for r in others if r.status == TaskStatus.COMPLETED}
next_pending = next(
(
r
for r in others
if r.status == TaskStatus.PENDING
and not [b for b in r.blocked_by if b not in completed_ids]
),
None,
)
in_progress = [r for r in others if r.status == TaskStatus.IN_PROGRESS]
if in_progress:
names = ", ".join(f"#{r.id}" for r in in_progress[:3])
message += f" Still in progress: {names}."
elif next_pending is not None:
message += (
f' Next pending: #{next_pending.id}"{next_pending.subject}". '
f"Mark it in_progress before starting."
)
else:
message += (
" All tasks complete. Wrap up: report results to the user "
"and stop."
)
return {
"success": True,
"task_list_id": list_id,
"task_id": task_id,
"fields": fields,
"message": message,
"task": _serialize_task(new),
}
return execute
def _make_list_executor(store: TaskStore):
async def execute(inputs: dict) -> dict[str, Any]:
list_id = _resolve_list_id()
if not list_id:
return {"success": False, "error": "No task_list_id resolved for this agent."}
records = await store.list_tasks(list_id)
# Filter resolved blockers from the rendering so a completed
# blocker disappears from blocked_by.
completed_ids = {r.id for r in records if r.status == TaskStatus.COMPLETED}
rendered: list[str] = []
for r in records:
unresolved_blockers = [b for b in r.blocked_by if b not in completed_ids]
line_parts = [f"#{r.id}", f"[{r.status.value}]", r.subject]
if r.owner:
line_parts.append(f"({r.owner})")
if unresolved_blockers:
line_parts.append(
f"[blocked by {', '.join(f'#{b}' for b in unresolved_blockers)}]"
)
rendered.append(" ".join(line_parts))
return {
"success": True,
"task_list_id": list_id,
"count": len(records),
"lines": rendered,
"tasks": [_serialize_task(r) for r in records],
}
return execute
def _make_get_executor(store: TaskStore):
async def execute(inputs: dict) -> dict[str, Any]:
list_id = _resolve_list_id()
if not list_id:
return {"success": False, "error": "No task_list_id resolved for this agent."}
task_id = int(inputs["id"])
rec = await store.get_task(list_id, task_id)
if rec is None:
return {
"success": False,
"task_list_id": list_id,
"task_id": task_id,
"message": f"Task #{task_id} not found.",
}
return {
"success": True,
"task_list_id": list_id,
"task_id": task_id,
"task": _serialize_task(rec),
}
return execute
# Sentinels so we can distinguish "owner not provided" from "owner=null".
class _OwnerSentinel: # noqa: N801 — internal sentinel class
pass
# ---------------------------------------------------------------------------
# Public registration
# ---------------------------------------------------------------------------
def build_session_tools(
store: TaskStore | None = None,
) -> list[tuple[Tool, Any]]:
"""Build (Tool, executor) pairs for the four session task tools."""
s = store or get_task_store()
return [
(
Tool(
name="task_create",
description=_CREATE_DESC,
parameters=_create_schema(),
concurrency_safe=False,
),
_make_create_executor(s),
),
(
Tool(
name="task_update",
description=_UPDATE_DESC,
parameters=_update_schema(),
concurrency_safe=False,
),
_make_update_executor(s),
),
(
Tool(
name="task_list",
description=_LIST_DESC,
parameters=_list_schema(),
concurrency_safe=True,
),
_make_list_executor(s),
),
(
Tool(
name="task_get",
description=_GET_DESC,
parameters=_get_schema(),
concurrency_safe=True,
),
_make_get_executor(s),
),
]
@@ -1328,6 +1328,35 @@ def register_queen_lifecycle_tools(
_pinned,
)
# Publish a colony template entry per task BEFORE spawning so
# the entries' template ids can be threaded into the spawn data
# (workers' ctx.picked_up_from references them). This mirrors the
# plan §5d "auto-populated by run_parallel_workers" behavior.
_template_ids: list[int | None] = [None] * len(normalised)
try:
from framework.tasks import TaskListRole, get_task_store
from framework.tasks.scoping import colony_task_list_id
_task_store = get_task_store()
_template_list_id = colony_task_list_id(_colony_id or "primary")
await _task_store.ensure_task_list(_template_list_id, role=TaskListRole.TEMPLATE)
for i, spec in enumerate(normalised):
rec = await _task_store.create_task(
_template_list_id,
subject=spec["task"][:200],
description=spec["task"],
)
_template_ids[i] = rec.id
# Thread the template id into the worker's spawn data so
# ColonyRuntime.spawn populates ctx.picked_up_from correctly.
spec["data"] = dict(spec.get("data") or {})
spec["data"]["__template_task_id"] = rec.id
except Exception:
logger.warning(
"run_parallel_workers: colony template publish failed (non-fatal)",
exc_info=True,
)
try:
worker_ids = await colony.spawn_batch(
normalised,
@@ -1336,6 +1365,33 @@ def register_queen_lifecycle_tools(
except Exception as e:
return json.dumps({"error": f"spawn_batch failed: {e}"})
# Stamp `assigned_session` on each template entry post-spawn so the
# UI's colony-overview panel can render the assigned-session chip.
try:
from framework.tasks.events import emit_colony_template_assignment
from framework.tasks.scoping import session_task_list_id
for tid, wid in zip(_template_ids, worker_ids, strict=False):
if tid is None:
continue
_assigned = session_task_list_id(wid, wid)
await _task_store.update_task(
_template_list_id,
tid,
metadata_patch={
"assigned_session": _assigned,
"assigned_worker_id": wid,
},
)
await emit_colony_template_assignment(
colony_id=_colony_id or "primary",
task_id=tid,
assigned_session=_assigned,
assigned_worker_id=wid,
)
except Exception:
logger.debug("run_parallel_workers: failed to stamp template assignments", exc_info=True)
# Phase transition — workers are now live, queen is in "working"
# phase. Worker-finish auto-transitions back to "reviewing" once
# every worker has reported (see queen_orchestrator._on_worker_report).
+151
View File
@@ -0,0 +1,151 @@
/**
* REST + types for the task system.
*
* Two list types:
* colony:{colony_id} — colony template (queen's spawn plan)
* session:{agent_id}:{sess_id} — per-session working list
*
* Each agent operates on its OWN session list via the four task tools;
* the colony template is queen-owned and read by the UI.
*/
import { api, ApiError } from "./client";
export type TaskStatus = "pending" | "in_progress" | "completed";
export type TaskListRole = "template" | "session";
export interface TaskRecord {
id: number;
subject: string;
description: string;
active_form: string | null;
owner: string | null;
status: TaskStatus;
blocks: number[];
blocked_by: number[];
metadata: Record<string, unknown>;
created_at: number;
updated_at: number;
}
export interface TaskListSnapshot {
task_list_id: string;
role: TaskListRole;
meta: {
task_list_id: string;
role: TaskListRole;
creator_agent_id: string | null;
created_at: number;
last_seen_session_ids: string[];
schema_version: number;
} | null;
tasks: TaskRecord[];
}
export interface ColonyTaskLists {
template_task_list_id: string;
queen_session_task_list_id: string | null;
}
export interface SessionTaskListInfo {
task_list_id: string | null;
picked_up_from: { colony_id: string; task_id: number } | null;
}
export const tasksApi = {
/**
* Snapshot of one task list, identified by its full task_list_id.
*
* Returns ``null`` if the list does not exist on disk yet (404). That
* happens when a session has just started and no agent has called
* ``task_create`` — the panel should hide until the first task is
* created instead of surfacing the 404 as an error.
*/
async getList(taskListId: string): Promise<TaskListSnapshot | null> {
try {
return await api.get<TaskListSnapshot>(`/tasks/${encodeURIComponent(taskListId)}`);
} catch (err) {
if (err instanceof ApiError && err.status === 404) return null;
throw err;
}
},
/** Helper: resolve template + queen-session list ids for a colony. */
async getColonyLists(
colonyId: string,
queenSessionId?: string,
): Promise<ColonyTaskLists> {
const qs = queenSessionId ? `?queen_session_id=${encodeURIComponent(queenSessionId)}` : "";
return api.get<ColonyTaskLists>(`/colonies/${encodeURIComponent(colonyId)}/task_lists${qs}`);
},
/** Helper: resolve task_list_id + picked_up_from for a session. */
async getSessionInfo(
sessionId: string,
agentId: string = "queen",
): Promise<SessionTaskListInfo> {
return api.get<SessionTaskListInfo>(
`/sessions/${encodeURIComponent(sessionId)}/task_list_id?agent_id=${encodeURIComponent(agentId)}`,
);
},
};
// ---------------------------------------------------------------------------
// SSE event payload shapes
// ---------------------------------------------------------------------------
export interface TaskCreatedEvent {
task_list_id: string;
task: TaskRecord;
}
export interface TaskUpdatedEvent {
task_list_id: string;
task_id: number;
after: TaskRecord;
fields: string[];
}
export interface TaskDeletedEvent {
task_list_id: string;
task_id: number;
cascade: number[];
}
export interface ColonyTemplateAssignmentEvent {
colony_id: string;
task_id: number;
assigned_session: string | null;
assigned_worker_id: string | null;
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/** Parse a task_list_id into structured parts (mirrors server-side scoping). */
export function parseTaskListId(taskListId: string): {
kind: "colony" | "session" | "raw";
colony_id?: string;
agent_id?: string;
session_id?: string;
raw?: string;
} {
if (taskListId.startsWith("colony:")) {
return { kind: "colony", colony_id: taskListId.slice("colony:".length) };
}
if (taskListId.startsWith("session:")) {
const rest = taskListId.slice("session:".length);
const idx = rest.indexOf(":");
return idx > 0
? { kind: "session", agent_id: rest.slice(0, idx), session_id: rest.slice(idx + 1) }
: { kind: "raw", raw: taskListId };
}
return { kind: "raw", raw: taskListId };
}
export function colonyTaskListId(colonyId: string): string {
return `colony:${colonyId}`;
}
export function sessionTaskListId(agentId: string, sessionId: string): string {
return `session:${agentId}:${sessionId}`;
}
+7 -1
View File
@@ -287,7 +287,13 @@ export type EventTypeName =
| "trigger_fired"
| "trigger_removed"
| "trigger_updated"
| "queen_identity_selected";
| "queen_identity_selected"
| "task_created"
| "task_updated"
| "task_deleted"
| "task_list_reset"
| "task_list_reattach_mismatch"
| "colony_template_assignment";
export interface AgentEvent {
type: EventTypeName;
@@ -35,6 +35,10 @@ import { sessionsApi } from "@/api/sessions";
import { cronToLabel } from "@/lib/graphUtils";
import type { GraphNode } from "@/components/graph-types";
import { useColonyWorkers } from "@/context/ColonyWorkersContext";
import TaskListPanel from "@/components/TaskListPanel";
// Re-export so the WorkerDetail block can use it without forward decl.
const TaskListPanelLazy = TaskListPanel;
import { DataGrid, type SortDir } from "@/components/data-grid";
interface ColonyWorkersPanelProps {
@@ -1607,6 +1611,11 @@ function WorkerDetail({
)}
</div>
{/* Worker session task list — embedded panel, not a separate rail. */}
<div className="mb-3">
<WorkerTaskList workerId={workerId} colonyName={colonyName} />
</div>
{isHistorical ? (
<HistoricalWorkerPlaceholder workerId={workerId} />
) : (
@@ -1616,6 +1625,28 @@ function WorkerDetail({
);
}
function WorkerTaskList({
workerId,
colonyName: _colonyName,
}: {
workerId: string;
colonyName: string | null;
}) {
// Workers' task_list_id is session:{worker_id}:{worker_id} (the worker's
// session_id == its worker_id under ColonyRuntime.spawn). The SSE
// events for it ride on the colony's bus, which we subscribe to via
// the queen's session id (already streaming in this view).
const { sessionId } = useColonyWorkers();
return (
<TaskListPanelLazy
taskListId={`session:${workerId}:${workerId}`}
sessionId={sessionId ?? ""}
title="Worker session"
variant="embedded"
/>
);
}
function LiveWorkerProgress({
colonyName,
workerId,
+102
View File
@@ -0,0 +1,102 @@
import { useState } from "react";
import { Check, Circle, Hourglass, Loader2 } from "lucide-react";
import type { TaskRecord, TaskStatus } from "@/api/tasks";
interface TaskItemProps {
task: TaskRecord;
unresolvedBlockers: number[];
onJumpToBlocker?: (id: number) => void;
}
const STATUS_ICON: Record<TaskStatus, JSX.Element> = {
in_progress: (
<Loader2 className="h-3.5 w-3.5 animate-spin text-amber-500" aria-label="in progress" />
),
pending: <Circle className="h-3.5 w-3.5 text-muted-foreground" aria-label="pending" />,
completed: <Check className="h-3.5 w-3.5 text-emerald-600" aria-label="completed" />,
};
function elapsedSince(ts: number): string {
const now = Date.now() / 1000;
const diff = Math.max(0, now - ts);
if (diff < 60) return `${Math.floor(diff)}s`;
if (diff < 3600) return `${Math.floor(diff / 60)}m ${Math.floor(diff % 60)}s`;
return `${Math.floor(diff / 3600)}h ${Math.floor((diff % 3600) / 60)}m`;
}
export default function TaskItem({ task, unresolvedBlockers, onJumpToBlocker }: TaskItemProps) {
const [expanded, setExpanded] = useState(false);
const isBlocked = task.status === "pending" && unresolvedBlockers.length > 0;
const elapsed = task.status === "in_progress" ? elapsedSince(task.updated_at) : null;
return (
<li className="group">
<button
type="button"
onClick={() => setExpanded((v) => !v)}
className="w-full text-left flex items-start gap-2 px-2 py-1.5 rounded hover:bg-muted/50 focus:bg-muted/60 focus:outline-none"
>
<span className="mt-0.5 flex-shrink-0">
{isBlocked ? (
<Hourglass
className="h-3.5 w-3.5 text-muted-foreground/70"
aria-label="waiting on dependency"
/>
) : (
STATUS_ICON[task.status]
)}
</span>
<span className="flex-1 min-w-0">
<span className="text-sm flex items-baseline gap-1.5">
<span className="text-muted-foreground tabular-nums">#{task.id}</span>
<span className="truncate">
{task.status === "in_progress" && task.active_form
? task.active_form
: task.subject}
</span>
</span>
<span className="flex items-center gap-2 text-xs text-muted-foreground mt-0.5">
{task.owner ? (
<span className="rounded bg-muted px-1.5 py-0.5">{task.owner.slice(0, 12)}</span>
) : null}
{elapsed ? <span>{elapsed}</span> : null}
{unresolvedBlockers.length > 0 ? (
<span>
blocked by{" "}
{unresolvedBlockers.map((b, idx) => (
<span key={b}>
<button
type="button"
className="text-foreground/70 hover:underline"
onClick={(e) => {
e.stopPropagation();
onJumpToBlocker?.(b);
}}
>
#{b}
</button>
{idx < unresolvedBlockers.length - 1 ? ", " : ""}
</span>
))}
</span>
) : null}
</span>
</span>
</button>
{expanded ? (
<div className="ml-7 mb-2 text-xs text-muted-foreground space-y-1">
{task.description ? <p className="whitespace-pre-wrap">{task.description}</p> : null}
{task.metadata && Object.keys(task.metadata).length > 0 ? (
<pre className="text-[10px] bg-muted/40 rounded p-2 overflow-x-auto">
{JSON.stringify(task.metadata, null, 2)}
</pre>
) : null}
<p className="text-[10px]">
updated {new Date(task.updated_at * 1000).toLocaleString()}
</p>
</div>
) : null}
</li>
);
}
@@ -0,0 +1,274 @@
/**
* Task list panel — renders one task list (queen-DM session, colony
* template, or worker session). Variants:
*
* variant="rail" -> right-rail panel with header & close button
* variant="embedded" -> inline (e.g., inside WorkerDetail)
*/
import { useRef, useState } from "react";
import { ChevronDown, ChevronRight, X } from "lucide-react";
import {
TaskListProvider,
useTaskList,
bucketTasks,
unresolvedBlockers,
} from "@/context/TaskListContext";
import TaskItem from "@/components/TaskItem";
import type { TaskRecord } from "@/api/tasks";
interface TaskListPanelProps {
taskListId: string;
sessionId?: string;
/** Override the default header label. */
title?: string;
variant?: "rail" | "embedded";
onClose?: () => void;
}
export default function TaskListPanel(props: TaskListPanelProps) {
return (
<TaskListProvider taskListId={props.taskListId} sessionId={props.sessionId}>
<TaskListPanelInner {...props} />
</TaskListProvider>
);
}
function TaskListPanelInner({ title, variant = "rail", onClose }: TaskListPanelProps) {
const { tasks, loading, error, role, exists } = useTaskList();
const buckets = bucketTasks(tasks);
// Don't render anything when the list doesn't exist yet AND we're in
// the rail variant (queen-DM session that hasn't created any task).
// The embedded variant always shows so the section in WorkerDetail/
// colony overview keeps a stable layout.
if (!loading && !exists && variant === "rail") return null;
const [activeOpen, setActiveOpen] = useState(true);
const [pendingOpen, setPendingOpen] = useState(true);
const [completedOpen, setCompletedOpen] = useState(false);
const itemRefs = useRef(new Map<number, HTMLLIElement>());
const handleJumpToBlocker = (id: number) => {
const node = itemRefs.current.get(id);
if (!node) return;
node.scrollIntoView({ behavior: "smooth", block: "center" });
node.classList.add("ring-2", "ring-primary/40");
setTimeout(() => node.classList.remove("ring-2", "ring-primary/40"), 1500);
};
const headerLabel =
title ??
(role === "template"
? "Colony plan"
: role === "session"
? "Tasks"
: "Tasks");
const inProgressCount = buckets.active.length;
const totalVisible = buckets.visible.length;
return (
<aside
className={
variant === "rail"
? "w-[320px] flex-shrink-0 border-l border-border bg-background flex flex-col h-full overflow-hidden"
: "w-full border border-border rounded-md bg-background flex flex-col"
}
>
<div className="flex items-center justify-between px-3 py-2 border-b border-border">
<h2 className="text-sm font-semibold flex items-center gap-2">
<span>{headerLabel}</span>
<span className="text-xs text-muted-foreground tabular-nums">
{inProgressCount}/{totalVisible}
</span>
</h2>
{onClose ? (
<button
type="button"
onClick={onClose}
className="text-muted-foreground hover:text-foreground"
aria-label="Close"
>
<X className="h-4 w-4" />
</button>
) : null}
</div>
<div className="flex-1 overflow-y-auto p-2">
{loading ? (
<p className="text-xs text-muted-foreground p-2">Loading</p>
) : error ? (
<p className="text-xs text-destructive p-2">Error: {error}</p>
) : totalVisible === 0 ? (
<p className="text-xs text-muted-foreground p-2">
{role === "template"
? "No template entries yet. The queen will populate this when planning a fan-out."
: "No tasks yet. The agent will create them as it plans."}
</p>
) : (
<>
<Section
label="Active"
count={buckets.active.length}
open={activeOpen}
onToggle={() => setActiveOpen((v) => !v)}
>
{buckets.active.map((t) => (
<RefItem
key={t.id}
task={t}
itemRefs={itemRefs}
unresolved={unresolvedBlockers(t, buckets.completedIds)}
onJumpToBlocker={handleJumpToBlocker}
/>
))}
</Section>
<Section
label="Pending"
count={buckets.pending.length}
open={pendingOpen}
onToggle={() => setPendingOpen((v) => !v)}
>
{buckets.pending.map((t) => (
<RefItem
key={t.id}
task={t}
itemRefs={itemRefs}
unresolved={unresolvedBlockers(t, buckets.completedIds)}
onJumpToBlocker={handleJumpToBlocker}
/>
))}
</Section>
<Section
label="Completed"
count={buckets.completed.length}
open={completedOpen}
onToggle={() => setCompletedOpen((v) => !v)}
>
{buckets.completed.map((t) => (
<RefItem
key={t.id}
task={t}
itemRefs={itemRefs}
unresolved={[]}
onJumpToBlocker={handleJumpToBlocker}
/>
))}
</Section>
</>
)}
</div>
</aside>
);
}
function Section({
label,
count,
open,
onToggle,
children,
}: {
label: string;
count: number;
open: boolean;
onToggle: () => void;
children: React.ReactNode;
}) {
if (count === 0) return null;
return (
<div className="mb-2">
<button
type="button"
onClick={onToggle}
className="flex items-center gap-1 text-xs font-medium text-muted-foreground px-2 py-1 hover:text-foreground"
>
{open ? <ChevronDown className="h-3 w-3" /> : <ChevronRight className="h-3 w-3" />}
<span>{label}</span>
<span className="tabular-nums">({count})</span>
</button>
{open ? <ul className="space-y-0.5">{children}</ul> : null}
</div>
);
}
function RefItem({
task,
itemRefs,
unresolved,
onJumpToBlocker,
}: {
task: TaskRecord;
itemRefs: React.MutableRefObject<Map<number, HTMLLIElement>>;
unresolved: number[];
onJumpToBlocker: (id: number) => void;
}) {
return (
<li
ref={(el) => {
if (el) itemRefs.current.set(task.id, el);
else itemRefs.current.delete(task.id);
}}
className="rounded transition-shadow"
>
<TaskItem
task={task}
unresolvedBlockers={unresolved}
onJumpToBlocker={onJumpToBlocker}
/>
</li>
);
}
// ---------------------------------------------------------------------------
// Stacked variant: two TaskListPanels (colony template + queen session).
// Used in the colony chat right rail.
// ---------------------------------------------------------------------------
interface TaskListPanelStackedProps {
templateTaskListId: string;
queenSessionTaskListId: string | null;
sessionId: string;
onClose?: () => void;
}
export function TaskListPanelStacked(props: TaskListPanelStackedProps) {
return (
<aside className="w-[320px] flex-shrink-0 border-l border-border bg-background flex flex-col h-full overflow-hidden">
<div className="flex items-center justify-between px-3 py-2 border-b border-border">
<h2 className="text-sm font-semibold">Tasks</h2>
{props.onClose ? (
<button
type="button"
onClick={props.onClose}
className="text-muted-foreground hover:text-foreground"
aria-label="Close"
>
<X className="h-4 w-4" />
</button>
) : null}
</div>
<div className="flex-1 min-h-0 flex flex-col overflow-hidden">
<div className="flex-1 min-h-0 overflow-hidden border-b border-border">
<TaskListPanel
taskListId={props.templateTaskListId}
sessionId={props.sessionId}
title="Colony plan"
variant="embedded"
/>
</div>
{props.queenSessionTaskListId ? (
<div className="flex-1 min-h-0 overflow-hidden">
<TaskListPanel
taskListId={props.queenSessionTaskListId}
sessionId={props.sessionId}
title="Queen's notes"
variant="embedded"
/>
</div>
) : null}
</div>
</aside>
);
}
@@ -0,0 +1,213 @@
/**
* Per-list live task state. Mounts a single list (snapshot + SSE diffs).
*
* Stack two of these for the colony-overview view (template + queen
* session). Mount one for the queen-DM and worker-detail views.
*/
import {
createContext,
useContext,
useEffect,
useMemo,
useReducer,
useRef,
type ReactNode,
} from "react";
import {
tasksApi,
type TaskRecord,
type TaskListRole,
type TaskCreatedEvent,
type TaskUpdatedEvent,
type TaskDeletedEvent,
} from "@/api/tasks";
import { useSSE } from "@/hooks/use-sse";
import type { AgentEvent } from "@/api/types";
interface TaskListState {
taskListId: string;
role: TaskListRole | "unknown";
tasks: TaskRecord[];
loading: boolean;
error: string | null;
/** False until the list exists on disk. Sessions that haven't created
* any task yet return 404 from the snapshot endpoint; the panel
* should hide rather than render an error. Becomes true on first
* successful snapshot or on the first task_created event. */
exists: boolean;
}
type Action =
| { type: "SNAPSHOT"; tasks: TaskRecord[]; role: TaskListRole }
| { type: "LOADING" }
| { type: "NOT_FOUND" }
| { type: "ERROR"; error: string }
| { type: "CREATED"; task: TaskRecord }
| { type: "UPDATED"; task: TaskRecord }
| { type: "DELETED"; taskId: number; cascade: number[] };
function reducer(state: TaskListState, action: Action): TaskListState {
switch (action.type) {
case "LOADING":
return { ...state, loading: true, error: null };
case "NOT_FOUND":
return { ...state, loading: false, error: null, exists: false, tasks: [] };
case "ERROR":
return { ...state, loading: false, error: action.error };
case "SNAPSHOT":
return {
...state,
tasks: action.tasks,
role: action.role,
loading: false,
error: null,
exists: true,
};
case "CREATED": {
// First task_created event for a previously-empty session marks
// the list as existing — the panel will reveal itself live.
if (state.tasks.some((t) => t.id === action.task.id)) {
return { ...state, exists: true };
}
const next = [...state.tasks, action.task].sort((a, b) => a.id - b.id);
return { ...state, tasks: next, exists: true };
}
case "UPDATED": {
const next = state.tasks.map((t) => (t.id === action.task.id ? action.task : t));
return { ...state, tasks: next, exists: true };
}
case "DELETED": {
const surviving = state.tasks
.filter((t) => t.id !== action.taskId)
.map((t) => {
if (action.cascade.includes(t.id)) {
return {
...t,
blocks: t.blocks.filter((b) => b !== action.taskId),
blocked_by: t.blocked_by.filter((b) => b !== action.taskId),
};
}
return t;
});
return { ...state, tasks: surviving };
}
default:
return state;
}
}
const initial: TaskListState = {
taskListId: "",
role: "unknown",
tasks: [],
loading: false,
error: null,
exists: false,
};
const TaskListContext = createContext<TaskListState | undefined>(undefined);
interface TaskListProviderProps {
taskListId: string;
// SSE source — the queen session id is a reasonable default; events for
// a list are published on the colony's bus. If `sessionId` is missing,
// the panel renders the snapshot but doesn't subscribe to live diffs.
sessionId?: string;
children: ReactNode;
}
const TASK_EVENT_TYPES = [
"task_created",
"task_updated",
"task_deleted",
"task_list_reset",
] as const;
export function TaskListProvider({ taskListId, sessionId, children }: TaskListProviderProps) {
const [state, dispatch] = useReducer(reducer, { ...initial, taskListId });
const taskListIdRef = useRef(taskListId);
taskListIdRef.current = taskListId;
// Snapshot fetch — re-run when taskListId changes.
useEffect(() => {
if (!taskListId) return;
let cancelled = false;
dispatch({ type: "LOADING" });
tasksApi
.getList(taskListId)
.then((snap) => {
if (cancelled) return;
if (snap === null) {
// Not yet on disk — the panel hides until the first task_created
// event arrives via SSE (see CREATED case in the reducer).
dispatch({ type: "NOT_FOUND" });
} else {
dispatch({ type: "SNAPSHOT", tasks: snap.tasks, role: snap.role });
}
})
.catch((err) => {
if (cancelled) return;
dispatch({ type: "ERROR", error: String(err?.message ?? err) });
});
return () => {
cancelled = true;
};
}, [taskListId]);
// Subscribe to SSE diffs scoped to this list_id.
useSSE({
sessionId: sessionId ?? "",
eventTypes: TASK_EVENT_TYPES as unknown as AgentEvent["type"][],
enabled: Boolean(sessionId),
onEvent: (ev) => {
const data = ev.data ?? {};
if (data.task_list_id !== taskListIdRef.current) return;
switch (ev.type) {
case "task_created":
dispatch({ type: "CREATED", task: (data as unknown as TaskCreatedEvent).task });
return;
case "task_updated":
dispatch({ type: "UPDATED", task: (data as unknown as TaskUpdatedEvent).after });
return;
case "task_deleted": {
const d = data as unknown as TaskDeletedEvent;
dispatch({ type: "DELETED", taskId: d.task_id, cascade: d.cascade ?? [] });
return;
}
case "task_list_reset":
dispatch({ type: "SNAPSHOT", tasks: [], role: state.role === "unknown" ? "session" : state.role });
return;
}
},
});
return <TaskListContext.Provider value={state}>{children}</TaskListContext.Provider>;
}
export function useTaskList(): TaskListState {
const ctx = useContext(TaskListContext);
if (!ctx) throw new Error("useTaskList must be used inside <TaskListProvider>");
return ctx;
}
// Helpers for components that want pre-bucketed views.
export function bucketTasks(tasks: TaskRecord[]) {
const completedIds = new Set(tasks.filter((t) => t.status === "completed").map((t) => t.id));
const visible = tasks.filter((t) => !(t.metadata as { _internal?: boolean })._internal);
const active = visible.filter((t) => t.status === "in_progress");
const pending = visible.filter((t) => t.status === "pending");
const completed = visible.filter((t) => t.status === "completed");
return { active, pending, completed, completedIds, visible };
}
export function unresolvedBlockers(task: TaskRecord, completedIds: Set<number>): number[] {
return task.blocked_by.filter((b) => !completedIds.has(b));
}
export const TASK_LIST_PANEL_LOCALSTORAGE_KEY = (taskListId: string) =>
`taskListPanel.${taskListId}`;
export const useMemoizedBuckets = (tasks: TaskRecord[]) =>
useMemo(() => bucketTasks(tasks), [tasks]);
+41 -1
View File
@@ -4,6 +4,8 @@ import Sidebar from "@/components/Sidebar";
import AppHeader from "@/components/AppHeader";
import QueenProfilePanel from "@/components/QueenProfilePanel";
import ColonyWorkersPanel from "@/components/ColonyWorkersPanel";
import TaskListPanel, { TaskListPanelStacked } from "@/components/TaskListPanel";
import { sessionTaskListId, colonyTaskListId } from "@/api/tasks";
import { ColonyProvider, useColony } from "@/context/ColonyContext";
import { HeaderActionsProvider } from "@/context/HeaderActionsContext";
import { QueenProfileProvider } from "@/context/QueenProfileContext";
@@ -64,7 +66,44 @@ function LayoutShell({
}) {
const { sessionId, colonyName, dismissed, toggleColonyWorkers } =
useColonyWorkers();
const showWorkersPanel = Boolean(sessionId && !dismissed);
// Workers panel is colony-only — queen-DM may publish a sessionId for
// the tasks panel below, but we don't want the workers panel showing
// up there (no workers exist).
const showWorkersPanel = Boolean(sessionId && colonyName && !dismissed);
const location = useLocation();
const [taskPanelDismissed, setTaskPanelDismissed] = useState(false);
// Determine which task panel to show based on the current route.
// queen-DM (/queen/...) -> single TaskListPanel for queen session
// colony chat (/colony/{name}) -> stacked (template + queen session)
// anywhere else -> hidden
const isColony = location.pathname.startsWith("/colony/");
const isQueenDm = location.pathname.startsWith("/queen/");
const showTasksPanel = !taskPanelDismissed && Boolean(sessionId) && (isQueenDm || isColony);
let tasksPanel: ReactNode = null;
if (showTasksPanel && sessionId) {
if (isColony) {
const colonyId = colonyName ?? location.pathname.replace("/colony/", "");
tasksPanel = (
<TaskListPanelStacked
templateTaskListId={colonyTaskListId(colonyId)}
queenSessionTaskListId={sessionTaskListId("queen", sessionId)}
sessionId={sessionId}
onClose={() => setTaskPanelDismissed(true)}
/>
);
} else {
tasksPanel = (
<TaskListPanel
taskListId={sessionTaskListId("queen", sessionId)}
sessionId={sessionId}
variant="rail"
onClose={() => setTaskPanelDismissed(true)}
/>
);
}
}
return (
<div className="flex h-screen bg-background overflow-hidden">
@@ -89,6 +128,7 @@ function LayoutShell({
onClose={toggleColonyWorkers}
/>
)}
{tasksPanel}
</div>
</div>
</div>
+11
View File
@@ -18,6 +18,7 @@ import {
replayEventsToMessages,
} from "@/lib/chat-helpers";
import { useColony } from "@/context/ColonyContext";
import { useColonyWorkers } from "@/context/ColonyWorkersContext";
import { useHeaderActions } from "@/context/HeaderActionsContext";
import { getQueenForAgent, slugToColonyId } from "@/lib/colony-registry";
@@ -119,6 +120,16 @@ export default function QueenDM() {
"independent" | "incubating" | "working" | "reviewing"
>("independent");
// Publish the active session id into the shared workers/tasks context
// so AppLayout's right-rail TaskListPanel can attach to it. The colony
// workers panel itself stays hidden in queen-DM because we don't set
// colonyName (AppLayout requires both — see LayoutShell).
const { setSessionId: setCtxSessionId } = useColonyWorkers();
useEffect(() => {
setCtxSessionId(sessionId ?? null);
return () => setCtxSessionId(null);
}, [sessionId, setCtxSessionId]);
const resetViewState = useCallback(() => {
setSessionId(null);
setMessages([]);