Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| cba0ec110f | |||
| 0256e0c944 | |||
| 4d9d0362a0 | |||
| f474d0bc8e | |||
| 6a0681b9aa | |||
| c7e634851b | |||
| cdb7155960 | |||
| 3f7790c26a | |||
| 5676b115f4 | |||
| 61c59d57e8 |
@@ -84,6 +84,10 @@ _QUEEN_STAGING_TOOLS = [
|
||||
# Launch or go back
|
||||
"run_agent_with_input",
|
||||
"stop_worker_and_edit",
|
||||
# Trigger management
|
||||
"set_trigger",
|
||||
"remove_trigger",
|
||||
"list_triggers",
|
||||
]
|
||||
|
||||
# Running phase: worker is executing — monitor and control.
|
||||
@@ -99,10 +103,15 @@ _QUEEN_RUNNING_TOOLS = [
|
||||
"stop_worker",
|
||||
"stop_worker_and_edit",
|
||||
"get_worker_status",
|
||||
"run_agent_with_input",
|
||||
"inject_worker_message",
|
||||
# Monitoring
|
||||
"get_worker_health_summary",
|
||||
"notify_operator",
|
||||
# Trigger management
|
||||
"set_trigger",
|
||||
"remove_trigger",
|
||||
"list_triggers",
|
||||
]
|
||||
|
||||
|
||||
@@ -477,6 +486,9 @@ The agent is loaded and ready to run. You can inspect it and launch it:
|
||||
- get_worker_status(focus?) — Brief status. Drill in with focus: memory, tools, issues, progress
|
||||
- run_agent_with_input(task) — Start the worker and switch to RUNNING phase
|
||||
- stop_worker_and_edit() — Go back to BUILDING phase
|
||||
- set_trigger(trigger_id, trigger_type?, trigger_config?) — Activate a trigger (timer)
|
||||
- remove_trigger(trigger_id) — Deactivate a trigger
|
||||
- list_triggers() — List all triggers and their active/inactive status
|
||||
|
||||
You do NOT have write tools. If you need to modify the agent, \
|
||||
call stop_worker_and_edit() to go back to BUILDING phase.
|
||||
@@ -493,6 +505,9 @@ The worker is running. You have monitoring and lifecycle tools:
|
||||
- notify_operator(ticket_id, analysis, urgency) — Alert the user (use sparingly)
|
||||
- stop_worker() — Stop the worker and return to STAGING phase, then ask the user what to do next
|
||||
- stop_worker_and_edit() — Stop the worker and switch back to BUILDING phase
|
||||
- set_trigger(trigger_id, trigger_type?, trigger_config?) — Activate a trigger (timer)
|
||||
- remove_trigger(trigger_id) — Deactivate a trigger
|
||||
- list_triggers() — List all triggers and their active/inactive status
|
||||
|
||||
You do NOT have write tools or agent construction tools. \
|
||||
If you need to modify the agent, call stop_worker_and_edit() to switch back \
|
||||
@@ -643,6 +658,33 @@ When the user asks to change, modify, or update the loaded worker \
|
||||
|
||||
1. Call stop_worker_and_edit() — this stops the worker and gives you \
|
||||
coding tools (switches to BUILDING phase).
|
||||
|
||||
## Trigger Management
|
||||
|
||||
Use list_triggers() to see available triggers from the loaded worker.
|
||||
Use set_trigger(trigger_id) to activate a timer. Once active, triggers \
|
||||
fire periodically and inject [TRIGGER: ...] messages so you can decide \
|
||||
whether to call run_agent_with_input(task).
|
||||
|
||||
### When the user says "Enable trigger <id>" (or clicks Enable in the UI):
|
||||
|
||||
1. Call get_worker_status(focus="memory") to check if the worker has \
|
||||
saved configuration (rules, preferences, settings from a prior run).
|
||||
2. If memory contains saved config: compose a task string from it \
|
||||
(e.g. "Process inbox emails using saved rules") and call \
|
||||
set_trigger(trigger_id, task="...") immediately. Tell the user the \
|
||||
trigger is now active and what schedule it uses. Do NOT ask them to \
|
||||
provide the task — you derive it from memory.
|
||||
3. If memory is empty (no prior run): tell the user the agent needs to \
|
||||
run once first so its configuration can be saved. Offer to run it now. \
|
||||
Once the worker finishes, enable the trigger.
|
||||
4. If the user just provided config this session (rules/task context \
|
||||
already in conversation): use that directly, no memory lookup needed. \
|
||||
Enable the trigger immediately.
|
||||
|
||||
Never ask "what should the task be?" when enabling a trigger for an \
|
||||
agent with a clear purpose. The task string is a brief description of \
|
||||
what the worker does, derived from its saved state or your current context.
|
||||
"""
|
||||
|
||||
# -- RUNNING phase behavior --
|
||||
@@ -733,6 +775,21 @@ When the user asks to change, modify, or update the loaded worker \
|
||||
|
||||
1. Call stop_worker_and_edit() — this stops the worker and gives you \
|
||||
coding tools (switches to BUILDING phase).
|
||||
|
||||
## Trigger Handling
|
||||
|
||||
You will receive [TRIGGER: ...] messages when a scheduled timer fires. \
|
||||
These are framework-level signals, not user messages.
|
||||
|
||||
Rules:
|
||||
- Check get_worker_status() before calling run_agent_with_input(task). If the worker \
|
||||
is already RUNNING, decide: skip this trigger, or note it for after completion.
|
||||
- When multiple [TRIGGER] messages arrive at once, read them all before acting. \
|
||||
Batch your response — do not call run_agent_with_input() once per trigger.
|
||||
- If a trigger fires but the task no longer makes sense (e.g., user changed \
|
||||
config since last run), skip it and inform the user.
|
||||
- Never disable a trigger without telling the user. Use remove_trigger() only \
|
||||
when explicitly asked or when the trigger is clearly obsolete.
|
||||
"""
|
||||
|
||||
# -- Backward-compatible composed versions (used by queen_node.system_prompt default) --
|
||||
|
||||
@@ -322,7 +322,11 @@ class AsyncEntryPointSpec(BaseModel):
|
||||
|
||||
id: str = Field(description="Unique identifier for this entry point")
|
||||
name: str = Field(description="Human-readable name")
|
||||
entry_node: str = Field(description="Node ID to start execution from")
|
||||
entry_node: str = Field(
|
||||
default="",
|
||||
description="Deprecated: Node ID to start execution from. "
|
||||
"Triggers are graph-level; worker always enters at GraphSpec.entry_node.",
|
||||
)
|
||||
trigger_type: str = Field(
|
||||
default="manual",
|
||||
description="How this entry point is triggered: webhook, api, timer, event, manual",
|
||||
@@ -331,6 +335,10 @@ class AsyncEntryPointSpec(BaseModel):
|
||||
default_factory=dict,
|
||||
description="Trigger-specific configuration (e.g., webhook URL, timer interval)",
|
||||
)
|
||||
task: str = Field(
|
||||
default="",
|
||||
description="Worker task string when this trigger fires autonomously",
|
||||
)
|
||||
isolation_level: str = Field(
|
||||
default="shared", description="State isolation: isolated, shared, or synchronized"
|
||||
)
|
||||
|
||||
@@ -36,6 +36,21 @@ from framework.runtime.llm_debug_logger import log_llm_turn
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TriggerEvent:
|
||||
"""A framework-level trigger signal (timer tick or webhook hit).
|
||||
|
||||
Triggers are queued separately from user messages / external events
|
||||
and drained atomically so the LLM sees all pending triggers at once.
|
||||
"""
|
||||
|
||||
trigger_type: str # "timer" | "webhook"
|
||||
source_id: str # entry point ID or webhook route ID
|
||||
payload: dict[str, Any] = field(default_factory=dict)
|
||||
timestamp: float = field(default_factory=time.time)
|
||||
|
||||
|
||||
# Pattern for detecting context-window-exceeded errors across LLM providers.
|
||||
_CONTEXT_TOO_LARGE_RE = re.compile(
|
||||
r"context.{0,20}(length|window|limit|size)|"
|
||||
@@ -342,6 +357,7 @@ class EventLoopNode(NodeProtocol):
|
||||
self._tool_executor = tool_executor
|
||||
self._conversation_store = conversation_store
|
||||
self._injection_queue: asyncio.Queue[tuple[str, bool]] = asyncio.Queue()
|
||||
self._trigger_queue: asyncio.Queue[TriggerEvent] = asyncio.Queue()
|
||||
# Client-facing input blocking state
|
||||
self._input_ready = asyncio.Event()
|
||||
self._awaiting_input = False
|
||||
@@ -615,6 +631,8 @@ class EventLoopNode(NodeProtocol):
|
||||
|
||||
# 6b. Drain injection queue
|
||||
await self._drain_injection_queue(conversation)
|
||||
# 6b1. Drain trigger queue (framework-level signals)
|
||||
await self._drain_trigger_queue(conversation)
|
||||
|
||||
# 6b2. Dynamic tool refresh (mode switching)
|
||||
if ctx.dynamic_tools_provider is not None:
|
||||
@@ -1692,6 +1710,15 @@ class EventLoopNode(NodeProtocol):
|
||||
await self._injection_queue.put((content, is_client_input))
|
||||
self._input_ready.set()
|
||||
|
||||
async def inject_trigger(self, trigger: TriggerEvent) -> None:
|
||||
"""Inject a framework-level trigger into the running queen loop.
|
||||
|
||||
Triggers are queued separately from user messages and drained
|
||||
atomically via _drain_trigger_queue().
|
||||
"""
|
||||
await self._trigger_queue.put(trigger)
|
||||
self._input_ready.set()
|
||||
|
||||
def signal_shutdown(self) -> None:
|
||||
"""Signal the node to exit its loop cleanly.
|
||||
|
||||
@@ -1737,9 +1764,9 @@ class EventLoopNode(NodeProtocol):
|
||||
|
||||
Returns True if input arrived, False if shutdown was signaled.
|
||||
"""
|
||||
# If messages arrived while the LLM was processing, skip blocking
|
||||
# entirely — the next _drain_injection_queue() will pick them up.
|
||||
if not self._injection_queue.empty():
|
||||
# If messages or triggers arrived while the LLM was processing, skip
|
||||
# blocking — the next drain pass will pick them up.
|
||||
if not self._injection_queue.empty() or not self._trigger_queue.empty():
|
||||
return True
|
||||
|
||||
# Clear BEFORE emitting so that synchronous handlers (e.g. the
|
||||
@@ -3843,6 +3870,34 @@ class EventLoopNode(NodeProtocol):
|
||||
break
|
||||
return count
|
||||
|
||||
async def _drain_trigger_queue(self, conversation: NodeConversation) -> int:
|
||||
"""Drain all pending trigger events as a single batched user message.
|
||||
|
||||
Multiple triggers are merged so the LLM sees them atomically and can
|
||||
reason about all pending triggers before acting.
|
||||
"""
|
||||
triggers: list[TriggerEvent] = []
|
||||
while not self._trigger_queue.empty():
|
||||
try:
|
||||
triggers.append(self._trigger_queue.get_nowait())
|
||||
except asyncio.QueueEmpty:
|
||||
break
|
||||
|
||||
if not triggers:
|
||||
return 0
|
||||
|
||||
parts: list[str] = []
|
||||
for t in triggers:
|
||||
task = t.payload.get("task", "")
|
||||
task_line = f"\nTask: {task}" if task else ""
|
||||
payload_str = json.dumps(t.payload, default=str)
|
||||
parts.append(f"[TRIGGER: {t.trigger_type}/{t.source_id}]{task_line}\n{payload_str}")
|
||||
|
||||
combined = "\n\n".join(parts)
|
||||
logger.info("[drain] %d trigger(s): %s", len(triggers), combined[:200])
|
||||
await conversation.add_user_message(combined)
|
||||
return len(triggers)
|
||||
|
||||
async def _check_pause(
|
||||
self,
|
||||
ctx: NodeContext,
|
||||
|
||||
@@ -146,6 +146,13 @@ class EventType(StrEnum):
|
||||
# Subagent reports (one-way progress updates from sub-agents)
|
||||
SUBAGENT_REPORT = "subagent_report"
|
||||
|
||||
# Trigger lifecycle (queen-level triggers / heartbeats)
|
||||
TRIGGER_AVAILABLE = "trigger_available"
|
||||
TRIGGER_ACTIVATED = "trigger_activated"
|
||||
TRIGGER_DEACTIVATED = "trigger_deactivated"
|
||||
TRIGGER_FIRED = "trigger_fired"
|
||||
TRIGGER_REMOVED = "trigger_removed"
|
||||
|
||||
|
||||
@dataclass
|
||||
class AgentEvent:
|
||||
|
||||
@@ -423,6 +423,30 @@ class ExecutionStream:
|
||||
return True
|
||||
return False
|
||||
|
||||
async def inject_trigger(
|
||||
self,
|
||||
node_id: str,
|
||||
trigger: Any,
|
||||
) -> bool:
|
||||
"""Inject a trigger event into a running queen EventLoopNode.
|
||||
|
||||
Searches active executors for a node matching ``node_id`` and calls
|
||||
its ``inject_trigger()`` method to wake the queen.
|
||||
|
||||
Args:
|
||||
node_id: The queen EventLoopNode ID.
|
||||
trigger: A ``TriggerEvent`` instance (typed as Any to avoid
|
||||
circular imports with graph layer).
|
||||
|
||||
Returns True if the trigger was delivered, False otherwise.
|
||||
"""
|
||||
for executor in self._active_executors.values():
|
||||
node = executor.node_registry.get(node_id)
|
||||
if node is not None and hasattr(node, "inject_trigger"):
|
||||
await node.inject_trigger(trigger)
|
||||
return True
|
||||
return False
|
||||
|
||||
async def execute(
|
||||
self,
|
||||
input_data: dict[str, Any],
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
"""Trigger definitions for queen-level heartbeats (timers, webhooks)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class TriggerDefinition:
|
||||
"""A registered trigger that can be activated on the queen runtime.
|
||||
|
||||
Trigger *definitions* come from the worker's ``async_entry_points``.
|
||||
Activation state is per-session (persisted in ``SessionState.active_triggers``).
|
||||
"""
|
||||
|
||||
id: str
|
||||
trigger_type: str # "timer" | "webhook"
|
||||
trigger_config: dict[str, Any] = field(default_factory=dict)
|
||||
description: str = ""
|
||||
task: str = ""
|
||||
active: bool = False
|
||||
@@ -141,6 +141,13 @@ class SessionState(BaseModel):
|
||||
checkpoint_enabled: bool = False
|
||||
latest_checkpoint_id: str | None = None
|
||||
|
||||
# Trigger activation state (IDs of triggers the queen/user turned on)
|
||||
active_triggers: list[str] = Field(default_factory=list)
|
||||
# Per-trigger task strings (user overrides, keyed by trigger ID)
|
||||
trigger_tasks: dict[str, str] = Field(default_factory=dict)
|
||||
# True after first successful worker execution (gates trigger delivery on restart)
|
||||
worker_configured: bool = Field(default=False)
|
||||
|
||||
model_config = {"extra": "allow"}
|
||||
|
||||
@computed_field
|
||||
|
||||
@@ -40,6 +40,11 @@ DEFAULT_EVENT_TYPES = [
|
||||
EventType.CREDENTIALS_REQUIRED,
|
||||
EventType.SUBAGENT_REPORT,
|
||||
EventType.QUEEN_PHASE_CHANGED,
|
||||
EventType.TRIGGER_AVAILABLE,
|
||||
EventType.TRIGGER_ACTIVATED,
|
||||
EventType.TRIGGER_DEACTIVATED,
|
||||
EventType.TRIGGER_FIRED,
|
||||
EventType.TRIGGER_REMOVED,
|
||||
]
|
||||
|
||||
# Keepalive interval in seconds
|
||||
|
||||
@@ -357,8 +357,8 @@ async def handle_pause(request: web.Request) -> web.Response:
|
||||
runtime.pause_timers()
|
||||
|
||||
# Switch to staging (agent still loaded, ready to re-run)
|
||||
if session.mode_state is not None:
|
||||
await session.mode_state.switch_to_staging(source="frontend")
|
||||
if session.phase_state is not None:
|
||||
await session.phase_state.switch_to_staging(source="frontend")
|
||||
|
||||
return web.json_response(
|
||||
{
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
@@ -116,6 +117,20 @@ async def handle_list_nodes(request: web.Request) -> web.Response:
|
||||
}
|
||||
for ep in reg.entry_points.values()
|
||||
]
|
||||
# Append hoisted triggers (stripped from worker runtime, stored on session)
|
||||
for t in getattr(session, "available_triggers", {}).values():
|
||||
entry = {
|
||||
"id": t.id,
|
||||
"name": t.description or t.id,
|
||||
"entry_node": graph.entry_node,
|
||||
"trigger_type": t.trigger_type,
|
||||
"trigger_config": t.trigger_config,
|
||||
"task": t.task,
|
||||
}
|
||||
mono = getattr(session, "trigger_next_fire", {}).get(t.id)
|
||||
if mono is not None:
|
||||
entry["next_fire_in"] = max(0.0, mono - time.monotonic())
|
||||
entry_points.append(entry)
|
||||
return web.json_response(
|
||||
{
|
||||
"nodes": nodes,
|
||||
|
||||
@@ -9,6 +9,7 @@ Session-primary routes:
|
||||
- DELETE /api/sessions/{session_id}/worker — unload worker from session
|
||||
- GET /api/sessions/{session_id}/stats — runtime statistics
|
||||
- GET /api/sessions/{session_id}/entry-points — list entry points
|
||||
- PATCH /api/sessions/{session_id}/triggers/{id} — update trigger task
|
||||
- GET /api/sessions/{session_id}/graphs — list graph IDs
|
||||
- GET /api/sessions/{session_id}/queen-messages — queen conversation history
|
||||
|
||||
@@ -228,6 +229,22 @@ async def handle_get_live_session(request: web.Request) -> web.Response:
|
||||
}
|
||||
for ep in rt.get_entry_points()
|
||||
]
|
||||
# Append hoisted triggers (stripped from worker runtime, stored on session)
|
||||
runner = getattr(session, "runner", None)
|
||||
graph_entry = runner.graph.entry_node if runner else ""
|
||||
for t in getattr(session, "available_triggers", {}).values():
|
||||
entry = {
|
||||
"id": t.id,
|
||||
"name": t.description or t.id,
|
||||
"entry_node": graph_entry,
|
||||
"trigger_type": t.trigger_type,
|
||||
"trigger_config": t.trigger_config,
|
||||
"task": t.task,
|
||||
}
|
||||
mono = getattr(session, "trigger_next_fire", {}).get(t.id)
|
||||
if mono is not None:
|
||||
entry["next_fire_in"] = max(0.0, mono - time.monotonic())
|
||||
data["entry_points"].append(entry)
|
||||
data["graphs"] = session.worker_runtime.list_graphs()
|
||||
|
||||
return web.json_response(data)
|
||||
@@ -351,23 +368,79 @@ async def handle_session_entry_points(request: web.Request) -> web.Response:
|
||||
|
||||
rt = session.worker_runtime
|
||||
eps = rt.get_entry_points() if rt else []
|
||||
entry_points = [
|
||||
{
|
||||
"id": ep.id,
|
||||
"name": ep.name,
|
||||
"entry_node": ep.entry_node,
|
||||
"trigger_type": ep.trigger_type,
|
||||
"trigger_config": ep.trigger_config,
|
||||
**(
|
||||
{"next_fire_in": nf}
|
||||
if rt and (nf := rt.get_timer_next_fire_in(ep.id)) is not None
|
||||
else {}
|
||||
),
|
||||
}
|
||||
for ep in eps
|
||||
]
|
||||
# Append hoisted triggers (stripped from worker runtime, stored on session)
|
||||
runner = getattr(session, "runner", None)
|
||||
graph_entry = runner.graph.entry_node if runner else ""
|
||||
for t in getattr(session, "available_triggers", {}).values():
|
||||
entry = {
|
||||
"id": t.id,
|
||||
"name": t.description or t.id,
|
||||
"entry_node": graph_entry,
|
||||
"trigger_type": t.trigger_type,
|
||||
"trigger_config": t.trigger_config,
|
||||
"task": t.task,
|
||||
}
|
||||
mono = getattr(session, "trigger_next_fire", {}).get(t.id)
|
||||
if mono is not None:
|
||||
entry["next_fire_in"] = max(0.0, mono - time.monotonic())
|
||||
entry_points.append(entry)
|
||||
return web.json_response({"entry_points": entry_points})
|
||||
|
||||
|
||||
async def handle_update_trigger_task(request: web.Request) -> web.Response:
|
||||
"""PATCH /api/sessions/{session_id}/triggers/{trigger_id} — update trigger task."""
|
||||
session, err = resolve_session(request)
|
||||
if err:
|
||||
return err
|
||||
|
||||
trigger_id = request.match_info["trigger_id"]
|
||||
available = getattr(session, "available_triggers", {})
|
||||
tdef = available.get(trigger_id)
|
||||
if tdef is None:
|
||||
return web.json_response(
|
||||
{"error": f"Trigger '{trigger_id}' not found"},
|
||||
status=404,
|
||||
)
|
||||
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
return web.json_response({"error": "Invalid JSON body"}, status=400)
|
||||
|
||||
task = body.get("task")
|
||||
if task is None:
|
||||
return web.json_response({"error": "Missing 'task' field"}, status=400)
|
||||
if not isinstance(task, str):
|
||||
return web.json_response({"error": "'task' must be a string"}, status=400)
|
||||
|
||||
tdef.task = task
|
||||
|
||||
# Persist if trigger is currently active
|
||||
if trigger_id in getattr(session, "active_trigger_ids", set()):
|
||||
from framework.tools.queen_lifecycle_tools import _persist_active_triggers
|
||||
|
||||
session_id = request.match_info["session_id"]
|
||||
await _persist_active_triggers(session, session_id)
|
||||
|
||||
return web.json_response(
|
||||
{
|
||||
"entry_points": [
|
||||
{
|
||||
"id": ep.id,
|
||||
"name": ep.name,
|
||||
"entry_node": ep.entry_node,
|
||||
"trigger_type": ep.trigger_type,
|
||||
"trigger_config": ep.trigger_config,
|
||||
**(
|
||||
{"next_fire_in": nf}
|
||||
if rt and (nf := rt.get_timer_next_fire_in(ep.id)) is not None
|
||||
else {}
|
||||
),
|
||||
}
|
||||
for ep in eps
|
||||
]
|
||||
"trigger_id": trigger_id,
|
||||
"task": tdef.task,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -783,6 +856,9 @@ def register_routes(app: web.Application) -> None:
|
||||
# Session info
|
||||
app.router.add_get("/api/sessions/{session_id}/stats", handle_session_stats)
|
||||
app.router.add_get("/api/sessions/{session_id}/entry-points", handle_session_entry_points)
|
||||
app.router.add_patch(
|
||||
"/api/sessions/{session_id}/triggers/{trigger_id}", handle_update_trigger_task
|
||||
)
|
||||
app.router.add_get("/api/sessions/{session_id}/graphs", handle_session_graphs)
|
||||
app.router.add_get("/api/sessions/{session_id}/queen-messages", handle_queen_messages)
|
||||
|
||||
|
||||
@@ -15,11 +15,13 @@ import json
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from framework.runtime.triggers import TriggerDefinition
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -46,6 +48,19 @@ class Session:
|
||||
judge_task: asyncio.Task | None = None
|
||||
escalation_sub: str | None = None
|
||||
worker_handoff_sub: str | None = None
|
||||
# Trigger definitions hoisted from worker (available but inactive)
|
||||
available_triggers: dict[str, TriggerDefinition] = field(default_factory=dict)
|
||||
# Active trigger tracking (IDs currently firing + their asyncio tasks)
|
||||
active_trigger_ids: set[str] = field(default_factory=set)
|
||||
active_timer_tasks: dict[str, asyncio.Task] = field(default_factory=dict)
|
||||
# Queen-owned webhook server (lazy singleton, created on first webhook trigger activation)
|
||||
queen_webhook_server: Any = None
|
||||
# EventBus subscription IDs for active webhook triggers (trigger_id -> sub_id)
|
||||
active_webhook_subs: dict[str, str] = field(default_factory=dict)
|
||||
# True after first successful worker execution (gates trigger delivery)
|
||||
worker_configured: bool = False
|
||||
# Monotonic timestamps for next trigger fire (mirrors AgentRuntime._timer_next_fire)
|
||||
trigger_next_fire: dict[str, float] = field(default_factory=dict)
|
||||
# Session directory resumption:
|
||||
# When set, _start_queen writes queen conversations to this existing session's
|
||||
# directory instead of creating a new one. This lets cold-restores accumulate
|
||||
@@ -240,6 +255,31 @@ class SessionManager:
|
||||
|
||||
runtime = runner._agent_runtime
|
||||
|
||||
# Auto-hoist: extract timer/webhook triggers from worker graph
|
||||
# and store as available (inactive) trigger definitions on the session.
|
||||
# Strip them from the worker runtime so it doesn't start its own loops.
|
||||
if runtime:
|
||||
for ep in runner.graph.async_entry_points:
|
||||
if ep.trigger_type in ("timer", "webhook"):
|
||||
session.available_triggers[ep.id] = TriggerDefinition(
|
||||
id=ep.id,
|
||||
trigger_type=ep.trigger_type,
|
||||
trigger_config=ep.trigger_config,
|
||||
description=ep.name,
|
||||
task=getattr(ep, "task", ""),
|
||||
)
|
||||
runtime.unregister_entry_point(ep.id)
|
||||
logger.info(
|
||||
"Hoisted trigger '%s' (%s) from worker to queen",
|
||||
ep.id,
|
||||
ep.trigger_type,
|
||||
)
|
||||
|
||||
if session.available_triggers:
|
||||
await self._emit_trigger_events(
|
||||
session, "available", session.available_triggers
|
||||
)
|
||||
|
||||
# Start runtime on event loop
|
||||
if runtime and not runtime.is_running:
|
||||
await runtime.start()
|
||||
@@ -331,6 +371,45 @@ class SessionManager:
|
||||
# await self._start_judge(session, session.runner._storage_path)
|
||||
await self._notify_queen_worker_loaded(session)
|
||||
|
||||
# Restore previously active triggers from persisted session state
|
||||
if session.available_triggers and session.worker_runtime:
|
||||
try:
|
||||
store = session.worker_runtime._session_store
|
||||
state = await store.read_state(session_id)
|
||||
if state and state.active_triggers:
|
||||
from framework.tools.queen_lifecycle_tools import (
|
||||
_start_trigger_timer,
|
||||
_start_trigger_webhook,
|
||||
)
|
||||
|
||||
saved_tasks = getattr(state, "trigger_tasks", {}) or {}
|
||||
for tid in state.active_triggers:
|
||||
tdef = session.available_triggers.get(tid)
|
||||
if tdef:
|
||||
# Restore user-configured task override
|
||||
saved_task = saved_tasks.get(tid, "")
|
||||
if saved_task:
|
||||
tdef.task = saved_task
|
||||
tdef.active = True
|
||||
session.active_trigger_ids.add(tid)
|
||||
if tdef.trigger_type == "timer":
|
||||
await _start_trigger_timer(session, tid, tdef)
|
||||
logger.info("Restored trigger timer '%s'", tid)
|
||||
elif tdef.trigger_type == "webhook":
|
||||
await _start_trigger_webhook(session, tid, tdef)
|
||||
logger.info("Restored webhook trigger '%s'", tid)
|
||||
else:
|
||||
logger.warning(
|
||||
"Saved trigger '%s' not found in worker entry points, skipping",
|
||||
tid,
|
||||
)
|
||||
|
||||
# Restore worker_configured flag
|
||||
if state and getattr(state, "worker_configured", False):
|
||||
session.worker_configured = True
|
||||
except Exception as e:
|
||||
logger.warning("Failed to restore active triggers: %s", e)
|
||||
|
||||
# Emit SSE event so the frontend can update UI
|
||||
await self._emit_worker_loaded(session)
|
||||
|
||||
@@ -354,6 +433,26 @@ class SessionManager:
|
||||
except Exception as e:
|
||||
logger.error("Error cleaning up worker '%s': %s", session.worker_id, e)
|
||||
|
||||
# Cancel active trigger timers
|
||||
for tid, task in session.active_timer_tasks.items():
|
||||
task.cancel()
|
||||
logger.info("Cancelled trigger timer '%s' on unload", tid)
|
||||
session.active_timer_tasks.clear()
|
||||
|
||||
# Unsubscribe webhook handlers (server stays alive — queen-owned)
|
||||
for sub_id in session.active_webhook_subs.values():
|
||||
try:
|
||||
session.event_bus.unsubscribe(sub_id)
|
||||
except Exception:
|
||||
pass
|
||||
session.active_webhook_subs.clear()
|
||||
session.active_trigger_ids.clear()
|
||||
|
||||
# Clean up hoisted triggers
|
||||
if session.available_triggers:
|
||||
await self._emit_trigger_events(session, "removed", session.available_triggers)
|
||||
session.available_triggers.clear()
|
||||
|
||||
worker_id = session.worker_id
|
||||
session.worker_id = None
|
||||
session.worker_path = None
|
||||
@@ -394,6 +493,25 @@ class SessionManager:
|
||||
session.queen_task = None
|
||||
session.queen_executor = None
|
||||
|
||||
# Cancel active trigger timers
|
||||
for task in session.active_timer_tasks.values():
|
||||
task.cancel()
|
||||
session.active_timer_tasks.clear()
|
||||
|
||||
# Unsubscribe webhook handlers and stop queen webhook server
|
||||
for sub_id in session.active_webhook_subs.values():
|
||||
try:
|
||||
session.event_bus.unsubscribe(sub_id)
|
||||
except Exception:
|
||||
pass
|
||||
session.active_webhook_subs.clear()
|
||||
if session.queen_webhook_server is not None:
|
||||
try:
|
||||
await session.queen_webhook_server.stop()
|
||||
except Exception:
|
||||
logger.error("Error stopping queen webhook server", exc_info=True)
|
||||
session.queen_webhook_server = None
|
||||
|
||||
# Cleanup worker
|
||||
if session.runner:
|
||||
try:
|
||||
@@ -720,6 +838,8 @@ class SessionManager:
|
||||
if phase_state.phase == "running":
|
||||
# Build termination notification for the queen
|
||||
if event.type == _ET.EXECUTION_COMPLETED:
|
||||
# Mark worker as configured after first successful run
|
||||
session.worker_configured = True
|
||||
output = event.data.get("output", {})
|
||||
output_summary = ""
|
||||
if output:
|
||||
@@ -909,7 +1029,22 @@ class SessionManager:
|
||||
return
|
||||
|
||||
profile = build_worker_profile(session.worker_runtime, agent_path=session.worker_path)
|
||||
await node.inject_event(f"[SYSTEM] Worker loaded.{profile}")
|
||||
|
||||
# Append available trigger info so the queen knows what's schedulable
|
||||
trigger_lines = ""
|
||||
if session.available_triggers:
|
||||
parts = []
|
||||
for t in session.available_triggers.values():
|
||||
cfg = t.trigger_config
|
||||
detail = cfg.get("cron") or f"every {cfg.get('interval_minutes', '?')} min"
|
||||
task_info = f' -> task: "{t.task}"' if t.task else " (no task configured)"
|
||||
parts.append(f" - {t.id} ({t.trigger_type}: {detail}){task_info}")
|
||||
trigger_lines = (
|
||||
"\n\nAvailable triggers (inactive — use set_trigger to activate):\n"
|
||||
+ "\n".join(parts)
|
||||
)
|
||||
|
||||
await node.inject_event(f"[SYSTEM] Worker loaded.{profile}{trigger_lines}")
|
||||
|
||||
async def _emit_worker_loaded(self, session: Session) -> None:
|
||||
"""Publish a WORKER_LOADED event so the frontend can update."""
|
||||
@@ -944,6 +1079,31 @@ class SessionManager:
|
||||
"Handle all tasks directly using your coding tools."
|
||||
)
|
||||
|
||||
async def _emit_trigger_events(
|
||||
self,
|
||||
session: Session,
|
||||
kind: str,
|
||||
triggers: dict[str, TriggerDefinition],
|
||||
) -> None:
|
||||
"""Emit TRIGGER_AVAILABLE or TRIGGER_REMOVED events for each trigger."""
|
||||
from framework.runtime.event_bus import AgentEvent, EventType
|
||||
|
||||
event_type = (
|
||||
EventType.TRIGGER_AVAILABLE if kind == "available" else EventType.TRIGGER_REMOVED
|
||||
)
|
||||
for t in triggers.values():
|
||||
await session.event_bus.publish(
|
||||
AgentEvent(
|
||||
type=event_type,
|
||||
stream_id="queen",
|
||||
data={
|
||||
"trigger_id": t.id,
|
||||
"trigger_type": t.trigger_type,
|
||||
"trigger_config": t.trigger_config,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
async def revive_queen(self, session: Session, initial_prompt: str | None = None) -> None:
|
||||
"""Revive a dead queen executor on an existing session.
|
||||
|
||||
|
||||
@@ -36,8 +36,9 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
@@ -242,6 +243,247 @@ def build_worker_profile(runtime: AgentRuntime, agent_path: Path | str | None =
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
async def _start_trigger_timer(session: Any, trigger_id: str, trigger_def: Any) -> None:
|
||||
"""Start an asyncio timer task that fires triggers into the queen node.
|
||||
|
||||
The task is stored in ``session.active_timer_tasks[trigger_id]`` and can
|
||||
be cancelled via ``task.cancel()``.
|
||||
"""
|
||||
from framework.graph.event_loop_node import TriggerEvent
|
||||
|
||||
tc = trigger_def.trigger_config
|
||||
cron_expr = tc.get("cron")
|
||||
interval_minutes = tc.get("interval_minutes")
|
||||
|
||||
async def _timer_loop():
|
||||
try:
|
||||
if cron_expr:
|
||||
from croniter import croniter
|
||||
|
||||
# Wait for first fire
|
||||
cron = croniter(cron_expr, datetime.now())
|
||||
next_dt = cron.get_next(datetime)
|
||||
sleep_secs = (next_dt - datetime.now()).total_seconds()
|
||||
session.trigger_next_fire[trigger_id] = time.monotonic() + max(0, sleep_secs)
|
||||
await asyncio.sleep(max(0, sleep_secs))
|
||||
|
||||
while True:
|
||||
# Fire trigger
|
||||
if getattr(session, "worker_runtime", None) is None:
|
||||
logger.warning(
|
||||
"Trigger '%s': worker not loaded, discarding tick", trigger_id
|
||||
)
|
||||
else:
|
||||
queen_node = _get_queen_node(session)
|
||||
if queen_node is not None:
|
||||
trigger = TriggerEvent(
|
||||
trigger_type="timer",
|
||||
source_id=trigger_id,
|
||||
payload={
|
||||
"schedule": cron_expr,
|
||||
"time": datetime.now(UTC).isoformat(),
|
||||
"task": trigger_def.task,
|
||||
},
|
||||
timestamp=time.time(),
|
||||
)
|
||||
await session.event_bus.publish(
|
||||
AgentEvent(
|
||||
type=EventType.TRIGGER_FIRED,
|
||||
stream_id="queen",
|
||||
data={
|
||||
"trigger_id": trigger_id,
|
||||
"trigger_type": "timer",
|
||||
"payload": trigger.payload,
|
||||
},
|
||||
)
|
||||
)
|
||||
await queen_node.inject_trigger(trigger)
|
||||
logger.info("Trigger '%s' fired (cron: %s)", trigger_id, cron_expr)
|
||||
else:
|
||||
logger.warning(
|
||||
"Trigger '%s': queen node not available, skipping tick", trigger_id
|
||||
)
|
||||
|
||||
# Compute next fire
|
||||
cron = croniter(cron_expr, datetime.now())
|
||||
next_dt = cron.get_next(datetime)
|
||||
sleep_secs = (next_dt - datetime.now()).total_seconds()
|
||||
session.trigger_next_fire[trigger_id] = time.monotonic() + max(0, sleep_secs)
|
||||
await asyncio.sleep(max(0, sleep_secs))
|
||||
|
||||
elif interval_minutes:
|
||||
sleep_secs = interval_minutes * 60
|
||||
session.trigger_next_fire[trigger_id] = time.monotonic() + sleep_secs
|
||||
await asyncio.sleep(sleep_secs)
|
||||
|
||||
while True:
|
||||
if getattr(session, "worker_runtime", None) is None:
|
||||
logger.warning(
|
||||
"Trigger '%s': worker not loaded, discarding tick", trigger_id
|
||||
)
|
||||
else:
|
||||
queen_node = _get_queen_node(session)
|
||||
if queen_node is not None:
|
||||
trigger = TriggerEvent(
|
||||
trigger_type="timer",
|
||||
source_id=trigger_id,
|
||||
payload={
|
||||
"interval_minutes": interval_minutes,
|
||||
"time": datetime.now(UTC).isoformat(),
|
||||
"task": trigger_def.task,
|
||||
},
|
||||
timestamp=time.time(),
|
||||
)
|
||||
await session.event_bus.publish(
|
||||
AgentEvent(
|
||||
type=EventType.TRIGGER_FIRED,
|
||||
stream_id="queen",
|
||||
data={
|
||||
"trigger_id": trigger_id,
|
||||
"trigger_type": "timer",
|
||||
"payload": trigger.payload,
|
||||
},
|
||||
)
|
||||
)
|
||||
await queen_node.inject_trigger(trigger)
|
||||
logger.info(
|
||||
"Trigger '%s' fired (interval: %dm)", trigger_id, interval_minutes
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"Trigger '%s': queen node not available, skipping tick", trigger_id
|
||||
)
|
||||
|
||||
session.trigger_next_fire[trigger_id] = time.monotonic() + sleep_secs
|
||||
await asyncio.sleep(sleep_secs)
|
||||
except asyncio.CancelledError:
|
||||
session.trigger_next_fire.pop(trigger_id, None)
|
||||
logger.info("Trigger timer '%s' cancelled", trigger_id)
|
||||
except Exception:
|
||||
logger.error("Trigger timer '%s' failed", trigger_id, exc_info=True)
|
||||
|
||||
task = asyncio.create_task(_timer_loop(), name=f"trigger-timer-{trigger_id}")
|
||||
session.active_timer_tasks[trigger_id] = task
|
||||
|
||||
|
||||
async def _start_trigger_webhook(session: Any, trigger_id: str, trigger_def: Any) -> None:
|
||||
"""Subscribe to WEBHOOK_RECEIVED EventBus events for a webhook trigger.
|
||||
|
||||
Creates (or reuses) the queen-owned WebhookServer on the session,
|
||||
registers the route, and subscribes via EventBus with filter_stream=trigger_id.
|
||||
The subscription ID is stored in session.active_webhook_subs[trigger_id].
|
||||
"""
|
||||
from framework.graph.event_loop_node import TriggerEvent
|
||||
from framework.runtime.webhook_server import WebhookRoute, WebhookServer, WebhookServerConfig
|
||||
|
||||
tc = trigger_def.trigger_config
|
||||
path: str = tc["path"]
|
||||
methods: list[str] = [m.upper() for m in tc.get("methods", ["POST"])]
|
||||
secret: str | None = tc.get("secret")
|
||||
# Queen server defaults to 8090 to avoid colliding with worker's 8080
|
||||
port: int = int(tc.get("port", 8090))
|
||||
|
||||
# Lazy-create the queen webhook server (singleton per session)
|
||||
if getattr(session, "queen_webhook_server", None) is None:
|
||||
server = WebhookServer(session.event_bus, WebhookServerConfig(host="127.0.0.1", port=port))
|
||||
session.queen_webhook_server = server
|
||||
|
||||
# Register route — source_id=trigger_id so filter_stream matches exactly
|
||||
route = WebhookRoute(source_id=trigger_id, path=path, methods=methods, secret=secret)
|
||||
session.queen_webhook_server.add_route(route)
|
||||
|
||||
if not session.queen_webhook_server.is_running:
|
||||
await session.queen_webhook_server.start()
|
||||
|
||||
# Subscribe to WEBHOOK_RECEIVED filtered by trigger_id (== route.source_id)
|
||||
async def _on_webhook(event: Any) -> None:
|
||||
if getattr(session, "worker_runtime", None) is None:
|
||||
logger.warning("Webhook trigger '%s': no worker loaded, discarding", trigger_id)
|
||||
return
|
||||
queen_node = _get_queen_node(session)
|
||||
if queen_node is None:
|
||||
logger.warning("Webhook trigger '%s': queen node not ready, skipping", trigger_id)
|
||||
return
|
||||
trigger = TriggerEvent(
|
||||
trigger_type="webhook",
|
||||
source_id=trigger_id,
|
||||
payload={
|
||||
"source": trigger_id,
|
||||
"method": event.data.get("method", ""),
|
||||
"path": path,
|
||||
"payload": event.data.get("payload", {}),
|
||||
"task": trigger_def.task,
|
||||
},
|
||||
timestamp=time.time(),
|
||||
)
|
||||
await session.event_bus.publish(
|
||||
AgentEvent(
|
||||
type=EventType.TRIGGER_FIRED,
|
||||
stream_id="queen",
|
||||
data={
|
||||
"trigger_id": trigger_id,
|
||||
"trigger_type": "webhook",
|
||||
"payload": trigger.payload,
|
||||
},
|
||||
)
|
||||
)
|
||||
await queen_node.inject_trigger(trigger)
|
||||
logger.info(
|
||||
"Webhook trigger '%s' fired (%s %s)", trigger_id, event.data.get("method", ""), path
|
||||
)
|
||||
|
||||
sub_id = session.event_bus.subscribe(
|
||||
event_types=[EventType.WEBHOOK_RECEIVED],
|
||||
handler=_on_webhook,
|
||||
filter_stream=trigger_id,
|
||||
)
|
||||
if not hasattr(session, "active_webhook_subs"):
|
||||
session.active_webhook_subs = {}
|
||||
session.active_webhook_subs[trigger_id] = sub_id
|
||||
|
||||
|
||||
def _get_queen_node(session: Any):
|
||||
"""Get the queen EventLoopNode from the session's executor."""
|
||||
executor = getattr(session, "queen_executor", None)
|
||||
if executor is None:
|
||||
return None
|
||||
registry = getattr(executor, "node_registry", None)
|
||||
if registry is None:
|
||||
return None
|
||||
return registry.get("queen")
|
||||
|
||||
|
||||
async def _persist_active_triggers(session: Any, session_id: str | None) -> None:
|
||||
"""Persist active trigger IDs to the worker's session state."""
|
||||
runtime = getattr(session, "worker_runtime", None)
|
||||
if runtime is None or session_id is None:
|
||||
return
|
||||
try:
|
||||
store = runtime._session_store
|
||||
state = await store.read_state(session_id)
|
||||
if state is None:
|
||||
from framework.schemas.session_state import SessionState, SessionTimestamps
|
||||
|
||||
now = datetime.now().isoformat()
|
||||
state = SessionState(
|
||||
session_id=session_id,
|
||||
goal_id="",
|
||||
timestamps=SessionTimestamps(started_at=now, updated_at=now),
|
||||
)
|
||||
state.active_triggers = list(session.active_trigger_ids)
|
||||
available = getattr(session, "available_triggers", {})
|
||||
state.trigger_tasks = {
|
||||
tid: available[tid].task
|
||||
for tid in session.active_trigger_ids
|
||||
if tid in available and available[tid].task
|
||||
}
|
||||
if getattr(session, "worker_configured", False):
|
||||
state.worker_configured = True
|
||||
await store.write_state(session_id, state)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to persist active triggers: %s", e)
|
||||
|
||||
|
||||
def register_queen_lifecycle_tools(
|
||||
registry: ToolRegistry,
|
||||
session: Any = None,
|
||||
@@ -1607,5 +1849,316 @@ def register_queen_lifecycle_tools(
|
||||
)
|
||||
tools_registered += 1
|
||||
|
||||
# --- set_trigger -----------------------------------------------------------
|
||||
|
||||
async def set_trigger(
|
||||
trigger_id: str,
|
||||
trigger_type: str | None = None,
|
||||
trigger_config: dict | None = None,
|
||||
task: str | None = None,
|
||||
) -> str:
|
||||
"""Activate a trigger so it fires periodically into the queen."""
|
||||
if trigger_id in getattr(session, "active_trigger_ids", set()):
|
||||
return json.dumps({"error": f"Trigger '{trigger_id}' is already active."})
|
||||
|
||||
# Look up existing or create new
|
||||
available = getattr(session, "available_triggers", {})
|
||||
tdef = available.get(trigger_id)
|
||||
|
||||
if tdef is None:
|
||||
if trigger_type and trigger_config:
|
||||
from framework.runtime.triggers import TriggerDefinition
|
||||
|
||||
tdef = TriggerDefinition(
|
||||
id=trigger_id,
|
||||
trigger_type=trigger_type,
|
||||
trigger_config=trigger_config,
|
||||
)
|
||||
available[trigger_id] = tdef
|
||||
else:
|
||||
return json.dumps(
|
||||
{
|
||||
"error": (
|
||||
f"Trigger '{trigger_id}' not found. "
|
||||
"Provide trigger_type and trigger_config to create a custom trigger."
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
# Apply task override if provided
|
||||
if task:
|
||||
tdef.task = task
|
||||
|
||||
# Task is mandatory before activation
|
||||
if not tdef.task:
|
||||
return json.dumps(
|
||||
{
|
||||
"error": f"Trigger '{trigger_id}' has no task configured. "
|
||||
"Set a task describing what the worker should do when this trigger fires."
|
||||
}
|
||||
)
|
||||
|
||||
# Use provided overrides if given
|
||||
t_type = trigger_type or tdef.trigger_type
|
||||
t_config = trigger_config or tdef.trigger_config
|
||||
if trigger_type:
|
||||
tdef.trigger_type = t_type
|
||||
if trigger_config:
|
||||
tdef.trigger_config = t_config
|
||||
|
||||
# Validate and activate by type
|
||||
if t_type == "webhook":
|
||||
path = t_config.get("path", "").strip()
|
||||
if not path or not path.startswith("/"):
|
||||
return json.dumps(
|
||||
{
|
||||
"error": (
|
||||
"Webhook trigger requires 'path' starting with '/'"
|
||||
" in trigger_config (e.g. '/hooks/github')."
|
||||
)
|
||||
}
|
||||
)
|
||||
valid_methods = {"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"}
|
||||
methods = t_config.get("methods", ["POST"])
|
||||
invalid = [m.upper() for m in methods if m.upper() not in valid_methods]
|
||||
if invalid:
|
||||
return json.dumps(
|
||||
{"error": f"Invalid HTTP methods: {invalid}. Valid: {sorted(valid_methods)}"}
|
||||
)
|
||||
|
||||
try:
|
||||
await _start_trigger_webhook(session, trigger_id, tdef)
|
||||
except Exception as e:
|
||||
return json.dumps({"error": f"Failed to start webhook trigger: {e}"})
|
||||
|
||||
tdef.active = True
|
||||
session.active_trigger_ids.add(trigger_id)
|
||||
await _persist_active_triggers(session, session_id)
|
||||
bus = getattr(session, "event_bus", None)
|
||||
if bus:
|
||||
await bus.publish(
|
||||
AgentEvent(
|
||||
type=EventType.TRIGGER_ACTIVATED,
|
||||
stream_id="queen",
|
||||
data={
|
||||
"trigger_id": trigger_id,
|
||||
"trigger_type": t_type,
|
||||
"trigger_config": t_config,
|
||||
},
|
||||
)
|
||||
)
|
||||
port = int(t_config.get("port", 8090))
|
||||
return json.dumps(
|
||||
{
|
||||
"status": "activated",
|
||||
"trigger_id": trigger_id,
|
||||
"trigger_type": t_type,
|
||||
"webhook_url": f"http://127.0.0.1:{port}{path}",
|
||||
}
|
||||
)
|
||||
|
||||
if t_type != "timer":
|
||||
return json.dumps({"error": f"Unsupported trigger type: {t_type}"})
|
||||
|
||||
cron_expr = t_config.get("cron")
|
||||
interval = t_config.get("interval_minutes")
|
||||
if cron_expr:
|
||||
try:
|
||||
from croniter import croniter
|
||||
|
||||
if not croniter.is_valid(cron_expr):
|
||||
return json.dumps({"error": f"Invalid cron expression: {cron_expr}"})
|
||||
except ImportError:
|
||||
return json.dumps(
|
||||
{"error": "croniter package not installed — cannot validate cron expression."}
|
||||
)
|
||||
elif interval:
|
||||
if not isinstance(interval, (int, float)) or interval <= 0:
|
||||
return json.dumps({"error": f"interval_minutes must be > 0, got {interval}"})
|
||||
else:
|
||||
return json.dumps(
|
||||
{"error": "Timer trigger needs 'cron' or 'interval_minutes' in trigger_config."}
|
||||
)
|
||||
|
||||
# Start timer
|
||||
try:
|
||||
await _start_trigger_timer(session, trigger_id, tdef)
|
||||
except Exception as e:
|
||||
return json.dumps({"error": f"Failed to start trigger timer: {e}"})
|
||||
|
||||
tdef.active = True
|
||||
session.active_trigger_ids.add(trigger_id)
|
||||
|
||||
# Persist
|
||||
await _persist_active_triggers(session, session_id)
|
||||
|
||||
# Emit event
|
||||
bus = getattr(session, "event_bus", None)
|
||||
if bus:
|
||||
await bus.publish(
|
||||
AgentEvent(
|
||||
type=EventType.TRIGGER_ACTIVATED,
|
||||
stream_id="queen",
|
||||
data={
|
||||
"trigger_id": trigger_id,
|
||||
"trigger_type": t_type,
|
||||
"trigger_config": t_config,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
return json.dumps(
|
||||
{
|
||||
"status": "activated",
|
||||
"trigger_id": trigger_id,
|
||||
"trigger_type": t_type,
|
||||
"trigger_config": t_config,
|
||||
}
|
||||
)
|
||||
|
||||
_set_trigger_tool = Tool(
|
||||
name="set_trigger",
|
||||
description=(
|
||||
"Activate a trigger (timer) so it fires periodically. "
|
||||
"Use trigger_id of an available trigger, or provide trigger_type + trigger_config"
|
||||
" to create a custom one. "
|
||||
"A task must be configured before activation —"
|
||||
" either pre-set on the trigger or provided here."
|
||||
),
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"trigger_id": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"ID of the trigger to activate (from list_triggers) or a new custom ID"
|
||||
),
|
||||
},
|
||||
"trigger_type": {
|
||||
"type": "string",
|
||||
"description": "Type of trigger ('timer'). Only needed for custom triggers.",
|
||||
},
|
||||
"trigger_config": {
|
||||
"type": "object",
|
||||
"description": (
|
||||
"Config for the trigger."
|
||||
" Timer: {cron: '*/5 * * * *'} or {interval_minutes: 5}."
|
||||
" Only needed for custom triggers."
|
||||
),
|
||||
},
|
||||
"task": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"The task/instructions for the worker when this trigger fires"
|
||||
" (e.g. 'Process inbox emails using saved rules')."
|
||||
" Required if not already configured on the trigger."
|
||||
),
|
||||
},
|
||||
},
|
||||
"required": ["trigger_id"],
|
||||
},
|
||||
)
|
||||
registry.register("set_trigger", _set_trigger_tool, lambda inputs: set_trigger(**inputs))
|
||||
tools_registered += 1
|
||||
|
||||
# --- remove_trigger --------------------------------------------------------
|
||||
|
||||
async def remove_trigger(trigger_id: str) -> str:
|
||||
"""Deactivate an active trigger."""
|
||||
if trigger_id not in getattr(session, "active_trigger_ids", set()):
|
||||
return json.dumps({"error": f"Trigger '{trigger_id}' is not active."})
|
||||
|
||||
# Cancel timer task (if timer trigger)
|
||||
task = session.active_timer_tasks.pop(trigger_id, None)
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
getattr(session, "trigger_next_fire", {}).pop(trigger_id, None)
|
||||
|
||||
# Unsubscribe webhook handler (if webhook trigger)
|
||||
webhook_subs = getattr(session, "active_webhook_subs", {})
|
||||
if sub_id := webhook_subs.pop(trigger_id, None):
|
||||
try:
|
||||
session.event_bus.unsubscribe(sub_id)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
session.active_trigger_ids.discard(trigger_id)
|
||||
|
||||
# Mark inactive
|
||||
available = getattr(session, "available_triggers", {})
|
||||
tdef = available.get(trigger_id)
|
||||
if tdef:
|
||||
tdef.active = False
|
||||
|
||||
# Persist
|
||||
await _persist_active_triggers(session, session_id)
|
||||
|
||||
# Emit event
|
||||
bus = getattr(session, "event_bus", None)
|
||||
if bus:
|
||||
await bus.publish(
|
||||
AgentEvent(
|
||||
type=EventType.TRIGGER_DEACTIVATED,
|
||||
stream_id="queen",
|
||||
data={"trigger_id": trigger_id},
|
||||
)
|
||||
)
|
||||
|
||||
return json.dumps({"status": "deactivated", "trigger_id": trigger_id})
|
||||
|
||||
_remove_trigger_tool = Tool(
|
||||
name="remove_trigger",
|
||||
description=(
|
||||
"Deactivate an active trigger."
|
||||
" The trigger stops firing but remains available for re-activation."
|
||||
),
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"trigger_id": {
|
||||
"type": "string",
|
||||
"description": "ID of the trigger to deactivate",
|
||||
},
|
||||
},
|
||||
"required": ["trigger_id"],
|
||||
},
|
||||
)
|
||||
registry.register(
|
||||
"remove_trigger", _remove_trigger_tool, lambda inputs: remove_trigger(**inputs)
|
||||
)
|
||||
tools_registered += 1
|
||||
|
||||
# --- list_triggers ---------------------------------------------------------
|
||||
|
||||
async def list_triggers() -> str:
|
||||
"""List all available triggers and their status."""
|
||||
available = getattr(session, "available_triggers", {})
|
||||
triggers = []
|
||||
for tdef in available.values():
|
||||
triggers.append(
|
||||
{
|
||||
"id": tdef.id,
|
||||
"trigger_type": tdef.trigger_type,
|
||||
"trigger_config": tdef.trigger_config,
|
||||
"description": tdef.description,
|
||||
"task": tdef.task,
|
||||
"active": tdef.active,
|
||||
}
|
||||
)
|
||||
return json.dumps({"triggers": triggers})
|
||||
|
||||
_list_triggers_tool = Tool(
|
||||
name="list_triggers",
|
||||
description=(
|
||||
"List all available triggers (from the loaded worker) and their active/inactive status."
|
||||
),
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {},
|
||||
},
|
||||
)
|
||||
registry.register("list_triggers", _list_triggers_tool, lambda inputs: list_triggers())
|
||||
tools_registered += 1
|
||||
|
||||
logger.info("Registered %d queen lifecycle tools", tools_registered)
|
||||
return tools_registered
|
||||
|
||||
@@ -38,4 +38,9 @@ export const api = {
|
||||
body: body ? JSON.stringify(body) : undefined,
|
||||
}),
|
||||
delete: <T>(path: string) => request<T>(path, { method: "DELETE" }),
|
||||
patch: <T>(path: string, body?: unknown) =>
|
||||
request<T>(path, {
|
||||
method: "PATCH",
|
||||
body: body ? JSON.stringify(body) : undefined,
|
||||
}),
|
||||
};
|
||||
|
||||
@@ -64,6 +64,12 @@ export const sessionsApi = {
|
||||
`/sessions/${sessionId}/entry-points`,
|
||||
),
|
||||
|
||||
updateTriggerTask: (sessionId: string, triggerId: string, task: string) =>
|
||||
api.patch<{ trigger_id: string; task: string }>(
|
||||
`/sessions/${sessionId}/triggers/${triggerId}`,
|
||||
{ task },
|
||||
),
|
||||
|
||||
graphs: (sessionId: string) =>
|
||||
api.get<{ graphs: string[] }>(`/sessions/${sessionId}/graphs`),
|
||||
|
||||
|
||||
@@ -31,6 +31,8 @@ export interface EntryPoint {
|
||||
entry_node: string;
|
||||
trigger_type: string;
|
||||
trigger_config?: Record<string, unknown>;
|
||||
/** Worker task string when this trigger fires autonomously. */
|
||||
task?: string;
|
||||
/** Seconds until the next timer fire (only present for timer entry points). */
|
||||
next_fire_in?: number;
|
||||
}
|
||||
@@ -276,7 +278,12 @@ export type EventTypeName =
|
||||
| "worker_loaded"
|
||||
| "credentials_required"
|
||||
| "queen_phase_changed"
|
||||
| "subagent_report";
|
||||
| "subagent_report"
|
||||
| "trigger_available"
|
||||
| "trigger_activated"
|
||||
| "trigger_deactivated"
|
||||
| "trigger_fired"
|
||||
| "trigger_removed";
|
||||
|
||||
export interface AgentEvent {
|
||||
type: EventTypeName;
|
||||
|
||||
@@ -131,6 +131,14 @@ const triggerColors = {
|
||||
icon: "hsl(210,40%,55%)",
|
||||
};
|
||||
|
||||
// Active trigger — brighter, more saturated blue
|
||||
const activeTriggerColors = {
|
||||
bg: "hsl(210,30%,18%)",
|
||||
border: "hsl(210,50%,50%)",
|
||||
text: "hsl(210,40%,75%)",
|
||||
icon: "hsl(210,60%,65%)",
|
||||
};
|
||||
|
||||
const triggerIcons: Record<string, string> = {
|
||||
webhook: "\u26A1", // lightning bolt
|
||||
timer: "\u23F1", // stopwatch
|
||||
@@ -417,10 +425,12 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
|
||||
const triggerAvailW = nodeW - 38;
|
||||
const triggerDisplayLabel = truncateLabel(node.label, triggerAvailW, triggerFontSize);
|
||||
const nextFireIn = node.triggerConfig?.next_fire_in as number | undefined;
|
||||
const isActive = node.status === "running" || node.status === "complete";
|
||||
const colors = isActive ? activeTriggerColors : triggerColors;
|
||||
|
||||
// Format countdown for display below node
|
||||
let countdownLabel: string | null = null;
|
||||
if (nextFireIn != null && nextFireIn > 0) {
|
||||
if (isActive && nextFireIn != null && nextFireIn > 0) {
|
||||
const h = Math.floor(nextFireIn / 3600);
|
||||
const m = Math.floor((nextFireIn % 3600) / 60);
|
||||
const s = Math.floor(nextFireIn % 60);
|
||||
@@ -429,24 +439,28 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
|
||||
: `next in ${m}m ${String(s).padStart(2, "0")}s`;
|
||||
}
|
||||
|
||||
// Status label below countdown
|
||||
const statusLabel = isActive ? "active" : "inactive";
|
||||
const statusColor = isActive ? "hsl(140,40%,50%)" : "hsl(210,20%,40%)";
|
||||
|
||||
return (
|
||||
<g key={node.id} onClick={() => onNodeClick?.(node)} style={{ cursor: onNodeClick ? "pointer" : "default" }}>
|
||||
<title>{node.label}</title>
|
||||
{/* Pill-shaped background with dashed border */}
|
||||
{/* Pill-shaped background — solid border when active, dashed when inactive */}
|
||||
<rect
|
||||
x={pos.x} y={pos.y}
|
||||
width={nodeW} height={NODE_H}
|
||||
rx={NODE_H / 2}
|
||||
fill={triggerColors.bg}
|
||||
stroke={triggerColors.border}
|
||||
strokeWidth={1}
|
||||
strokeDasharray="4 2"
|
||||
fill={colors.bg}
|
||||
stroke={colors.border}
|
||||
strokeWidth={isActive ? 1.5 : 1}
|
||||
strokeDasharray={isActive ? undefined : "4 2"}
|
||||
/>
|
||||
|
||||
{/* Trigger type icon */}
|
||||
<text
|
||||
x={pos.x + 18} y={pos.y + NODE_H / 2}
|
||||
fill={triggerColors.icon} fontSize={13}
|
||||
fill={colors.icon} fontSize={13}
|
||||
textAnchor="middle" dominantBaseline="middle"
|
||||
>
|
||||
{icon}
|
||||
@@ -455,7 +469,7 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
|
||||
{/* Label */}
|
||||
<text
|
||||
x={pos.x + 32} y={pos.y + NODE_H / 2}
|
||||
fill={triggerColors.text}
|
||||
fill={colors.text}
|
||||
fontSize={triggerFontSize}
|
||||
fontWeight={500}
|
||||
dominantBaseline="middle"
|
||||
@@ -474,6 +488,15 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
|
||||
{countdownLabel}
|
||||
</text>
|
||||
)}
|
||||
|
||||
{/* Status label */}
|
||||
<text
|
||||
x={pos.x + nodeW / 2} y={pos.y + NODE_H + (countdownLabel ? 25 : 13)}
|
||||
fill={statusColor} fontSize={9}
|
||||
textAnchor="middle" opacity={0.8}
|
||||
>
|
||||
{statusLabel}
|
||||
</text>
|
||||
</g>
|
||||
);
|
||||
};
|
||||
|
||||
@@ -51,6 +51,7 @@ export function topologyToGraphNodes(topology: GraphTopology): GraphNode[] {
|
||||
triggerConfig: {
|
||||
...ep.trigger_config,
|
||||
...(ep.next_fire_in != null ? { next_fire_in: ep.next_fire_in } : {}),
|
||||
...(ep.task ? { task: ep.task } : {}),
|
||||
},
|
||||
next: [ep.entry_node],
|
||||
});
|
||||
|
||||
@@ -478,6 +478,8 @@ export default function Workspace() {
|
||||
const [credentialAgentPath, setCredentialAgentPath] = useState<string | null>(null);
|
||||
const [dismissedBanner, setDismissedBanner] = useState<string | null>(null);
|
||||
const [selectedNode, setSelectedNode] = useState<GraphNode | null>(null);
|
||||
const [triggerTaskDraft, setTriggerTaskDraft] = useState("");
|
||||
const [triggerTaskSaving, setTriggerTaskSaving] = useState(false);
|
||||
const [newTabOpen, setNewTabOpen] = useState(false);
|
||||
const newTabBtnRef = useRef<HTMLButtonElement>(null);
|
||||
|
||||
@@ -1072,19 +1074,32 @@ export default function Workspace() {
|
||||
try {
|
||||
const { entry_points } = await sessionsApi.entryPoints(state.sessionId);
|
||||
const fireMap = new Map<string, number>();
|
||||
const taskMap = new Map<string, string>();
|
||||
for (const ep of entry_points) {
|
||||
if (ep.next_fire_in != null) {
|
||||
fireMap.set(`__trigger_${ep.id}`, ep.next_fire_in);
|
||||
}
|
||||
if (ep.task != null) {
|
||||
taskMap.set(`__trigger_${ep.id}`, ep.task);
|
||||
}
|
||||
}
|
||||
if (fireMap.size === 0) continue;
|
||||
if (fireMap.size === 0 && taskMap.size === 0) continue;
|
||||
setSessionsByAgent((prev) => {
|
||||
const ss = prev[agentType];
|
||||
if (!ss?.length) return prev;
|
||||
const updated = ss[0].graphNodes.map((n) => {
|
||||
if (n.nodeType !== "trigger") return n;
|
||||
const nfi = fireMap.get(n.id);
|
||||
if (nfi == null || n.nodeType !== "trigger") return n;
|
||||
return { ...n, triggerConfig: { ...n.triggerConfig, next_fire_in: nfi } };
|
||||
const task = taskMap.get(n.id);
|
||||
if (nfi == null && task == null) return n;
|
||||
return {
|
||||
...n,
|
||||
triggerConfig: {
|
||||
...n.triggerConfig,
|
||||
...(nfi != null ? { next_fire_in: nfi } : {}),
|
||||
...(task != null ? { task } : {}),
|
||||
},
|
||||
};
|
||||
});
|
||||
// Skip update if nothing changed
|
||||
if (updated.every((n, idx) => n === ss[0].graphNodes[idx])) return prev;
|
||||
@@ -1842,6 +1857,54 @@ export default function Workspace() {
|
||||
break;
|
||||
}
|
||||
|
||||
case "trigger_activated": {
|
||||
const triggerId = event.data?.trigger_id as string;
|
||||
if (triggerId) {
|
||||
updateGraphNodeStatus(agentType, `__trigger_${triggerId}`, "running");
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case "trigger_deactivated": {
|
||||
const triggerId = event.data?.trigger_id as string;
|
||||
if (triggerId) {
|
||||
// Clear next_fire_in so countdown hides when inactive
|
||||
setSessionsByAgent(prev => {
|
||||
const sessions = prev[agentType] || [];
|
||||
const activeId = activeSessionRef.current[agentType] || sessions[0]?.id;
|
||||
return {
|
||||
...prev,
|
||||
[agentType]: sessions.map(s => {
|
||||
if (s.id !== activeId) return s;
|
||||
return {
|
||||
...s,
|
||||
graphNodes: s.graphNodes.map(n => {
|
||||
if (n.id !== `__trigger_${triggerId}`) return n;
|
||||
const { next_fire_in: _, ...restConfig } = (n.triggerConfig || {}) as Record<string, unknown> & { next_fire_in?: unknown };
|
||||
return { ...n, status: "pending" as const, triggerConfig: restConfig };
|
||||
}),
|
||||
};
|
||||
}),
|
||||
};
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case "trigger_fired": {
|
||||
const triggerId = event.data?.trigger_id as string;
|
||||
if (triggerId) {
|
||||
const nodeId = `__trigger_${triggerId}`;
|
||||
updateGraphNodeStatus(agentType, nodeId, "complete");
|
||||
setTimeout(() => updateGraphNodeStatus(agentType, nodeId, "running"), 1500);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case "trigger_available":
|
||||
case "trigger_removed":
|
||||
break;
|
||||
|
||||
default:
|
||||
// Fallback: ensure queenReady is set even for unexpected first events
|
||||
if (shouldMarkQueenReady) updateAgentState(agentType, { queenReady: true });
|
||||
@@ -1872,6 +1935,18 @@ export default function Workspace() {
|
||||
? { nodes: activeSession.graphNodes, title: activeAgentState?.displayName || formatAgentDisplayName(baseAgentType(activeWorker)) }
|
||||
: { nodes: [] as GraphNode[], title: "" };
|
||||
|
||||
// Keep selectedNode in sync with live graphNodes (trigger status updates via SSE)
|
||||
const liveSelectedNode = selectedNode && currentGraph.nodes.find(n => n.id === selectedNode.id);
|
||||
const resolvedSelectedNode = liveSelectedNode || selectedNode;
|
||||
|
||||
// Sync trigger task draft when selected trigger node changes
|
||||
useEffect(() => {
|
||||
if (resolvedSelectedNode?.nodeType === "trigger") {
|
||||
const tc = resolvedSelectedNode.triggerConfig as Record<string, unknown> | undefined;
|
||||
setTriggerTaskDraft((tc?.task as string) || "");
|
||||
}
|
||||
}, [resolvedSelectedNode?.id]);
|
||||
|
||||
// Build a flat list of all agent-type tabs for the tab bar
|
||||
const agentTabs = Object.entries(sessionsByAgent)
|
||||
.filter(([, sessions]) => sessions.length > 0)
|
||||
@@ -2460,20 +2535,32 @@ export default function Workspace() {
|
||||
/>
|
||||
)}
|
||||
</div>
|
||||
{selectedNode && (
|
||||
{resolvedSelectedNode && (
|
||||
<div className="w-[480px] min-w-[400px] flex-shrink-0">
|
||||
{selectedNode.nodeType === "trigger" ? (
|
||||
{resolvedSelectedNode.nodeType === "trigger" ? (
|
||||
<div className="flex flex-col h-full border-l border-border/40 bg-card/20 animate-in slide-in-from-right">
|
||||
<div className="px-4 pt-4 pb-3 border-b border-border/30 flex items-start justify-between gap-2">
|
||||
<div className="flex items-start gap-3 min-w-0">
|
||||
<div className="w-8 h-8 rounded-lg flex items-center justify-center flex-shrink-0 mt-0.5 bg-[hsl(210,40%,55%)]/15 border border-[hsl(210,40%,55%)]/25">
|
||||
<span className="text-sm" style={{ color: "hsl(210,40%,55%)" }}>
|
||||
{{ "webhook": "\u26A1", "timer": "\u23F1", "api": "\u2192", "event": "\u223F" }[selectedNode.triggerType || ""] || "\u26A1"}
|
||||
{{ "webhook": "\u26A1", "timer": "\u23F1", "api": "\u2192", "event": "\u223F" }[resolvedSelectedNode.triggerType || ""] || "\u26A1"}
|
||||
</span>
|
||||
</div>
|
||||
<div className="min-w-0">
|
||||
<h3 className="text-sm font-semibold text-foreground leading-tight">{selectedNode.label}</h3>
|
||||
<p className="text-[11px] text-muted-foreground mt-0.5 capitalize">{selectedNode.triggerType} trigger</p>
|
||||
<h3 className="text-sm font-semibold text-foreground leading-tight">{resolvedSelectedNode.label}</h3>
|
||||
<p className="text-[11px] text-muted-foreground mt-0.5 capitalize flex items-center gap-1.5">
|
||||
{resolvedSelectedNode.triggerType} trigger
|
||||
<span className={`inline-block w-1.5 h-1.5 rounded-full ${
|
||||
resolvedSelectedNode.status === "running" || resolvedSelectedNode.status === "complete"
|
||||
? "bg-emerald-400" : "bg-muted-foreground/40"
|
||||
}`} />
|
||||
<span className={`text-[10px] ${
|
||||
resolvedSelectedNode.status === "running" || resolvedSelectedNode.status === "complete"
|
||||
? "text-emerald-400" : "text-muted-foreground/60"
|
||||
}`}>
|
||||
{resolvedSelectedNode.status === "running" || resolvedSelectedNode.status === "complete" ? "active" : "inactive"}
|
||||
</span>
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
<button onClick={() => setSelectedNode(null)} className="p-1 rounded-md text-muted-foreground hover:text-foreground hover:bg-muted/50 transition-colors flex-shrink-0">
|
||||
@@ -2482,7 +2569,7 @@ export default function Workspace() {
|
||||
</div>
|
||||
<div className="px-4 py-4 flex flex-col gap-3">
|
||||
{(() => {
|
||||
const tc = selectedNode.triggerConfig as Record<string, unknown> | undefined;
|
||||
const tc = resolvedSelectedNode.triggerConfig as Record<string, unknown> | undefined;
|
||||
const cron = tc?.cron as string | undefined;
|
||||
const interval = tc?.interval_minutes as number | undefined;
|
||||
const eventTypes = tc?.event_types as string[] | undefined;
|
||||
@@ -2503,7 +2590,7 @@ export default function Workspace() {
|
||||
) : null;
|
||||
})()}
|
||||
{(() => {
|
||||
const nfi = (selectedNode.triggerConfig as Record<string, unknown> | undefined)?.next_fire_in as number | undefined;
|
||||
const nfi = (resolvedSelectedNode.triggerConfig as Record<string, unknown> | undefined)?.next_fire_in as number | undefined;
|
||||
return nfi != null ? (
|
||||
<div>
|
||||
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mb-1.5">Next run</p>
|
||||
@@ -2513,25 +2600,92 @@ export default function Workspace() {
|
||||
</div>
|
||||
) : null;
|
||||
})()}
|
||||
<div>
|
||||
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mb-1.5">Task</p>
|
||||
<textarea
|
||||
value={triggerTaskDraft}
|
||||
onChange={(e) => setTriggerTaskDraft(e.target.value)}
|
||||
placeholder="Describe what the worker should do when this trigger fires..."
|
||||
className="w-full text-xs text-foreground/80 bg-muted/30 rounded-lg px-3 py-2 border border-border/20 resize-none min-h-[60px] font-mono focus:outline-none focus:border-primary/40"
|
||||
rows={3}
|
||||
/>
|
||||
{(() => {
|
||||
const currentTask = (resolvedSelectedNode.triggerConfig as Record<string, unknown> | undefined)?.task as string || "";
|
||||
const hasChanged = triggerTaskDraft !== currentTask;
|
||||
if (!hasChanged) return null;
|
||||
return (
|
||||
<button
|
||||
disabled={triggerTaskSaving}
|
||||
onClick={async () => {
|
||||
const sessionId = activeAgentState?.sessionId;
|
||||
const triggerId = resolvedSelectedNode.id.replace("__trigger_", "");
|
||||
if (!sessionId) return;
|
||||
setTriggerTaskSaving(true);
|
||||
try {
|
||||
await sessionsApi.updateTriggerTask(sessionId, triggerId, triggerTaskDraft);
|
||||
} finally {
|
||||
setTriggerTaskSaving(false);
|
||||
}
|
||||
}}
|
||||
className="mt-1.5 w-full text-[11px] px-3 py-1.5 rounded-lg border border-primary/30 text-primary hover:bg-primary/10 transition-colors disabled:opacity-50"
|
||||
>
|
||||
{triggerTaskSaving ? "Saving..." : "Save Task"}
|
||||
</button>
|
||||
);
|
||||
})()}
|
||||
{!triggerTaskDraft && (
|
||||
<p className="text-[10px] text-amber-400/80 mt-1">A task is required before enabling this trigger.</p>
|
||||
)}
|
||||
</div>
|
||||
<div>
|
||||
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mb-1.5">Fires into</p>
|
||||
<p className="text-xs text-foreground/80 font-mono bg-muted/30 rounded-lg px-3 py-2 border border-border/20">
|
||||
{selectedNode.next?.[0]?.split("-").map(w => w.charAt(0).toUpperCase() + w.slice(1)).join(" ") || "—"}
|
||||
{resolvedSelectedNode.next?.[0]?.split("-").map(w => w.charAt(0).toUpperCase() + w.slice(1)).join(" ") || "—"}
|
||||
</p>
|
||||
</div>
|
||||
{activeAgentState?.queenPhase !== "building" && (() => {
|
||||
const triggerIsActive = resolvedSelectedNode.status === "running" || resolvedSelectedNode.status === "complete";
|
||||
const triggerId = resolvedSelectedNode.id.replace("__trigger_", "");
|
||||
const taskMissing = !triggerTaskDraft;
|
||||
return (
|
||||
<div className="pt-1">
|
||||
<button
|
||||
disabled={!triggerIsActive && taskMissing}
|
||||
onClick={async () => {
|
||||
const sessionId = activeAgentState?.sessionId;
|
||||
if (!sessionId) return;
|
||||
const action = triggerIsActive ? "Disable" : "Enable";
|
||||
await executionApi.chat(sessionId, `${action} trigger ${triggerId}`);
|
||||
}}
|
||||
className={`w-full text-xs px-3 py-2 rounded-lg border transition-colors ${
|
||||
triggerIsActive
|
||||
? "border-red-500/30 text-red-400 hover:bg-red-500/10"
|
||||
: taskMissing
|
||||
? "border-border/30 text-muted-foreground/40 cursor-not-allowed"
|
||||
: "border-emerald-500/30 text-emerald-400 hover:bg-emerald-500/10"
|
||||
}`}
|
||||
>
|
||||
{triggerIsActive ? "Disable Trigger" : "Enable Trigger"}
|
||||
</button>
|
||||
{!triggerIsActive && taskMissing && (
|
||||
<p className="text-[10px] text-muted-foreground/50 mt-1 text-center">Configure a task first</p>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
})()}
|
||||
</div>
|
||||
</div>
|
||||
) : (
|
||||
<NodeDetailPanel
|
||||
node={selectedNode}
|
||||
nodeSpec={activeAgentState?.nodeSpecs.find(n => n.id === selectedNode.id) ?? null}
|
||||
node={resolvedSelectedNode}
|
||||
nodeSpec={activeAgentState?.nodeSpecs.find(n => n.id === resolvedSelectedNode.id) ?? null}
|
||||
allNodeSpecs={activeAgentState?.nodeSpecs}
|
||||
subagentReports={activeAgentState?.subagentReports}
|
||||
sessionId={activeAgentState?.sessionId || undefined}
|
||||
graphId={activeAgentState?.graphId || undefined}
|
||||
workerSessionId={null}
|
||||
nodeLogs={activeAgentState?.nodeLogs[selectedNode.id] || []}
|
||||
actionPlan={activeAgentState?.nodeActionPlans[selectedNode.id]}
|
||||
nodeLogs={activeAgentState?.nodeLogs[resolvedSelectedNode.id] || []}
|
||||
actionPlan={activeAgentState?.nodeActionPlans[resolvedSelectedNode.id]}
|
||||
onClose={() => setSelectedNode(null)}
|
||||
/>
|
||||
)}
|
||||
|
||||
@@ -0,0 +1,261 @@
|
||||
"""Tests for queen-level trigger system.
|
||||
|
||||
Verifies that:
|
||||
- Timer triggers fire inject_trigger() on the queen node
|
||||
- Webhook triggers fire inject_trigger() via EventBus WEBHOOK_RECEIVED
|
||||
- Queen node unavailable → trigger skipped silently
|
||||
- worker_runtime=None → trigger discarded (gating)
|
||||
- remove_trigger cleans up webhook subscription
|
||||
- run_agent_with_input is in _QUEEN_RUNNING_TOOLS
|
||||
- System prompts reference run_agent_with_input, not start_worker()
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from framework.runtime.event_bus import EventBus
|
||||
from framework.runtime.triggers import TriggerDefinition
|
||||
from framework.server.session_manager import Session
|
||||
|
||||
|
||||
def _make_session(event_bus: EventBus, session_id: str = "session_trigger_test") -> Session:
|
||||
return Session(id=session_id, event_bus=event_bus, llm=object(), loaded_at=0.0)
|
||||
|
||||
|
||||
def _make_executor(queen_node) -> SimpleNamespace:
|
||||
return SimpleNamespace(node_registry={"queen": queen_node})
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_interval_timer_fires_inject_trigger_on_queen_node() -> None:
|
||||
"""Timer with interval_minutes fires inject_trigger() on the queen node."""
|
||||
from framework.graph.event_loop_node import TriggerEvent
|
||||
from framework.tools.queen_lifecycle_tools import _start_trigger_timer
|
||||
|
||||
bus = EventBus()
|
||||
session = _make_session(bus)
|
||||
session.worker_runtime = object() # non-None → worker is loaded
|
||||
|
||||
queen_node = SimpleNamespace(inject_trigger=AsyncMock())
|
||||
session.queen_executor = _make_executor(queen_node)
|
||||
|
||||
tdef = TriggerDefinition(
|
||||
id="test-timer",
|
||||
trigger_type="timer",
|
||||
trigger_config={"interval_minutes": 0.001}, # ~60ms
|
||||
task="run it",
|
||||
)
|
||||
|
||||
await _start_trigger_timer(session, "test-timer", tdef)
|
||||
|
||||
# Let the timer fire at least once
|
||||
await asyncio.sleep(0.15)
|
||||
|
||||
# Cancel the background task
|
||||
task = session.active_timer_tasks.get("test-timer")
|
||||
if task:
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
assert queen_node.inject_trigger.await_count >= 1
|
||||
|
||||
# Inspect the TriggerEvent passed to inject_trigger
|
||||
call_args = queen_node.inject_trigger.await_args_list[0]
|
||||
trigger: TriggerEvent = call_args.args[0]
|
||||
assert trigger.trigger_type == "timer"
|
||||
assert trigger.source_id == "test-timer"
|
||||
assert trigger.payload.get("task") == "run it"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_timer_skipped_when_queen_node_unavailable() -> None:
|
||||
"""No inject_trigger call and no exception when queen executor is not set."""
|
||||
from framework.tools.queen_lifecycle_tools import _start_trigger_timer
|
||||
|
||||
bus = EventBus()
|
||||
session = _make_session(bus)
|
||||
session.worker_runtime = object()
|
||||
session.queen_executor = None # queen not ready
|
||||
|
||||
tdef = TriggerDefinition(
|
||||
id="no-queen-timer",
|
||||
trigger_type="timer",
|
||||
trigger_config={"interval_minutes": 0.001},
|
||||
task="should not fire",
|
||||
)
|
||||
|
||||
await _start_trigger_timer(session, "no-queen-timer", tdef)
|
||||
await asyncio.sleep(0.15)
|
||||
|
||||
task = session.active_timer_tasks.get("no-queen-timer")
|
||||
if task:
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
# No exception raised, nothing to assert beyond completion
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_webhook_trigger_fires_inject_trigger() -> None:
|
||||
"""WEBHOOK_RECEIVED on EventBus → inject_trigger() on the queen node."""
|
||||
from framework.graph.event_loop_node import TriggerEvent
|
||||
from framework.tools.queen_lifecycle_tools import _start_trigger_webhook
|
||||
|
||||
bus = EventBus()
|
||||
session = _make_session(bus)
|
||||
session.worker_runtime = object()
|
||||
|
||||
queen_node = SimpleNamespace(inject_trigger=AsyncMock())
|
||||
session.queen_executor = _make_executor(queen_node)
|
||||
|
||||
tdef = TriggerDefinition(
|
||||
id="test-webhook",
|
||||
trigger_type="webhook",
|
||||
trigger_config={"path": "/hooks/test", "methods": ["POST"]},
|
||||
task="process it",
|
||||
)
|
||||
|
||||
# Patch WebhookServer to avoid binding a real port
|
||||
mock_server = MagicMock()
|
||||
mock_server.is_running = False
|
||||
mock_server.add_route = MagicMock()
|
||||
mock_server.start = AsyncMock()
|
||||
with patch("framework.runtime.webhook_server.WebhookServer", return_value=mock_server):
|
||||
with patch("framework.runtime.webhook_server.WebhookServerConfig"):
|
||||
await _start_trigger_webhook(session, "test-webhook", tdef)
|
||||
|
||||
# Simulate an incoming webhook event on the EventBus
|
||||
await bus.emit_webhook_received(
|
||||
source_id="test-webhook",
|
||||
path="/hooks/test",
|
||||
method="POST",
|
||||
headers={},
|
||||
payload={"event": "push"},
|
||||
)
|
||||
await asyncio.sleep(0.05) # let handler run
|
||||
|
||||
assert queen_node.inject_trigger.await_count == 1
|
||||
trigger: TriggerEvent = queen_node.inject_trigger.await_args_list[0].args[0]
|
||||
assert trigger.trigger_type == "webhook"
|
||||
assert trigger.source_id == "test-webhook"
|
||||
assert trigger.payload["method"] == "POST"
|
||||
assert trigger.payload["path"] == "/hooks/test"
|
||||
assert trigger.payload["task"] == "process it"
|
||||
assert trigger.payload["payload"] == {"event": "push"}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_webhook_trigger_discarded_when_no_worker() -> None:
|
||||
"""inject_trigger is NOT called when no worker is loaded."""
|
||||
from framework.tools.queen_lifecycle_tools import _start_trigger_webhook
|
||||
|
||||
bus = EventBus()
|
||||
session = _make_session(bus)
|
||||
session.worker_runtime = None # no worker
|
||||
|
||||
queen_node = SimpleNamespace(inject_trigger=AsyncMock())
|
||||
session.queen_executor = _make_executor(queen_node)
|
||||
|
||||
tdef = TriggerDefinition(
|
||||
id="no-worker-webhook",
|
||||
trigger_type="webhook",
|
||||
trigger_config={"path": "/hooks/noop", "methods": ["POST"]},
|
||||
task="should not fire",
|
||||
)
|
||||
|
||||
mock_server = MagicMock()
|
||||
mock_server.is_running = False
|
||||
mock_server.add_route = MagicMock()
|
||||
mock_server.start = AsyncMock()
|
||||
with patch("framework.runtime.webhook_server.WebhookServer", return_value=mock_server):
|
||||
with patch("framework.runtime.webhook_server.WebhookServerConfig"):
|
||||
await _start_trigger_webhook(session, "no-worker-webhook", tdef)
|
||||
|
||||
await bus.emit_webhook_received(
|
||||
source_id="no-worker-webhook",
|
||||
path="/hooks/noop",
|
||||
method="POST",
|
||||
headers={},
|
||||
payload={},
|
||||
)
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
assert queen_node.inject_trigger.await_count == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_remove_trigger_cleans_up_webhook_subscription() -> None:
|
||||
"""After remove_trigger(), WEBHOOK_RECEIVED no longer calls inject_trigger."""
|
||||
from framework.tools.queen_lifecycle_tools import _start_trigger_webhook
|
||||
|
||||
bus = EventBus()
|
||||
session = _make_session(bus)
|
||||
session.worker_runtime = object()
|
||||
|
||||
queen_node = SimpleNamespace(inject_trigger=AsyncMock())
|
||||
session.queen_executor = _make_executor(queen_node)
|
||||
|
||||
tdef = TriggerDefinition(
|
||||
id="removable-webhook",
|
||||
trigger_type="webhook",
|
||||
trigger_config={"path": "/hooks/removable", "methods": ["POST"]},
|
||||
task="run it",
|
||||
)
|
||||
|
||||
mock_server = MagicMock()
|
||||
mock_server.is_running = False
|
||||
mock_server.add_route = MagicMock()
|
||||
mock_server.start = AsyncMock()
|
||||
with patch("framework.runtime.webhook_server.WebhookServer", return_value=mock_server):
|
||||
with patch("framework.runtime.webhook_server.WebhookServerConfig"):
|
||||
await _start_trigger_webhook(session, "removable-webhook", tdef)
|
||||
|
||||
# Manually unsubscribe (mirrors what remove_trigger does)
|
||||
sub_id = session.active_webhook_subs.pop("removable-webhook", None)
|
||||
assert sub_id is not None
|
||||
bus.unsubscribe(sub_id)
|
||||
|
||||
# Now fire — should NOT reach queen
|
||||
await bus.emit_webhook_received(
|
||||
source_id="removable-webhook",
|
||||
path="/hooks/removable",
|
||||
method="POST",
|
||||
headers={},
|
||||
payload={},
|
||||
)
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
assert queen_node.inject_trigger.await_count == 0
|
||||
assert "removable-webhook" not in session.active_webhook_subs
|
||||
|
||||
|
||||
def test_run_agent_with_input_in_running_tools() -> None:
|
||||
"""run_agent_with_input must be available to the queen in RUNNING phase."""
|
||||
from framework.agents.hive_coder.nodes import _QUEEN_RUNNING_TOOLS
|
||||
|
||||
assert "run_agent_with_input" in _QUEEN_RUNNING_TOOLS
|
||||
|
||||
|
||||
def test_system_prompt_uses_correct_tool_name() -> None:
|
||||
"""Trigger handling rules must reference run_agent_with_input, not start_worker()."""
|
||||
from framework.agents.hive_coder.nodes import (
|
||||
_queen_behavior_running,
|
||||
_queen_behavior_staging,
|
||||
)
|
||||
|
||||
assert "run_agent_with_input" in _queen_behavior_running
|
||||
assert "start_worker()" not in _queen_behavior_running
|
||||
|
||||
assert "run_agent_with_input" in _queen_behavior_staging
|
||||
assert "start_worker()" not in _queen_behavior_staging
|
||||
Reference in New Issue
Block a user