From 4e9d9bf1ea74f8b01fe6babfb87f4631b54371e2 Mon Sep 17 00:00:00 2001 From: Richard Tang Date: Mon, 20 Apr 2026 18:20:10 -0700 Subject: [PATCH] feat: group tools by sessions --- core/frontend/src/lib/chat-helpers.test.ts | 45 ++++++++++ core/frontend/src/lib/chat-helpers.ts | 98 +++++++++++----------- 2 files changed, 94 insertions(+), 49 deletions(-) diff --git a/core/frontend/src/lib/chat-helpers.test.ts b/core/frontend/src/lib/chat-helpers.test.ts index 0e742e24..6374a7eb 100644 --- a/core/frontend/src/lib/chat-helpers.test.ts +++ b/core/frontend/src/lib/chat-helpers.test.ts @@ -594,6 +594,51 @@ describe("replayEventsToMessages", () => { allDone: true, }); }); + + it("uses execution id when resolving tool completions", () => { + const events = [ + makeEvent({ + type: "tool_call_started", + stream_id: "queen", + node_id: "queen", + execution_id: "exec-a", + data: { tool_name: "first_run_tool", tool_use_id: "shared-id" }, + }), + makeEvent({ + type: "tool_call_started", + stream_id: "queen", + node_id: "queen", + execution_id: "exec-b", + data: { tool_name: "second_run_tool", tool_use_id: "shared-id" }, + }), + makeEvent({ + type: "tool_call_completed", + stream_id: "queen", + node_id: "queen", + execution_id: "exec-a", + data: { tool_name: "first_run_tool", tool_use_id: "shared-id" }, + }), + ]; + + const restored = replayEventsToMessages(events, "queen-dm", "Alexandra"); + const firstRunRow = restored.find( + (m) => m.id === "tool-pill-queen-exec-a-0", + ); + const secondRunRow = restored.find( + (m) => m.id === "tool-pill-queen-exec-b-0", + ); + + expect(firstRunRow).toBeDefined(); + expect(secondRunRow).toBeDefined(); + expect(JSON.parse(firstRunRow!.content)).toEqual({ + tools: [{ name: "first_run_tool", done: true }], + allDone: true, + }); + expect(JSON.parse(secondRunRow!.content)).toEqual({ + tools: [{ name: "second_run_tool", done: false }], + allDone: false, + }); + }); }); // --------------------------------------------------------------------------- diff --git a/core/frontend/src/lib/chat-helpers.ts b/core/frontend/src/lib/chat-helpers.ts index 25b337e5..cc0ee9d7 100644 --- a/core/frontend/src/lib/chat-helpers.ts +++ b/core/frontend/src/lib/chat-helpers.ts @@ -250,20 +250,22 @@ export function sseEventToChatMessage( /** * State maintained while replaying an event stream. Tracks per-stream turn - * counters, the set of active tool calls (so tool_status pill content - * reflects "tool A done, tool B running" correctly), and a tool_use_id → - * pill_msg_id map so deferred `tool_call_completed` events can find the - * pill they belong to after the turn counter moves on. + * counters, materialized tool rows, and a pending tool_use_id → row map so + * deferred `tool_call_completed` events can find the exact pill they belong + * to after the turn counter moves on. */ +type ToolRowState = { + streamId: string; + executionId: string; + tools: Record; +}; + export interface ReplayState { turnCounters: Record; - activeToolCalls: Record< - string, - { name: string; done: boolean; streamId: string } - >; + toolRows: Record; toolUseToPill: Record< string, - { msgId: string; name: string; streamId: string } + { msgId: string; toolKey: string; name: string } >; queenIterText: Record>; } @@ -271,24 +273,27 @@ export interface ReplayState { export function newReplayState(): ReplayState { return { turnCounters: {}, - activeToolCalls: {}, + toolRows: {}, toolUseToPill: {}, queenIterText: {}, }; } -function clearToolStateForStream(state: ReplayState, streamId: string): void { - const activeToolCalls: typeof state.activeToolCalls = {}; - for (const [toolUseId, tool] of Object.entries(state.activeToolCalls)) { - if (tool.streamId !== streamId) activeToolCalls[toolUseId] = tool; - } - state.activeToolCalls = activeToolCalls; +function toolLookupKey( + streamId: string, + executionId: string | null | undefined, + toolUseId: string, +): string { + return `${streamId}:${executionId || "exec"}:${toolUseId}`; +} - const toolUseToPill: typeof state.toolUseToPill = {}; - for (const [toolUseId, pill] of Object.entries(state.toolUseToPill)) { - if (pill.streamId !== streamId) toolUseToPill[toolUseId] = pill; - } - state.toolUseToPill = toolUseToPill; +function toolRowContent(row: ToolRowState): string { + const tools = Object.values(row.tools).map((t) => ({ + name: t.name, + done: t.done, + })); + const allDone = tools.length > 0 && tools.every((t) => t.done); + return JSON.stringify({ tools, allDone }); } /** @@ -334,14 +339,6 @@ export function replayEvent( switch (event.type) { case "execution_started": state.turnCounters[turnKey] = currentTurn + 1; - // New executions reset their active tools so stale completed pills - // from a previous run cannot bleed into the next run. - clearToolStateForStream(state, streamId); - break; - case "node_loop_started": - // Queen-triggered scheduler runs emit node_loop_started rather than - // execution_started, so use it as an execution boundary too. - clearToolStateForStream(state, streamId); break; case "llm_turn_complete": state.turnCounters[turnKey] = currentTurn + 1; @@ -350,28 +347,31 @@ export function replayEvent( if (!event.node_id) break; const toolName = (event.data?.tool_name as string) || "unknown"; const toolUseId = (event.data?.tool_use_id as string) || ""; - state.activeToolCalls[toolUseId] = { + const pillId = `tool-pill-${streamId}-${event.execution_id || "exec"}-${currentTurn}`; + const row = + state.toolRows[pillId] || + (state.toolRows[pillId] = { + streamId, + executionId: event.execution_id || "exec", + tools: {}, + }); + const toolKey = toolUseId || `anonymous-${Object.keys(row.tools).length}`; + row.tools[toolKey] = { name: toolName, done: false, - streamId, }; - const pillId = `tool-pill-${streamId}-${event.execution_id || "exec"}-${currentTurn}`; if (toolUseId) { - state.toolUseToPill[toolUseId] = { + state.toolUseToPill[toolLookupKey(streamId, event.execution_id, toolUseId)] = { msgId: pillId, + toolKey, name: toolName, - streamId, }; } - const tools = Object.values(state.activeToolCalls) - .filter((t) => t.streamId === streamId) - .map((t) => ({ name: t.name, done: t.done })); - const allDone = tools.length > 0 && tools.every((t) => t.done); out.push({ id: pillId, agent: effectiveName || event.node_id || "Agent", agentColor: "", - content: JSON.stringify({ tools, allDone }), + content: toolRowContent(row), timestamp: "", type: "tool_status", role, @@ -386,21 +386,21 @@ export function replayEvent( case "tool_call_completed": { if (!event.node_id) break; const toolUseId = (event.data?.tool_use_id as string) || ""; - const tracked = state.toolUseToPill[toolUseId]; - if (toolUseId) delete state.toolUseToPill[toolUseId]; - if (toolUseId && state.activeToolCalls[toolUseId]) { - state.activeToolCalls[toolUseId].done = true; - } + const lookupKey = toolLookupKey(streamId, event.execution_id, toolUseId); + const tracked = state.toolUseToPill[lookupKey]; + if (toolUseId) delete state.toolUseToPill[lookupKey]; if (!tracked) break; - const tools = Object.values(state.activeToolCalls) - .filter((t) => t.streamId === streamId) - .map((t) => ({ name: t.name, done: t.done })); - const allDone = tools.length > 0 && tools.every((t) => t.done); + const row = state.toolRows[tracked.msgId]; + if (!row) break; + row.tools[tracked.toolKey] = { + name: row.tools[tracked.toolKey]?.name || tracked.name, + done: true, + }; out.push({ id: tracked.msgId, agent: effectiveName || event.node_id || "Agent", agentColor: "", - content: JSON.stringify({ tools, allDone }), + content: toolRowContent(row), timestamp: "", type: "tool_status", role,