Merge branch 'refactor/worker-message' into feature/colony-sqlite

This commit is contained in:
Timothy
2026-04-16 11:26:46 -07:00
7 changed files with 419 additions and 61 deletions
+71 -5
View File
@@ -686,6 +686,10 @@ async def handle_session_colonies(request: web.Request) -> web.Response:
return web.json_response({"colonies": colonies})
_EVENTS_HISTORY_DEFAULT_LIMIT = 2000
_EVENTS_HISTORY_MAX_LIMIT = 10000
async def handle_session_events_history(request: web.Request) -> web.Response:
"""GET /api/sessions/{session_id}/events/history — persisted eventbus log.
@@ -693,17 +697,58 @@ async def handle_session_events_history(request: web.Request) -> web.Response:
both live sessions and cold (post-server-restart) sessions. The frontend
replays these events through ``sseEventToChatMessage`` to fully reconstruct
the UI state on resume.
Query params:
limit: maximum number of events to return (default 2000, max 10000).
The TAIL of the file is returned i.e. the most recent N events.
Older events are dropped and ``truncated`` is set to True.
Response shape::
{
"events": [...], # up to ``limit`` events, oldest-first
"session_id": "...",
"total": 12345, # total events in the file
"returned": 2000, # len(events)
"truncated": true, # total > returned
"limit": 2000, # the effective limit used
}
``events.jsonl`` is append-only chronological, so "last N lines" == "most
recent N events". Long-running colonies have produced files with 50k+
events; before this cap, restoring on page-mount shipped the whole thing
down the wire and blocked the UI for seconds.
"""
session_id = request.match_info["session_id"]
try:
limit = int(request.query.get("limit", str(_EVENTS_HISTORY_DEFAULT_LIMIT)))
except ValueError:
limit = _EVENTS_HISTORY_DEFAULT_LIMIT
limit = max(1, min(limit, _EVENTS_HISTORY_MAX_LIMIT))
from framework.server.session_manager import _find_queen_session_dir
queen_dir = _find_queen_session_dir(session_id)
events_path = queen_dir / "events.jsonl"
if not events_path.exists():
return web.json_response({"events": [], "session_id": session_id})
return web.json_response(
{
"events": [],
"session_id": session_id,
"total": 0,
"returned": 0,
"truncated": False,
"limit": limit,
}
)
events: list[dict] = []
# Tail the file using a bounded deque — O(limit) memory regardless
# of file size. No need to materialize the whole list only to slice it.
from collections import deque
tail: deque[dict] = deque(maxlen=limit)
total = 0
try:
with open(events_path, encoding="utf-8") as f:
for line in f:
@@ -711,13 +756,34 @@ async def handle_session_events_history(request: web.Request) -> web.Response:
if not line:
continue
try:
events.append(json.loads(line))
evt = json.loads(line)
except json.JSONDecodeError:
continue
total += 1
tail.append(evt)
except OSError:
return web.json_response({"events": [], "session_id": session_id})
return web.json_response(
{
"events": [],
"session_id": session_id,
"total": 0,
"returned": 0,
"truncated": False,
"limit": limit,
}
)
return web.json_response({"events": events, "session_id": session_id})
events = list(tail)
return web.json_response(
{
"events": events,
"session_id": session_id,
"total": total,
"returned": len(events),
"truncated": total > len(events),
"limit": limit,
}
)
async def handle_session_history(request: web.Request) -> web.Response:
+19 -3
View File
@@ -87,9 +87,25 @@ export const sessionsApi = {
colonies: (sessionId: string) =>
api.get<{ colonies: string[] }>(`/sessions/${sessionId}/colonies`),
/** Get persisted eventbus log for a session (works for cold sessions — used for full UI replay). */
eventsHistory: (sessionId: string) =>
api.get<{ events: AgentEvent[]; session_id: string }>(`/sessions/${sessionId}/events/history`),
/** Get persisted eventbus log for a session (works for cold sessions — used for full UI replay).
*
* Returns the TAIL of the event log. Default limit 2000 (server
* clamps to [1, 10000]); older events get dropped and
* ``truncated: true`` is set so the UI can show an indicator.
*/
eventsHistory: (sessionId: string, limit?: number) =>
api.get<{
events: AgentEvent[];
session_id: string;
total: number;
returned: number;
truncated: boolean;
limit: number;
}>(
`/sessions/${sessionId}/events/history${
limit ? `?limit=${limit}` : ""
}`,
),
/** Open the session's data folder in the OS file manager. */
revealFolder: (sessionId: string) =>
+2 -2
View File
@@ -128,14 +128,14 @@ const TOOL_HEX = [
"#e5a820", // sunflower
];
function toolHex(name: string): string {
export function toolHex(name: string): string {
let hash = 0;
for (let i = 0; i < name.length; i++)
hash = (hash * 31 + name.charCodeAt(i)) | 0;
return TOOL_HEX[Math.abs(hash) % TOOL_HEX.length];
}
function ToolActivityRow({ content }: { content: string }) {
export function ToolActivityRow({ content }: { content: string }) {
let tools: { name: string; done: boolean }[] = [];
try {
const parsed = JSON.parse(content);
@@ -1,6 +1,7 @@
import { memo, useState, useRef, useEffect } from "react";
import { ChevronDown, ChevronUp, Cpu } from "lucide-react";
import type { ChatMessage } from "@/components/ChatPanel";
import { ToolActivityRow } from "@/components/ChatPanel";
import MarkdownContent from "@/components/MarkdownContent";
const workerColor = "hsl(220,60%,55%)";
@@ -197,41 +198,77 @@ const WorkerRunBubble = memo(
</div>
)}
{/* Expanded: full scrollable message stream */}
{/* Expanded: chronological stream with tool bursts
coalesced into a single ToolActivityRow each.
Consecutive tool_status messages (no text between)
collapse to the LATEST snapshot — each snapshot is
cumulative within its turn, so the latest one tells
the whole story for that burst. Text messages break
the burst and render as markdown. */}
{expanded && (
<div
ref={bodyRef}
className="max-h-[400px] overflow-y-auto px-4 py-3 space-y-2"
className="max-h-[400px] overflow-y-auto px-4 py-3 space-y-3"
>
{group.messages.map((m, i) => {
if (m.type === "tool_status") {
const tools = parseToolStatus(m.content);
if (tools.length === 0) return null;
{(() => {
type RenderRow =
| { kind: "tools"; content: string; key: string }
| { kind: "text"; msg: ChatMessage; key: string };
const rows: RenderRow[] = [];
let pendingTool: { content: string; id: string } | null = null;
const flushTool = () => {
if (pendingTool) {
rows.push({
kind: "tools",
content: pendingTool.content,
key: `tools-${pendingTool.id}`,
});
pendingTool = null;
}
};
for (let i = 0; i < group.messages.length; i++) {
const m = group.messages[i];
if (m.type === "tool_status") {
// Overwrite — latest snapshot in the burst wins
pendingTool = {
content: m.content,
id: m.id || `ts-${i}`,
};
continue;
}
if (m.content?.trim()) {
flushTool();
rows.push({
kind: "text",
msg: m,
key: m.id || `txt-${i}`,
});
}
}
flushTool();
return rows.map((row) => {
if (row.kind === "tools") {
// ToolActivityRow groups by tool name (×N), shows
// running pills (spinner) before done pills (check),
// and uses the per-tool color hash that matches
// the rest of the chat.
return (
<div key={row.key} className="-ml-10">
<ToolActivityRow content={row.content} />
</div>
);
}
return (
<div key={m.id || i} className="flex flex-wrap gap-1.5">
{tools.map((t, ti) => (
<span
key={ti}
className={`inline-flex items-center gap-1 text-[10px] px-1.5 py-0.5 rounded-md border ${
t.done
? "bg-green-50 border-green-200 text-green-700 dark:bg-green-900/20 dark:border-green-800 dark:text-green-400"
: "bg-blue-50 border-blue-200 text-blue-700 dark:bg-blue-900/20 dark:border-blue-800 dark:text-blue-400"
}`}
>
<span>{t.done ? "\u2713" : "\u25cf"}</span>
<span className="font-mono">{t.name}</span>
</span>
))}
<div
key={row.key}
className="text-sm leading-relaxed"
>
<MarkdownContent content={row.msg.content} />
</div>
);
}
if (!m.content?.trim()) return null;
return (
<div key={m.id || i} className="text-sm leading-relaxed">
<MarkdownContent content={m.content} />
</div>
);
})}
});
})()}
</div>
)}
</div>
+180
View File
@@ -194,6 +194,186 @@ export function sseEventToChatMessage(
}
}
// ---------------------------------------------------------------------------
// Stateful event replay — produces tool_status pills + regular messages
// ---------------------------------------------------------------------------
/**
* State maintained while replaying an event stream. Tracks per-stream turn
* counters, the set of active tool calls (so tool_status pill content
* reflects "tool A done, tool B running" correctly), and a tool_use_id →
* pill_msg_id map so deferred `tool_call_completed` events can find the
* pill they belong to after the turn counter moves on.
*/
export interface ReplayState {
turnCounters: Record<string, number>;
activeToolCalls: Record<
string,
{ name: string; done: boolean; streamId: string }
>;
toolUseToPill: Record<string, { msgId: string; name: string }>;
}
export function newReplayState(): ReplayState {
return { turnCounters: {}, activeToolCalls: {}, toolUseToPill: {} };
}
/**
* Process a single event and emit zero or more ChatMessage upserts.
*
* Why this exists: `sseEventToChatMessage` is stateless — one event in, at
* most one message out. But the chat's tool_status pill is a SYNTHESIZED
* message: each tool_call_started adds to an accumulating pill, and each
* tool_call_completed flips one of its tools from running to done. Live
* SSE handlers in colony-chat and queen-dm already do this synthesis
* against React refs. Cold-restore from events.jsonl used to skip
* tool_call_* events entirely, so refreshed sessions looked completely
* different from live ones — no tool activity visible, just prose.
*
* This function centralizes the synthesis so cold-restore and live paths
* can use the exact same state machine. The caller treats the returned
* messages as upserts (by id) — a later event in the same replay may
* emit the same pill id with updated content, which should REPLACE the
* earlier row in the caller's message list.
*/
export function replayEvent(
state: ReplayState,
event: AgentEvent,
thread: string,
agentDisplayName: string | undefined,
): ChatMessage[] {
const streamId = event.stream_id;
const isQueen = streamId === "queen";
const role: "queen" | "worker" = isQueen ? "queen" : "worker";
const turnKey = streamId;
const currentTurn = state.turnCounters[turnKey] ?? 0;
const eventCreatedAt = event.timestamp
? new Date(event.timestamp).getTime()
: Date.now();
const out: ChatMessage[] = [];
// Update state machine BEFORE the generic converter runs so the
// regular message emitted for this event sees the post-update
// counter (matches live handler ordering at colony-chat.tsx:525).
switch (event.type) {
case "execution_started":
state.turnCounters[turnKey] = currentTurn + 1;
// New execution for a worker resets its active tools, mirroring
// the live handler's setAgentState at colony-chat.tsx:566.
if (!isQueen) {
const keepActive: typeof state.activeToolCalls = {};
for (const [k, v] of Object.entries(state.activeToolCalls)) {
if (v.streamId !== streamId) keepActive[k] = v;
}
state.activeToolCalls = keepActive;
}
break;
case "llm_turn_complete":
state.turnCounters[turnKey] = currentTurn + 1;
break;
case "tool_call_started": {
if (!event.node_id) break;
const toolName = (event.data?.tool_name as string) || "unknown";
const toolUseId = (event.data?.tool_use_id as string) || "";
state.activeToolCalls[toolUseId] = {
name: toolName,
done: false,
streamId,
};
const pillId = `tool-pill-${streamId}-${event.execution_id || "exec"}-${currentTurn}`;
if (toolUseId) {
state.toolUseToPill[toolUseId] = { msgId: pillId, name: toolName };
}
const tools = Object.values(state.activeToolCalls)
.filter((t) => t.streamId === streamId)
.map((t) => ({ name: t.name, done: t.done }));
const allDone = tools.length > 0 && tools.every((t) => t.done);
out.push({
id: pillId,
agent: agentDisplayName || event.node_id || "Agent",
agentColor: "",
content: JSON.stringify({ tools, allDone }),
timestamp: "",
type: "tool_status",
role,
thread,
createdAt: eventCreatedAt,
nodeId: event.node_id || undefined,
executionId: event.execution_id || undefined,
});
break;
}
case "tool_call_completed": {
if (!event.node_id) break;
const toolUseId = (event.data?.tool_use_id as string) || "";
const tracked = state.toolUseToPill[toolUseId];
if (toolUseId) delete state.toolUseToPill[toolUseId];
if (toolUseId && state.activeToolCalls[toolUseId]) {
state.activeToolCalls[toolUseId].done = true;
}
if (!tracked) break;
const tools = Object.values(state.activeToolCalls)
.filter((t) => t.streamId === streamId)
.map((t) => ({ name: t.name, done: t.done }));
const allDone = tools.length > 0 && tools.every((t) => t.done);
// Re-emit the SAME pill id with updated content. Caller upserts
// by id, so this replaces the row from tool_call_started.
out.push({
id: tracked.msgId,
agent: agentDisplayName || event.node_id || "Agent",
agentColor: "",
content: JSON.stringify({ tools, allDone }),
timestamp: "",
type: "tool_status",
role,
thread,
createdAt: eventCreatedAt,
nodeId: event.node_id || undefined,
executionId: event.execution_id || undefined,
});
break;
}
}
// Regular stateless conversion (prose, user input, system notes).
const msg = sseEventToChatMessage(
event,
thread,
agentDisplayName,
state.turnCounters[turnKey] ?? 0,
);
if (msg) {
if (isQueen) msg.role = "queen";
out.push(msg);
}
return out;
}
/**
* Replay an entire event array and return a deduplicated, chronologically
* sorted ChatMessage list. Used by cold-restore paths so refreshed
* sessions match the live stream exactly.
*/
export function replayEventsToMessages(
events: AgentEvent[],
thread: string,
agentDisplayName: string | undefined,
): ChatMessage[] {
const state = newReplayState();
// Upsert by id — later emissions for the same pill replace earlier ones.
const byId = new Map<string, ChatMessage>();
for (const evt of events) {
for (const m of replayEvent(state, evt, thread, agentDisplayName)) {
byId.set(m.id, m);
}
}
return Array.from(byId.values()).sort(
(a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0),
);
}
type QueenPhase = "planning" | "building" | "staging" | "running" | "independent";
const VALID_PHASES = new Set<string>(["planning", "building", "staging", "running", "independent"]);
+53 -14
View File
@@ -13,7 +13,11 @@ import { executionApi } from "@/api/execution";
import { sessionsApi } from "@/api/sessions";
import { useMultiSSE } from "@/hooks/use-sse";
import type { LiveSession, AgentEvent } from "@/api/types";
import { sseEventToChatMessage, formatAgentDisplayName } from "@/lib/chat-helpers";
import {
sseEventToChatMessage,
formatAgentDisplayName,
replayEventsToMessages,
} from "@/lib/chat-helpers";
import { cronToLabel } from "@/lib/graphUtils";
import { ApiError } from "@/api/client";
import { useColony } from "@/context/ColonyContext";
@@ -41,6 +45,8 @@ function truncate(s: string, max: number): string {
type SessionRestoreResult = {
messages: ChatMessage[];
restoredPhase: "planning" | "building" | "staging" | "running" | "independent" | null;
truncated: boolean;
droppedCount: number;
};
async function restoreSessionMessages(
@@ -49,34 +55,67 @@ async function restoreSessionMessages(
agentDisplayName: string,
): Promise<SessionRestoreResult> {
try {
const { events } = await sessionsApi.eventsHistory(sessionId);
const { events, truncated, total, returned } =
await sessionsApi.eventsHistory(sessionId);
if (events.length > 0) {
const messages: ChatMessage[] = [];
// Walk events twice:
// 1. Extract the trailing queen phase (unchanged logic).
// 2. Run the full state-machine replay so tool_status pills
// are synthesized just like the live SSE handler does.
// Without (2), refreshed sessions showed zero tool activity
// because tool_call_started/completed events are ignored by
// the stateless converter.
let runningPhase: ChatMessage["phase"] = undefined;
for (const evt of events) {
const p =
evt.type === "queen_phase_changed"
? (evt.data?.phase as string)
: evt.type === "node_loop_iteration"
? (evt.data?.phase as string | undefined)
: undefined;
? (evt.data?.phase as string | undefined)
: undefined;
if (p && ["planning", "building", "staging", "running"].includes(p)) {
runningPhase = p as ChatMessage["phase"];
}
const msg = sseEventToChatMessage(evt, thread, agentDisplayName);
if (!msg) continue;
if (evt.stream_id === "queen") {
msg.role = "queen";
msg.phase = runningPhase;
}
messages.push(msg);
}
return { messages, restoredPhase: runningPhase ?? null };
const messages = replayEventsToMessages(events, thread, agentDisplayName);
// Stamp the latest phase on every queen message so the UI's
// phase-badge rendering matches what the live path would have
// displayed at the time of the refresh.
if (runningPhase) {
for (const m of messages) {
if (m.role === "queen") m.phase = runningPhase;
}
}
// Prepend a run_divider banner when the server truncated older
// events so the user knows how many are hidden.
const droppedCount = Math.max(0, total - returned);
if (truncated && droppedCount > 0) {
const firstTs = events[0]?.timestamp;
const bannerCreatedAt = firstTs ? new Date(firstTs).getTime() - 1 : 0;
messages.unshift({
id: `restore-truncated-${sessionId}`,
agent: "System",
agentColor: "",
type: "run_divider",
content: `${droppedCount.toLocaleString()} older event${droppedCount === 1 ? "" : "s"} not shown (showing last ${returned.toLocaleString()})`,
timestamp: firstTs ?? new Date().toISOString(),
thread,
createdAt: bannerCreatedAt,
});
}
return {
messages,
restoredPhase: runningPhase ?? null,
truncated,
droppedCount,
};
}
} catch {
// Event log not available
}
return { messages: [], restoredPhase: null };
return { messages: [], restoredPhase: null, truncated: false, droppedCount: 0 };
}
// ── Agent backend state ──────────────────────────────────────────────────────
+29 -9
View File
@@ -11,7 +11,10 @@ import { sessionsApi } from "@/api/sessions";
import { queensApi } from "@/api/queens";
import { useMultiSSE } from "@/hooks/use-sse";
import type { AgentEvent, HistorySession } from "@/api/types";
import { sseEventToChatMessage } from "@/lib/chat-helpers";
import {
sseEventToChatMessage,
replayEventsToMessages,
} from "@/lib/chat-helpers";
import { useColony } from "@/context/ColonyContext";
import { useHeaderActions } from "@/context/HeaderActionsContext";
import { getQueenForAgent, slugToColonyId } from "@/lib/colony-registry";
@@ -90,17 +93,34 @@ export default function QueenDM() {
const restoreMessages = useCallback(
async (sid: string, cancelled: () => boolean) => {
try {
const { events } = await sessionsApi.eventsHistory(sid);
const { events, truncated, total, returned } =
await sessionsApi.eventsHistory(sid);
if (cancelled()) return;
const restored: ChatMessage[] = [];
for (const evt of events) {
const msg = sseEventToChatMessage(evt, "queen-dm", queenName);
if (!msg) continue;
if (evt.stream_id === "queen") msg.role = "queen";
restored.push(msg);
// Use the stateful replay so tool_status pills are synthesized
// the same way the live SSE handler does — without this the
// refreshed queen DM shows zero tool activity.
const restored = replayEventsToMessages(events, "queen-dm", queenName);
// Show a banner if the server truncated older events.
const droppedCount = Math.max(0, total - returned);
if (truncated && droppedCount > 0) {
const firstTs = events[0]?.timestamp;
const bannerCreatedAt = firstTs
? new Date(firstTs).getTime() - 1
: 0;
restored.unshift({
id: `restore-truncated-${sid}`,
agent: "System",
agentColor: "",
type: "run_divider",
content: `${droppedCount.toLocaleString()} older event${droppedCount === 1 ? "" : "s"} not shown (showing last ${returned.toLocaleString()})`,
timestamp: firstTs ?? new Date().toISOString(),
thread: "queen-dm",
createdAt: bannerCreatedAt,
});
}
if (restored.length > 0 && !cancelled()) {
restored.sort((a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0));
setMessages(restored);
// Only clear typing if the history contains a completed execution;
// during bootstrap the queen is still processing.