diff --git a/core/framework/host/colony_runtime.py b/core/framework/host/colony_runtime.py index 1fb6296d..030c2ce4 100644 --- a/core/framework/host/colony_runtime.py +++ b/core/framework/host/colony_runtime.py @@ -964,9 +964,7 @@ class ColonyRuntime: _worker_list_id = _session_list_id(worker_id, worker_id) _picked_up = None - _template_id = ( - input_data.get("__template_task_id") if isinstance(input_data, dict) else None - ) + _template_id = input_data.get("__template_task_id") if isinstance(input_data, dict) else None if _template_id is not None: try: _picked_up = (_colony_list_id(self._colony_id), int(_template_id)) diff --git a/core/framework/host/worker.py b/core/framework/host/worker.py index 64e48ab6..dddc8eca 100644 --- a/core/framework/host/worker.py +++ b/core/framework/host/worker.py @@ -158,10 +158,7 @@ class Worker: ctx = self._context agent_id = getattr(ctx, "agent_id", None) or self.id - list_id = ( - getattr(ctx, "task_list_id", None) - or session_task_list_id(agent_id, self.id) - ) + list_id = getattr(ctx, "task_list_id", None) or session_task_list_id(agent_id, self.id) ToolRegistry.set_execution_context( profile=self.id, agent_id=agent_id, diff --git a/core/framework/server/queen_orchestrator.py b/core/framework/server/queen_orchestrator.py index 52e52602..feec93a8 100644 --- a/core/framework/server/queen_orchestrator.py +++ b/core/framework/server/queen_orchestrator.py @@ -938,9 +938,8 @@ async def create_queen( queen_agent_id = getattr(session, "agent_id", None) or "queen" queen_list_id = session_task_list_id(queen_agent_id, session.id) - colony_id = ( - getattr(session, "colony_id", None) - or getattr(getattr(session, "colony_runtime", None), "_colony_id", None) + colony_id = getattr(session, "colony_id", None) or getattr( + getattr(session, "colony_runtime", None), "_colony_id", None ) ToolRegistry.set_execution_context( profile=session.id, diff --git a/core/framework/server/routes_tasks.py b/core/framework/server/routes_tasks.py index 85d90b71..c9d1aed6 100644 --- a/core/framework/server/routes_tasks.py +++ b/core/framework/server/routes_tasks.py @@ -73,9 +73,7 @@ async def handle_get_colony_task_lists(request: web.Request) -> web.Response: # browser-facing colony session id. The frontend already knows that # value; we surface what we have on disk for completeness. queen_session_id = request.query.get("queen_session_id") - queen_list_id = ( - session_task_list_id("queen", queen_session_id) if queen_session_id else None - ) + queen_list_id = session_task_list_id("queen", queen_session_id) if queen_session_id else None return web.json_response( { "template_task_list_id": template_id, diff --git a/core/framework/tasks/reminders.py b/core/framework/tasks/reminders.py index 261ca1af..f497d9e6 100644 --- a/core/framework/tasks/reminders.py +++ b/core/framework/tasks/reminders.py @@ -86,22 +86,17 @@ def build_reminder(records: list[TaskRecord]) -> str: "before starting the next step. Don't batch completions.", " - If you've finished work that wasn't on the list, add a " "task_create + task_update completed pair so the panel reflects it.", - " - If you're umbrella-tracking ('reply to all posts' as one " - "task), break it into one task per atomic action.", + " - If you're umbrella-tracking ('reply to all posts' as one task), break it into one task per atomic action.", ] if in_progress: bullets.append( " - Currently in_progress (consider whether they're really " - "still active): " - + ", ".join(f'#{r.id} "{r.subject}"' for r in in_progress[:5]) + "still active): " + ", ".join(f'#{r.id} "{r.subject}"' for r in in_progress[:5]) ) listing = ["", "Open tasks:"] for r in open_[:10]: listing.append(f" #{r.id} [{r.status.value}] {r.subject}") if len(open_) > 10: listing.append(f" ... and {len(open_) - 10} more") - listing.append( - "\nOnly act on this if relevant to the current work. NEVER mention " - "this reminder to the user." - ) + listing.append("\nOnly act on this if relevant to the current work. NEVER mention this reminder to the user.") return "\n".join([head, *bullets, *listing]) diff --git a/core/framework/tasks/scoping.py b/core/framework/tasks/scoping.py index ebe7d60c..b4ce82b5 100644 --- a/core/framework/tasks/scoping.py +++ b/core/framework/tasks/scoping.py @@ -65,10 +65,7 @@ def resolve_task_list_id(ctx: Any) -> str: agent_id = getattr(ctx, "agent_id", None) or "" session_id = ( - getattr(ctx, "run_id", None) - or getattr(ctx, "execution_id", None) - or getattr(ctx, "stream_id", None) - or "" + getattr(ctx, "run_id", None) or getattr(ctx, "execution_id", None) or getattr(ctx, "stream_id", None) or "" ) if agent_id and session_id: return session_task_list_id(agent_id, session_id) diff --git a/core/framework/tasks/store.py b/core/framework/tasks/store.py index 6c571c75..46de2181 100644 --- a/core/framework/tasks/store.py +++ b/core/framework/tasks/store.py @@ -140,14 +140,13 @@ class TaskStore: first time the list is read. Until then, we still want to expose the list's tasks via REST. """ + def _check() -> bool: root = self._list_root(task_list_id) if (root / "meta.json").exists(): return True tasks_dir = root / "tasks" - if tasks_dir.exists() and any( - p.suffix == ".json" for p in tasks_dir.iterdir() - ): + if tasks_dir.exists() and any(p.suffix == ".json" for p in tasks_dir.iterdir()): return True return False @@ -405,11 +404,7 @@ class TaskStore: # ensure_task_list call); without this backfill the REST # endpoint can't discover them. Role is inferred from prefix. if not self._meta_path(task_list_id).exists(): - inferred_role = ( - TaskListRole.TEMPLATE - if task_list_id.startswith("colony:") - else TaskListRole.SESSION - ) + inferred_role = TaskListRole.TEMPLATE if task_list_id.startswith("colony:") else TaskListRole.SESSION self._write_meta_sync( task_list_id, TaskListMeta( diff --git a/core/framework/tasks/tests/test_e2e.py b/core/framework/tasks/tests/test_e2e.py index 3ce328df..c48d9316 100644 --- a/core/framework/tasks/tests/test_e2e.py +++ b/core/framework/tasks/tests/test_e2e.py @@ -1,9 +1,9 @@ """End-to-end tests: - - Session task tools fire EventBus events - - REST routes return correct snapshots - - run_parallel_workers-style flow stamps assigned_session - - Durability: store survives a process boundary (subprocess) +- Session task tools fire EventBus events +- REST routes return correct snapshots +- run_parallel_workers-style flow stamps assigned_session +- Durability: store survives a process boundary (subprocess) """ from __future__ import annotations @@ -72,9 +72,7 @@ async def test_task_created_emits_event(registry: ToolRegistry) -> None: bus.subscribe([EventType.TASK_CREATED], handler) - token = ToolRegistry.set_execution_context( - agent_id="alice", task_list_id="session:alice:s1" - ) + token = ToolRegistry.set_execution_context(agent_id="alice", task_list_id="session:alice:s1") try: await _invoke(registry, "task_create", subject="hello") finally: @@ -99,9 +97,7 @@ async def test_task_updated_emits_event(registry: ToolRegistry) -> None: bus.subscribe([EventType.TASK_UPDATED], handler) - token = ToolRegistry.set_execution_context( - agent_id="alice", task_list_id="session:alice:s1" - ) + token = ToolRegistry.set_execution_context(agent_id="alice", task_list_id="session:alice:s1") try: await _invoke(registry, "task_create", subject="x") await _invoke(registry, "task_update", id=1, status="in_progress") diff --git a/core/framework/tasks/tests/test_integration.py b/core/framework/tasks/tests/test_integration.py index 01833ebd..92f2322e 100644 --- a/core/framework/tasks/tests/test_integration.py +++ b/core/framework/tasks/tests/test_integration.py @@ -116,9 +116,7 @@ async def test_queen_plans_workers_pick_up(tmp_path: Path) -> None: for wid in worker_ids: worker_reg = ToolRegistry() register_task_tools(worker_reg, store=store) - wtoken = ToolRegistry.set_execution_context( - agent_id=wid, task_list_id=session_task_list_id(wid, wid) - ) + wtoken = ToolRegistry.set_execution_context(agent_id=wid, task_list_id=session_task_list_id(wid, wid)) try: await _invoke(worker_reg, "task_create", subject=f"setup for {wid}") await _invoke(worker_reg, "task_update", id=1, status="in_progress") @@ -128,10 +126,7 @@ async def test_queen_plans_workers_pick_up(tmp_path: Path) -> None: # 4. Verify the colony template entries are stamped + workers have # their own private lists. template_after = await store.list_tasks(template_list_id) - assert all( - t.metadata.get("assigned_worker_id") in {"w1", "w2", "w3"} - for t in template_after - ) + assert all(t.metadata.get("assigned_worker_id") in {"w1", "w2", "w3"} for t in template_after) for wid in worker_ids: worker_tasks = await store.list_tasks(session_task_list_id(wid, wid)) diff --git a/core/framework/tasks/tests/test_store.py b/core/framework/tasks/tests/test_store.py index d2043773..655d13f2 100644 --- a/core/framework/tasks/tests/test_store.py +++ b/core/framework/tasks/tests/test_store.py @@ -75,9 +75,7 @@ async def test_list_filters_internal(store: TaskStore, list_id: str) -> None: @pytest.mark.asyncio async def test_concurrent_create_distinct_ids(store: TaskStore, list_id: str) -> None: await store.ensure_task_list(list_id, role=TaskListRole.SESSION) - results = await asyncio.gather( - *(store.create_task(list_id, subject=f"t{i}") for i in range(20)) - ) + results = await asyncio.gather(*(store.create_task(list_id, subject=f"t{i}") for i in range(20))) ids = sorted(r.id for r in results) assert ids == list(range(1, 21)) @@ -91,9 +89,7 @@ async def test_concurrent_create_distinct_ids(store: TaskStore, list_id: str) -> async def test_update_returns_changed_fields(store: TaskStore, list_id: str) -> None: await store.ensure_task_list(list_id, role=TaskListRole.SESSION) rec = await store.create_task(list_id, subject="orig") - new, fields = await store.update_task( - list_id, rec.id, subject="orig", status=TaskStatus.IN_PROGRESS - ) + new, fields = await store.update_task(list_id, rec.id, subject="orig", status=TaskStatus.IN_PROGRESS) assert fields == ["status"] # subject unchanged shouldn't appear assert new.status == TaskStatus.IN_PROGRESS diff --git a/core/framework/tasks/tests/test_tools.py b/core/framework/tasks/tests/test_tools.py index fd4ff410..a8cce8bd 100644 --- a/core/framework/tasks/tests/test_tools.py +++ b/core/framework/tasks/tests/test_tools.py @@ -50,9 +50,7 @@ async def _invoke(registry: ToolRegistry, name: str, **inputs): def _set_ctx(*, agent_id: str, task_list_id: str, **extra): - return ToolRegistry.set_execution_context( - agent_id=agent_id, task_list_id=task_list_id, **extra - ) + return ToolRegistry.set_execution_context(agent_id=agent_id, task_list_id=task_list_id, **extra) # --------------------------------------------------------------------------- @@ -308,8 +306,7 @@ async def test_hook_blocks_task_completed_never_writes( assert after is not None assert after.status == TaskStatus.IN_PROGRESS assert after.updated_at == ts_before, ( - "veto-before-write violated: updated_at changed, indicating a " - "transient write happened" + "veto-before-write violated: updated_at changed, indicating a transient write happened" ) finally: ToolRegistry.reset_execution_context(token) @@ -349,9 +346,7 @@ async def test_colony_template_add_and_list(queen_registry: ToolRegistry) -> Non @pytest.mark.asyncio async def test_colony_template_remove(queen_registry: ToolRegistry) -> None: reg = queen_registry - token = _set_ctx( - agent_id="queen", task_list_id="session:queen:sess_1", colony_id="abc" - ) + token = _set_ctx(agent_id="queen", task_list_id="session:queen:sess_1", colony_id="abc") try: await _invoke(reg, "colony_template_add", subject="a") await _invoke(reg, "colony_template_add", subject="b") diff --git a/core/framework/tasks/tools/session_tools.py b/core/framework/tasks/tools/session_tools.py index fa92fbf7..58c879fa 100644 --- a/core/framework/tasks/tools/session_tools.py +++ b/core/framework/tasks/tools/session_tools.py @@ -259,11 +259,7 @@ def _make_update_executor(store: TaskStore): # Auto-owner on in_progress. owner_in = inputs.get("owner", _OwnerSentinel) status_enum = TaskStatus(status_in) if status_in else None - if ( - status_enum == TaskStatus.IN_PROGRESS - and owner_in is _OwnerSentinel - and agent_id - ): + if status_enum == TaskStatus.IN_PROGRESS and owner_in is _OwnerSentinel and agent_id: owner_in = agent_id # task_completed hook — fires BEFORE the write (Claude Code's @@ -336,8 +332,7 @@ def _make_update_executor(store: TaskStore): ( r for r in others - if r.status == TaskStatus.PENDING - and not [b for b in r.blocked_by if b not in completed_ids] + if r.status == TaskStatus.PENDING and not [b for b in r.blocked_by if b not in completed_ids] ), None, ) @@ -351,10 +346,7 @@ def _make_update_executor(store: TaskStore): f"Mark it in_progress before starting." ) else: - message += ( - " All tasks complete. Wrap up: report results to the user " - "and stop." - ) + message += " All tasks complete. Wrap up: report results to the user and stop." return { "success": True, @@ -384,9 +376,7 @@ def _make_list_executor(store: TaskStore): if r.owner: line_parts.append(f"({r.owner})") if unresolved_blockers: - line_parts.append( - f"[blocked by {', '.join(f'#{b}' for b in unresolved_blockers)}]" - ) + line_parts.append(f"[blocked by {', '.join(f'#{b}' for b in unresolved_blockers)}]") rendered.append(" ".join(line_parts)) return { "success": True,