feat): fix structural blockers preventing the queen from using task_* , also enhanced the hook

This commit is contained in:
Richard Tang
2026-04-26 19:15:23 -07:00
parent 0af597881f
commit 833aa4bc7a
3 changed files with 104 additions and 35 deletions
+28 -14
View File
@@ -239,6 +239,12 @@ for a single-paragraph summary.
_queen_tools_independent = """
# Tools (INDEPENDENT mode)
## Planning — use FIRST for multi-step work
- task_create / task_update / task_list / task_get When a request \
has 3+ atomic steps, your FIRST tool call is `task_create` (one task \
per step) BEFORE you touch any other tool. See "Independent execution" \
for the per-step flow and granularity rule.
## File I/O (coder-tools MCP)
- read_file, write_file, edit_file, hashline_edit, list_directory, \
search_files, run_command, undo_changes
@@ -406,26 +412,34 @@ asks for specifics. Do not invent a new pass unless the user asks for one.
_queen_behavior_independent = """
## Independent execution
You are the agent. Do one real inline instance before any scaling \
open the browser, call the real API, write to the real file. If the \
action is irreversible or touches shared systems, show and confirm \
before executing. Report concrete evidence (actual output, what \
worked / failed) after the run. Scale order once inline succeeds: \
repeat inline (10 items) `run_parallel_workers` (batch, results \
now) `create_colony` (recurring / background). Conceptual or \
strategic questions: answer directly, skip execution.
You are the agent. **For multi-step work (3+ atomic actions): your FIRST \
tool call is `task_create`** one task per atomic action, before you \
touch any other tool. Then work the list one task at a time:
## Tracking multi-step work with task_*
Break down and manage your work with `task_create`. **Mark each task \
`completed` as soon as you are done with it. Do not batch up multiple \
tasks before marking them completed** the user's right-rail panel \
treats `completed` transitions as your progress heartbeat.
1. `task_update` in_progress before you start the step.
2. Do one real inline instance open the browser, call the real API, \
write to the real file. If the action is irreversible or touches \
shared systems, show and confirm before executing. Report concrete \
evidence (actual output, what worked / failed) after the run.
3. `task_update` completed THE MOMENT it's done. **Never batch up \
multiple completions to flush at the end.** `completed` transitions \
are the user's progress heartbeat in the right-rail panel — without \
them, the panel shows a hung spinner no matter how much real work \
you got done.
**Granularity: one task per atomic action, not one umbrella per project.** \
Replying to 5 posts is 5 tasks, not 1. Crawling 3 sites is 3 tasks. \
An umbrella task that stays `in_progress` for the whole run looks \
identical to the user as "the queen is stuck".
Once one task succeeds inline, scale order for the rest of that task's \
work: repeat inline (10 items) `run_parallel_workers` (batch, \
results now) `create_colony` (recurring / background).
For conceptual or strategic questions, single-tool-call work, \
greetings, or chat: answer directly in prose. Skip `task_*`, skip the \
planning ceremony the bar is "real multi-step work the user benefits \
from seeing tracked", not "anything you reply to".
"""
_queen_behavior_always = """
+42
View File
@@ -273,6 +273,48 @@ async def test_hook_blocks_task_completed(
ToolRegistry.reset_execution_context(token)
@pytest.mark.asyncio
async def test_hook_blocks_task_completed_never_writes(
registry_with_session_tools: ToolRegistry,
store: TaskStore,
) -> None:
"""Veto-before-write: when the task_completed hook blocks, the COMPLETED
status must NEVER touch disk `updated_at` should equal the value from
the prior in_progress write, not be bumped by a transient COMPLETED
write + rollback."""
from framework.tasks.models import TaskStatus
reg = registry_with_session_tools
list_id = "session:agent_a:sess_1"
register_hook(HOOK_TASK_COMPLETED, lambda ctx: (_ for _ in ()).throw(BlockingHookError("nope")))
token = _set_ctx(agent_id="agent_a", task_list_id=list_id)
try:
await _invoke(reg, "task_create", subject="x")
await _invoke(reg, "task_update", id=1, status="in_progress")
# Snapshot updated_at after the in_progress write — this is the
# value that should persist if veto-before-write is honored.
before = await store.get_task(list_id, 1)
assert before is not None
ts_before = before.updated_at
# Vetoed completion attempt.
result = await _invoke(reg, "task_update", id=1, status="completed")
body = json.loads(result.content)
assert body["success"] is False
# On-disk record must be byte-identical to the pre-vet snapshot —
# no transient COMPLETED write, no rollback updated_at bump.
after = await store.get_task(list_id, 1)
assert after is not None
assert after.status == TaskStatus.IN_PROGRESS
assert after.updated_at == ts_before, (
"veto-before-write violated: updated_at changed, indicating a "
"transient write happened"
)
finally:
ToolRegistry.reset_execution_context(token)
# ---------------------------------------------------------------------------
# Colony template tools
# ---------------------------------------------------------------------------
+34 -21
View File
@@ -266,6 +266,40 @@ def _make_update_executor(store: TaskStore):
):
owner_in = agent_id
# task_completed hook — fires BEFORE the write (Claude Code's
# veto-before-write semantics). If the hook blocks, nothing
# touches disk and no SSE event fires. The hook receives a
# preview record with the intended new status so it can inspect
# what's about to land.
if status_enum == TaskStatus.COMPLETED:
current = await store.get_task(list_id, task_id)
if current is None:
return {
"success": False,
"task_list_id": list_id,
"task_id": task_id,
"message": f"Task #{task_id} not found.",
}
if current.status != TaskStatus.COMPLETED:
preview = current.model_copy(update={"status": TaskStatus.COMPLETED})
try:
await run_task_hooks(
HOOK_TASK_COMPLETED,
task_list_id=list_id,
task=preview,
agent_id=agent_id,
)
except BlockingHookError as exc:
logger.warning("task_completed hook blocked #%s: %s", task_id, exc)
return {
"success": False,
"task_list_id": list_id,
"task_id": task_id,
"message": f"Hook blocked completion of #{task_id}: {exc}",
"task": _serialize_task(current),
}
# Hook passed (or wasn't applicable) — proceed with the write.
new, fields = await store.update_task(
list_id,
task_id,
@@ -287,27 +321,6 @@ def _make_update_executor(store: TaskStore):
"message": f"Task #{task_id} not found.",
}
# task_completed hooks fire on transition to completed; can block.
if status_enum == TaskStatus.COMPLETED and "status" in fields:
try:
await run_task_hooks(
HOOK_TASK_COMPLETED,
task_list_id=list_id,
task=new,
agent_id=agent_id,
)
except BlockingHookError as exc:
# Roll back the status transition and surface the error.
logger.warning("task_completed hook blocked #%s: %s", new.id, exc)
rb_new, _ = await store.update_task(list_id, task_id, status=TaskStatus.IN_PROGRESS)
return {
"success": False,
"task_list_id": list_id,
"task_id": task_id,
"message": f"Hook blocked completion of #{task_id}: {exc}",
"task": _serialize_task(rb_new) if rb_new else None,
}
if fields:
await emit_task_updated(task_list_id=list_id, record=new, fields=fields)