fix: worker message display

This commit is contained in:
Timothy
2026-04-16 11:20:17 -07:00
parent 1ee0d5a2e8
commit c4ee12532f
6 changed files with 206 additions and 42 deletions
+71 -5
View File
@@ -708,6 +708,10 @@ async def handle_session_colonies(request: web.Request) -> web.Response:
return web.json_response({"colonies": colonies})
_EVENTS_HISTORY_DEFAULT_LIMIT = 2000
_EVENTS_HISTORY_MAX_LIMIT = 10000
async def handle_session_events_history(request: web.Request) -> web.Response:
"""GET /api/sessions/{session_id}/events/history — persisted eventbus log.
@@ -715,17 +719,58 @@ async def handle_session_events_history(request: web.Request) -> web.Response:
both live sessions and cold (post-server-restart) sessions. The frontend
replays these events through ``sseEventToChatMessage`` to fully reconstruct
the UI state on resume.
Query params:
limit: maximum number of events to return (default 2000, max 10000).
The TAIL of the file is returned i.e. the most recent N events.
Older events are dropped and ``truncated`` is set to True.
Response shape::
{
"events": [...], # up to ``limit`` events, oldest-first
"session_id": "...",
"total": 12345, # total events in the file
"returned": 2000, # len(events)
"truncated": true, # total > returned
"limit": 2000, # the effective limit used
}
``events.jsonl`` is append-only chronological, so "last N lines" == "most
recent N events". Long-running colonies have produced files with 50k+
events; before this cap, restoring on page-mount shipped the whole thing
down the wire and blocked the UI for seconds.
"""
session_id = request.match_info["session_id"]
try:
limit = int(request.query.get("limit", str(_EVENTS_HISTORY_DEFAULT_LIMIT)))
except ValueError:
limit = _EVENTS_HISTORY_DEFAULT_LIMIT
limit = max(1, min(limit, _EVENTS_HISTORY_MAX_LIMIT))
from framework.server.session_manager import _find_queen_session_dir
queen_dir = _find_queen_session_dir(session_id)
events_path = queen_dir / "events.jsonl"
if not events_path.exists():
return web.json_response({"events": [], "session_id": session_id})
return web.json_response(
{
"events": [],
"session_id": session_id,
"total": 0,
"returned": 0,
"truncated": False,
"limit": limit,
}
)
events: list[dict] = []
# Tail the file using a bounded deque — O(limit) memory regardless
# of file size. No need to materialize the whole list only to slice it.
from collections import deque
tail: deque[dict] = deque(maxlen=limit)
total = 0
try:
with open(events_path, encoding="utf-8") as f:
for line in f:
@@ -733,13 +778,34 @@ async def handle_session_events_history(request: web.Request) -> web.Response:
if not line:
continue
try:
events.append(json.loads(line))
evt = json.loads(line)
except json.JSONDecodeError:
continue
total += 1
tail.append(evt)
except OSError:
return web.json_response({"events": [], "session_id": session_id})
return web.json_response(
{
"events": [],
"session_id": session_id,
"total": 0,
"returned": 0,
"truncated": False,
"limit": limit,
}
)
return web.json_response({"events": events, "session_id": session_id})
events = list(tail)
return web.json_response(
{
"events": events,
"session_id": session_id,
"total": total,
"returned": len(events),
"truncated": total > len(events),
"limit": limit,
}
)
async def handle_session_history(request: web.Request) -> web.Response:
+19 -3
View File
@@ -87,9 +87,25 @@ export const sessionsApi = {
colonies: (sessionId: string) =>
api.get<{ colonies: string[] }>(`/sessions/${sessionId}/colonies`),
/** Get persisted eventbus log for a session (works for cold sessions — used for full UI replay). */
eventsHistory: (sessionId: string) =>
api.get<{ events: AgentEvent[]; session_id: string }>(`/sessions/${sessionId}/events/history`),
/** Get persisted eventbus log for a session (works for cold sessions — used for full UI replay).
*
* Returns the TAIL of the event log. Default limit 2000 (server
* clamps to [1, 10000]); older events get dropped and
* ``truncated: true`` is set so the UI can show an indicator.
*/
eventsHistory: (sessionId: string, limit?: number) =>
api.get<{
events: AgentEvent[];
session_id: string;
total: number;
returned: number;
truncated: boolean;
limit: number;
}>(
`/sessions/${sessionId}/events/history${
limit ? `?limit=${limit}` : ""
}`,
),
/** Open the session's data folder in the OS file manager. */
revealFolder: (sessionId: string) =>
+2 -2
View File
@@ -122,14 +122,14 @@ const TOOL_HEX = [
"#e5a820", // sunflower
];
function toolHex(name: string): string {
export function toolHex(name: string): string {
let hash = 0;
for (let i = 0; i < name.length; i++)
hash = (hash * 31 + name.charCodeAt(i)) | 0;
return TOOL_HEX[Math.abs(hash) % TOOL_HEX.length];
}
function ToolActivityRow({ content }: { content: string }) {
export function ToolActivityRow({ content }: { content: string }) {
let tools: { name: string; done: boolean }[] = [];
try {
const parsed = JSON.parse(content);
@@ -1,6 +1,7 @@
import { memo, useState, useRef, useEffect } from "react";
import { ChevronDown, ChevronUp, Cpu } from "lucide-react";
import type { ChatMessage } from "@/components/ChatPanel";
import { ToolActivityRow } from "@/components/ChatPanel";
import MarkdownContent from "@/components/MarkdownContent";
const workerColor = "hsl(220,60%,55%)";
@@ -197,41 +198,77 @@ const WorkerRunBubble = memo(
</div>
)}
{/* Expanded: full scrollable message stream */}
{/* Expanded: chronological stream with tool bursts
coalesced into a single ToolActivityRow each.
Consecutive tool_status messages (no text between)
collapse to the LATEST snapshot — each snapshot is
cumulative within its turn, so the latest one tells
the whole story for that burst. Text messages break
the burst and render as markdown. */}
{expanded && (
<div
ref={bodyRef}
className="max-h-[400px] overflow-y-auto px-4 py-3 space-y-2"
className="max-h-[400px] overflow-y-auto px-4 py-3 space-y-3"
>
{group.messages.map((m, i) => {
if (m.type === "tool_status") {
const tools = parseToolStatus(m.content);
if (tools.length === 0) return null;
{(() => {
type RenderRow =
| { kind: "tools"; content: string; key: string }
| { kind: "text"; msg: ChatMessage; key: string };
const rows: RenderRow[] = [];
let pendingTool: { content: string; id: string } | null = null;
const flushTool = () => {
if (pendingTool) {
rows.push({
kind: "tools",
content: pendingTool.content,
key: `tools-${pendingTool.id}`,
});
pendingTool = null;
}
};
for (let i = 0; i < group.messages.length; i++) {
const m = group.messages[i];
if (m.type === "tool_status") {
// Overwrite — latest snapshot in the burst wins
pendingTool = {
content: m.content,
id: m.id || `ts-${i}`,
};
continue;
}
if (m.content?.trim()) {
flushTool();
rows.push({
kind: "text",
msg: m,
key: m.id || `txt-${i}`,
});
}
}
flushTool();
return rows.map((row) => {
if (row.kind === "tools") {
// ToolActivityRow groups by tool name (×N), shows
// running pills (spinner) before done pills (check),
// and uses the per-tool color hash that matches
// the rest of the chat.
return (
<div key={row.key} className="-ml-10">
<ToolActivityRow content={row.content} />
</div>
);
}
return (
<div key={m.id || i} className="flex flex-wrap gap-1.5">
{tools.map((t, ti) => (
<span
key={ti}
className={`inline-flex items-center gap-1 text-[10px] px-1.5 py-0.5 rounded-md border ${
t.done
? "bg-green-50 border-green-200 text-green-700 dark:bg-green-900/20 dark:border-green-800 dark:text-green-400"
: "bg-blue-50 border-blue-200 text-blue-700 dark:bg-blue-900/20 dark:border-blue-800 dark:text-blue-400"
}`}
>
<span>{t.done ? "\u2713" : "\u25cf"}</span>
<span className="font-mono">{t.name}</span>
</span>
))}
<div
key={row.key}
className="text-sm leading-relaxed"
>
<MarkdownContent content={row.msg.content} />
</div>
);
}
if (!m.content?.trim()) return null;
return (
<div key={m.id || i} className="text-sm leading-relaxed">
<MarkdownContent content={m.content} />
</div>
);
})}
});
})()}
</div>
)}
</div>
+29 -3
View File
@@ -41,6 +41,8 @@ function truncate(s: string, max: number): string {
type SessionRestoreResult = {
messages: ChatMessage[];
restoredPhase: "planning" | "building" | "staging" | "running" | "independent" | null;
truncated: boolean;
droppedCount: number;
};
async function restoreSessionMessages(
@@ -49,7 +51,8 @@ async function restoreSessionMessages(
agentDisplayName: string,
): Promise<SessionRestoreResult> {
try {
const { events } = await sessionsApi.eventsHistory(sessionId);
const { events, truncated, total, returned } =
await sessionsApi.eventsHistory(sessionId);
if (events.length > 0) {
const messages: ChatMessage[] = [];
let runningPhase: ChatMessage["phase"] = undefined;
@@ -71,12 +74,35 @@ async function restoreSessionMessages(
}
messages.push(msg);
}
return { messages, restoredPhase: runningPhase ?? null };
// Prepend a run_divider banner so the user knows how many older
// events were dropped. Keeps the chronology readable without
// injecting a load-more button (we can add pagination later).
const droppedCount = Math.max(0, total - returned);
if (truncated && droppedCount > 0) {
const firstTs = events[0]?.timestamp;
const bannerCreatedAt = firstTs ? new Date(firstTs).getTime() - 1 : 0;
messages.unshift({
id: `restore-truncated-${sessionId}`,
agent: "System",
agentColor: "",
type: "run_divider",
content: `${droppedCount.toLocaleString()} older event${droppedCount === 1 ? "" : "s"} not shown (showing last ${returned.toLocaleString()})`,
timestamp: firstTs ?? new Date().toISOString(),
thread,
createdAt: bannerCreatedAt,
});
}
return {
messages,
restoredPhase: runningPhase ?? null,
truncated,
droppedCount,
};
}
} catch {
// Event log not available
}
return { messages: [], restoredPhase: null };
return { messages: [], restoredPhase: null, truncated: false, droppedCount: 0 };
}
// ── Agent backend state ──────────────────────────────────────────────────────
+20 -1
View File
@@ -90,7 +90,8 @@ export default function QueenDM() {
const restoreMessages = useCallback(
async (sid: string, cancelled: () => boolean) => {
try {
const { events } = await sessionsApi.eventsHistory(sid);
const { events, truncated, total, returned } =
await sessionsApi.eventsHistory(sid);
if (cancelled()) return;
const restored: ChatMessage[] = [];
for (const evt of events) {
@@ -99,6 +100,24 @@ export default function QueenDM() {
if (evt.stream_id === "queen") msg.role = "queen";
restored.push(msg);
}
// Show a banner if the server truncated older events.
const droppedCount = Math.max(0, total - returned);
if (truncated && droppedCount > 0) {
const firstTs = events[0]?.timestamp;
const bannerCreatedAt = firstTs
? new Date(firstTs).getTime() - 1
: 0;
restored.unshift({
id: `restore-truncated-${sid}`,
agent: "System",
agentColor: "",
type: "run_divider",
content: `${droppedCount.toLocaleString()} older event${droppedCount === 1 ? "" : "s"} not shown (showing last ${returned.toLocaleString()})`,
timestamp: firstTs ?? new Date().toISOString(),
thread: "queen-dm",
createdAt: bannerCreatedAt,
});
}
if (restored.length > 0 && !cancelled()) {
restored.sort((a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0));
setMessages(restored);