feat: group tools by sessions

This commit is contained in:
Richard Tang
2026-04-20 18:20:10 -07:00
parent 2644ab953d
commit 4e9d9bf1ea
2 changed files with 94 additions and 49 deletions
@@ -594,6 +594,51 @@ describe("replayEventsToMessages", () => {
allDone: true,
});
});
it("uses execution id when resolving tool completions", () => {
const events = [
makeEvent({
type: "tool_call_started",
stream_id: "queen",
node_id: "queen",
execution_id: "exec-a",
data: { tool_name: "first_run_tool", tool_use_id: "shared-id" },
}),
makeEvent({
type: "tool_call_started",
stream_id: "queen",
node_id: "queen",
execution_id: "exec-b",
data: { tool_name: "second_run_tool", tool_use_id: "shared-id" },
}),
makeEvent({
type: "tool_call_completed",
stream_id: "queen",
node_id: "queen",
execution_id: "exec-a",
data: { tool_name: "first_run_tool", tool_use_id: "shared-id" },
}),
];
const restored = replayEventsToMessages(events, "queen-dm", "Alexandra");
const firstRunRow = restored.find(
(m) => m.id === "tool-pill-queen-exec-a-0",
);
const secondRunRow = restored.find(
(m) => m.id === "tool-pill-queen-exec-b-0",
);
expect(firstRunRow).toBeDefined();
expect(secondRunRow).toBeDefined();
expect(JSON.parse(firstRunRow!.content)).toEqual({
tools: [{ name: "first_run_tool", done: true }],
allDone: true,
});
expect(JSON.parse(secondRunRow!.content)).toEqual({
tools: [{ name: "second_run_tool", done: false }],
allDone: false,
});
});
});
// ---------------------------------------------------------------------------
+49 -49
View File
@@ -250,20 +250,22 @@ export function sseEventToChatMessage(
/**
* State maintained while replaying an event stream. Tracks per-stream turn
* counters, the set of active tool calls (so tool_status pill content
* reflects "tool A done, tool B running" correctly), and a tool_use_id →
* pill_msg_id map so deferred `tool_call_completed` events can find the
* pill they belong to after the turn counter moves on.
* counters, materialized tool rows, and a pending tool_use_id → row map so
* deferred `tool_call_completed` events can find the exact pill they belong
* to after the turn counter moves on.
*/
type ToolRowState = {
streamId: string;
executionId: string;
tools: Record<string, { name: string; done: boolean }>;
};
export interface ReplayState {
turnCounters: Record<string, number>;
activeToolCalls: Record<
string,
{ name: string; done: boolean; streamId: string }
>;
toolRows: Record<string, ToolRowState>;
toolUseToPill: Record<
string,
{ msgId: string; name: string; streamId: string }
{ msgId: string; toolKey: string; name: string }
>;
queenIterText: Record<string, Record<number, string>>;
}
@@ -271,24 +273,27 @@ export interface ReplayState {
export function newReplayState(): ReplayState {
return {
turnCounters: {},
activeToolCalls: {},
toolRows: {},
toolUseToPill: {},
queenIterText: {},
};
}
function clearToolStateForStream(state: ReplayState, streamId: string): void {
const activeToolCalls: typeof state.activeToolCalls = {};
for (const [toolUseId, tool] of Object.entries(state.activeToolCalls)) {
if (tool.streamId !== streamId) activeToolCalls[toolUseId] = tool;
}
state.activeToolCalls = activeToolCalls;
function toolLookupKey(
streamId: string,
executionId: string | null | undefined,
toolUseId: string,
): string {
return `${streamId}:${executionId || "exec"}:${toolUseId}`;
}
const toolUseToPill: typeof state.toolUseToPill = {};
for (const [toolUseId, pill] of Object.entries(state.toolUseToPill)) {
if (pill.streamId !== streamId) toolUseToPill[toolUseId] = pill;
}
state.toolUseToPill = toolUseToPill;
function toolRowContent(row: ToolRowState): string {
const tools = Object.values(row.tools).map((t) => ({
name: t.name,
done: t.done,
}));
const allDone = tools.length > 0 && tools.every((t) => t.done);
return JSON.stringify({ tools, allDone });
}
/**
@@ -334,14 +339,6 @@ export function replayEvent(
switch (event.type) {
case "execution_started":
state.turnCounters[turnKey] = currentTurn + 1;
// New executions reset their active tools so stale completed pills
// from a previous run cannot bleed into the next run.
clearToolStateForStream(state, streamId);
break;
case "node_loop_started":
// Queen-triggered scheduler runs emit node_loop_started rather than
// execution_started, so use it as an execution boundary too.
clearToolStateForStream(state, streamId);
break;
case "llm_turn_complete":
state.turnCounters[turnKey] = currentTurn + 1;
@@ -350,28 +347,31 @@ export function replayEvent(
if (!event.node_id) break;
const toolName = (event.data?.tool_name as string) || "unknown";
const toolUseId = (event.data?.tool_use_id as string) || "";
state.activeToolCalls[toolUseId] = {
const pillId = `tool-pill-${streamId}-${event.execution_id || "exec"}-${currentTurn}`;
const row =
state.toolRows[pillId] ||
(state.toolRows[pillId] = {
streamId,
executionId: event.execution_id || "exec",
tools: {},
});
const toolKey = toolUseId || `anonymous-${Object.keys(row.tools).length}`;
row.tools[toolKey] = {
name: toolName,
done: false,
streamId,
};
const pillId = `tool-pill-${streamId}-${event.execution_id || "exec"}-${currentTurn}`;
if (toolUseId) {
state.toolUseToPill[toolUseId] = {
state.toolUseToPill[toolLookupKey(streamId, event.execution_id, toolUseId)] = {
msgId: pillId,
toolKey,
name: toolName,
streamId,
};
}
const tools = Object.values(state.activeToolCalls)
.filter((t) => t.streamId === streamId)
.map((t) => ({ name: t.name, done: t.done }));
const allDone = tools.length > 0 && tools.every((t) => t.done);
out.push({
id: pillId,
agent: effectiveName || event.node_id || "Agent",
agentColor: "",
content: JSON.stringify({ tools, allDone }),
content: toolRowContent(row),
timestamp: "",
type: "tool_status",
role,
@@ -386,21 +386,21 @@ export function replayEvent(
case "tool_call_completed": {
if (!event.node_id) break;
const toolUseId = (event.data?.tool_use_id as string) || "";
const tracked = state.toolUseToPill[toolUseId];
if (toolUseId) delete state.toolUseToPill[toolUseId];
if (toolUseId && state.activeToolCalls[toolUseId]) {
state.activeToolCalls[toolUseId].done = true;
}
const lookupKey = toolLookupKey(streamId, event.execution_id, toolUseId);
const tracked = state.toolUseToPill[lookupKey];
if (toolUseId) delete state.toolUseToPill[lookupKey];
if (!tracked) break;
const tools = Object.values(state.activeToolCalls)
.filter((t) => t.streamId === streamId)
.map((t) => ({ name: t.name, done: t.done }));
const allDone = tools.length > 0 && tools.every((t) => t.done);
const row = state.toolRows[tracked.msgId];
if (!row) break;
row.tools[tracked.toolKey] = {
name: row.tools[tracked.toolKey]?.name || tracked.name,
done: true,
};
out.push({
id: tracked.msgId,
agent: effectiveName || event.node_id || "Agent",
agentColor: "",
content: JSON.stringify({ tools, allDone }),
content: toolRowContent(row),
timestamp: "",
type: "tool_status",
role,