feat: scheduler tools for incubating

This commit is contained in:
Richard Tang
2026-04-19 19:30:31 -07:00
parent 0907ff9cec
commit 67d55e6cce
3 changed files with 195 additions and 219 deletions
+41 -43
View File
@@ -63,12 +63,9 @@ _QUEEN_INCUBATING_TOOLS = [
"list_directory",
"search_files",
"run_command",
# Trigger management: phase-branched in queen_lifecycle_tools.py to
# buffer drafts onto phase_state.pending_triggers, then drained into
# the colony's triggers.json by create_colony post-fork.
"set_trigger",
"remove_trigger",
"list_triggers",
# Schedule lives on the colony, not on the queen session — pass it
# inline as create_colony(triggers=[...]) instead of staging through
# set_trigger here.
"create_colony",
"cancel_incubation",
]
@@ -91,10 +88,6 @@ _QUEEN_WORKING_TOOLS = [
"stop_worker",
# Fan out more tasks while workers are still running
"run_parallel_workers",
# Trigger management
"set_trigger",
"remove_trigger",
"list_triggers",
]
# Reviewing phase: workers have finished. Queen summarises results,
@@ -184,6 +177,14 @@ If you realise mid-incubation that the spec isn't ready (user changed \
their mind, you're missing more than a couple of details, the work \
turned out to be one-shot after all), call ``cancel_incubation`` \
no harm, you go back to INDEPENDENT and can retry later.
If the user explicitly asks for something UNRELATED to the current \
colony being drafted (a side question, a one-shot task, a different \
problem), don't try to handle it from this limited tool surface. Call \
``cancel_incubation`` first to switch back to INDEPENDENT where you \
have the full toolkit, handle their request there, and re-enter \
INCUBATING later via ``start_incubating_colony`` when they want to \
resume the colony spec.
"""
_queen_role_working = """\
@@ -201,9 +202,10 @@ What you DO in this phase:
- Intervene when a worker is clearly off course (inject_message) or \
needs to stop (stop_worker).
- Make SPEC-COMPATIBLE adjustments when the user asks fan out MORE \
of the same work (run_parallel_workers), or tweak the schedule \
(set_trigger / remove_trigger / list_triggers). These are tweaks to \
the spec the user already approved, not redesigns.
of the same work (run_parallel_workers). This is a tweak to the spec \
the user already approved, not a redesign. Scheduled / recurring \
work belongs to a colony; if the user wants to add or change a \
schedule, that's a new colony.
What you DO NOT do in this phase:
- Redesign the colony. If the user asks for something fundamentally \
@@ -269,24 +271,6 @@ work. Available:
details before you commit (e.g. peek at an existing skill in \
~/.hive/skills/, sanity-check an API URL).
## Schedule the colony (drafted now, written on commit)
- set_trigger(trigger_id, trigger_type, trigger_config, task) Stage a \
schedule for the colony. Validated immediately (cron syntax / interval \
shape) but NOT activated yet buffered onto the incubation context \
and written to ``triggers.json`` by create_colony so it auto-starts on \
first colony load. Repeated calls with the same trigger_id replace \
the prior draft.
- list_triggers() Inspect the buffered drafts.
- remove_trigger(trigger_id) Drop a draft from the buffer.
- ``trigger_type`` is "timer" (with ``trigger_config={"cron": "..."}`` \
or ``{"interval_minutes": N}``) or "webhook" (with \
``trigger_config={"path": "/hooks/..."}``).
- ``task`` describes what the worker should do when the trigger fires; \
this is REQUIRED a trigger with no task can't fire usefully. It's \
separate from create_colony's ``task`` argument because the trigger \
task is one-shot per fire, while create_colony's ``task`` is the \
worker's overall purpose.
## Approved → operational checklist (use your judgement, ask only what's missing)
The conversation that got you here probably did NOT cover all of:
- Concurrency: how many tasks should run in parallel? Single-fire?
@@ -306,11 +290,12 @@ for gaps that need a real answer; skip the rest.
## Commit
- create_colony(colony_name, task, skill_name, skill_description, \
skill_body, skill_files?, tasks?, concurrency_hint?) Fork this \
session into the colony. **Atomic call pass the skill INLINE.** Do \
NOT write SKILL.md with write_file beforehand; this tool materialises \
the folder for you and then forks. Reusing an existing skill_name \
within the colony replaces that skill with your latest content.
skill_body, skill_files?, tasks?, concurrency_hint?, triggers?) \
Fork this session into the colony. **Atomic call pass the skill \
AND the schedule INLINE.** Do NOT write SKILL.md with write_file \
beforehand; this tool materialises the folder for you and then \
forks. Reusing an existing skill_name within the colony replaces \
that skill with your latest content.
- The ``task`` must be FULL and self-contained the worker has zero \
memory of THIS chat at run time.
- The ``skill_body`` must be FULL and self-contained capture the \
@@ -320,9 +305,18 @@ for gaps that need a real answer; skip the rest.
many worker processes typically run in parallel for this colony \
(e.g. 1 for "send digest", 5 for a fan-out). Baked into worker.json \
for the future colony queen to consult; not enforced.
- Any triggers you staged with ``set_trigger`` during this incubation \
are written to ``triggers.json`` as part of the commit and \
auto-start on first colony load.
- ``triggers`` (optional array) the colony's schedule, written \
inline to ``triggers.json`` and auto-started on first colony load. \
Pass this when the work is recurring / event-driven; omit for \
colonies the user will run by clicking start. Each entry: \
``{id, trigger_type, trigger_config, task}`` where trigger_type is \
"timer" (config ``{cron: "0 9 * * *"}`` or ``{interval_minutes: N}``) \
or "webhook" (config ``{path: "/hooks/..."}``). Each entry's \
``task`` is what the worker does when THAT trigger fires separate \
from the colony-wide ``task`` argument, which is the worker's \
overall purpose. Validated up front a bad cron, missing task, or \
malformed webhook path fails the call before anything is written, \
so you can retry with corrected input.
- After this returns, the chat is over: the session locks immediately \
and the user gets a "compact and start a new session with you" \
button. So make your call to create_colony the last thing you do \
@@ -334,6 +328,11 @@ for gaps that need a real answer; skip the rest.
changed their mind, you discovered the work is actually one-shot, \
more than a couple of details still need to be worked out). Returns \
you to INDEPENDENT with the full toolkit; no fork happens.
- Also call cancel_incubation() if the user explicitly pivots to \
something UNRELATED to this colony (side question, one-shot ask, \
different problem). You can't serve that from this narrow toolkit — \
drop back to INDEPENDENT, handle it, then re-enter incubation via \
start_incubating_colony when they're ready to resume the spec.
"""
_queen_tools_working = """
@@ -358,10 +357,9 @@ operational, not editorial.
- run_parallel_workers(tasks, timeout?) Fan out MORE of the same \
work. Use when the user wants additional units of an already-defined \
job, NOT for new scope. Each task string must be fully self-contained.
- set_trigger / remove_trigger / list_triggers Tweak the schedule \
the user already approved during incubation. Adding a follow-up \
trigger for the same colony is fine; redesigning the colony's \
purpose is not.
- Scheduled / recurring work belongs to a colony, not this session. \
If the user wants to add or change a schedule, that's a new colony \
born from a fresh chat via start_incubating_colony.
## Read-only inspection
- read_file, list_directory, search_files, run_command
+152 -176
View File
@@ -156,15 +156,6 @@ class QueenPhaseState:
# without having to remember it from the tool result.
incubating_colony_name: str | None = None
incubating_intended_purpose: str | None = None
# Trigger configs the queen drafted during incubation. set_trigger /
# remove_trigger / list_triggers buffer here when phase=="incubating"
# because the colony directory + worker runtime don't exist yet —
# there's nothing live to attach a timer/webhook to. ``create_colony``
# drains this list into ``{colony_dir}/triggers.json`` after the fork
# so the triggers auto-start the next time the colony is loaded.
# Cleared on switch_to_independent so failed/cancelled incubations
# don't leak into a future incubation attempt.
pending_triggers: list = field(default_factory=list) # list[dict]
# Default skill operational protocols — appended to every phase prompt
protocols_prompt: str = ""
@@ -310,13 +301,8 @@ class QueenPhaseState:
return
self.phase = "independent"
# Clear stale incubation context so a future incubation starts fresh.
# This includes the trigger buffer — a cancelled or successful
# incubation should not leak its drafted triggers into the next
# attempt (successful create_colony already drained them; cancel
# never committed them).
self.incubating_colony_name = None
self.incubating_intended_purpose = None
self.pending_triggers = []
tool_names = [t.name for t in self.independent_tools]
logger.info("Queen phase → independent (source=%s, tools: %s)", source, tool_names)
await self._emit_phase_event()
@@ -1434,6 +1420,80 @@ def register_queen_lifecycle_tools(
return target, None, replaced
def _validate_triggers(raw: Any) -> tuple[list[dict] | None, str | None]:
"""Validate and normalize the ``triggers`` argument for create_colony.
Mirrors the per-type validation that ``set_trigger`` applied when it
buffered drafts during incubation. Returns (normalized_list, error).
On success error is None. Empty / missing input yields ([], None).
"""
if raw is None:
return [], None
if not isinstance(raw, list):
return None, "triggers must be an array"
normalized: list[dict] = []
seen_ids: set[str] = set()
for idx, entry in enumerate(raw):
if not isinstance(entry, dict):
return None, f"triggers[{idx}] must be an object"
tid = (entry.get("id") or "").strip() if isinstance(entry.get("id"), str) else ""
if not tid:
return None, f"triggers[{idx}] missing non-empty 'id'"
if tid in seen_ids:
return None, f"triggers[{idx}] duplicate id '{tid}'"
seen_ids.add(tid)
t_type = entry.get("trigger_type")
if t_type not in ("timer", "webhook"):
return None, f"triggers[{idx}] trigger_type must be 'timer' or 'webhook' (got {t_type!r})"
t_config = entry.get("trigger_config") or {}
if not isinstance(t_config, dict):
return None, f"triggers[{idx}] trigger_config must be an object"
task_str = entry.get("task")
if not isinstance(task_str, str) or not task_str.strip():
return None, (
f"triggers[{idx}] ('{tid}') needs a non-empty 'task' "
"— what the worker should do when this trigger fires"
)
if t_type == "timer":
cron_expr = t_config.get("cron")
interval = t_config.get("interval_minutes")
if cron_expr:
try:
from croniter import croniter
if not croniter.is_valid(cron_expr):
return None, f"triggers[{idx}] ('{tid}') invalid cron expression: {cron_expr}"
except ImportError:
return None, (
f"triggers[{idx}] ('{tid}') croniter package not installed — "
"cannot validate cron expression."
)
elif interval is not None:
if not isinstance(interval, (int, float)) or interval <= 0:
return None, f"triggers[{idx}] ('{tid}') interval_minutes must be > 0, got {interval}"
else:
return None, (
f"triggers[{idx}] ('{tid}') timer trigger needs 'cron' or "
"'interval_minutes' in trigger_config."
)
else: # webhook
path = (t_config.get("path") or "").strip() if isinstance(t_config.get("path"), str) else ""
if not path or not path.startswith("/"):
return None, (
f"triggers[{idx}] ('{tid}') webhook trigger requires 'path' "
"starting with '/' in trigger_config (e.g. '/hooks/github')."
)
normalized.append(
{
"id": tid,
"trigger_type": t_type,
"trigger_config": t_config,
"task": task_str.strip(),
"name": entry.get("name") if isinstance(entry.get("name"), str) and entry.get("name").strip() else tid,
}
)
return normalized, None
async def create_colony(
*,
colony_name: str,
@@ -1444,6 +1504,7 @@ def register_queen_lifecycle_tools(
skill_files: list[dict] | None = None,
tasks: list[dict] | None = None,
concurrency_hint: int | None = None,
triggers: list[dict] | None = None,
) -> str:
"""Create a colony and materialize its skill folder in one atomic call.
@@ -1476,6 +1537,22 @@ def register_queen_lifecycle_tools(
{"error": ("colony_name must be lowercase alphanumeric with underscores (e.g. 'honeycomb_research').")}
)
# Validate triggers up front so a bad cron / webhook path fails fast,
# before we materialize the skill folder or fork the session.
validated_triggers, trig_err = _validate_triggers(triggers)
if trig_err is not None:
return json.dumps(
{
"error": trig_err,
"hint": (
"Each trigger needs id, trigger_type ('timer' or "
"'webhook'), trigger_config, and task. Timer: "
"{cron: '...'} or {interval_minutes: N}. Webhook: "
"{path: '/hooks/...'}."
),
}
)
# Pre-create the colony dir so the skill can be materialized
# INSIDE it (project scope, colony-local). fork_session_into_colony
# keys "is_new" off worker.json rather than the dir itself, so
@@ -1590,6 +1667,28 @@ def register_queen_lifecycle_tools(
exc_info=True,
)
# Write triggers.json from the validated arg so the colony's
# timers/webhooks auto-start when session_manager loads the colony.
# Runs regardless of phase — if a colony is re-created with the
# same name the triggers list is the authoritative new schedule.
if validated_triggers:
triggers_path = colony_dir / "triggers.json"
try:
triggers_path.write_text(
json.dumps(validated_triggers, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
logger.info(
"create_colony: wrote %d trigger(s) to %s",
len(validated_triggers),
triggers_path,
)
except OSError:
logger.warning(
"create_colony: failed to write triggers.json",
exc_info=True,
)
# When the queen forked from INCUBATING phase, the chat is over by
# design: the colony spec is committed and there's nothing left to
# discuss in this DM. Auto-lock the session immediately (same
@@ -1599,28 +1698,6 @@ def register_queen_lifecycle_tools(
# with the "compact and start a new session" UX.
phase_state = getattr(session, "phase_state", None)
if phase_state is not None and phase_state.phase == "incubating":
# Drain buffered triggers into the colony's triggers.json so they
# auto-start when the colony is first loaded (session_manager
# reads triggers.json on colony load and starts each timer /
# webhook). Do this BEFORE switch_to_independent clears the
# buffer.
if phase_state.pending_triggers:
triggers_path = colony_dir / "triggers.json"
try:
triggers_path.write_text(
json.dumps(phase_state.pending_triggers, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
logger.info(
"create_colony: wrote %d buffered trigger(s) to %s",
len(phase_state.pending_triggers),
triggers_path,
)
except OSError:
logger.warning(
"create_colony: failed to write triggers.json from buffer",
exc_info=True,
)
try:
from framework.server.routes_execution import (
persist_colony_spawn_lock,
@@ -1867,6 +1944,42 @@ def register_queen_lifecycle_tools(
),
"minimum": 1,
},
"triggers": {
"type": "array",
"description": (
"Optional schedule for the colony — written to "
"{colony_dir}/triggers.json and auto-started on "
"first colony load. Use this when the user wants "
"the colony to fire on a cron, every N minutes, "
"or on an incoming webhook; omit for colonies "
"that run once when the user clicks start. Each "
"entry: id (unique string), trigger_type "
"('timer' or 'webhook'), trigger_config (timer: "
"{cron: '0 9 * * *'} or {interval_minutes: N}; "
"webhook: {path: '/hooks/...'}), task (what the "
"worker should do when this trigger fires — "
"required, separate from the colony-wide task "
"because a trigger's task is one-shot per fire). "
"Validated up front — a bad cron, missing task, "
"or malformed webhook path fails the call before "
"anything is written. Scheduling lives on the "
"colony, not on the queen session."
),
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"trigger_type": {
"type": "string",
"enum": ["timer", "webhook"],
},
"trigger_config": {"type": "object"},
"task": {"type": "string"},
"name": {"type": "string"},
},
"required": ["id", "trigger_type", "trigger_config", "task"],
},
},
},
"required": [
"colony_name",
@@ -4330,103 +4443,7 @@ def register_queen_lifecycle_tools(
trigger_config: dict | None = None,
task: str | None = None,
) -> str:
"""Activate a trigger so it fires periodically into the queen.
Phase-branched: in INCUBATING the trigger spec is buffered onto
``phase_state.pending_triggers`` because the colony directory +
worker runtime don't exist yet. ``create_colony`` drains the
buffer into ``{colony_dir}/triggers.json`` post-fork so the
triggers auto-start on first colony load. In WORKING / REVIEWING
the live behaviour (start timer/webhook + persist) runs as before.
"""
# ── Incubating branch: buffer instead of activate ────────────
phase_state = getattr(session, "phase_state", None)
if phase_state is not None and phase_state.phase == "incubating":
t_type = trigger_type
t_config = trigger_config or {}
if not t_type:
return json.dumps(
{
"error": (
"trigger_type is required during incubation — "
"no existing trigger to inherit from yet."
)
}
)
if not task:
return json.dumps(
{
"error": (
f"Trigger '{trigger_id}' needs a task — "
"describe what the worker should do when this "
"trigger fires."
)
}
)
# Mirror the live-path validation so a bad cron / interval is
# caught at draft time, not on first colony load.
if t_type == "timer":
cron_expr = t_config.get("cron")
interval = t_config.get("interval_minutes")
if cron_expr:
try:
from croniter import croniter
if not croniter.is_valid(cron_expr):
return json.dumps({"error": f"Invalid cron expression: {cron_expr}"})
except ImportError:
return json.dumps(
{"error": "croniter package not installed — cannot validate cron expression."}
)
elif interval:
if not isinstance(interval, (int, float)) or interval <= 0:
return json.dumps({"error": f"interval_minutes must be > 0, got {interval}"})
else:
return json.dumps(
{"error": "Timer trigger needs 'cron' or 'interval_minutes' in trigger_config."}
)
elif t_type == "webhook":
path = (t_config.get("path") or "").strip()
if not path or not path.startswith("/"):
return json.dumps(
{
"error": (
"Webhook trigger requires 'path' starting "
"with '/' in trigger_config (e.g. '/hooks/github')."
)
}
)
else:
return json.dumps({"error": f"Unsupported trigger type: {t_type}"})
# Replace any prior buffered entry with the same id so the
# queen can iteratively refine without duplicates.
phase_state.pending_triggers = [
t for t in phase_state.pending_triggers if t.get("id") != trigger_id
]
phase_state.pending_triggers.append(
{
"id": trigger_id,
"trigger_type": t_type,
"trigger_config": t_config,
"task": task,
"name": trigger_id,
}
)
return json.dumps(
{
"status": "buffered",
"trigger_id": trigger_id,
"trigger_type": t_type,
"trigger_config": t_config,
"note": (
"Buffered for incubation. Will be written to "
"triggers.json when create_colony commits the spec, "
"and auto-start on first colony load."
),
}
)
"""Activate a trigger so it fires periodically into the queen."""
if trigger_id in getattr(session, "active_trigger_ids", set()):
return json.dumps({"error": f"Trigger '{trigger_id}' is already active."})
@@ -4635,29 +4652,7 @@ def register_queen_lifecycle_tools(
# --- remove_trigger --------------------------------------------------------
async def remove_trigger(trigger_id: str) -> str:
"""Deactivate an active trigger.
Phase-branched: in INCUBATING, removes the buffered draft instead
of touching live runtime state.
"""
# ── Incubating branch: drop from buffer ──────────────────────
phase_state = getattr(session, "phase_state", None)
if phase_state is not None and phase_state.phase == "incubating":
before = len(phase_state.pending_triggers)
phase_state.pending_triggers = [
t for t in phase_state.pending_triggers if t.get("id") != trigger_id
]
if len(phase_state.pending_triggers) == before:
return json.dumps(
{
"error": (
f"Trigger '{trigger_id}' was not in the "
"incubation buffer."
)
}
)
return json.dumps({"status": "removed_from_buffer", "trigger_id": trigger_id})
"""Deactivate an active trigger."""
if trigger_id not in getattr(session, "active_trigger_ids", set()):
return json.dumps({"error": f"Trigger '{trigger_id}' is not active."})
@@ -4723,26 +4718,7 @@ def register_queen_lifecycle_tools(
# --- list_triggers ---------------------------------------------------------
async def list_triggers() -> str:
"""List all available triggers and their status.
Phase-branched: in INCUBATING, returns the buffered drafts that
will be written to triggers.json when create_colony commits.
"""
# ── Incubating branch: return buffered drafts ────────────────
phase_state = getattr(session, "phase_state", None)
if phase_state is not None and phase_state.phase == "incubating":
return json.dumps(
{
"triggers": list(phase_state.pending_triggers),
"buffered": True,
"note": (
"These are draft triggers staged for the colony "
"being incubated. They are written to triggers.json "
"when create_colony commits the spec."
),
}
)
"""List all available triggers and their status."""
available = getattr(session, "available_triggers", {})
triggers = []
for tdef in available.values():
+2
View File
@@ -86,6 +86,7 @@ def patched_fork(monkeypatch):
colony_name: str,
task: str,
tasks: list[dict] | None = None,
concurrency_hint: int | None = None,
) -> dict:
calls.append(
{
@@ -93,6 +94,7 @@ def patched_fork(monkeypatch):
"colony_name": colony_name,
"task": task,
"tasks": tasks,
"concurrency_hint": concurrency_hint,
}
)
return {