Compare commits

...

10 Commits

Author SHA1 Message Date
bryan cba0ec110f fix: linter update 2026-03-08 19:37:57 -07:00
bryan 0256e0c944 Merge branch 'main' into feat/agent-trigger 2026-03-08 19:28:36 -07:00
bryan 4d9d0362a0 fixes to make the timer trigger properly 2026-03-08 18:44:42 -07:00
bryan f474d0bc8e Merge branch 'main' into feat/agent-trigger 2026-03-08 16:59:14 -07:00
bryan 6a0681b9aa feat: fixing phase 4, continuing to test 2026-03-08 16:52:00 -07:00
bryan c7e634851b feat: phase 4 of trigger plan 2026-03-06 19:21:32 -08:00
bryan cdb7155960 feat: phase 3 of trigger plan 2026-03-06 18:07:26 -08:00
bryan 3f7790c26a feat: phase 2 of trigger plan 2026-03-06 17:22:57 -08:00
bryan 5676b115f4 Merge branch 'feat/queen-responsibility' into feat/agent-trigger 2026-03-06 16:58:06 -08:00
bryan 61c59d57e8 feat: phase 1 of trigger plan 2026-03-06 15:11:36 -08:00
20 changed files with 1494 additions and 48 deletions
@@ -84,6 +84,10 @@ _QUEEN_STAGING_TOOLS = [
# Launch or go back
"run_agent_with_input",
"stop_worker_and_edit",
# Trigger management
"set_trigger",
"remove_trigger",
"list_triggers",
]
# Running phase: worker is executing — monitor and control.
@@ -99,10 +103,15 @@ _QUEEN_RUNNING_TOOLS = [
"stop_worker",
"stop_worker_and_edit",
"get_worker_status",
"run_agent_with_input",
"inject_worker_message",
# Monitoring
"get_worker_health_summary",
"notify_operator",
# Trigger management
"set_trigger",
"remove_trigger",
"list_triggers",
]
@@ -477,6 +486,9 @@ The agent is loaded and ready to run. You can inspect it and launch it:
- get_worker_status(focus?) Brief status. Drill in with focus: memory, tools, issues, progress
- run_agent_with_input(task) Start the worker and switch to RUNNING phase
- stop_worker_and_edit() Go back to BUILDING phase
- set_trigger(trigger_id, trigger_type?, trigger_config?) Activate a trigger (timer)
- remove_trigger(trigger_id) Deactivate a trigger
- list_triggers() List all triggers and their active/inactive status
You do NOT have write tools. If you need to modify the agent, \
call stop_worker_and_edit() to go back to BUILDING phase.
@@ -493,6 +505,9 @@ The worker is running. You have monitoring and lifecycle tools:
- notify_operator(ticket_id, analysis, urgency) Alert the user (use sparingly)
- stop_worker() Stop the worker and return to STAGING phase, then ask the user what to do next
- stop_worker_and_edit() Stop the worker and switch back to BUILDING phase
- set_trigger(trigger_id, trigger_type?, trigger_config?) Activate a trigger (timer)
- remove_trigger(trigger_id) Deactivate a trigger
- list_triggers() List all triggers and their active/inactive status
You do NOT have write tools or agent construction tools. \
If you need to modify the agent, call stop_worker_and_edit() to switch back \
@@ -643,6 +658,33 @@ When the user asks to change, modify, or update the loaded worker \
1. Call stop_worker_and_edit() this stops the worker and gives you \
coding tools (switches to BUILDING phase).
## Trigger Management
Use list_triggers() to see available triggers from the loaded worker.
Use set_trigger(trigger_id) to activate a timer. Once active, triggers \
fire periodically and inject [TRIGGER: ...] messages so you can decide \
whether to call run_agent_with_input(task).
### When the user says "Enable trigger <id>" (or clicks Enable in the UI):
1. Call get_worker_status(focus="memory") to check if the worker has \
saved configuration (rules, preferences, settings from a prior run).
2. If memory contains saved config: compose a task string from it \
(e.g. "Process inbox emails using saved rules") and call \
set_trigger(trigger_id, task="...") immediately. Tell the user the \
trigger is now active and what schedule it uses. Do NOT ask them to \
provide the task you derive it from memory.
3. If memory is empty (no prior run): tell the user the agent needs to \
run once first so its configuration can be saved. Offer to run it now. \
Once the worker finishes, enable the trigger.
4. If the user just provided config this session (rules/task context \
already in conversation): use that directly, no memory lookup needed. \
Enable the trigger immediately.
Never ask "what should the task be?" when enabling a trigger for an \
agent with a clear purpose. The task string is a brief description of \
what the worker does, derived from its saved state or your current context.
"""
# -- RUNNING phase behavior --
@@ -733,6 +775,21 @@ When the user asks to change, modify, or update the loaded worker \
1. Call stop_worker_and_edit() this stops the worker and gives you \
coding tools (switches to BUILDING phase).
## Trigger Handling
You will receive [TRIGGER: ...] messages when a scheduled timer fires. \
These are framework-level signals, not user messages.
Rules:
- Check get_worker_status() before calling run_agent_with_input(task). If the worker \
is already RUNNING, decide: skip this trigger, or note it for after completion.
- When multiple [TRIGGER] messages arrive at once, read them all before acting. \
Batch your response do not call run_agent_with_input() once per trigger.
- If a trigger fires but the task no longer makes sense (e.g., user changed \
config since last run), skip it and inform the user.
- Never disable a trigger without telling the user. Use remove_trigger() only \
when explicitly asked or when the trigger is clearly obsolete.
"""
# -- Backward-compatible composed versions (used by queen_node.system_prompt default) --
+9 -1
View File
@@ -322,7 +322,11 @@ class AsyncEntryPointSpec(BaseModel):
id: str = Field(description="Unique identifier for this entry point")
name: str = Field(description="Human-readable name")
entry_node: str = Field(description="Node ID to start execution from")
entry_node: str = Field(
default="",
description="Deprecated: Node ID to start execution from. "
"Triggers are graph-level; worker always enters at GraphSpec.entry_node.",
)
trigger_type: str = Field(
default="manual",
description="How this entry point is triggered: webhook, api, timer, event, manual",
@@ -331,6 +335,10 @@ class AsyncEntryPointSpec(BaseModel):
default_factory=dict,
description="Trigger-specific configuration (e.g., webhook URL, timer interval)",
)
task: str = Field(
default="",
description="Worker task string when this trigger fires autonomously",
)
isolation_level: str = Field(
default="shared", description="State isolation: isolated, shared, or synchronized"
)
+58 -3
View File
@@ -36,6 +36,21 @@ from framework.runtime.llm_debug_logger import log_llm_turn
logger = logging.getLogger(__name__)
@dataclass
class TriggerEvent:
"""A framework-level trigger signal (timer tick or webhook hit).
Triggers are queued separately from user messages / external events
and drained atomically so the LLM sees all pending triggers at once.
"""
trigger_type: str # "timer" | "webhook"
source_id: str # entry point ID or webhook route ID
payload: dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
# Pattern for detecting context-window-exceeded errors across LLM providers.
_CONTEXT_TOO_LARGE_RE = re.compile(
r"context.{0,20}(length|window|limit|size)|"
@@ -342,6 +357,7 @@ class EventLoopNode(NodeProtocol):
self._tool_executor = tool_executor
self._conversation_store = conversation_store
self._injection_queue: asyncio.Queue[tuple[str, bool]] = asyncio.Queue()
self._trigger_queue: asyncio.Queue[TriggerEvent] = asyncio.Queue()
# Client-facing input blocking state
self._input_ready = asyncio.Event()
self._awaiting_input = False
@@ -615,6 +631,8 @@ class EventLoopNode(NodeProtocol):
# 6b. Drain injection queue
await self._drain_injection_queue(conversation)
# 6b1. Drain trigger queue (framework-level signals)
await self._drain_trigger_queue(conversation)
# 6b2. Dynamic tool refresh (mode switching)
if ctx.dynamic_tools_provider is not None:
@@ -1692,6 +1710,15 @@ class EventLoopNode(NodeProtocol):
await self._injection_queue.put((content, is_client_input))
self._input_ready.set()
async def inject_trigger(self, trigger: TriggerEvent) -> None:
"""Inject a framework-level trigger into the running queen loop.
Triggers are queued separately from user messages and drained
atomically via _drain_trigger_queue().
"""
await self._trigger_queue.put(trigger)
self._input_ready.set()
def signal_shutdown(self) -> None:
"""Signal the node to exit its loop cleanly.
@@ -1737,9 +1764,9 @@ class EventLoopNode(NodeProtocol):
Returns True if input arrived, False if shutdown was signaled.
"""
# If messages arrived while the LLM was processing, skip blocking
# entirely — the next _drain_injection_queue() will pick them up.
if not self._injection_queue.empty():
# If messages or triggers arrived while the LLM was processing, skip
# blocking — the next drain pass will pick them up.
if not self._injection_queue.empty() or not self._trigger_queue.empty():
return True
# Clear BEFORE emitting so that synchronous handlers (e.g. the
@@ -3843,6 +3870,34 @@ class EventLoopNode(NodeProtocol):
break
return count
async def _drain_trigger_queue(self, conversation: NodeConversation) -> int:
"""Drain all pending trigger events as a single batched user message.
Multiple triggers are merged so the LLM sees them atomically and can
reason about all pending triggers before acting.
"""
triggers: list[TriggerEvent] = []
while not self._trigger_queue.empty():
try:
triggers.append(self._trigger_queue.get_nowait())
except asyncio.QueueEmpty:
break
if not triggers:
return 0
parts: list[str] = []
for t in triggers:
task = t.payload.get("task", "")
task_line = f"\nTask: {task}" if task else ""
payload_str = json.dumps(t.payload, default=str)
parts.append(f"[TRIGGER: {t.trigger_type}/{t.source_id}]{task_line}\n{payload_str}")
combined = "\n\n".join(parts)
logger.info("[drain] %d trigger(s): %s", len(triggers), combined[:200])
await conversation.add_user_message(combined)
return len(triggers)
async def _check_pause(
self,
ctx: NodeContext,
+7
View File
@@ -146,6 +146,13 @@ class EventType(StrEnum):
# Subagent reports (one-way progress updates from sub-agents)
SUBAGENT_REPORT = "subagent_report"
# Trigger lifecycle (queen-level triggers / heartbeats)
TRIGGER_AVAILABLE = "trigger_available"
TRIGGER_ACTIVATED = "trigger_activated"
TRIGGER_DEACTIVATED = "trigger_deactivated"
TRIGGER_FIRED = "trigger_fired"
TRIGGER_REMOVED = "trigger_removed"
@dataclass
class AgentEvent:
@@ -423,6 +423,30 @@ class ExecutionStream:
return True
return False
async def inject_trigger(
self,
node_id: str,
trigger: Any,
) -> bool:
"""Inject a trigger event into a running queen EventLoopNode.
Searches active executors for a node matching ``node_id`` and calls
its ``inject_trigger()`` method to wake the queen.
Args:
node_id: The queen EventLoopNode ID.
trigger: A ``TriggerEvent`` instance (typed as Any to avoid
circular imports with graph layer).
Returns True if the trigger was delivered, False otherwise.
"""
for executor in self._active_executors.values():
node = executor.node_registry.get(node_id)
if node is not None and hasattr(node, "inject_trigger"):
await node.inject_trigger(trigger)
return True
return False
async def execute(
self,
input_data: dict[str, Any],
+22
View File
@@ -0,0 +1,22 @@
"""Trigger definitions for queen-level heartbeats (timers, webhooks)."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass
class TriggerDefinition:
"""A registered trigger that can be activated on the queen runtime.
Trigger *definitions* come from the worker's ``async_entry_points``.
Activation state is per-session (persisted in ``SessionState.active_triggers``).
"""
id: str
trigger_type: str # "timer" | "webhook"
trigger_config: dict[str, Any] = field(default_factory=dict)
description: str = ""
task: str = ""
active: bool = False
+7
View File
@@ -141,6 +141,13 @@ class SessionState(BaseModel):
checkpoint_enabled: bool = False
latest_checkpoint_id: str | None = None
# Trigger activation state (IDs of triggers the queen/user turned on)
active_triggers: list[str] = Field(default_factory=list)
# Per-trigger task strings (user overrides, keyed by trigger ID)
trigger_tasks: dict[str, str] = Field(default_factory=dict)
# True after first successful worker execution (gates trigger delivery on restart)
worker_configured: bool = Field(default=False)
model_config = {"extra": "allow"}
@computed_field
+5
View File
@@ -40,6 +40,11 @@ DEFAULT_EVENT_TYPES = [
EventType.CREDENTIALS_REQUIRED,
EventType.SUBAGENT_REPORT,
EventType.QUEEN_PHASE_CHANGED,
EventType.TRIGGER_AVAILABLE,
EventType.TRIGGER_ACTIVATED,
EventType.TRIGGER_DEACTIVATED,
EventType.TRIGGER_FIRED,
EventType.TRIGGER_REMOVED,
]
# Keepalive interval in seconds
+2 -2
View File
@@ -357,8 +357,8 @@ async def handle_pause(request: web.Request) -> web.Response:
runtime.pause_timers()
# Switch to staging (agent still loaded, ready to re-run)
if session.mode_state is not None:
await session.mode_state.switch_to_staging(source="frontend")
if session.phase_state is not None:
await session.phase_state.switch_to_staging(source="frontend")
return web.json_response(
{
+15
View File
@@ -2,6 +2,7 @@
import json
import logging
import time
from aiohttp import web
@@ -116,6 +117,20 @@ async def handle_list_nodes(request: web.Request) -> web.Response:
}
for ep in reg.entry_points.values()
]
# Append hoisted triggers (stripped from worker runtime, stored on session)
for t in getattr(session, "available_triggers", {}).values():
entry = {
"id": t.id,
"name": t.description or t.id,
"entry_node": graph.entry_node,
"trigger_type": t.trigger_type,
"trigger_config": t.trigger_config,
"task": t.task,
}
mono = getattr(session, "trigger_next_fire", {}).get(t.id)
if mono is not None:
entry["next_fire_in"] = max(0.0, mono - time.monotonic())
entry_points.append(entry)
return web.json_response(
{
"nodes": nodes,
+91 -15
View File
@@ -9,6 +9,7 @@ Session-primary routes:
- DELETE /api/sessions/{session_id}/worker unload worker from session
- GET /api/sessions/{session_id}/stats runtime statistics
- GET /api/sessions/{session_id}/entry-points list entry points
- PATCH /api/sessions/{session_id}/triggers/{id} update trigger task
- GET /api/sessions/{session_id}/graphs list graph IDs
- GET /api/sessions/{session_id}/queen-messages queen conversation history
@@ -228,6 +229,22 @@ async def handle_get_live_session(request: web.Request) -> web.Response:
}
for ep in rt.get_entry_points()
]
# Append hoisted triggers (stripped from worker runtime, stored on session)
runner = getattr(session, "runner", None)
graph_entry = runner.graph.entry_node if runner else ""
for t in getattr(session, "available_triggers", {}).values():
entry = {
"id": t.id,
"name": t.description or t.id,
"entry_node": graph_entry,
"trigger_type": t.trigger_type,
"trigger_config": t.trigger_config,
"task": t.task,
}
mono = getattr(session, "trigger_next_fire", {}).get(t.id)
if mono is not None:
entry["next_fire_in"] = max(0.0, mono - time.monotonic())
data["entry_points"].append(entry)
data["graphs"] = session.worker_runtime.list_graphs()
return web.json_response(data)
@@ -351,23 +368,79 @@ async def handle_session_entry_points(request: web.Request) -> web.Response:
rt = session.worker_runtime
eps = rt.get_entry_points() if rt else []
entry_points = [
{
"id": ep.id,
"name": ep.name,
"entry_node": ep.entry_node,
"trigger_type": ep.trigger_type,
"trigger_config": ep.trigger_config,
**(
{"next_fire_in": nf}
if rt and (nf := rt.get_timer_next_fire_in(ep.id)) is not None
else {}
),
}
for ep in eps
]
# Append hoisted triggers (stripped from worker runtime, stored on session)
runner = getattr(session, "runner", None)
graph_entry = runner.graph.entry_node if runner else ""
for t in getattr(session, "available_triggers", {}).values():
entry = {
"id": t.id,
"name": t.description or t.id,
"entry_node": graph_entry,
"trigger_type": t.trigger_type,
"trigger_config": t.trigger_config,
"task": t.task,
}
mono = getattr(session, "trigger_next_fire", {}).get(t.id)
if mono is not None:
entry["next_fire_in"] = max(0.0, mono - time.monotonic())
entry_points.append(entry)
return web.json_response({"entry_points": entry_points})
async def handle_update_trigger_task(request: web.Request) -> web.Response:
"""PATCH /api/sessions/{session_id}/triggers/{trigger_id} — update trigger task."""
session, err = resolve_session(request)
if err:
return err
trigger_id = request.match_info["trigger_id"]
available = getattr(session, "available_triggers", {})
tdef = available.get(trigger_id)
if tdef is None:
return web.json_response(
{"error": f"Trigger '{trigger_id}' not found"},
status=404,
)
try:
body = await request.json()
except Exception:
return web.json_response({"error": "Invalid JSON body"}, status=400)
task = body.get("task")
if task is None:
return web.json_response({"error": "Missing 'task' field"}, status=400)
if not isinstance(task, str):
return web.json_response({"error": "'task' must be a string"}, status=400)
tdef.task = task
# Persist if trigger is currently active
if trigger_id in getattr(session, "active_trigger_ids", set()):
from framework.tools.queen_lifecycle_tools import _persist_active_triggers
session_id = request.match_info["session_id"]
await _persist_active_triggers(session, session_id)
return web.json_response(
{
"entry_points": [
{
"id": ep.id,
"name": ep.name,
"entry_node": ep.entry_node,
"trigger_type": ep.trigger_type,
"trigger_config": ep.trigger_config,
**(
{"next_fire_in": nf}
if rt and (nf := rt.get_timer_next_fire_in(ep.id)) is not None
else {}
),
}
for ep in eps
]
"trigger_id": trigger_id,
"task": tdef.task,
}
)
@@ -783,6 +856,9 @@ def register_routes(app: web.Application) -> None:
# Session info
app.router.add_get("/api/sessions/{session_id}/stats", handle_session_stats)
app.router.add_get("/api/sessions/{session_id}/entry-points", handle_session_entry_points)
app.router.add_patch(
"/api/sessions/{session_id}/triggers/{trigger_id}", handle_update_trigger_task
)
app.router.add_get("/api/sessions/{session_id}/graphs", handle_session_graphs)
app.router.add_get("/api/sessions/{session_id}/queen-messages", handle_queen_messages)
+162 -2
View File
@@ -15,11 +15,13 @@ import json
import logging
import time
import uuid
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
from framework.runtime.triggers import TriggerDefinition
logger = logging.getLogger(__name__)
@@ -46,6 +48,19 @@ class Session:
judge_task: asyncio.Task | None = None
escalation_sub: str | None = None
worker_handoff_sub: str | None = None
# Trigger definitions hoisted from worker (available but inactive)
available_triggers: dict[str, TriggerDefinition] = field(default_factory=dict)
# Active trigger tracking (IDs currently firing + their asyncio tasks)
active_trigger_ids: set[str] = field(default_factory=set)
active_timer_tasks: dict[str, asyncio.Task] = field(default_factory=dict)
# Queen-owned webhook server (lazy singleton, created on first webhook trigger activation)
queen_webhook_server: Any = None
# EventBus subscription IDs for active webhook triggers (trigger_id -> sub_id)
active_webhook_subs: dict[str, str] = field(default_factory=dict)
# True after first successful worker execution (gates trigger delivery)
worker_configured: bool = False
# Monotonic timestamps for next trigger fire (mirrors AgentRuntime._timer_next_fire)
trigger_next_fire: dict[str, float] = field(default_factory=dict)
# Session directory resumption:
# When set, _start_queen writes queen conversations to this existing session's
# directory instead of creating a new one. This lets cold-restores accumulate
@@ -240,6 +255,31 @@ class SessionManager:
runtime = runner._agent_runtime
# Auto-hoist: extract timer/webhook triggers from worker graph
# and store as available (inactive) trigger definitions on the session.
# Strip them from the worker runtime so it doesn't start its own loops.
if runtime:
for ep in runner.graph.async_entry_points:
if ep.trigger_type in ("timer", "webhook"):
session.available_triggers[ep.id] = TriggerDefinition(
id=ep.id,
trigger_type=ep.trigger_type,
trigger_config=ep.trigger_config,
description=ep.name,
task=getattr(ep, "task", ""),
)
runtime.unregister_entry_point(ep.id)
logger.info(
"Hoisted trigger '%s' (%s) from worker to queen",
ep.id,
ep.trigger_type,
)
if session.available_triggers:
await self._emit_trigger_events(
session, "available", session.available_triggers
)
# Start runtime on event loop
if runtime and not runtime.is_running:
await runtime.start()
@@ -331,6 +371,45 @@ class SessionManager:
# await self._start_judge(session, session.runner._storage_path)
await self._notify_queen_worker_loaded(session)
# Restore previously active triggers from persisted session state
if session.available_triggers and session.worker_runtime:
try:
store = session.worker_runtime._session_store
state = await store.read_state(session_id)
if state and state.active_triggers:
from framework.tools.queen_lifecycle_tools import (
_start_trigger_timer,
_start_trigger_webhook,
)
saved_tasks = getattr(state, "trigger_tasks", {}) or {}
for tid in state.active_triggers:
tdef = session.available_triggers.get(tid)
if tdef:
# Restore user-configured task override
saved_task = saved_tasks.get(tid, "")
if saved_task:
tdef.task = saved_task
tdef.active = True
session.active_trigger_ids.add(tid)
if tdef.trigger_type == "timer":
await _start_trigger_timer(session, tid, tdef)
logger.info("Restored trigger timer '%s'", tid)
elif tdef.trigger_type == "webhook":
await _start_trigger_webhook(session, tid, tdef)
logger.info("Restored webhook trigger '%s'", tid)
else:
logger.warning(
"Saved trigger '%s' not found in worker entry points, skipping",
tid,
)
# Restore worker_configured flag
if state and getattr(state, "worker_configured", False):
session.worker_configured = True
except Exception as e:
logger.warning("Failed to restore active triggers: %s", e)
# Emit SSE event so the frontend can update UI
await self._emit_worker_loaded(session)
@@ -354,6 +433,26 @@ class SessionManager:
except Exception as e:
logger.error("Error cleaning up worker '%s': %s", session.worker_id, e)
# Cancel active trigger timers
for tid, task in session.active_timer_tasks.items():
task.cancel()
logger.info("Cancelled trigger timer '%s' on unload", tid)
session.active_timer_tasks.clear()
# Unsubscribe webhook handlers (server stays alive — queen-owned)
for sub_id in session.active_webhook_subs.values():
try:
session.event_bus.unsubscribe(sub_id)
except Exception:
pass
session.active_webhook_subs.clear()
session.active_trigger_ids.clear()
# Clean up hoisted triggers
if session.available_triggers:
await self._emit_trigger_events(session, "removed", session.available_triggers)
session.available_triggers.clear()
worker_id = session.worker_id
session.worker_id = None
session.worker_path = None
@@ -394,6 +493,25 @@ class SessionManager:
session.queen_task = None
session.queen_executor = None
# Cancel active trigger timers
for task in session.active_timer_tasks.values():
task.cancel()
session.active_timer_tasks.clear()
# Unsubscribe webhook handlers and stop queen webhook server
for sub_id in session.active_webhook_subs.values():
try:
session.event_bus.unsubscribe(sub_id)
except Exception:
pass
session.active_webhook_subs.clear()
if session.queen_webhook_server is not None:
try:
await session.queen_webhook_server.stop()
except Exception:
logger.error("Error stopping queen webhook server", exc_info=True)
session.queen_webhook_server = None
# Cleanup worker
if session.runner:
try:
@@ -720,6 +838,8 @@ class SessionManager:
if phase_state.phase == "running":
# Build termination notification for the queen
if event.type == _ET.EXECUTION_COMPLETED:
# Mark worker as configured after first successful run
session.worker_configured = True
output = event.data.get("output", {})
output_summary = ""
if output:
@@ -909,7 +1029,22 @@ class SessionManager:
return
profile = build_worker_profile(session.worker_runtime, agent_path=session.worker_path)
await node.inject_event(f"[SYSTEM] Worker loaded.{profile}")
# Append available trigger info so the queen knows what's schedulable
trigger_lines = ""
if session.available_triggers:
parts = []
for t in session.available_triggers.values():
cfg = t.trigger_config
detail = cfg.get("cron") or f"every {cfg.get('interval_minutes', '?')} min"
task_info = f' -> task: "{t.task}"' if t.task else " (no task configured)"
parts.append(f" - {t.id} ({t.trigger_type}: {detail}){task_info}")
trigger_lines = (
"\n\nAvailable triggers (inactive — use set_trigger to activate):\n"
+ "\n".join(parts)
)
await node.inject_event(f"[SYSTEM] Worker loaded.{profile}{trigger_lines}")
async def _emit_worker_loaded(self, session: Session) -> None:
"""Publish a WORKER_LOADED event so the frontend can update."""
@@ -944,6 +1079,31 @@ class SessionManager:
"Handle all tasks directly using your coding tools."
)
async def _emit_trigger_events(
self,
session: Session,
kind: str,
triggers: dict[str, TriggerDefinition],
) -> None:
"""Emit TRIGGER_AVAILABLE or TRIGGER_REMOVED events for each trigger."""
from framework.runtime.event_bus import AgentEvent, EventType
event_type = (
EventType.TRIGGER_AVAILABLE if kind == "available" else EventType.TRIGGER_REMOVED
)
for t in triggers.values():
await session.event_bus.publish(
AgentEvent(
type=event_type,
stream_id="queen",
data={
"trigger_id": t.id,
"trigger_type": t.trigger_type,
"trigger_config": t.trigger_config,
},
)
)
async def revive_queen(self, session: Session, initial_prompt: str | None = None) -> None:
"""Revive a dead queen executor on an existing session.
+554 -1
View File
@@ -36,8 +36,9 @@ from __future__ import annotations
import asyncio
import json
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any
@@ -242,6 +243,247 @@ def build_worker_profile(runtime: AgentRuntime, agent_path: Path | str | None =
return "\n".join(lines)
async def _start_trigger_timer(session: Any, trigger_id: str, trigger_def: Any) -> None:
"""Start an asyncio timer task that fires triggers into the queen node.
The task is stored in ``session.active_timer_tasks[trigger_id]`` and can
be cancelled via ``task.cancel()``.
"""
from framework.graph.event_loop_node import TriggerEvent
tc = trigger_def.trigger_config
cron_expr = tc.get("cron")
interval_minutes = tc.get("interval_minutes")
async def _timer_loop():
try:
if cron_expr:
from croniter import croniter
# Wait for first fire
cron = croniter(cron_expr, datetime.now())
next_dt = cron.get_next(datetime)
sleep_secs = (next_dt - datetime.now()).total_seconds()
session.trigger_next_fire[trigger_id] = time.monotonic() + max(0, sleep_secs)
await asyncio.sleep(max(0, sleep_secs))
while True:
# Fire trigger
if getattr(session, "worker_runtime", None) is None:
logger.warning(
"Trigger '%s': worker not loaded, discarding tick", trigger_id
)
else:
queen_node = _get_queen_node(session)
if queen_node is not None:
trigger = TriggerEvent(
trigger_type="timer",
source_id=trigger_id,
payload={
"schedule": cron_expr,
"time": datetime.now(UTC).isoformat(),
"task": trigger_def.task,
},
timestamp=time.time(),
)
await session.event_bus.publish(
AgentEvent(
type=EventType.TRIGGER_FIRED,
stream_id="queen",
data={
"trigger_id": trigger_id,
"trigger_type": "timer",
"payload": trigger.payload,
},
)
)
await queen_node.inject_trigger(trigger)
logger.info("Trigger '%s' fired (cron: %s)", trigger_id, cron_expr)
else:
logger.warning(
"Trigger '%s': queen node not available, skipping tick", trigger_id
)
# Compute next fire
cron = croniter(cron_expr, datetime.now())
next_dt = cron.get_next(datetime)
sleep_secs = (next_dt - datetime.now()).total_seconds()
session.trigger_next_fire[trigger_id] = time.monotonic() + max(0, sleep_secs)
await asyncio.sleep(max(0, sleep_secs))
elif interval_minutes:
sleep_secs = interval_minutes * 60
session.trigger_next_fire[trigger_id] = time.monotonic() + sleep_secs
await asyncio.sleep(sleep_secs)
while True:
if getattr(session, "worker_runtime", None) is None:
logger.warning(
"Trigger '%s': worker not loaded, discarding tick", trigger_id
)
else:
queen_node = _get_queen_node(session)
if queen_node is not None:
trigger = TriggerEvent(
trigger_type="timer",
source_id=trigger_id,
payload={
"interval_minutes": interval_minutes,
"time": datetime.now(UTC).isoformat(),
"task": trigger_def.task,
},
timestamp=time.time(),
)
await session.event_bus.publish(
AgentEvent(
type=EventType.TRIGGER_FIRED,
stream_id="queen",
data={
"trigger_id": trigger_id,
"trigger_type": "timer",
"payload": trigger.payload,
},
)
)
await queen_node.inject_trigger(trigger)
logger.info(
"Trigger '%s' fired (interval: %dm)", trigger_id, interval_minutes
)
else:
logger.warning(
"Trigger '%s': queen node not available, skipping tick", trigger_id
)
session.trigger_next_fire[trigger_id] = time.monotonic() + sleep_secs
await asyncio.sleep(sleep_secs)
except asyncio.CancelledError:
session.trigger_next_fire.pop(trigger_id, None)
logger.info("Trigger timer '%s' cancelled", trigger_id)
except Exception:
logger.error("Trigger timer '%s' failed", trigger_id, exc_info=True)
task = asyncio.create_task(_timer_loop(), name=f"trigger-timer-{trigger_id}")
session.active_timer_tasks[trigger_id] = task
async def _start_trigger_webhook(session: Any, trigger_id: str, trigger_def: Any) -> None:
"""Subscribe to WEBHOOK_RECEIVED EventBus events for a webhook trigger.
Creates (or reuses) the queen-owned WebhookServer on the session,
registers the route, and subscribes via EventBus with filter_stream=trigger_id.
The subscription ID is stored in session.active_webhook_subs[trigger_id].
"""
from framework.graph.event_loop_node import TriggerEvent
from framework.runtime.webhook_server import WebhookRoute, WebhookServer, WebhookServerConfig
tc = trigger_def.trigger_config
path: str = tc["path"]
methods: list[str] = [m.upper() for m in tc.get("methods", ["POST"])]
secret: str | None = tc.get("secret")
# Queen server defaults to 8090 to avoid colliding with worker's 8080
port: int = int(tc.get("port", 8090))
# Lazy-create the queen webhook server (singleton per session)
if getattr(session, "queen_webhook_server", None) is None:
server = WebhookServer(session.event_bus, WebhookServerConfig(host="127.0.0.1", port=port))
session.queen_webhook_server = server
# Register route — source_id=trigger_id so filter_stream matches exactly
route = WebhookRoute(source_id=trigger_id, path=path, methods=methods, secret=secret)
session.queen_webhook_server.add_route(route)
if not session.queen_webhook_server.is_running:
await session.queen_webhook_server.start()
# Subscribe to WEBHOOK_RECEIVED filtered by trigger_id (== route.source_id)
async def _on_webhook(event: Any) -> None:
if getattr(session, "worker_runtime", None) is None:
logger.warning("Webhook trigger '%s': no worker loaded, discarding", trigger_id)
return
queen_node = _get_queen_node(session)
if queen_node is None:
logger.warning("Webhook trigger '%s': queen node not ready, skipping", trigger_id)
return
trigger = TriggerEvent(
trigger_type="webhook",
source_id=trigger_id,
payload={
"source": trigger_id,
"method": event.data.get("method", ""),
"path": path,
"payload": event.data.get("payload", {}),
"task": trigger_def.task,
},
timestamp=time.time(),
)
await session.event_bus.publish(
AgentEvent(
type=EventType.TRIGGER_FIRED,
stream_id="queen",
data={
"trigger_id": trigger_id,
"trigger_type": "webhook",
"payload": trigger.payload,
},
)
)
await queen_node.inject_trigger(trigger)
logger.info(
"Webhook trigger '%s' fired (%s %s)", trigger_id, event.data.get("method", ""), path
)
sub_id = session.event_bus.subscribe(
event_types=[EventType.WEBHOOK_RECEIVED],
handler=_on_webhook,
filter_stream=trigger_id,
)
if not hasattr(session, "active_webhook_subs"):
session.active_webhook_subs = {}
session.active_webhook_subs[trigger_id] = sub_id
def _get_queen_node(session: Any):
"""Get the queen EventLoopNode from the session's executor."""
executor = getattr(session, "queen_executor", None)
if executor is None:
return None
registry = getattr(executor, "node_registry", None)
if registry is None:
return None
return registry.get("queen")
async def _persist_active_triggers(session: Any, session_id: str | None) -> None:
"""Persist active trigger IDs to the worker's session state."""
runtime = getattr(session, "worker_runtime", None)
if runtime is None or session_id is None:
return
try:
store = runtime._session_store
state = await store.read_state(session_id)
if state is None:
from framework.schemas.session_state import SessionState, SessionTimestamps
now = datetime.now().isoformat()
state = SessionState(
session_id=session_id,
goal_id="",
timestamps=SessionTimestamps(started_at=now, updated_at=now),
)
state.active_triggers = list(session.active_trigger_ids)
available = getattr(session, "available_triggers", {})
state.trigger_tasks = {
tid: available[tid].task
for tid in session.active_trigger_ids
if tid in available and available[tid].task
}
if getattr(session, "worker_configured", False):
state.worker_configured = True
await store.write_state(session_id, state)
except Exception as e:
logger.warning("Failed to persist active triggers: %s", e)
def register_queen_lifecycle_tools(
registry: ToolRegistry,
session: Any = None,
@@ -1607,5 +1849,316 @@ def register_queen_lifecycle_tools(
)
tools_registered += 1
# --- set_trigger -----------------------------------------------------------
async def set_trigger(
trigger_id: str,
trigger_type: str | None = None,
trigger_config: dict | None = None,
task: str | None = None,
) -> str:
"""Activate a trigger so it fires periodically into the queen."""
if trigger_id in getattr(session, "active_trigger_ids", set()):
return json.dumps({"error": f"Trigger '{trigger_id}' is already active."})
# Look up existing or create new
available = getattr(session, "available_triggers", {})
tdef = available.get(trigger_id)
if tdef is None:
if trigger_type and trigger_config:
from framework.runtime.triggers import TriggerDefinition
tdef = TriggerDefinition(
id=trigger_id,
trigger_type=trigger_type,
trigger_config=trigger_config,
)
available[trigger_id] = tdef
else:
return json.dumps(
{
"error": (
f"Trigger '{trigger_id}' not found. "
"Provide trigger_type and trigger_config to create a custom trigger."
)
}
)
# Apply task override if provided
if task:
tdef.task = task
# Task is mandatory before activation
if not tdef.task:
return json.dumps(
{
"error": f"Trigger '{trigger_id}' has no task configured. "
"Set a task describing what the worker should do when this trigger fires."
}
)
# Use provided overrides if given
t_type = trigger_type or tdef.trigger_type
t_config = trigger_config or tdef.trigger_config
if trigger_type:
tdef.trigger_type = t_type
if trigger_config:
tdef.trigger_config = t_config
# Validate and activate by type
if t_type == "webhook":
path = t_config.get("path", "").strip()
if not path or not path.startswith("/"):
return json.dumps(
{
"error": (
"Webhook trigger requires 'path' starting with '/'"
" in trigger_config (e.g. '/hooks/github')."
)
}
)
valid_methods = {"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"}
methods = t_config.get("methods", ["POST"])
invalid = [m.upper() for m in methods if m.upper() not in valid_methods]
if invalid:
return json.dumps(
{"error": f"Invalid HTTP methods: {invalid}. Valid: {sorted(valid_methods)}"}
)
try:
await _start_trigger_webhook(session, trigger_id, tdef)
except Exception as e:
return json.dumps({"error": f"Failed to start webhook trigger: {e}"})
tdef.active = True
session.active_trigger_ids.add(trigger_id)
await _persist_active_triggers(session, session_id)
bus = getattr(session, "event_bus", None)
if bus:
await bus.publish(
AgentEvent(
type=EventType.TRIGGER_ACTIVATED,
stream_id="queen",
data={
"trigger_id": trigger_id,
"trigger_type": t_type,
"trigger_config": t_config,
},
)
)
port = int(t_config.get("port", 8090))
return json.dumps(
{
"status": "activated",
"trigger_id": trigger_id,
"trigger_type": t_type,
"webhook_url": f"http://127.0.0.1:{port}{path}",
}
)
if t_type != "timer":
return json.dumps({"error": f"Unsupported trigger type: {t_type}"})
cron_expr = t_config.get("cron")
interval = t_config.get("interval_minutes")
if cron_expr:
try:
from croniter import croniter
if not croniter.is_valid(cron_expr):
return json.dumps({"error": f"Invalid cron expression: {cron_expr}"})
except ImportError:
return json.dumps(
{"error": "croniter package not installed — cannot validate cron expression."}
)
elif interval:
if not isinstance(interval, (int, float)) or interval <= 0:
return json.dumps({"error": f"interval_minutes must be > 0, got {interval}"})
else:
return json.dumps(
{"error": "Timer trigger needs 'cron' or 'interval_minutes' in trigger_config."}
)
# Start timer
try:
await _start_trigger_timer(session, trigger_id, tdef)
except Exception as e:
return json.dumps({"error": f"Failed to start trigger timer: {e}"})
tdef.active = True
session.active_trigger_ids.add(trigger_id)
# Persist
await _persist_active_triggers(session, session_id)
# Emit event
bus = getattr(session, "event_bus", None)
if bus:
await bus.publish(
AgentEvent(
type=EventType.TRIGGER_ACTIVATED,
stream_id="queen",
data={
"trigger_id": trigger_id,
"trigger_type": t_type,
"trigger_config": t_config,
},
)
)
return json.dumps(
{
"status": "activated",
"trigger_id": trigger_id,
"trigger_type": t_type,
"trigger_config": t_config,
}
)
_set_trigger_tool = Tool(
name="set_trigger",
description=(
"Activate a trigger (timer) so it fires periodically. "
"Use trigger_id of an available trigger, or provide trigger_type + trigger_config"
" to create a custom one. "
"A task must be configured before activation —"
" either pre-set on the trigger or provided here."
),
parameters={
"type": "object",
"properties": {
"trigger_id": {
"type": "string",
"description": (
"ID of the trigger to activate (from list_triggers) or a new custom ID"
),
},
"trigger_type": {
"type": "string",
"description": "Type of trigger ('timer'). Only needed for custom triggers.",
},
"trigger_config": {
"type": "object",
"description": (
"Config for the trigger."
" Timer: {cron: '*/5 * * * *'} or {interval_minutes: 5}."
" Only needed for custom triggers."
),
},
"task": {
"type": "string",
"description": (
"The task/instructions for the worker when this trigger fires"
" (e.g. 'Process inbox emails using saved rules')."
" Required if not already configured on the trigger."
),
},
},
"required": ["trigger_id"],
},
)
registry.register("set_trigger", _set_trigger_tool, lambda inputs: set_trigger(**inputs))
tools_registered += 1
# --- remove_trigger --------------------------------------------------------
async def remove_trigger(trigger_id: str) -> str:
"""Deactivate an active trigger."""
if trigger_id not in getattr(session, "active_trigger_ids", set()):
return json.dumps({"error": f"Trigger '{trigger_id}' is not active."})
# Cancel timer task (if timer trigger)
task = session.active_timer_tasks.pop(trigger_id, None)
if task and not task.done():
task.cancel()
getattr(session, "trigger_next_fire", {}).pop(trigger_id, None)
# Unsubscribe webhook handler (if webhook trigger)
webhook_subs = getattr(session, "active_webhook_subs", {})
if sub_id := webhook_subs.pop(trigger_id, None):
try:
session.event_bus.unsubscribe(sub_id)
except Exception:
pass
session.active_trigger_ids.discard(trigger_id)
# Mark inactive
available = getattr(session, "available_triggers", {})
tdef = available.get(trigger_id)
if tdef:
tdef.active = False
# Persist
await _persist_active_triggers(session, session_id)
# Emit event
bus = getattr(session, "event_bus", None)
if bus:
await bus.publish(
AgentEvent(
type=EventType.TRIGGER_DEACTIVATED,
stream_id="queen",
data={"trigger_id": trigger_id},
)
)
return json.dumps({"status": "deactivated", "trigger_id": trigger_id})
_remove_trigger_tool = Tool(
name="remove_trigger",
description=(
"Deactivate an active trigger."
" The trigger stops firing but remains available for re-activation."
),
parameters={
"type": "object",
"properties": {
"trigger_id": {
"type": "string",
"description": "ID of the trigger to deactivate",
},
},
"required": ["trigger_id"],
},
)
registry.register(
"remove_trigger", _remove_trigger_tool, lambda inputs: remove_trigger(**inputs)
)
tools_registered += 1
# --- list_triggers ---------------------------------------------------------
async def list_triggers() -> str:
"""List all available triggers and their status."""
available = getattr(session, "available_triggers", {})
triggers = []
for tdef in available.values():
triggers.append(
{
"id": tdef.id,
"trigger_type": tdef.trigger_type,
"trigger_config": tdef.trigger_config,
"description": tdef.description,
"task": tdef.task,
"active": tdef.active,
}
)
return json.dumps({"triggers": triggers})
_list_triggers_tool = Tool(
name="list_triggers",
description=(
"List all available triggers (from the loaded worker) and their active/inactive status."
),
parameters={
"type": "object",
"properties": {},
},
)
registry.register("list_triggers", _list_triggers_tool, lambda inputs: list_triggers())
tools_registered += 1
logger.info("Registered %d queen lifecycle tools", tools_registered)
return tools_registered
+5
View File
@@ -38,4 +38,9 @@ export const api = {
body: body ? JSON.stringify(body) : undefined,
}),
delete: <T>(path: string) => request<T>(path, { method: "DELETE" }),
patch: <T>(path: string, body?: unknown) =>
request<T>(path, {
method: "PATCH",
body: body ? JSON.stringify(body) : undefined,
}),
};
+6
View File
@@ -64,6 +64,12 @@ export const sessionsApi = {
`/sessions/${sessionId}/entry-points`,
),
updateTriggerTask: (sessionId: string, triggerId: string, task: string) =>
api.patch<{ trigger_id: string; task: string }>(
`/sessions/${sessionId}/triggers/${triggerId}`,
{ task },
),
graphs: (sessionId: string) =>
api.get<{ graphs: string[] }>(`/sessions/${sessionId}/graphs`),
+8 -1
View File
@@ -31,6 +31,8 @@ export interface EntryPoint {
entry_node: string;
trigger_type: string;
trigger_config?: Record<string, unknown>;
/** Worker task string when this trigger fires autonomously. */
task?: string;
/** Seconds until the next timer fire (only present for timer entry points). */
next_fire_in?: number;
}
@@ -276,7 +278,12 @@ export type EventTypeName =
| "worker_loaded"
| "credentials_required"
| "queen_phase_changed"
| "subagent_report";
| "subagent_report"
| "trigger_available"
| "trigger_activated"
| "trigger_deactivated"
| "trigger_fired"
| "trigger_removed";
export interface AgentEvent {
type: EventTypeName;
+31 -8
View File
@@ -131,6 +131,14 @@ const triggerColors = {
icon: "hsl(210,40%,55%)",
};
// Active trigger — brighter, more saturated blue
const activeTriggerColors = {
bg: "hsl(210,30%,18%)",
border: "hsl(210,50%,50%)",
text: "hsl(210,40%,75%)",
icon: "hsl(210,60%,65%)",
};
const triggerIcons: Record<string, string> = {
webhook: "\u26A1", // lightning bolt
timer: "\u23F1", // stopwatch
@@ -417,10 +425,12 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
const triggerAvailW = nodeW - 38;
const triggerDisplayLabel = truncateLabel(node.label, triggerAvailW, triggerFontSize);
const nextFireIn = node.triggerConfig?.next_fire_in as number | undefined;
const isActive = node.status === "running" || node.status === "complete";
const colors = isActive ? activeTriggerColors : triggerColors;
// Format countdown for display below node
let countdownLabel: string | null = null;
if (nextFireIn != null && nextFireIn > 0) {
if (isActive && nextFireIn != null && nextFireIn > 0) {
const h = Math.floor(nextFireIn / 3600);
const m = Math.floor((nextFireIn % 3600) / 60);
const s = Math.floor(nextFireIn % 60);
@@ -429,24 +439,28 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
: `next in ${m}m ${String(s).padStart(2, "0")}s`;
}
// Status label below countdown
const statusLabel = isActive ? "active" : "inactive";
const statusColor = isActive ? "hsl(140,40%,50%)" : "hsl(210,20%,40%)";
return (
<g key={node.id} onClick={() => onNodeClick?.(node)} style={{ cursor: onNodeClick ? "pointer" : "default" }}>
<title>{node.label}</title>
{/* Pill-shaped background with dashed border */}
{/* Pill-shaped background — solid border when active, dashed when inactive */}
<rect
x={pos.x} y={pos.y}
width={nodeW} height={NODE_H}
rx={NODE_H / 2}
fill={triggerColors.bg}
stroke={triggerColors.border}
strokeWidth={1}
strokeDasharray="4 2"
fill={colors.bg}
stroke={colors.border}
strokeWidth={isActive ? 1.5 : 1}
strokeDasharray={isActive ? undefined : "4 2"}
/>
{/* Trigger type icon */}
<text
x={pos.x + 18} y={pos.y + NODE_H / 2}
fill={triggerColors.icon} fontSize={13}
fill={colors.icon} fontSize={13}
textAnchor="middle" dominantBaseline="middle"
>
{icon}
@@ -455,7 +469,7 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
{/* Label */}
<text
x={pos.x + 32} y={pos.y + NODE_H / 2}
fill={triggerColors.text}
fill={colors.text}
fontSize={triggerFontSize}
fontWeight={500}
dominantBaseline="middle"
@@ -474,6 +488,15 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
{countdownLabel}
</text>
)}
{/* Status label */}
<text
x={pos.x + nodeW / 2} y={pos.y + NODE_H + (countdownLabel ? 25 : 13)}
fill={statusColor} fontSize={9}
textAnchor="middle" opacity={0.8}
>
{statusLabel}
</text>
</g>
);
};
+1
View File
@@ -51,6 +51,7 @@ export function topologyToGraphNodes(topology: GraphTopology): GraphNode[] {
triggerConfig: {
...ep.trigger_config,
...(ep.next_fire_in != null ? { next_fire_in: ep.next_fire_in } : {}),
...(ep.task ? { task: ep.task } : {}),
},
next: [ep.entry_node],
});
+169 -15
View File
@@ -478,6 +478,8 @@ export default function Workspace() {
const [credentialAgentPath, setCredentialAgentPath] = useState<string | null>(null);
const [dismissedBanner, setDismissedBanner] = useState<string | null>(null);
const [selectedNode, setSelectedNode] = useState<GraphNode | null>(null);
const [triggerTaskDraft, setTriggerTaskDraft] = useState("");
const [triggerTaskSaving, setTriggerTaskSaving] = useState(false);
const [newTabOpen, setNewTabOpen] = useState(false);
const newTabBtnRef = useRef<HTMLButtonElement>(null);
@@ -1072,19 +1074,32 @@ export default function Workspace() {
try {
const { entry_points } = await sessionsApi.entryPoints(state.sessionId);
const fireMap = new Map<string, number>();
const taskMap = new Map<string, string>();
for (const ep of entry_points) {
if (ep.next_fire_in != null) {
fireMap.set(`__trigger_${ep.id}`, ep.next_fire_in);
}
if (ep.task != null) {
taskMap.set(`__trigger_${ep.id}`, ep.task);
}
}
if (fireMap.size === 0) continue;
if (fireMap.size === 0 && taskMap.size === 0) continue;
setSessionsByAgent((prev) => {
const ss = prev[agentType];
if (!ss?.length) return prev;
const updated = ss[0].graphNodes.map((n) => {
if (n.nodeType !== "trigger") return n;
const nfi = fireMap.get(n.id);
if (nfi == null || n.nodeType !== "trigger") return n;
return { ...n, triggerConfig: { ...n.triggerConfig, next_fire_in: nfi } };
const task = taskMap.get(n.id);
if (nfi == null && task == null) return n;
return {
...n,
triggerConfig: {
...n.triggerConfig,
...(nfi != null ? { next_fire_in: nfi } : {}),
...(task != null ? { task } : {}),
},
};
});
// Skip update if nothing changed
if (updated.every((n, idx) => n === ss[0].graphNodes[idx])) return prev;
@@ -1842,6 +1857,54 @@ export default function Workspace() {
break;
}
case "trigger_activated": {
const triggerId = event.data?.trigger_id as string;
if (triggerId) {
updateGraphNodeStatus(agentType, `__trigger_${triggerId}`, "running");
}
break;
}
case "trigger_deactivated": {
const triggerId = event.data?.trigger_id as string;
if (triggerId) {
// Clear next_fire_in so countdown hides when inactive
setSessionsByAgent(prev => {
const sessions = prev[agentType] || [];
const activeId = activeSessionRef.current[agentType] || sessions[0]?.id;
return {
...prev,
[agentType]: sessions.map(s => {
if (s.id !== activeId) return s;
return {
...s,
graphNodes: s.graphNodes.map(n => {
if (n.id !== `__trigger_${triggerId}`) return n;
const { next_fire_in: _, ...restConfig } = (n.triggerConfig || {}) as Record<string, unknown> & { next_fire_in?: unknown };
return { ...n, status: "pending" as const, triggerConfig: restConfig };
}),
};
}),
};
});
}
break;
}
case "trigger_fired": {
const triggerId = event.data?.trigger_id as string;
if (triggerId) {
const nodeId = `__trigger_${triggerId}`;
updateGraphNodeStatus(agentType, nodeId, "complete");
setTimeout(() => updateGraphNodeStatus(agentType, nodeId, "running"), 1500);
}
break;
}
case "trigger_available":
case "trigger_removed":
break;
default:
// Fallback: ensure queenReady is set even for unexpected first events
if (shouldMarkQueenReady) updateAgentState(agentType, { queenReady: true });
@@ -1872,6 +1935,18 @@ export default function Workspace() {
? { nodes: activeSession.graphNodes, title: activeAgentState?.displayName || formatAgentDisplayName(baseAgentType(activeWorker)) }
: { nodes: [] as GraphNode[], title: "" };
// Keep selectedNode in sync with live graphNodes (trigger status updates via SSE)
const liveSelectedNode = selectedNode && currentGraph.nodes.find(n => n.id === selectedNode.id);
const resolvedSelectedNode = liveSelectedNode || selectedNode;
// Sync trigger task draft when selected trigger node changes
useEffect(() => {
if (resolvedSelectedNode?.nodeType === "trigger") {
const tc = resolvedSelectedNode.triggerConfig as Record<string, unknown> | undefined;
setTriggerTaskDraft((tc?.task as string) || "");
}
}, [resolvedSelectedNode?.id]);
// Build a flat list of all agent-type tabs for the tab bar
const agentTabs = Object.entries(sessionsByAgent)
.filter(([, sessions]) => sessions.length > 0)
@@ -2460,20 +2535,32 @@ export default function Workspace() {
/>
)}
</div>
{selectedNode && (
{resolvedSelectedNode && (
<div className="w-[480px] min-w-[400px] flex-shrink-0">
{selectedNode.nodeType === "trigger" ? (
{resolvedSelectedNode.nodeType === "trigger" ? (
<div className="flex flex-col h-full border-l border-border/40 bg-card/20 animate-in slide-in-from-right">
<div className="px-4 pt-4 pb-3 border-b border-border/30 flex items-start justify-between gap-2">
<div className="flex items-start gap-3 min-w-0">
<div className="w-8 h-8 rounded-lg flex items-center justify-center flex-shrink-0 mt-0.5 bg-[hsl(210,40%,55%)]/15 border border-[hsl(210,40%,55%)]/25">
<span className="text-sm" style={{ color: "hsl(210,40%,55%)" }}>
{{ "webhook": "\u26A1", "timer": "\u23F1", "api": "\u2192", "event": "\u223F" }[selectedNode.triggerType || ""] || "\u26A1"}
{{ "webhook": "\u26A1", "timer": "\u23F1", "api": "\u2192", "event": "\u223F" }[resolvedSelectedNode.triggerType || ""] || "\u26A1"}
</span>
</div>
<div className="min-w-0">
<h3 className="text-sm font-semibold text-foreground leading-tight">{selectedNode.label}</h3>
<p className="text-[11px] text-muted-foreground mt-0.5 capitalize">{selectedNode.triggerType} trigger</p>
<h3 className="text-sm font-semibold text-foreground leading-tight">{resolvedSelectedNode.label}</h3>
<p className="text-[11px] text-muted-foreground mt-0.5 capitalize flex items-center gap-1.5">
{resolvedSelectedNode.triggerType} trigger
<span className={`inline-block w-1.5 h-1.5 rounded-full ${
resolvedSelectedNode.status === "running" || resolvedSelectedNode.status === "complete"
? "bg-emerald-400" : "bg-muted-foreground/40"
}`} />
<span className={`text-[10px] ${
resolvedSelectedNode.status === "running" || resolvedSelectedNode.status === "complete"
? "text-emerald-400" : "text-muted-foreground/60"
}`}>
{resolvedSelectedNode.status === "running" || resolvedSelectedNode.status === "complete" ? "active" : "inactive"}
</span>
</p>
</div>
</div>
<button onClick={() => setSelectedNode(null)} className="p-1 rounded-md text-muted-foreground hover:text-foreground hover:bg-muted/50 transition-colors flex-shrink-0">
@@ -2482,7 +2569,7 @@ export default function Workspace() {
</div>
<div className="px-4 py-4 flex flex-col gap-3">
{(() => {
const tc = selectedNode.triggerConfig as Record<string, unknown> | undefined;
const tc = resolvedSelectedNode.triggerConfig as Record<string, unknown> | undefined;
const cron = tc?.cron as string | undefined;
const interval = tc?.interval_minutes as number | undefined;
const eventTypes = tc?.event_types as string[] | undefined;
@@ -2503,7 +2590,7 @@ export default function Workspace() {
) : null;
})()}
{(() => {
const nfi = (selectedNode.triggerConfig as Record<string, unknown> | undefined)?.next_fire_in as number | undefined;
const nfi = (resolvedSelectedNode.triggerConfig as Record<string, unknown> | undefined)?.next_fire_in as number | undefined;
return nfi != null ? (
<div>
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mb-1.5">Next run</p>
@@ -2513,25 +2600,92 @@ export default function Workspace() {
</div>
) : null;
})()}
<div>
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mb-1.5">Task</p>
<textarea
value={triggerTaskDraft}
onChange={(e) => setTriggerTaskDraft(e.target.value)}
placeholder="Describe what the worker should do when this trigger fires..."
className="w-full text-xs text-foreground/80 bg-muted/30 rounded-lg px-3 py-2 border border-border/20 resize-none min-h-[60px] font-mono focus:outline-none focus:border-primary/40"
rows={3}
/>
{(() => {
const currentTask = (resolvedSelectedNode.triggerConfig as Record<string, unknown> | undefined)?.task as string || "";
const hasChanged = triggerTaskDraft !== currentTask;
if (!hasChanged) return null;
return (
<button
disabled={triggerTaskSaving}
onClick={async () => {
const sessionId = activeAgentState?.sessionId;
const triggerId = resolvedSelectedNode.id.replace("__trigger_", "");
if (!sessionId) return;
setTriggerTaskSaving(true);
try {
await sessionsApi.updateTriggerTask(sessionId, triggerId, triggerTaskDraft);
} finally {
setTriggerTaskSaving(false);
}
}}
className="mt-1.5 w-full text-[11px] px-3 py-1.5 rounded-lg border border-primary/30 text-primary hover:bg-primary/10 transition-colors disabled:opacity-50"
>
{triggerTaskSaving ? "Saving..." : "Save Task"}
</button>
);
})()}
{!triggerTaskDraft && (
<p className="text-[10px] text-amber-400/80 mt-1">A task is required before enabling this trigger.</p>
)}
</div>
<div>
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mb-1.5">Fires into</p>
<p className="text-xs text-foreground/80 font-mono bg-muted/30 rounded-lg px-3 py-2 border border-border/20">
{selectedNode.next?.[0]?.split("-").map(w => w.charAt(0).toUpperCase() + w.slice(1)).join(" ") || "—"}
{resolvedSelectedNode.next?.[0]?.split("-").map(w => w.charAt(0).toUpperCase() + w.slice(1)).join(" ") || "—"}
</p>
</div>
{activeAgentState?.queenPhase !== "building" && (() => {
const triggerIsActive = resolvedSelectedNode.status === "running" || resolvedSelectedNode.status === "complete";
const triggerId = resolvedSelectedNode.id.replace("__trigger_", "");
const taskMissing = !triggerTaskDraft;
return (
<div className="pt-1">
<button
disabled={!triggerIsActive && taskMissing}
onClick={async () => {
const sessionId = activeAgentState?.sessionId;
if (!sessionId) return;
const action = triggerIsActive ? "Disable" : "Enable";
await executionApi.chat(sessionId, `${action} trigger ${triggerId}`);
}}
className={`w-full text-xs px-3 py-2 rounded-lg border transition-colors ${
triggerIsActive
? "border-red-500/30 text-red-400 hover:bg-red-500/10"
: taskMissing
? "border-border/30 text-muted-foreground/40 cursor-not-allowed"
: "border-emerald-500/30 text-emerald-400 hover:bg-emerald-500/10"
}`}
>
{triggerIsActive ? "Disable Trigger" : "Enable Trigger"}
</button>
{!triggerIsActive && taskMissing && (
<p className="text-[10px] text-muted-foreground/50 mt-1 text-center">Configure a task first</p>
)}
</div>
);
})()}
</div>
</div>
) : (
<NodeDetailPanel
node={selectedNode}
nodeSpec={activeAgentState?.nodeSpecs.find(n => n.id === selectedNode.id) ?? null}
node={resolvedSelectedNode}
nodeSpec={activeAgentState?.nodeSpecs.find(n => n.id === resolvedSelectedNode.id) ?? null}
allNodeSpecs={activeAgentState?.nodeSpecs}
subagentReports={activeAgentState?.subagentReports}
sessionId={activeAgentState?.sessionId || undefined}
graphId={activeAgentState?.graphId || undefined}
workerSessionId={null}
nodeLogs={activeAgentState?.nodeLogs[selectedNode.id] || []}
actionPlan={activeAgentState?.nodeActionPlans[selectedNode.id]}
nodeLogs={activeAgentState?.nodeLogs[resolvedSelectedNode.id] || []}
actionPlan={activeAgentState?.nodeActionPlans[resolvedSelectedNode.id]}
onClose={() => setSelectedNode(null)}
/>
)}
+261
View File
@@ -0,0 +1,261 @@
"""Tests for queen-level trigger system.
Verifies that:
- Timer triggers fire inject_trigger() on the queen node
- Webhook triggers fire inject_trigger() via EventBus WEBHOOK_RECEIVED
- Queen node unavailable trigger skipped silently
- worker_runtime=None trigger discarded (gating)
- remove_trigger cleans up webhook subscription
- run_agent_with_input is in _QUEEN_RUNNING_TOOLS
- System prompts reference run_agent_with_input, not start_worker()
"""
from __future__ import annotations
import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from framework.runtime.event_bus import EventBus
from framework.runtime.triggers import TriggerDefinition
from framework.server.session_manager import Session
def _make_session(event_bus: EventBus, session_id: str = "session_trigger_test") -> Session:
return Session(id=session_id, event_bus=event_bus, llm=object(), loaded_at=0.0)
def _make_executor(queen_node) -> SimpleNamespace:
return SimpleNamespace(node_registry={"queen": queen_node})
@pytest.mark.asyncio
async def test_interval_timer_fires_inject_trigger_on_queen_node() -> None:
"""Timer with interval_minutes fires inject_trigger() on the queen node."""
from framework.graph.event_loop_node import TriggerEvent
from framework.tools.queen_lifecycle_tools import _start_trigger_timer
bus = EventBus()
session = _make_session(bus)
session.worker_runtime = object() # non-None → worker is loaded
queen_node = SimpleNamespace(inject_trigger=AsyncMock())
session.queen_executor = _make_executor(queen_node)
tdef = TriggerDefinition(
id="test-timer",
trigger_type="timer",
trigger_config={"interval_minutes": 0.001}, # ~60ms
task="run it",
)
await _start_trigger_timer(session, "test-timer", tdef)
# Let the timer fire at least once
await asyncio.sleep(0.15)
# Cancel the background task
task = session.active_timer_tasks.get("test-timer")
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
assert queen_node.inject_trigger.await_count >= 1
# Inspect the TriggerEvent passed to inject_trigger
call_args = queen_node.inject_trigger.await_args_list[0]
trigger: TriggerEvent = call_args.args[0]
assert trigger.trigger_type == "timer"
assert trigger.source_id == "test-timer"
assert trigger.payload.get("task") == "run it"
@pytest.mark.asyncio
async def test_timer_skipped_when_queen_node_unavailable() -> None:
"""No inject_trigger call and no exception when queen executor is not set."""
from framework.tools.queen_lifecycle_tools import _start_trigger_timer
bus = EventBus()
session = _make_session(bus)
session.worker_runtime = object()
session.queen_executor = None # queen not ready
tdef = TriggerDefinition(
id="no-queen-timer",
trigger_type="timer",
trigger_config={"interval_minutes": 0.001},
task="should not fire",
)
await _start_trigger_timer(session, "no-queen-timer", tdef)
await asyncio.sleep(0.15)
task = session.active_timer_tasks.get("no-queen-timer")
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# No exception raised, nothing to assert beyond completion
@pytest.mark.asyncio
async def test_webhook_trigger_fires_inject_trigger() -> None:
"""WEBHOOK_RECEIVED on EventBus → inject_trigger() on the queen node."""
from framework.graph.event_loop_node import TriggerEvent
from framework.tools.queen_lifecycle_tools import _start_trigger_webhook
bus = EventBus()
session = _make_session(bus)
session.worker_runtime = object()
queen_node = SimpleNamespace(inject_trigger=AsyncMock())
session.queen_executor = _make_executor(queen_node)
tdef = TriggerDefinition(
id="test-webhook",
trigger_type="webhook",
trigger_config={"path": "/hooks/test", "methods": ["POST"]},
task="process it",
)
# Patch WebhookServer to avoid binding a real port
mock_server = MagicMock()
mock_server.is_running = False
mock_server.add_route = MagicMock()
mock_server.start = AsyncMock()
with patch("framework.runtime.webhook_server.WebhookServer", return_value=mock_server):
with patch("framework.runtime.webhook_server.WebhookServerConfig"):
await _start_trigger_webhook(session, "test-webhook", tdef)
# Simulate an incoming webhook event on the EventBus
await bus.emit_webhook_received(
source_id="test-webhook",
path="/hooks/test",
method="POST",
headers={},
payload={"event": "push"},
)
await asyncio.sleep(0.05) # let handler run
assert queen_node.inject_trigger.await_count == 1
trigger: TriggerEvent = queen_node.inject_trigger.await_args_list[0].args[0]
assert trigger.trigger_type == "webhook"
assert trigger.source_id == "test-webhook"
assert trigger.payload["method"] == "POST"
assert trigger.payload["path"] == "/hooks/test"
assert trigger.payload["task"] == "process it"
assert trigger.payload["payload"] == {"event": "push"}
@pytest.mark.asyncio
async def test_webhook_trigger_discarded_when_no_worker() -> None:
"""inject_trigger is NOT called when no worker is loaded."""
from framework.tools.queen_lifecycle_tools import _start_trigger_webhook
bus = EventBus()
session = _make_session(bus)
session.worker_runtime = None # no worker
queen_node = SimpleNamespace(inject_trigger=AsyncMock())
session.queen_executor = _make_executor(queen_node)
tdef = TriggerDefinition(
id="no-worker-webhook",
trigger_type="webhook",
trigger_config={"path": "/hooks/noop", "methods": ["POST"]},
task="should not fire",
)
mock_server = MagicMock()
mock_server.is_running = False
mock_server.add_route = MagicMock()
mock_server.start = AsyncMock()
with patch("framework.runtime.webhook_server.WebhookServer", return_value=mock_server):
with patch("framework.runtime.webhook_server.WebhookServerConfig"):
await _start_trigger_webhook(session, "no-worker-webhook", tdef)
await bus.emit_webhook_received(
source_id="no-worker-webhook",
path="/hooks/noop",
method="POST",
headers={},
payload={},
)
await asyncio.sleep(0.05)
assert queen_node.inject_trigger.await_count == 0
@pytest.mark.asyncio
async def test_remove_trigger_cleans_up_webhook_subscription() -> None:
"""After remove_trigger(), WEBHOOK_RECEIVED no longer calls inject_trigger."""
from framework.tools.queen_lifecycle_tools import _start_trigger_webhook
bus = EventBus()
session = _make_session(bus)
session.worker_runtime = object()
queen_node = SimpleNamespace(inject_trigger=AsyncMock())
session.queen_executor = _make_executor(queen_node)
tdef = TriggerDefinition(
id="removable-webhook",
trigger_type="webhook",
trigger_config={"path": "/hooks/removable", "methods": ["POST"]},
task="run it",
)
mock_server = MagicMock()
mock_server.is_running = False
mock_server.add_route = MagicMock()
mock_server.start = AsyncMock()
with patch("framework.runtime.webhook_server.WebhookServer", return_value=mock_server):
with patch("framework.runtime.webhook_server.WebhookServerConfig"):
await _start_trigger_webhook(session, "removable-webhook", tdef)
# Manually unsubscribe (mirrors what remove_trigger does)
sub_id = session.active_webhook_subs.pop("removable-webhook", None)
assert sub_id is not None
bus.unsubscribe(sub_id)
# Now fire — should NOT reach queen
await bus.emit_webhook_received(
source_id="removable-webhook",
path="/hooks/removable",
method="POST",
headers={},
payload={},
)
await asyncio.sleep(0.05)
assert queen_node.inject_trigger.await_count == 0
assert "removable-webhook" not in session.active_webhook_subs
def test_run_agent_with_input_in_running_tools() -> None:
"""run_agent_with_input must be available to the queen in RUNNING phase."""
from framework.agents.hive_coder.nodes import _QUEEN_RUNNING_TOOLS
assert "run_agent_with_input" in _QUEEN_RUNNING_TOOLS
def test_system_prompt_uses_correct_tool_name() -> None:
"""Trigger handling rules must reference run_agent_with_input, not start_worker()."""
from framework.agents.hive_coder.nodes import (
_queen_behavior_running,
_queen_behavior_staging,
)
assert "run_agent_with_input" in _queen_behavior_running
assert "start_worker()" not in _queen_behavior_running
assert "run_agent_with_input" in _queen_behavior_staging
assert "start_worker()" not in _queen_behavior_staging