refactor: align worker display

This commit is contained in:
Timothy
2026-04-16 11:26:32 -07:00
parent c4ee12532f
commit 4aa5fd7a90
3 changed files with 217 additions and 23 deletions
+180
View File
@@ -149,6 +149,186 @@ export function sseEventToChatMessage(
}
}
// ---------------------------------------------------------------------------
// Stateful event replay — produces tool_status pills + regular messages
// ---------------------------------------------------------------------------
/**
* State maintained while replaying an event stream. Tracks per-stream turn
* counters, the set of active tool calls (so tool_status pill content
* reflects "tool A done, tool B running" correctly), and a tool_use_id →
* pill_msg_id map so deferred `tool_call_completed` events can find the
* pill they belong to after the turn counter moves on.
*/
export interface ReplayState {
turnCounters: Record<string, number>;
activeToolCalls: Record<
string,
{ name: string; done: boolean; streamId: string }
>;
toolUseToPill: Record<string, { msgId: string; name: string }>;
}
export function newReplayState(): ReplayState {
return { turnCounters: {}, activeToolCalls: {}, toolUseToPill: {} };
}
/**
* Process a single event and emit zero or more ChatMessage upserts.
*
* Why this exists: `sseEventToChatMessage` is stateless — one event in, at
* most one message out. But the chat's tool_status pill is a SYNTHESIZED
* message: each tool_call_started adds to an accumulating pill, and each
* tool_call_completed flips one of its tools from running to done. Live
* SSE handlers in colony-chat and queen-dm already do this synthesis
* against React refs. Cold-restore from events.jsonl used to skip
* tool_call_* events entirely, so refreshed sessions looked completely
* different from live ones — no tool activity visible, just prose.
*
* This function centralizes the synthesis so cold-restore and live paths
* can use the exact same state machine. The caller treats the returned
* messages as upserts (by id) — a later event in the same replay may
* emit the same pill id with updated content, which should REPLACE the
* earlier row in the caller's message list.
*/
export function replayEvent(
state: ReplayState,
event: AgentEvent,
thread: string,
agentDisplayName: string | undefined,
): ChatMessage[] {
const streamId = event.stream_id;
const isQueen = streamId === "queen";
const role: "queen" | "worker" = isQueen ? "queen" : "worker";
const turnKey = streamId;
const currentTurn = state.turnCounters[turnKey] ?? 0;
const eventCreatedAt = event.timestamp
? new Date(event.timestamp).getTime()
: Date.now();
const out: ChatMessage[] = [];
// Update state machine BEFORE the generic converter runs so the
// regular message emitted for this event sees the post-update
// counter (matches live handler ordering at colony-chat.tsx:525).
switch (event.type) {
case "execution_started":
state.turnCounters[turnKey] = currentTurn + 1;
// New execution for a worker resets its active tools, mirroring
// the live handler's setAgentState at colony-chat.tsx:566.
if (!isQueen) {
const keepActive: typeof state.activeToolCalls = {};
for (const [k, v] of Object.entries(state.activeToolCalls)) {
if (v.streamId !== streamId) keepActive[k] = v;
}
state.activeToolCalls = keepActive;
}
break;
case "llm_turn_complete":
state.turnCounters[turnKey] = currentTurn + 1;
break;
case "tool_call_started": {
if (!event.node_id) break;
const toolName = (event.data?.tool_name as string) || "unknown";
const toolUseId = (event.data?.tool_use_id as string) || "";
state.activeToolCalls[toolUseId] = {
name: toolName,
done: false,
streamId,
};
const pillId = `tool-pill-${streamId}-${event.execution_id || "exec"}-${currentTurn}`;
if (toolUseId) {
state.toolUseToPill[toolUseId] = { msgId: pillId, name: toolName };
}
const tools = Object.values(state.activeToolCalls)
.filter((t) => t.streamId === streamId)
.map((t) => ({ name: t.name, done: t.done }));
const allDone = tools.length > 0 && tools.every((t) => t.done);
out.push({
id: pillId,
agent: agentDisplayName || event.node_id || "Agent",
agentColor: "",
content: JSON.stringify({ tools, allDone }),
timestamp: "",
type: "tool_status",
role,
thread,
createdAt: eventCreatedAt,
nodeId: event.node_id || undefined,
executionId: event.execution_id || undefined,
});
break;
}
case "tool_call_completed": {
if (!event.node_id) break;
const toolUseId = (event.data?.tool_use_id as string) || "";
const tracked = state.toolUseToPill[toolUseId];
if (toolUseId) delete state.toolUseToPill[toolUseId];
if (toolUseId && state.activeToolCalls[toolUseId]) {
state.activeToolCalls[toolUseId].done = true;
}
if (!tracked) break;
const tools = Object.values(state.activeToolCalls)
.filter((t) => t.streamId === streamId)
.map((t) => ({ name: t.name, done: t.done }));
const allDone = tools.length > 0 && tools.every((t) => t.done);
// Re-emit the SAME pill id with updated content. Caller upserts
// by id, so this replaces the row from tool_call_started.
out.push({
id: tracked.msgId,
agent: agentDisplayName || event.node_id || "Agent",
agentColor: "",
content: JSON.stringify({ tools, allDone }),
timestamp: "",
type: "tool_status",
role,
thread,
createdAt: eventCreatedAt,
nodeId: event.node_id || undefined,
executionId: event.execution_id || undefined,
});
break;
}
}
// Regular stateless conversion (prose, user input, system notes).
const msg = sseEventToChatMessage(
event,
thread,
agentDisplayName,
state.turnCounters[turnKey] ?? 0,
);
if (msg) {
if (isQueen) msg.role = "queen";
out.push(msg);
}
return out;
}
/**
* Replay an entire event array and return a deduplicated, chronologically
* sorted ChatMessage list. Used by cold-restore paths so refreshed
* sessions match the live stream exactly.
*/
export function replayEventsToMessages(
events: AgentEvent[],
thread: string,
agentDisplayName: string | undefined,
): ChatMessage[] {
const state = newReplayState();
// Upsert by id — later emissions for the same pill replace earlier ones.
const byId = new Map<string, ChatMessage>();
for (const evt of events) {
for (const m of replayEvent(state, evt, thread, agentDisplayName)) {
byId.set(m.id, m);
}
}
return Array.from(byId.values()).sort(
(a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0),
);
}
type QueenPhase = "planning" | "building" | "staging" | "running" | "independent";
const VALID_PHASES = new Set<string>(["planning", "building", "staging", "running", "independent"]);
+24 -11
View File
@@ -13,7 +13,11 @@ import { executionApi } from "@/api/execution";
import { sessionsApi } from "@/api/sessions";
import { useMultiSSE } from "@/hooks/use-sse";
import type { LiveSession, AgentEvent } from "@/api/types";
import { sseEventToChatMessage, formatAgentDisplayName } from "@/lib/chat-helpers";
import {
sseEventToChatMessage,
formatAgentDisplayName,
replayEventsToMessages,
} from "@/lib/chat-helpers";
import { cronToLabel } from "@/lib/graphUtils";
import { ApiError } from "@/api/client";
import { useColony } from "@/context/ColonyContext";
@@ -54,7 +58,13 @@ async function restoreSessionMessages(
const { events, truncated, total, returned } =
await sessionsApi.eventsHistory(sessionId);
if (events.length > 0) {
const messages: ChatMessage[] = [];
// Walk events twice:
// 1. Extract the trailing queen phase (unchanged logic).
// 2. Run the full state-machine replay so tool_status pills
// are synthesized just like the live SSE handler does.
// Without (2), refreshed sessions showed zero tool activity
// because tool_call_started/completed events are ignored by
// the stateless converter.
let runningPhase: ChatMessage["phase"] = undefined;
for (const evt of events) {
const p =
@@ -66,17 +76,20 @@ async function restoreSessionMessages(
if (p && ["planning", "building", "staging", "running"].includes(p)) {
runningPhase = p as ChatMessage["phase"];
}
const msg = sseEventToChatMessage(evt, thread, agentDisplayName);
if (!msg) continue;
if (evt.stream_id === "queen") {
msg.role = "queen";
msg.phase = runningPhase;
}
messages.push(msg);
const messages = replayEventsToMessages(events, thread, agentDisplayName);
// Stamp the latest phase on every queen message so the UI's
// phase-badge rendering matches what the live path would have
// displayed at the time of the refresh.
if (runningPhase) {
for (const m of messages) {
if (m.role === "queen") m.phase = runningPhase;
}
// Prepend a run_divider banner so the user knows how many older
// events were dropped. Keeps the chronology readable without
// injecting a load-more button (we can add pagination later).
}
// Prepend a run_divider banner when the server truncated older
// events so the user knows how many are hidden.
const droppedCount = Math.max(0, total - returned);
if (truncated && droppedCount > 0) {
const firstTs = events[0]?.timestamp;
+10 -9
View File
@@ -11,7 +11,10 @@ import { sessionsApi } from "@/api/sessions";
import { queensApi } from "@/api/queens";
import { useMultiSSE } from "@/hooks/use-sse";
import type { AgentEvent, HistorySession } from "@/api/types";
import { sseEventToChatMessage } from "@/lib/chat-helpers";
import {
sseEventToChatMessage,
replayEventsToMessages,
} from "@/lib/chat-helpers";
import { useColony } from "@/context/ColonyContext";
import { useHeaderActions } from "@/context/HeaderActionsContext";
import { getQueenForAgent, slugToColonyId } from "@/lib/colony-registry";
@@ -93,13 +96,12 @@ export default function QueenDM() {
const { events, truncated, total, returned } =
await sessionsApi.eventsHistory(sid);
if (cancelled()) return;
const restored: ChatMessage[] = [];
for (const evt of events) {
const msg = sseEventToChatMessage(evt, "queen-dm", queenName);
if (!msg) continue;
if (evt.stream_id === "queen") msg.role = "queen";
restored.push(msg);
}
// Use the stateful replay so tool_status pills are synthesized
// the same way the live SSE handler does — without this the
// refreshed queen DM shows zero tool activity.
const restored = replayEventsToMessages(events, "queen-dm", queenName);
// Show a banner if the server truncated older events.
const droppedCount = Math.max(0, total - returned);
if (truncated && droppedCount > 0) {
@@ -119,7 +121,6 @@ export default function QueenDM() {
});
}
if (restored.length > 0 && !cancelled()) {
restored.sort((a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0));
setMessages(restored);
// Only clear typing if the history contains a completed execution;
// during bootstrap the queen is still processing.