feat: colony sqlite and skills

This commit is contained in:
Timothy
2026-04-15 15:28:37 -07:00
parent 252710fb41
commit 79c5d43006
12 changed files with 1478 additions and 16 deletions
+39 -2
View File
@@ -641,6 +641,43 @@ class ColonyRuntime:
spawn_tools = tools if tools is not None else self._tools
spawn_executor = tool_executor or self._tool_executor
# Colony progress tracker: when the caller supplied a db_path
# in input_data, this worker is part of a SQLite task queue
# and must see the hive.colony-progress-tracker skill body in
# its system prompt from turn 0. Rebuild the catalog with the
# skill pre-activated; falls back to the colony default when
# no db_path is present.
_spawn_catalog = self.skills_catalog_prompt
_spawn_skill_dirs = self.skill_dirs
if isinstance(input_data, dict) and input_data.get("db_path"):
try:
from framework.skills.config import SkillsConfig
from framework.skills.manager import SkillsManager, SkillsManagerConfig
_pre = SkillsManager(
SkillsManagerConfig(
skills_config=SkillsConfig.from_agent_vars(
skills=["hive.colony-progress-tracker"],
),
)
)
_pre.load()
_spawn_catalog = _pre.skills_catalog_prompt
_spawn_skill_dirs = list(_pre.allowlisted_dirs) if hasattr(_pre, "allowlisted_dirs") else self.skill_dirs
logger.info(
"spawn: pre-activated hive.colony-progress-tracker "
"(catalog %d%d chars) for worker with db_path=%s",
len(self.skills_catalog_prompt),
len(_spawn_catalog),
input_data.get("db_path"),
)
except Exception as exc:
logger.warning(
"spawn: failed to pre-activate colony-progress-tracker "
"skill, falling back to base catalog: %s",
exc,
)
# Resolve the SSE stream_id once. When the caller didn't supply
# one we use the per-worker fan-out tag (filtered out by the
# SSE handler). When the caller passed an explicit value we
@@ -697,9 +734,9 @@ class ColonyRuntime:
llm=self._llm,
available_tools=list(spawn_tools),
accounts_prompt=self._accounts_prompt,
skills_catalog_prompt=self.skills_catalog_prompt,
skills_catalog_prompt=_spawn_catalog,
protocols_prompt=self.protocols_prompt,
skill_dirs=self.skill_dirs,
skill_dirs=_spawn_skill_dirs,
execution_id=worker_id,
stream_id=explicit_stream_id or f"worker:{worker_id}",
)
+491
View File
@@ -0,0 +1,491 @@
"""Per-colony SQLite task queue + progress ledger.
Every colony gets its own ``progress.db`` under ``~/.hive/colonies/{name}/data/``.
The DB holds the colony's task queue plus per-task step and SOP checklist
rows. Workers claim tasks atomically, write progress as they execute, and
verify SOP gates before marking a task done. This gives cross-run memory
that the existing per-iteration stall detectors don't have.
The DB is driven by agents via the ``sqlite3`` CLI through
``execute_command_tool``. This module handles framework-side lifecycle:
creation, migration, queen-side bulk seeding, stale-claim reclamation.
Concurrency model:
- WAL mode on from day one so 100 concurrent workers don't serialize.
- Workers hold NO long-running connection they ``sqlite3`` per call,
which naturally releases locks between LLM turns.
- Atomic claim via ``BEGIN IMMEDIATE; UPDATE tasks SET status='claimed'
WHERE id=(SELECT ... LIMIT 1)``. The subquery-form UPDATE runs inside
the immediate transaction so racers either win the row or find zero
affected rows.
- Stale-claim reclaimer runs on host startup: claims older than
``stale_after_minutes`` get returned to ``pending`` and the row's
``retry_count`` increments. When ``retry_count >= max_retries`` the
row is moved to ``failed`` instead.
All writes go through ``BEGIN IMMEDIATE`` so racing readers see
consistent snapshots.
"""
from __future__ import annotations
import json
import logging
import sqlite3
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
SCHEMA_VERSION = 1
_SCHEMA_V1 = """
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
seq INTEGER,
priority INTEGER NOT NULL DEFAULT 0,
goal TEXT NOT NULL,
payload TEXT,
status TEXT NOT NULL DEFAULT 'pending',
worker_id TEXT,
claim_token TEXT,
claimed_at TEXT,
started_at TEXT,
completed_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 3,
last_error TEXT,
parent_task_id TEXT REFERENCES tasks(id) ON DELETE SET NULL,
source TEXT
);
CREATE TABLE IF NOT EXISTS steps (
id TEXT PRIMARY KEY,
task_id TEXT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
seq INTEGER NOT NULL,
title TEXT NOT NULL,
detail TEXT,
status TEXT NOT NULL DEFAULT 'pending',
evidence TEXT,
worker_id TEXT,
started_at TEXT,
completed_at TEXT,
UNIQUE (task_id, seq)
);
CREATE TABLE IF NOT EXISTS sop_checklist (
id TEXT PRIMARY KEY,
task_id TEXT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
key TEXT NOT NULL,
description TEXT NOT NULL,
required INTEGER NOT NULL DEFAULT 1,
done_at TEXT,
done_by TEXT,
note TEXT,
UNIQUE (task_id, key)
);
CREATE TABLE IF NOT EXISTS colony_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_tasks_claimable
ON tasks(status, priority DESC, seq, created_at)
WHERE status = 'pending';
CREATE INDEX IF NOT EXISTS idx_steps_task_seq
ON steps(task_id, seq);
CREATE INDEX IF NOT EXISTS idx_sop_required_open
ON sop_checklist(task_id, required, done_at);
CREATE INDEX IF NOT EXISTS idx_tasks_status
ON tasks(status, updated_at);
"""
_PRAGMAS = (
"PRAGMA journal_mode = WAL;",
"PRAGMA synchronous = NORMAL;",
"PRAGMA foreign_keys = ON;",
"PRAGMA busy_timeout = 5000;",
)
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
def _new_id() -> str:
return str(uuid.uuid4())
def _connect(db_path: Path) -> sqlite3.Connection:
"""Open a connection with the standard pragmas applied.
WAL mode is sticky on the file once set, so re-applying on every
open is cheap. The other pragmas are per-connection and must be
set each time.
"""
con = sqlite3.connect(str(db_path), isolation_level=None, timeout=5.0)
for pragma in _PRAGMAS:
con.execute(pragma)
return con
def ensure_progress_db(colony_dir: Path) -> Path:
"""Create or migrate ``{colony_dir}/data/progress.db``.
Idempotent: safe to call on an already-initialized DB. Returns the
absolute path to the DB file.
Steps:
1. Ensure ``data/`` subdir exists.
2. Open the DB (creates the file if missing).
3. Apply WAL + pragmas.
4. Read ``PRAGMA user_version``; if < SCHEMA_VERSION, run the
schema block and bump user_version.
5. Reclaim any stale claims left from previous runs.
6. Patch every ``*.json`` worker config in the colony dir to
inject ``input_data.db_path`` and ``input_data.colony_id`` so
pre-existing colonies (forked before this feature landed) get
the tracker wiring on their next spawn.
"""
data_dir = Path(colony_dir) / "data"
data_dir.mkdir(parents=True, exist_ok=True)
db_path = data_dir / "progress.db"
con = _connect(db_path)
try:
current_version = con.execute("PRAGMA user_version").fetchone()[0]
if current_version < SCHEMA_VERSION:
con.executescript(_SCHEMA_V1)
con.execute(f"PRAGMA user_version = {SCHEMA_VERSION}")
con.execute(
"INSERT OR REPLACE INTO colony_meta(key, value, updated_at) "
"VALUES (?, ?, ?)",
("schema_version", str(SCHEMA_VERSION), _now_iso()),
)
logger.info(
"progress_db: initialized schema v%d at %s", SCHEMA_VERSION, db_path
)
reclaimed = _reclaim_stale_inner(con, stale_after_minutes=15)
if reclaimed:
logger.info(
"progress_db: reclaimed %d stale claims at startup (%s)",
reclaimed,
db_path,
)
finally:
con.close()
resolved_db_path = db_path.resolve()
_patch_worker_configs(Path(colony_dir), resolved_db_path)
return resolved_db_path
def _patch_worker_configs(colony_dir: Path, db_path: Path) -> int:
"""Inject ``input_data.db_path`` + ``input_data.colony_id`` into
existing ``worker.json`` files in a colony directory.
Runs on every ``ensure_progress_db`` call so colonies that were
forked before this feature landed get their worker spawn messages
patched in place. Idempotent: if ``input_data`` already contains
the correct ``db_path``, the file is not rewritten.
Returns the number of files that were actually modified (0 on
the common case of already-patched colonies).
"""
colony_id = colony_dir.name
abs_db = str(db_path)
patched = 0
for worker_cfg in colony_dir.glob("*.json"):
# Only patch files that look like worker configs (have the
# worker_meta shape). ``metadata.json`` and ``triggers.json``
# are colony-level and must not be touched.
if worker_cfg.name in ("metadata.json", "triggers.json"):
continue
try:
data = json.loads(worker_cfg.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
continue
if not isinstance(data, dict) or "system_prompt" not in data:
# Not a worker config (lacks the worker_meta schema).
continue
input_data = data.get("input_data")
if not isinstance(input_data, dict):
input_data = {}
if (
input_data.get("db_path") == abs_db
and input_data.get("colony_id") == colony_id
):
continue # already patched
input_data["db_path"] = abs_db
input_data["colony_id"] = colony_id
data["input_data"] = input_data
try:
worker_cfg.write_text(
json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8"
)
patched += 1
except OSError as e:
logger.warning(
"progress_db: failed to patch worker config %s: %s", worker_cfg, e
)
if patched:
logger.info(
"progress_db: patched %d worker config(s) in colony '%s' with db_path",
patched,
colony_id,
)
return patched
def ensure_all_colony_dbs(colonies_root: Path | None = None) -> list[Path]:
"""Idempotently ensure every existing colony has a progress.db.
Called on framework host startup to backfill older colonies and
run the stale-claim reclaimer on all of them in one pass.
"""
if colonies_root is None:
colonies_root = Path.home() / ".hive" / "colonies"
if not colonies_root.is_dir():
return []
initialized: list[Path] = []
for entry in sorted(colonies_root.iterdir()):
if not entry.is_dir():
continue
try:
initialized.append(ensure_progress_db(entry))
except Exception as e:
logger.warning(
"progress_db: failed to ensure DB for colony '%s': %s", entry.name, e
)
return initialized
def seed_tasks(
db_path: Path,
tasks: list[dict[str, Any]],
*,
source: str = "queen_create",
) -> list[str]:
"""Bulk-insert tasks (with optional nested steps + sop_items).
Each task dict accepts:
- goal: str (required)
- seq: int (optional ordering hint)
- priority: int (default 0)
- payload: dict | str | None (stored as JSON text)
- max_retries: int (default 3)
- parent_task_id: str | None
- steps: list[{"title": str, "detail"?: str}] (optional)
- sop_items: list[{"key": str, "description": str, "required"?: bool, "note"?: str}] (optional)
All rows are inserted in a single BEGIN IMMEDIATE transaction so
10k-row seeds finish in one disk flush. Returns the created task ids
in the same order as input.
"""
if not tasks:
return []
created_ids: list[str] = []
now = _now_iso()
con = _connect(Path(db_path))
try:
con.execute("BEGIN IMMEDIATE")
for idx, task in enumerate(tasks):
goal = task.get("goal")
if not goal:
raise ValueError(f"task[{idx}] missing required 'goal' field")
task_id = task.get("id") or _new_id()
payload = task.get("payload")
if payload is not None and not isinstance(payload, str):
payload = json.dumps(payload, ensure_ascii=False)
con.execute(
"""
INSERT INTO tasks (
id, seq, priority, goal, payload, status,
created_at, updated_at, max_retries, parent_task_id, source
) VALUES (?, ?, ?, ?, ?, 'pending', ?, ?, ?, ?, ?)
""",
(
task_id,
task.get("seq"),
int(task.get("priority", 0)),
goal,
payload,
now,
now,
int(task.get("max_retries", 3)),
task.get("parent_task_id"),
source,
),
)
for step_seq, step in enumerate(task.get("steps") or [], start=1):
if not step.get("title"):
raise ValueError(
f"task[{idx}].steps[{step_seq - 1}] missing required 'title'"
)
con.execute(
"""
INSERT INTO steps (id, task_id, seq, title, detail, status)
VALUES (?, ?, ?, ?, ?, 'pending')
""",
(
_new_id(),
task_id,
step.get("seq", step_seq),
step["title"],
step.get("detail"),
),
)
for sop in task.get("sop_items") or []:
key = sop.get("key")
description = sop.get("description")
if not key or not description:
raise ValueError(
f"task[{idx}].sop_items missing 'key' or 'description'"
)
con.execute(
"""
INSERT INTO sop_checklist
(id, task_id, key, description, required, note)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
_new_id(),
task_id,
key,
description,
1 if sop.get("required", True) else 0,
sop.get("note"),
),
)
created_ids.append(task_id)
con.execute("COMMIT")
except Exception:
con.execute("ROLLBACK")
raise
finally:
con.close()
return created_ids
def enqueue_task(
db_path: Path,
goal: str,
*,
steps: list[dict[str, Any]] | None = None,
sop_items: list[dict[str, Any]] | None = None,
payload: Any = None,
priority: int = 0,
parent_task_id: str | None = None,
source: str = "enqueue_tool",
) -> str:
"""Append a single task to an existing queue. Thin wrapper over seed_tasks."""
ids = seed_tasks(
db_path,
[
{
"goal": goal,
"steps": steps,
"sop_items": sop_items,
"payload": payload,
"priority": priority,
"parent_task_id": parent_task_id,
}
],
source=source,
)
return ids[0]
def _reclaim_stale_inner(
con: sqlite3.Connection, *, stale_after_minutes: int
) -> int:
"""Reclaim stale claims. Runs inside an existing open connection.
Two-step:
1. Tasks past max_retries go to 'failed' with last_error populated.
2. Remaining stale claims return to 'pending', retry_count++.
"""
cutoff_expr = f"datetime('now', '-{int(stale_after_minutes)} minutes')"
con.execute("BEGIN IMMEDIATE")
try:
con.execute(
f"""
UPDATE tasks
SET status = 'failed',
last_error = COALESCE(last_error, 'exceeded max_retries after stale claim'),
completed_at = datetime('now'),
updated_at = datetime('now')
WHERE status IN ('claimed', 'in_progress')
AND claimed_at IS NOT NULL
AND claimed_at < {cutoff_expr}
AND retry_count >= max_retries
"""
)
cur = con.execute(
f"""
UPDATE tasks
SET status = 'pending',
worker_id = NULL,
claim_token = NULL,
claimed_at = NULL,
started_at = NULL,
retry_count = retry_count + 1,
updated_at = datetime('now')
WHERE status IN ('claimed', 'in_progress')
AND claimed_at IS NOT NULL
AND claimed_at < {cutoff_expr}
AND retry_count < max_retries
"""
)
reclaimed = cur.rowcount or 0
con.execute("COMMIT")
return reclaimed
except Exception:
con.execute("ROLLBACK")
raise
def reclaim_stale(db_path: Path, stale_after_minutes: int = 15) -> int:
"""Public wrapper that opens its own connection."""
con = _connect(Path(db_path))
try:
return _reclaim_stale_inner(con, stale_after_minutes=stale_after_minutes)
finally:
con.close()
__all__ = [
"SCHEMA_VERSION",
"ensure_progress_db",
"ensure_all_colony_dbs",
"seed_tasks",
"enqueue_task",
"reclaim_stale",
]
+12 -1
View File
@@ -1421,7 +1421,18 @@ class AgentLoader:
credential_store=credential_store,
)
runner._agent_default_skills = None
runner._agent_skills = None
# Colony workers attached to a SQLite task queue get the
# colony-progress-tracker skill pre-activated so its full
# claim / step / SOP-gate protocol lands in the system prompt
# on turn 0, bypassing the progressive-disclosure catalog
# lookup. Triggered by the presence of ``input_data.db_path``
# in worker.json (written by fork_session_into_colony and
# backfilled by ensure_progress_db for pre-existing colonies).
_preactivate: list[str] = []
_input_data = first_worker.get("input_data") or {}
if isinstance(_input_data, dict) and _input_data.get("db_path"):
_preactivate.append("hive.colony-progress-tracker")
runner._agent_skills = _preactivate or None
return runner
def register_tool(
+35 -1
View File
@@ -664,6 +664,7 @@ async def handle_colony_spawn(request: web.Request) -> web.Response:
body = await request.json()
colony_name = body.get("colony_name", "").strip()
task = body.get("task", "").strip()
tasks = body.get("tasks")
if not colony_name:
return web.json_response({"error": "colony_name is required"}, status=400)
@@ -681,6 +682,7 @@ async def handle_colony_spawn(request: web.Request) -> web.Response:
session=session,
colony_name=colony_name,
task=task,
tasks=tasks if isinstance(tasks, list) else None,
)
except Exception as e:
logger.exception("colony_spawn fork failed")
@@ -694,6 +696,7 @@ async def fork_session_into_colony(
session: Any,
colony_name: str,
task: str,
tasks: list[dict] | None = None,
) -> dict:
"""Fork a queen session into a colony directory.
@@ -710,8 +713,14 @@ async def fork_session_into_colony(
the colony resumes with the queen's entire conversation history.
3. Multiple independent sessions can be created against the same colony,
giving parallel execution capacity without separate worker configs.
4. Initializes (or ensures) ``data/progress.db`` the colony's SQLite
task queue + progress ledger. When *tasks* is provided, the queen-
authored task batch is seeded into the queue in one transaction.
The absolute DB path is threaded into the worker's ``input_data``
so spawned workers see it in their first user message.
Returns ``{"colony_path", "colony_name", "queen_session_id", "is_new"}``.
Returns ``{"colony_path", "colony_name", "queen_session_id", "is_new",
"db_path", "task_ids"}``.
"""
import asyncio
import json
@@ -721,6 +730,7 @@ async def fork_session_into_colony(
from framework.agent_loop.agent_loop import AgentLoop, LoopConfig
from framework.agent_loop.types import AgentContext, AgentSpec
from framework.host.progress_db import ensure_progress_db, seed_tasks
from framework.server.session_manager import _queen_session_dir
from framework.storage.conversation_store import FileConversationStore
@@ -732,6 +742,21 @@ async def fork_session_into_colony(
colony_dir.mkdir(parents=True, exist_ok=True)
(colony_dir / "data").mkdir(exist_ok=True)
# ── 0. Ensure the colony's progress DB exists and seed tasks ──
# Runs before worker.json is written so the DB path can be threaded
# into input_data. Idempotent on reruns of the same colony name.
db_path = await asyncio.to_thread(ensure_progress_db, colony_dir)
seeded_task_ids: list[str] = []
if tasks:
seeded_task_ids = await asyncio.to_thread(
seed_tasks, db_path, tasks, source="queen_create"
)
logger.info(
"progress_db: seeded %d task(s) into colony '%s'",
len(seeded_task_ids),
colony_name,
)
# Fixed worker name -- sessions are the unit of parallelism, not workers
worker_name = "worker"
@@ -797,6 +822,13 @@ async def fork_session_into_colony(
"name": worker_name,
"version": "1.0.0",
"description": f"Worker clone from queen session {session.id}",
# Colony progress tracker: worker sees these in its first user
# message via _format_spawn_task_message. The colony-progress-
# tracker default skill teaches the worker how to use them.
"input_data": {
"db_path": str(db_path),
"colony_id": colony_name,
},
"goal": {
"description": worker_task,
"success_criteria": [],
@@ -938,6 +970,8 @@ async def fork_session_into_colony(
"colony_name": colony_name,
"queen_session_id": colony_session_id,
"is_new": is_new,
"db_path": str(db_path),
"task_ids": seeded_task_ids,
}
+18
View File
@@ -141,6 +141,24 @@ class SessionManager:
except Exception:
logger.warning("v2 migration failed (non-fatal)", exc_info=True)
# Ensure every existing colony has an up-to-date progress.db
# (schema v1, WAL mode) and reclaim any stale claims left behind
# by crashed workers from the previous run. Idempotent and
# fast; runs synchronously because the event loop hasn't
# started yet at __init__ time.
from framework.host.progress_db import ensure_all_colony_dbs
try:
ensured = ensure_all_colony_dbs()
if ensured:
logger.info(
"progress_db: ensured %d colony DB(s) at startup", len(ensured)
)
except Exception:
logger.warning(
"progress_db: backfill at startup failed (non-fatal)", exc_info=True
)
def build_llm(self, model: str | None = None):
"""Construct an LLM provider using the server's configured defaults."""
from framework.config import RuntimeConfig, get_hive_config
@@ -0,0 +1,93 @@
---
name: hive.colony-progress-tracker
description: Claim tasks, record step progress, and verify SOP gates in the colony SQLite queue. Applies when your spawn message includes a db_path field.
metadata:
author: hive
type: default-skill
---
## Operational Protocol: Colony Progress Tracker
**Applies when** your spawn message has `db_path:` and `colony_id:` fields. The DB is your durable working memory — tells you what's done, what to skip, which SOP gates you owe.
Access via `execute_command_tool` running `sqlite3 "<db_path>" "..."`. Tables: `tasks` (queue), `steps` (per-task decomposition), `sop_checklist` (hard gates).
### Claim next task (ONLY correct pattern)
```bash
sqlite3 "<db_path>" <<'SQL'
UPDATE tasks SET status='claimed', worker_id='<worker-id>',
claim_token=lower(hex(randomblob(8))),
claimed_at=datetime('now'), updated_at=datetime('now')
WHERE id=(SELECT id FROM tasks WHERE status='pending'
ORDER BY priority DESC, seq, created_at LIMIT 1)
RETURNING id, goal, payload;
SQL
```
Empty output → queue drained, exit. Otherwise the returned `id` is yours. **Never SELECT-then-UPDATE** — races.
### Load the plan
```bash
sqlite3 "<db_path>" "SELECT seq, id, title, status FROM steps WHERE task_id='<task-id>' ORDER BY seq;"
sqlite3 "<db_path>" "SELECT key, description, required, done_at FROM sop_checklist WHERE task_id='<task-id>';"
```
**Skip any step where status='done'.** That's the point — don't redo completed work.
### Execute a step
Before tool calls:
```bash
sqlite3 "<db_path>" "UPDATE steps SET status='in_progress', worker_id='<worker-id>', started_at=datetime('now') WHERE id='<step-id>';"
```
After success (one-line evidence: path, URL, key result):
```bash
sqlite3 "<db_path>" "UPDATE steps SET status='done', evidence='<what you did>', completed_at=datetime('now') WHERE id='<step-id>';"
```
### MANDATORY: SOP gate check before marking task done
```bash
sqlite3 "<db_path>" "SELECT key, description FROM sop_checklist WHERE task_id='<task-id>' AND required=1 AND done_at IS NULL;"
```
- Empty → proceed to "Mark task done".
- Non-empty → each row is work you still owe. Do it, then check it off:
```bash
sqlite3 "<db_path>" "UPDATE sop_checklist SET done_at=datetime('now'), done_by='<worker-id>', note='<why>' WHERE task_id='<task-id>' AND key='<key>';"
```
**Never mark a task done while this SELECT returns rows.** This gate exists specifically to stop you from declaring success while skipping required steps.
### Mark task done / failed
```bash
# Success:
sqlite3 "<db_path>" "UPDATE tasks SET status='done', completed_at=datetime('now'), updated_at=datetime('now') WHERE id='<task-id>' AND worker_id='<worker-id>';"
# Unrecoverable failure:
sqlite3 "<db_path>" "UPDATE tasks SET status='failed', last_error='<one sentence>', completed_at=datetime('now'), updated_at=datetime('now') WHERE id='<task-id>' AND worker_id='<worker-id>';"
```
The `AND worker_id=?` guard means a reclaimed row won't accept your write — treat zero rows affected as "your claim was revoked, stop."
### Loop
After done/failed → claim the next task. Exit only when claim returns empty.
### Errors + debug
- **"database is locked"**: retry with 100ms → 1s backoff, max 5 attempts. `busy_timeout=5000` handles most contention silently.
- **Queue health**: `SELECT status, count(*) FROM tasks GROUP BY status;`
- **Your in-flight work**: `SELECT id, goal, status FROM tasks WHERE worker_id='<worker-id>';`
### Anti-patterns (will break the queue)
- Don't DDL (CREATE/ALTER/DROP).
- Don't DELETE — failed tasks stay as `failed` for audit.
- Don't skip Protocol 4 (SOP gate) before marking done.
- Don't hold a task >15min without updates — the stale-claim reclaimer revokes your claim.
- Don't invent task IDs. Workers update existing rows; only the queen enqueues new ones.
+1
View File
@@ -77,6 +77,7 @@ SKILL_REGISTRY: dict[str, str] = {
"hive.quality-monitor": "quality-monitor",
"hive.error-recovery": "error-recovery",
"hive.task-decomposition": "task-decomposition",
"hive.colony-progress-tracker": "colony-progress-tracker",
"hive.writing-hive-skills": "writing-hive-skills",
}
+238 -2
View File
@@ -55,7 +55,6 @@ from framework.tools.flowchart_utils import (
)
if TYPE_CHECKING:
from framework.loader.tool_registry import ToolRegistry
from framework.host.colony_runtime import ColonyRuntime
from framework.host.event_bus import EventBus
from framework.loader.tool_registry import ToolRegistry
@@ -1430,6 +1429,7 @@ def register_queen_lifecycle_tools(
colony_name: str,
task: str,
skill_path: str,
tasks: list[dict] | None = None,
) -> str:
"""Create a colony after installing a pre-authored skill folder.
@@ -1439,6 +1439,13 @@ def register_queen_lifecycle_tools(
they're ready to start the worker — at that point the worker
reads the task from ``worker.json`` and the skill from
``~/.hive/skills/`` and starts informed.
When *tasks* is provided, each entry is seeded into the
colony's ``progress.db`` task queue in a single transaction.
Workers then claim rows from the queue using the
``hive.colony-progress-tracker`` default skill. Each task dict
accepts: ``goal`` (required), optional ``steps``,
``sop_items``, ``priority``, ``payload``, ``parent_task_id``.
"""
if session is None:
return json.dumps({"error": "No session bound to this tool registry."})
@@ -1498,6 +1505,7 @@ def register_queen_lifecycle_tools(
session=session,
colony_name=cn,
task=(task or "").strip(),
tasks=tasks if isinstance(tasks, list) else None,
)
except Exception as e:
logger.exception("create_colony: fork failed after installing skill")
@@ -1550,6 +1558,8 @@ def register_queen_lifecycle_tools(
"is_new": fork_result.get("is_new", True),
"skill_installed": str(installed_skill),
"skill_name": installed_skill.name if installed_skill else None,
"db_path": fork_result.get("db_path"),
"tasks_seeded": len(fork_result.get("task_ids") or []),
}
)
@@ -1648,6 +1658,57 @@ def register_queen_lifecycle_tools(
"protocol'."
),
},
"tasks": {
"type": "array",
"description": (
"Optional pre-seeded task queue for the colony. "
"When the colony is a fan-out of many similar "
"units of work (e.g. 'process record #1234', "
"'scrape profile X'), pass them here as an "
"array and workers will claim rows atomically "
"from the SQLite queue using the "
"hive.colony-progress-tracker skill. Each task "
"needs a 'goal' string; optionally include "
"'steps' (ordered subtasks), 'sop_items' "
"(required checklist gates), 'priority' "
"(higher runs first), and 'payload' "
"(task-specific parameters). Can be hundreds "
"or thousands of entries — the bulk insert "
"runs in a single transaction."
),
"items": {
"type": "object",
"properties": {
"goal": {"type": "string"},
"priority": {"type": "integer"},
"payload": {},
"steps": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string"},
"detail": {"type": "string"},
},
"required": ["title"],
},
},
"sop_items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"key": {"type": "string"},
"description": {"type": "string"},
"required": {"type": "boolean"},
},
"required": ["key", "description"],
},
},
},
"required": ["goal"],
},
},
},
"required": ["colony_name", "task", "skill_path"],
},
@@ -1659,6 +1720,158 @@ def register_queen_lifecycle_tools(
)
tools_registered += 1
# --- enqueue_task ------------------------------------------------------------
async def enqueue_task_tool(
*,
colony_name: str,
goal: str,
steps: list[dict] | None = None,
sop_items: list[dict] | None = None,
payload: Any = None,
priority: int = 0,
parent_task_id: str | None = None,
) -> str:
"""Append a single task to an existing colony's progress.db queue.
Use this when the colony is already created and more work
needs to be fanned out (webhook-driven, follow-up requests,
worker-generated subtasks). The colony's workers pick it up
on their next claim cycle.
"""
cn = (colony_name or "").strip()
if not _COLONY_NAME_RE.match(cn):
return json.dumps(
{"error": "colony_name must be lowercase alphanumeric with underscores"}
)
from pathlib import Path as _Path
from framework.host.progress_db import (
enqueue_task as _enqueue_task,
ensure_progress_db as _ensure_db,
)
colony_dir = _Path.home() / ".hive" / "colonies" / cn
if not colony_dir.is_dir():
return json.dumps({"error": f"colony '{cn}' not found"})
try:
db_path = await asyncio.to_thread(_ensure_db, colony_dir)
task_id = await asyncio.to_thread(
_enqueue_task,
db_path,
goal,
steps=steps,
sop_items=sop_items,
payload=payload,
priority=priority,
parent_task_id=parent_task_id,
)
except Exception as e:
logger.exception("enqueue_task: failed to insert row")
return json.dumps({"error": f"enqueue_task failed: {e}"})
return json.dumps(
{
"status": "enqueued",
"colony_name": cn,
"task_id": task_id,
"db_path": str(db_path),
}
)
_enqueue_task_tool = Tool(
name="enqueue_task",
description=(
"Append a single task to an existing colony's progress.db "
"queue. Use this after create_colony when more work needs "
"to be fanned out — e.g. a webhook fired, the user asked "
"for a follow-up run, or a worker spawned a subtask. The "
"colony's workers pick it up on their next claim cycle "
"(atomic UPDATE … WHERE status='pending'). For bulk "
"authoring at colony creation time, pass the 'tasks' "
"array to create_colony instead."
),
parameters={
"type": "object",
"properties": {
"colony_name": {
"type": "string",
"description": "Target colony name (lowercase + underscores).",
},
"goal": {
"type": "string",
"description": (
"Human-readable task description. Self-contained — "
"the worker has no context beyond this string plus "
"any steps/sop_items/payload you attach."
),
},
"steps": {
"type": "array",
"description": (
"Optional ordered subtasks the worker should "
"check off as it executes. Each step needs a "
"'title'; optional 'detail' for longer "
"instructions."
),
"items": {
"type": "object",
"properties": {
"title": {"type": "string"},
"detail": {"type": "string"},
},
"required": ["title"],
},
},
"sop_items": {
"type": "array",
"description": (
"Optional hard-gate checklist items the worker "
"MUST address before marking the task done. "
"Each item needs a 'key' (slug) and "
"'description'; 'required' defaults to true."
),
"items": {
"type": "object",
"properties": {
"key": {"type": "string"},
"description": {"type": "string"},
"required": {"type": "boolean"},
},
"required": ["key", "description"],
},
},
"payload": {
"description": (
"Optional task-specific parameters. Stored as "
"JSON in the 'payload' column."
),
},
"priority": {
"type": "integer",
"description": "Higher values run first. Default 0.",
},
"parent_task_id": {
"type": "string",
"description": (
"Optional reference to an existing task this "
"one was spawned from (audit only; no blocking "
"dependency resolver today)."
),
},
},
"required": ["colony_name", "goal"],
},
)
registry.register(
"enqueue_task",
_enqueue_task_tool,
lambda inputs: enqueue_task_tool(**inputs),
)
tools_registered += 1
# --- switch_to_reviewing ----------------------------------------------------
async def switch_to_reviewing_tool() -> str:
@@ -4034,10 +4247,33 @@ def register_queen_lifecycle_tools(
dropped_count,
)
# Colony progress tracker wiring: if the loaded worker
# lives under ~/.hive/colonies/{name}/ and has a
# progress.db, inject db_path + colony_id into input_data
# so the spawned worker sees them in its first user
# message and can use the hive.colony-progress-tracker
# skill to claim tasks from the queue.
_spawn_input_data: dict[str, Any] = {"user_request": task}
_worker_path = getattr(session, "worker_path", None)
if _worker_path:
from pathlib import Path as _Path
_worker_path_p = _Path(_worker_path)
_progress_db = _worker_path_p / "data" / "progress.db"
if _progress_db.exists():
_spawn_input_data["db_path"] = str(_progress_db.resolve())
_spawn_input_data["colony_id"] = _worker_path_p.name
logger.info(
"run_agent_with_input: attached progress_db context "
"(colony_id=%s, db_path=%s)",
_worker_path_p.name,
_progress_db,
)
worker_ids = await colony.spawn(
task=task,
count=1,
input_data={"user_request": task},
input_data=_spawn_input_data,
agent_spec=spawn_spec,
tools=spawn_tools,
tool_executor=spawn_tool_executor,
+17 -2
View File
@@ -75,13 +75,28 @@ def patched_fork(monkeypatch):
"""Stub out fork_session_into_colony so we don't need a real queen."""
calls: list[dict] = []
async def _stub_fork(*, session: Any, colony_name: str, task: str) -> dict:
calls.append({"session": session, "colony_name": colony_name, "task": task})
async def _stub_fork(
*,
session: Any,
colony_name: str,
task: str,
tasks: list[dict] | None = None,
) -> dict:
calls.append(
{
"session": session,
"colony_name": colony_name,
"task": task,
"tasks": tasks,
}
)
return {
"colony_path": f"/tmp/fake_colonies/{colony_name}",
"colony_name": colony_name,
"queen_session_id": "session_fake_fork_id",
"is_new": True,
"db_path": f"/tmp/fake_colonies/{colony_name}/data/progress.db",
"task_ids": [],
}
monkeypatch.setattr(
+14 -8
View File
@@ -19,10 +19,10 @@ _DEFAULT_SKILLS_DIR = (
class TestDefaultSkillFiles:
"""Verify all 6 built-in SKILL.md files parse correctly."""
"""Verify all built-in SKILL.md files parse correctly."""
def test_all_six_skills_exist(self):
assert len(SKILL_REGISTRY) == 6
def test_all_skills_exist(self):
assert len(SKILL_REGISTRY) == 8
@pytest.mark.parametrize("skill_name,dir_name", list(SKILL_REGISTRY.items()))
def test_skill_parses(self, skill_name, dir_name):
@@ -37,7 +37,13 @@ class TestDefaultSkillFiles:
assert parsed.source_scope == "framework"
def test_combined_token_budget(self):
"""All default skill bodies combined should be under 2000 tokens (~8000 chars)."""
"""All default skill bodies combined should stay within the protocols budget.
Ceiling is 5000 tokens (~20000 chars): the prompt-injection path
appends every registered skill body to the system prompt, so
uncontrolled growth would balloon every LLM call. 5000 gives
headroom over today's ~3500 while still catching obvious bloat.
"""
total_chars = 0
for dir_name in SKILL_REGISTRY.values():
path = _DEFAULT_SKILLS_DIR / dir_name / "SKILL.md"
@@ -46,9 +52,9 @@ class TestDefaultSkillFiles:
total_chars += len(parsed.body)
approx_tokens = total_chars // 4
assert approx_tokens < 2000, (
assert approx_tokens < 5000, (
f"Combined default skill bodies are ~{approx_tokens} tokens "
f"({total_chars} chars), exceeding the 2000 token budget"
f"({total_chars} chars), exceeding the 5000 token budget"
)
def test_data_buffer_keys_all_prefixed(self):
@@ -62,7 +68,7 @@ class TestDefaultSkillManager:
manager = DefaultSkillManager()
manager.load()
assert len(manager.active_skill_names) == 6
assert len(manager.active_skill_names) == 8
for name in SKILL_REGISTRY:
assert name in manager.active_skill_names
@@ -101,7 +107,7 @@ class TestDefaultSkillManager:
manager.load()
assert "hive.quality-monitor" not in manager.active_skill_names
assert len(manager.active_skill_names) == 5
assert len(manager.active_skill_names) == len(SKILL_REGISTRY) - 1
def test_disable_all_via_convention(self):
config = SkillsConfig.from_agent_vars(default_skills={"_all": {"enabled": False}})
+505
View File
@@ -0,0 +1,505 @@
"""Tests for framework.host.progress_db — per-colony task queue."""
from __future__ import annotations
import sqlite3
import threading
import time
from pathlib import Path
import pytest
from framework.host.progress_db import (
SCHEMA_VERSION,
ensure_all_colony_dbs,
ensure_progress_db,
enqueue_task,
reclaim_stale,
seed_tasks,
)
# ----------------------------------------------------------------------
# Schema / init
# ----------------------------------------------------------------------
def test_ensure_progress_db_fresh(tmp_path: Path) -> None:
colony = tmp_path / "c"
db_path = ensure_progress_db(colony)
assert db_path.exists()
assert db_path.name == "progress.db"
assert db_path.parent.name == "data"
con = sqlite3.connect(str(db_path))
try:
assert con.execute("PRAGMA journal_mode").fetchone()[0].lower() == "wal"
assert con.execute("PRAGMA user_version").fetchone()[0] == SCHEMA_VERSION
tables = {r[0] for r in con.execute("SELECT name FROM sqlite_master WHERE type='table'")}
assert {"tasks", "steps", "sop_checklist", "colony_meta"}.issubset(tables)
indexes = {r[0] for r in con.execute("SELECT name FROM sqlite_master WHERE type='index'")}
# Named indexes we declared
assert "idx_tasks_claimable" in indexes
assert "idx_steps_task_seq" in indexes
assert "idx_sop_required_open" in indexes
assert "idx_tasks_status" in indexes
finally:
con.close()
def test_ensure_progress_db_idempotent(tmp_path: Path) -> None:
colony = tmp_path / "c"
p1 = ensure_progress_db(colony)
p2 = ensure_progress_db(colony)
assert p1 == p2
con = sqlite3.connect(str(p1))
try:
assert con.execute("PRAGMA user_version").fetchone()[0] == SCHEMA_VERSION
finally:
con.close()
def test_ensure_all_colony_dbs_backfill(tmp_path: Path) -> None:
colonies_root = tmp_path / "colonies"
(colonies_root / "alpha").mkdir(parents=True)
(colonies_root / "beta").mkdir(parents=True)
(colonies_root / "gamma_not_dir").touch() # should be ignored
initialized = ensure_all_colony_dbs(colonies_root)
names = {p.parent.parent.name for p in initialized}
assert names == {"alpha", "beta"}
for p in initialized:
assert p.exists()
def test_ensure_all_colony_dbs_missing_root(tmp_path: Path) -> None:
missing = tmp_path / "nonexistent"
assert ensure_all_colony_dbs(missing) == []
# ----------------------------------------------------------------------
# Seeding / enqueue
# ----------------------------------------------------------------------
def test_seed_tasks_basic(tmp_path: Path) -> None:
db = ensure_progress_db(tmp_path / "c")
ids = seed_tasks(
db,
[
{
"goal": "task one",
"priority": 5,
"payload": {"url": "https://example.com"},
"steps": [
{"title": "open page"},
{"title": "extract data", "detail": "selector .content"},
],
"sop_items": [
{"key": "captcha_handled", "description": "Verify no CAPTCHA blocks"},
{"key": "soft_hint", "description": "optional", "required": False},
],
},
{"goal": "task two"},
],
)
assert len(ids) == 2
con = sqlite3.connect(str(db))
try:
rows = list(con.execute("SELECT id, goal, priority, status, source, payload FROM tasks ORDER BY goal"))
assert len(rows) == 2
assert rows[0][1] == "task one"
assert rows[0][2] == 5
assert rows[0][3] == "pending"
assert rows[0][4] == "queen_create"
assert '"url"' in rows[0][5]
step_count = con.execute(
"SELECT count(*) FROM steps WHERE task_id=?", (ids[0],)
).fetchone()[0]
assert step_count == 2
sop_rows = list(con.execute(
"SELECT key, required FROM sop_checklist WHERE task_id=? ORDER BY key", (ids[0],)
))
assert sop_rows == [("captcha_handled", 1), ("soft_hint", 0)]
finally:
con.close()
def test_seed_tasks_rejects_missing_goal(tmp_path: Path) -> None:
db = ensure_progress_db(tmp_path / "c")
with pytest.raises(ValueError):
seed_tasks(db, [{"priority": 1}])
def test_seed_tasks_empty_is_noop(tmp_path: Path) -> None:
db = ensure_progress_db(tmp_path / "c")
assert seed_tasks(db, []) == []
def test_seed_tasks_rollback_on_partial_failure(tmp_path: Path) -> None:
"""A bad row mid-batch must roll back the whole transaction."""
db = ensure_progress_db(tmp_path / "c")
with pytest.raises(ValueError):
seed_tasks(
db,
[
{"goal": "good one"},
{"priority": 1}, # missing goal -> boom
{"goal": "never inserted"},
],
)
con = sqlite3.connect(str(db))
try:
count = con.execute("SELECT count(*) FROM tasks").fetchone()[0]
assert count == 0
finally:
con.close()
def test_enqueue_task(tmp_path: Path) -> None:
db = ensure_progress_db(tmp_path / "c")
tid = enqueue_task(
db,
"appended",
steps=[{"title": "s1"}],
sop_items=[{"key": "k", "description": "d"}],
priority=3,
)
assert tid
con = sqlite3.connect(str(db))
try:
row = con.execute(
"SELECT goal, priority, source FROM tasks WHERE id=?", (tid,)
).fetchone()
assert row == ("appended", 3, "enqueue_tool")
assert con.execute(
"SELECT count(*) FROM steps WHERE task_id=?", (tid,)
).fetchone()[0] == 1
finally:
con.close()
def test_seed_tasks_bulk_10k(tmp_path: Path) -> None:
"""10k rows in one transaction should finish under a second on local disk."""
db = ensure_progress_db(tmp_path / "c")
tasks = [{"goal": f"task {i}", "seq": i} for i in range(10_000)]
start = time.perf_counter()
ids = seed_tasks(db, tasks)
elapsed = time.perf_counter() - start
assert len(ids) == 10_000
# Generous ceiling — on CI with slow disk we've seen ~300ms.
assert elapsed < 3.0, f"bulk seed too slow: {elapsed:.2f}s"
# ----------------------------------------------------------------------
# Atomic claim under concurrency
# ----------------------------------------------------------------------
_CLAIM_SQL = """
BEGIN IMMEDIATE;
UPDATE tasks
SET
status = 'claimed',
worker_id = ?,
claim_token = lower(hex(randomblob(8))),
claimed_at = datetime('now'),
updated_at = datetime('now')
WHERE id = (
SELECT id FROM tasks
WHERE status = 'pending'
ORDER BY priority DESC, seq, created_at
LIMIT 1
);
"""
def _claim_one(db_path: Path, worker_id: str) -> str | None:
"""Atomic single-shot claim using RETURNING (SQLite 3.35+).
The skill teaches agents the BEGIN IMMEDIATE + subquery UPDATE
pattern; for an in-process test helper we use RETURNING so the
claimed row id is returned from the same statement (no racing
follow-up SELECT). Functionally equivalent: both approaches rely
on the atomic subquery-UPDATE.
"""
con = sqlite3.connect(str(db_path), isolation_level=None, timeout=10.0)
con.execute("PRAGMA busy_timeout = 10000")
try:
cur = con.execute(
"""
UPDATE tasks
SET status = 'claimed',
worker_id = ?,
claim_token = lower(hex(randomblob(8))),
claimed_at = datetime('now'),
updated_at = datetime('now')
WHERE id = (
SELECT id FROM tasks
WHERE status = 'pending'
ORDER BY priority DESC, seq, created_at
LIMIT 1
)
RETURNING id
""",
(worker_id,),
)
row = cur.fetchone()
return row[0] if row else None
finally:
con.close()
def test_claim_atomicity_under_concurrency(tmp_path: Path) -> None:
"""20 threads racing to drain 100 tasks — each task claimed exactly once."""
db = ensure_progress_db(tmp_path / "c")
seed_tasks(db, [{"goal": f"task {i}", "seq": i} for i in range(100)])
claims: list[tuple[str, str]] = []
claims_lock = threading.Lock()
def worker(worker_id: str) -> None:
while True:
tid = _claim_one(db, worker_id)
if tid is None:
return
with claims_lock:
claims.append((worker_id, tid))
threads = [threading.Thread(target=worker, args=(f"w{i}",)) for i in range(20)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=30)
task_ids = [tid for _, tid in claims]
assert len(task_ids) == 100, f"expected 100 claims, got {len(task_ids)}"
assert len(set(task_ids)) == 100, "duplicate claims detected"
con = sqlite3.connect(str(db))
try:
remaining = con.execute(
"SELECT count(*) FROM tasks WHERE status='pending'"
).fetchone()[0]
assert remaining == 0
claimed = con.execute(
"SELECT count(*) FROM tasks WHERE status='claimed'"
).fetchone()[0]
assert claimed == 100
finally:
con.close()
# ----------------------------------------------------------------------
# Stale-claim reclaimer
# ----------------------------------------------------------------------
def test_reclaim_stale_returns_to_pending(tmp_path: Path) -> None:
db = ensure_progress_db(tmp_path / "c")
[tid] = seed_tasks(db, [{"goal": "stuck"}])
# Simulate a claim made 20 minutes ago.
con = sqlite3.connect(str(db), isolation_level=None)
try:
con.execute(
"UPDATE tasks SET status='claimed', worker_id='w1', "
"claimed_at=datetime('now', '-20 minutes') WHERE id=?",
(tid,),
)
finally:
con.close()
reclaimed = reclaim_stale(db, stale_after_minutes=15)
assert reclaimed == 1
con = sqlite3.connect(str(db))
try:
row = con.execute(
"SELECT status, worker_id, retry_count FROM tasks WHERE id=?", (tid,)
).fetchone()
assert row == ("pending", None, 1)
finally:
con.close()
def test_reclaim_stale_fails_after_max_retries(tmp_path: Path) -> None:
db = ensure_progress_db(tmp_path / "c")
[tid] = seed_tasks(db, [{"goal": "doomed", "max_retries": 2}])
con = sqlite3.connect(str(db), isolation_level=None)
try:
con.execute(
"UPDATE tasks SET status='claimed', worker_id='w1', retry_count=2, "
"claimed_at=datetime('now', '-20 minutes') WHERE id=?",
(tid,),
)
finally:
con.close()
reclaim_stale(db, stale_after_minutes=15)
con = sqlite3.connect(str(db))
try:
row = con.execute(
"SELECT status, last_error FROM tasks WHERE id=?", (tid,)
).fetchone()
assert row[0] == "failed"
assert row[1] is not None and "max_retries" in row[1]
finally:
con.close()
def test_reclaim_stale_ignores_fresh_claims(tmp_path: Path) -> None:
db = ensure_progress_db(tmp_path / "c")
[tid] = seed_tasks(db, [{"goal": "working"}])
con = sqlite3.connect(str(db), isolation_level=None)
try:
con.execute(
"UPDATE tasks SET status='claimed', worker_id='w1', "
"claimed_at=datetime('now') WHERE id=?",
(tid,),
)
finally:
con.close()
reclaimed = reclaim_stale(db, stale_after_minutes=15)
assert reclaimed == 0
# ----------------------------------------------------------------------
# Foreign key cascade
# ----------------------------------------------------------------------
# ----------------------------------------------------------------------
# Worker config patching for pre-existing colonies
# ----------------------------------------------------------------------
def _write_worker_cfg(path: Path, *, with_input_data: dict | None = None) -> None:
"""Write a minimal worker.json that matches the shape ensure_progress_db patches."""
import json as _json
cfg = {
"name": "worker",
"system_prompt": "You are a worker.",
"goal": {"description": "do stuff", "success_criteria": [], "constraints": []},
"tools": [],
}
if with_input_data is not None:
cfg["input_data"] = with_input_data
path.write_text(_json.dumps(cfg, indent=2))
def test_ensure_progress_db_patches_existing_worker_json(tmp_path: Path) -> None:
"""Pre-existing worker.json without input_data gets db_path injected."""
import json as _json
colony = tmp_path / "legacy_colony"
colony.mkdir()
_write_worker_cfg(colony / "worker.json")
# Before: no input_data
before = _json.loads((colony / "worker.json").read_text())
assert "input_data" not in before
db = ensure_progress_db(colony)
after = _json.loads((colony / "worker.json").read_text())
assert after["input_data"]["db_path"] == str(db)
assert after["input_data"]["colony_id"] == "legacy_colony"
# Other fields untouched
assert after["system_prompt"] == "You are a worker."
assert after["goal"]["description"] == "do stuff"
def test_ensure_progress_db_patch_is_idempotent(tmp_path: Path) -> None:
"""Second call must not rewrite the file (mtime unchanged)."""
import time as _time
colony = tmp_path / "idem"
colony.mkdir()
_write_worker_cfg(colony / "worker.json")
ensure_progress_db(colony)
mtime1 = (colony / "worker.json").stat().st_mtime
_time.sleep(0.02) # ensure any rewrite would bump mtime
ensure_progress_db(colony)
mtime2 = (colony / "worker.json").stat().st_mtime
assert mtime1 == mtime2, "second ensure_progress_db must not rewrite worker.json"
def test_ensure_progress_db_preserves_existing_input_data_keys(tmp_path: Path) -> None:
"""Pre-existing input_data keys (other than db_path/colony_id) are preserved."""
import json as _json
colony = tmp_path / "preserved"
colony.mkdir()
_write_worker_cfg(
colony / "worker.json",
with_input_data={"custom_key": "hello", "db_path": "/stale/path.db"},
)
db = ensure_progress_db(colony)
after = _json.loads((colony / "worker.json").read_text())
assert after["input_data"]["custom_key"] == "hello"
assert after["input_data"]["db_path"] == str(db)
assert after["input_data"]["colony_id"] == "preserved"
def test_ensure_progress_db_skips_metadata_and_triggers(tmp_path: Path) -> None:
"""metadata.json and triggers.json are not worker configs — must not be touched."""
import json as _json
colony = tmp_path / "guarded"
colony.mkdir()
(colony / "metadata.json").write_text(_json.dumps({"colony_name": "guarded"}))
(colony / "triggers.json").write_text(_json.dumps([{"id": "t1"}]))
_write_worker_cfg(colony / "worker.json")
ensure_progress_db(colony)
meta = _json.loads((colony / "metadata.json").read_text())
trig = _json.loads((colony / "triggers.json").read_text())
assert "input_data" not in meta
assert trig == [{"id": "t1"}]
worker = _json.loads((colony / "worker.json").read_text())
assert "input_data" in worker
def test_task_delete_cascades_to_steps_and_sop(tmp_path: Path) -> None:
db = ensure_progress_db(tmp_path / "c")
[tid] = seed_tasks(
db,
[
{
"goal": "cascade test",
"steps": [{"title": "a"}, {"title": "b"}],
"sop_items": [{"key": "k", "description": "d"}],
}
],
)
con = sqlite3.connect(str(db), isolation_level=None)
try:
con.execute("PRAGMA foreign_keys = ON")
con.execute("DELETE FROM tasks WHERE id=?", (tid,))
assert con.execute(
"SELECT count(*) FROM steps WHERE task_id=?", (tid,)
).fetchone()[0] == 0
assert con.execute(
"SELECT count(*) FROM sop_checklist WHERE task_id=?", (tid,)
).fetchone()[0] == 0
finally:
con.close()
+15
View File
@@ -0,0 +1,15 @@
import json
try:
with open('data/linkedin_ledger.json', 'r') as f:
data = json.load(f)
profiles = data.get('messaged_profiles', [])
for p in profiles:
if 'variant' not in p:
p['variant'] = 'Control' # Retroactively label our first runs
with open('data/linkedin_ledger.json', 'w') as f:
json.dump({"messaged_profiles": profiles}, f, indent=2)
except Exception as e:
print(f"Error: {e}")