From 3c91119f67003cea10ea6b91adf24d63af6125f8 Mon Sep 17 00:00:00 2001 From: Richard Tang Date: Mon, 20 Apr 2026 10:49:37 -0700 Subject: [PATCH] feat: improvements for scheduler --- core/framework/server/routes_sessions.py | 90 +++++++++- core/framework/server/session_manager.py | 25 ++- core/framework/tools/queen_lifecycle_tools.py | 80 ++++++++- core/frontend/src/api/sessions.ts | 5 + core/frontend/src/api/types.ts | 6 + .../src/components/ColonyWorkersPanel.tsx | 162 +++++++++++++++++- core/frontend/src/pages/colony-chat.tsx | 30 +++- data/autonomous_agent_research_log.md | 139 +++++++++++++++ 8 files changed, 523 insertions(+), 14 deletions(-) create mode 100644 data/autonomous_agent_research_log.md diff --git a/core/framework/server/routes_sessions.py b/core/framework/server/routes_sessions.py index 9de65336..2070acdc 100644 --- a/core/framework/server/routes_sessions.py +++ b/core/framework/server/routes_sessions.py @@ -10,6 +10,7 @@ Session-primary routes: - GET /api/sessions/{session_id}/stats — runtime statistics - GET /api/sessions/{session_id}/entry-points — list entry points - PATCH /api/sessions/{session_id}/triggers/{id} — update trigger task +- POST /api/sessions/{session_id}/triggers/{id}/run — fire trigger once (manual) - GET /api/sessions/{session_id}/colonies — list colony IDs - GET /api/sessions/{session_id}/events/history — persisted eventbus log (for replay) @@ -247,7 +248,14 @@ async def handle_get_live_session(request: web.Request) -> web.Response: } mono = getattr(session, "trigger_next_fire", {}).get(t.id) if mono is not None: - entry["next_fire_in"] = max(0.0, mono - time.monotonic()) + remaining = max(0.0, mono - time.monotonic()) + entry["next_fire_in"] = remaining + entry["next_fire_at"] = int((time.time() + remaining) * 1000) + stats = getattr(session, "trigger_fire_stats", {}).get(t.id) + if stats: + entry["fire_count"] = stats.get("fire_count", 0) + if stats.get("last_fired_at") is not None: + entry["last_fired_at"] = stats["last_fired_at"] data["entry_points"].append(entry) data["colonies"] = session.colony_runtime.list_graphs() @@ -397,7 +405,14 @@ async def handle_session_entry_points(request: web.Request) -> web.Response: } mono = getattr(session, "trigger_next_fire", {}).get(t.id) if mono is not None: - entry["next_fire_in"] = max(0.0, mono - time.monotonic()) + remaining = max(0.0, mono - time.monotonic()) + entry["next_fire_in"] = remaining + entry["next_fire_at"] = int((time.time() + remaining) * 1000) + stats = getattr(session, "trigger_fire_stats", {}).get(t.id) + if stats: + entry["fire_count"] = stats.get("fire_count", 0) + if stats.get("last_fired_at") is not None: + entry["last_fired_at"] = stats["last_fired_at"] entry_points.append(entry) return web.json_response({"entry_points": entry_points}) @@ -548,6 +563,60 @@ async def handle_update_trigger_task(request: web.Request) -> web.Response: ) +async def handle_run_trigger(request: web.Request) -> web.Response: + """POST /api/sessions/{session_id}/triggers/{trigger_id}/run — fire the trigger once. + + Manual invocation for testing. Works whether the trigger is active or + inactive; does not change active state and does not reset the scheduled + next-fire time of an active timer. + """ + session, err = resolve_session(request) + if err: + return err + + trigger_id = request.match_info["trigger_id"] + tdef = getattr(session, "available_triggers", {}).get(trigger_id) + if tdef is None: + return web.json_response( + {"error": f"Trigger '{trigger_id}' not found"}, + status=404, + ) + + if getattr(session, "colony_runtime", None) is None: + return web.json_response({"error": "Colony not loaded"}, status=409) + + executor = getattr(session, "queen_executor", None) + queen_node = getattr(executor, "node_registry", {}).get("queen") if executor else None + if queen_node is None: + return web.json_response({"error": "Queen not ready"}, status=409) + + from framework.agent_loop.agent_loop import TriggerEvent + + try: + await queen_node.inject_trigger( + TriggerEvent( + trigger_type=tdef.trigger_type, + source_id=trigger_id, + payload={ + "task": tdef.task or "", + "trigger_config": tdef.trigger_config, + "forced": True, + }, + ) + ) + except Exception as exc: # noqa: BLE001 + return web.json_response( + {"error": f"Failed to fire trigger: {exc}"}, + status=500, + ) + + from framework.tools.queen_lifecycle_tools import _emit_trigger_fired + + await _emit_trigger_fired(session, trigger_id, tdef.trigger_type) + + return web.json_response({"status": "fired", "trigger_id": trigger_id}) + + async def handle_activate_trigger(request: web.Request) -> web.Response: """POST /api/sessions/{session_id}/triggers/{trigger_id}/activate — start a trigger.""" session, err = resolve_session(request) @@ -599,6 +668,17 @@ async def handle_activate_trigger(request: web.Request) -> web.Response: runner = getattr(session, "runner", None) colony_entry = runner.graph.entry_node if runner else None + config_out = dict(tdef.trigger_config) + mono = getattr(session, "trigger_next_fire", {}).get(trigger_id) + if mono is not None: + remaining = max(0.0, mono - time.monotonic()) + config_out["next_fire_in"] = remaining + config_out["next_fire_at"] = int((time.time() + remaining) * 1000) + stats = getattr(session, "trigger_fire_stats", {}).get(trigger_id) + if stats: + config_out["fire_count"] = stats.get("fire_count", 0) + if stats.get("last_fired_at") is not None: + config_out["last_fired_at"] = stats["last_fired_at"] await bus.publish( AgentEvent( type=EventType.TRIGGER_ACTIVATED, @@ -606,7 +686,7 @@ async def handle_activate_trigger(request: web.Request) -> web.Response: data={ "trigger_id": trigger_id, "trigger_type": tdef.trigger_type, - "trigger_config": tdef.trigger_config, + "trigger_config": config_out, "name": tdef.description or trigger_id, **({"entry_node": colony_entry} if colony_entry else {}), }, @@ -1022,6 +1102,10 @@ def register_routes(app: web.Application) -> None: "/api/sessions/{session_id}/triggers/{trigger_id}/deactivate", handle_deactivate_trigger, ) + app.router.add_post( + "/api/sessions/{session_id}/triggers/{trigger_id}/run", + handle_run_trigger, + ) app.router.add_get("/api/sessions/{session_id}/colonies", handle_session_colonies) app.router.add_get("/api/sessions/{session_id}/events/history", handle_session_events_history) diff --git a/core/framework/server/session_manager.py b/core/framework/server/session_manager.py index d89456db..d65fd533 100644 --- a/core/framework/server/session_manager.py +++ b/core/framework/server/session_manager.py @@ -93,6 +93,9 @@ class Session: worker_configured: bool = False # Monotonic timestamps for next trigger fire (mirrors AgentRuntime._timer_next_fire) trigger_next_fire: dict[str, float] = field(default_factory=dict) + # Per-trigger fire stats (session lifetime): {trigger_id: {"fire_count": int, "last_fired_at": epoch_ms}}. + # Reset on process restart — good enough as a "since this session started" counter. + trigger_fire_stats: dict[str, dict[str, Any]] = field(default_factory=dict) # Session directory resumption: # When set, _start_queen writes queen conversations to this existing session's # directory instead of creating a new one. This lets cold-restores accumulate @@ -1607,8 +1610,28 @@ class SessionManager: # Resolve entry node for trigger target runner = getattr(session, "runner", None) colony_entry = runner.graph.entry_node if runner else None + fire_times = getattr(session, "trigger_next_fire", {}) + fire_stats = getattr(session, "trigger_fire_stats", {}) + now_mono = time.monotonic() + now_wall = time.time() for t in triggers.values(): + # Merge ephemeral next-fire data + historical fire stats into + # trigger_config so the UI can render a live-ticking countdown + # and a "fired Nx · last 2m ago" badge. `next_fire_at` is epoch + # milliseconds (wall clock) — the frontend anchors its ticker + # on this. `next_fire_in` is kept for legacy consumers. + config_out = dict(t.trigger_config) + mono = fire_times.get(t.id) + if mono is not None: + remaining = max(0.0, mono - now_mono) + config_out["next_fire_in"] = remaining + config_out["next_fire_at"] = int((now_wall + remaining) * 1000) + stats = fire_stats.get(t.id) + if stats: + config_out["fire_count"] = stats.get("fire_count", 0) + if stats.get("last_fired_at") is not None: + config_out["last_fired_at"] = stats["last_fired_at"] await session.event_bus.publish( AgentEvent( type=event_type, @@ -1616,7 +1639,7 @@ class SessionManager: data={ "trigger_id": t.id, "trigger_type": t.trigger_type, - "trigger_config": t.trigger_config, + "trigger_config": config_out, "name": t.description or t.id, **({"entry_node": colony_entry} if colony_entry else {}), }, diff --git a/core/framework/tools/queen_lifecycle_tools.py b/core/framework/tools/queen_lifecycle_tools.py index 3bf08a21..53988485 100644 --- a/core/framework/tools/queen_lifecycle_tools.py +++ b/core/framework/tools/queen_lifecycle_tools.py @@ -468,6 +468,51 @@ async def _persist_active_triggers(session: Any, session_id: str) -> None: logger.warning("Failed to persist active triggers for session %s", session_id, exc_info=True) +async def _emit_trigger_fired(session: Any, trigger_id: str, trigger_type: str) -> None: + """Publish EventType.TRIGGER_FIRED and update per-session fire stats. + + Called by both the timer loop and the webhook handler right after + ``queen_node.inject_trigger(...)``. The event carries refreshed + ``next_fire_at``/``next_fire_in`` so the UI can re-anchor its + countdown without polling, plus ``fire_count``/``last_fired_at`` for + the "fired Nx · last 2m ago" badge. + """ + now_wall = time.time() + stats_map = getattr(session, "trigger_fire_stats", None) + fire_count: int | None = None + last_fired_at: int = int(now_wall * 1000) + if stats_map is not None: + s = stats_map.setdefault(trigger_id, {"fire_count": 0, "last_fired_at": None}) + s["fire_count"] = int(s.get("fire_count", 0)) + 1 + s["last_fired_at"] = last_fired_at + fire_count = s["fire_count"] + + bus = getattr(session, "event_bus", None) + if bus is None: + return + + from framework.host.event_bus import AgentEvent, EventType + + data: dict[str, Any] = { + "trigger_id": trigger_id, + "trigger_type": trigger_type, + "last_fired_at": last_fired_at, + } + if fire_count is not None: + data["fire_count"] = fire_count + + mono = getattr(session, "trigger_next_fire", {}).get(trigger_id) + if mono is not None: + remaining = max(0.0, mono - time.monotonic()) + data["next_fire_in"] = remaining + data["next_fire_at"] = int((now_wall + remaining) * 1000) + + try: + await bus.publish(AgentEvent(type=EventType.TRIGGER_FIRED, stream_id="queen", data=data)) + except Exception: + logger.warning("Failed to publish TRIGGER_FIRED for '%s'", trigger_id, exc_info=True) + + async def _start_trigger_timer(session: Any, trigger_id: str, tdef: Any) -> None: """Start an asyncio background task that fires the trigger on a timer.""" from framework.agent_loop.agent_loop import TriggerEvent @@ -475,6 +520,24 @@ async def _start_trigger_timer(session: Any, trigger_id: str, tdef: Any) -> None cron_expr = tdef.trigger_config.get("cron") interval_minutes = tdef.trigger_config.get("interval_minutes") + # Seed the first-fire time up front so introspection (and the UI + # countdown) have a value immediately on activation instead of only + # after the first tick. Cron uses croniter's next match; interval + # uses interval_minutes. Both use monotonic, matching route readers. + fire_times = getattr(session, "trigger_next_fire", None) + if fire_times is not None: + if cron_expr: + try: + from croniter import croniter as _croniter_seed + + _first = _croniter_seed(cron_expr, datetime.now(tz=UTC)).get_next(datetime) + _first_delay = max(0.0, (_first - datetime.now(tz=UTC)).total_seconds()) + except Exception: + _first_delay = 60.0 + else: + _first_delay = float(interval_minutes) * 60 if interval_minutes else 60.0 + fire_times[trigger_id] = time.monotonic() + _first_delay + async def _timer_loop() -> None: if cron_expr: from croniter import croniter @@ -491,10 +554,21 @@ async def _start_trigger_timer(session: Any, trigger_id: str, tdef: Any) -> None else: await asyncio.sleep(float(interval_minutes) * 60) - # Record next fire time for introspection (monotonic, matches routes) + # Record the *subsequent* next-fire time for introspection. + # For cron we peek one step further; for interval we add + # another interval. Matches routes' monotonic clock. fire_times = getattr(session, "trigger_next_fire", None) if fire_times is not None: - _next_delay = float(interval_minutes) * 60 if interval_minutes else 60 + if cron_expr: + try: + _peek = croniter(cron_expr, datetime.now(tz=UTC)).get_next(datetime) + _next_delay = max( + 0.0, (_peek - datetime.now(tz=UTC)).total_seconds() + ) + except Exception: + _next_delay = 60.0 + else: + _next_delay = float(interval_minutes) * 60 if interval_minutes else 60.0 fire_times[trigger_id] = time.monotonic() + _next_delay # Gate on a graph being loaded @@ -518,6 +592,7 @@ async def _start_trigger_timer(session: Any, trigger_id: str, tdef: Any) -> None }, ) await queen_node.inject_trigger(event) + await _emit_trigger_fired(session, trigger_id, "timer") except asyncio.CancelledError: raise except Exception: @@ -567,6 +642,7 @@ async def _start_trigger_webhook(session: Any, trigger_id: str, tdef: Any) -> No }, ) await queen_node.inject_trigger(trigger_event) + await _emit_trigger_fired(session, trigger_id, "webhook") sub_id = bus.subscribe( event_types=[EventType.WEBHOOK_RECEIVED], diff --git a/core/frontend/src/api/sessions.ts b/core/frontend/src/api/sessions.ts index 20d07c4f..07ddb168 100644 --- a/core/frontend/src/api/sessions.ts +++ b/core/frontend/src/api/sessions.ts @@ -84,6 +84,11 @@ export const sessionsApi = { `/sessions/${sessionId}/triggers/${triggerId}/deactivate`, ), + runTrigger: (sessionId: string, triggerId: string) => + api.post<{ status: string; trigger_id: string }>( + `/sessions/${sessionId}/triggers/${triggerId}/run`, + ), + colonies: (sessionId: string) => api.get<{ colonies: string[] }>(`/sessions/${sessionId}/colonies`), diff --git a/core/frontend/src/api/types.ts b/core/frontend/src/api/types.ts index 2be35190..de89ddbf 100644 --- a/core/frontend/src/api/types.ts +++ b/core/frontend/src/api/types.ts @@ -62,6 +62,12 @@ export interface EntryPoint { task?: string; /** Seconds until the next timer fire (only present for timer entry points). */ next_fire_in?: number; + /** Absolute wall-clock time of the next timer fire (epoch ms). */ + next_fire_at?: number; + /** Number of times this trigger has fired during the session's lifetime. */ + fire_count?: number; + /** Wall-clock time of the most recent fire (epoch ms). */ + last_fired_at?: number; } export interface WorkerEntry { diff --git a/core/frontend/src/components/ColonyWorkersPanel.tsx b/core/frontend/src/components/ColonyWorkersPanel.tsx index 606e920c..e11daff8 100644 --- a/core/frontend/src/components/ColonyWorkersPanel.tsx +++ b/core/frontend/src/components/ColonyWorkersPanel.tsx @@ -818,11 +818,64 @@ function countdownLabel(nextFireIn: number | undefined): string | null { : `next in ${m}m ${String(s).padStart(2, "0")}s`; } +/** Tick a live countdown against the server-provided absolute `next_fire_at` + * (epoch ms). Falls back to converting `next_fire_in` (seconds delta) if + * the absolute form is absent. Rolls forward by interval_minutes when + * zero is crossed so the UI keeps counting between server pushes. */ +function useLiveCountdown( + nextFireAt: number | undefined, + nextFireIn: number | undefined, + isActive: boolean, + intervalMinutes: number | undefined, +): { remainingSec: number | null; firesAtMs: number | null } { + const [firesAtMs, setFiresAtMs] = useState(null); + const [remainingSec, setRemainingSec] = useState(null); + + useEffect(() => { + if (typeof nextFireAt === "number" && nextFireAt > 0) { + setFiresAtMs(nextFireAt); + } else if (typeof nextFireIn === "number" && nextFireIn >= 0) { + setFiresAtMs(Date.now() + nextFireIn * 1000); + } else { + setFiresAtMs(null); + } + }, [nextFireAt, nextFireIn]); + + useEffect(() => { + if (!isActive || firesAtMs == null) { + setRemainingSec(null); + return; + } + const tick = () => { + const diff = (firesAtMs - Date.now()) / 1000; + if (diff > 0) { + setRemainingSec(diff); + } else if (intervalMinutes) { + setFiresAtMs((prev) => (prev != null ? prev + intervalMinutes * 60 * 1000 : prev)); + } else { + setRemainingSec(0); + } + }; + tick(); + const id = window.setInterval(tick, 1000); + return () => window.clearInterval(id); + }, [firesAtMs, isActive, intervalMinutes]); + + return { remainingSec, firesAtMs }; +} + function TriggerCard({ trigger, onClick }: { trigger: GraphNode; onClick: () => void }) { const isActive = triggerIsActive(trigger); const schedule = scheduleLabel(trigger.triggerConfig); const nextFireIn = trigger.triggerConfig?.next_fire_in as number | undefined; - const countdown = isActive ? countdownLabel(nextFireIn) : null; + const nextFireAt = trigger.triggerConfig?.next_fire_at as number | undefined; + const interval = trigger.triggerConfig?.interval_minutes as number | undefined; + const fireCount = trigger.triggerConfig?.fire_count as number | undefined; + const lastFiredAt = trigger.triggerConfig?.last_fired_at as number | undefined; + const { remainingSec } = useLiveCountdown(nextFireAt, nextFireIn, isActive, interval); + const now = useNow(1000); + const countdown = isActive && remainingSec != null ? countdownLabel(remainingSec) : null; + const agoLabel = lastFiredAt ? formatAgo(lastFiredAt, now) : null; return ( ); } @@ -867,6 +927,31 @@ function formatCountdown(seconds: number): string { return `${s}s`; } +/** Human-readable "X ago" for a wall-clock epoch ms. */ +function formatAgo(epochMs: number, nowMs: number): string { + const diff = Math.max(0, Math.floor((nowMs - epochMs) / 1000)); + if (diff < 5) return "just now"; + if (diff < 60) return `${diff}s ago`; + const m = Math.floor(diff / 60); + if (m < 60) return `${m}m ago`; + const h = Math.floor(m / 60); + if (h < 24) return `${h}h ago`; + const d = Math.floor(h / 24); + return `${d}d ago`; +} + +/** Reactive Date.now() that re-renders on an interval. 1s default keeps + * countdowns smooth; consumers that only need "ago" can pass a coarser + * interval. */ +function useNow(intervalMs = 1000): number { + const [now, setNow] = useState(() => Date.now()); + useEffect(() => { + const id = window.setInterval(() => setNow(Date.now()), intervalMs); + return () => window.clearInterval(id); + }, [intervalMs]); + return now; +} + function TriggerDetail({ sessionId, trigger, @@ -877,14 +962,23 @@ function TriggerDetail({ onBack: () => void; }) { const [busy, setBusy] = useState(false); + const [runBusy, setRunBusy] = useState(false); + const [runNotice, setRunNotice] = useState(null); const [error, setError] = useState(null); const isActive = triggerIsActive(trigger); const config = (trigger.triggerConfig || {}) as Record; const cron = config.cron as string | undefined; const interval = config.interval_minutes as number | undefined; const nextFireIn = config.next_fire_in as number | undefined; + const nextFireAt = config.next_fire_at as number | undefined; + const fireCount = config.fire_count as number | undefined; + const lastFiredAt = config.last_fired_at as number | undefined; const triggerId = trigger.id.replace(/^__trigger_/, ""); + const { remainingSec, firesAtMs } = useLiveCountdown(nextFireAt, nextFireIn, isActive, interval); + const now = useNow(1000); + const lastFiredAgo = lastFiredAt ? formatAgo(lastFiredAt, now) : null; + const handleToggle = async () => { if (!sessionId || busy) return; setBusy(true); @@ -904,6 +998,23 @@ function TriggerDetail({ } }; + const handleForceRun = async () => { + if (!sessionId || runBusy) return; + setRunBusy(true); + setError(null); + setRunNotice(null); + try { + await sessionsApi.runTrigger(sessionId, triggerId); + setRunNotice("Trigger fired"); + // Clear the notice after a few seconds so it doesn't linger. + setTimeout(() => setRunNotice(null), 3000); + } catch (err) { + setError(err instanceof Error ? err.message : String(err)); + } finally { + setRunBusy(false); + } + }; + const schedule = cron ? cronToLabel(cron) : interval != null @@ -914,7 +1025,12 @@ function TriggerDetail({ // Hide UI-synthesised fields so the user sees only real operator config. const displayEntries = Object.entries(config).filter( - ([k]) => k !== "next_fire_in" && k !== "entry_node", + ([k]) => + k !== "next_fire_in" && + k !== "next_fire_at" && + k !== "fire_count" && + k !== "last_fired_at" && + k !== "entry_node", ); return ( @@ -967,12 +1083,33 @@ function TriggerDetail({ )} - {isActive && nextFireIn != null && nextFireIn > 0 && ( + {isActive && remainingSec != null && remainingSec > 0 && (
-

in {formatCountdown(nextFireIn)}

+

in {formatCountdown(remainingSec)}

+ {firesAtMs != null && ( +

+ at {new Date(firesAtMs).toLocaleTimeString()} +

+ )}
)} + {(fireCount != null && fireCount > 0) || lastFiredAgo ? ( +
+
+ {lastFiredAgo ?? "—"} + {fireCount != null && fireCount > 0 && ( + fired {fireCount}× + )} +
+ {lastFiredAt && ( +

+ at {new Date(lastFiredAt).toLocaleTimeString()} +

+ )} +
+ ) : null} + {displayEntries.length > 0 && (
@@ -995,6 +1132,23 @@ function TriggerDetail({ {error && (

{error}

)} + {runNotice && ( +

{runNotice}

+ )} +