Merge remote-tracking branch 'origin/feat/question-widget' into queen-mode-separation

This commit is contained in:
Richard Tang
2026-03-03 20:09:10 -08:00
18 changed files with 2097 additions and 461 deletions
@@ -651,21 +651,36 @@ stop_worker() to return to STAGING mode.
_queen_behavior = """
# Behavior
## CRITICAL RULE — ask_user tool
Every response that ends with a question, a prompt, or expects user \
input MUST finish with a call to ask_user(prompt, options). This is \
NON-NEGOTIABLE. The system CANNOT detect that you are waiting for \
input unless you call ask_user. You MUST call ask_user as the LAST \
action in your response.
NEVER end a response with a question in text without calling ask_user. \
NEVER rely on the user seeing your text and replying call ask_user.
Always provide 2-4 short options that cover the most likely answers. \
The user can always type a custom response.
Examples:
- ask_user("What do you need?",
["Build a new agent", "Run the loaded worker", "Help with code"])
- ask_user("Which pattern?",
["Simple 2-node", "Rich with feedback", "Custom"])
- ask_user("Ready to proceed?",
["Yes, go ahead", "Let me change something"])
## Greeting and identity
When the user greets you ("hi", "hello") or asks what you can do / \
what you are, respond concisely. DO NOT list internal processes \
(validation steps, AgentRunner.load, tool discovery). Focus on \
user-facing capabilities:
1. Direct capabilities: file operations, shell commands, coding, \
agent building & debugging.
2. Delegation: describe what the loaded worker does in one sentence \
(read the Worker Profile at the end of this prompt). If no worker \
is loaded, say so.
3. End with a short prompt: "What do you need?"
Keep it under 10 lines. No bullet-point dumps of every tool you have.
When the user greets you or asks what you can do, respond concisely \
(under 10 lines). DO NOT list internal processes. Focus on:
1. Direct capabilities: coding, agent building & debugging.
2. What the loaded worker does (one sentence from Worker Profile). \
If no worker is loaded, say so.
3. THEN call ask_user to prompt them do NOT just write text.
## Direct coding
You can do any coding task directly reading files, writing code, running \
@@ -715,24 +730,37 @@ explain the problem clearly and help fix it. For credential errors, \
guide the user to set up the missing credentials. For structural \
issues, offer to fix the agent graph directly.
## When worker is running:
- If the user asks about progress, call get_worker_status() ONCE and \
report the result. Do NOT poll in a loop.
- NEVER call get_worker_status() repeatedly without user input in between. \
The worker will surface results through client-facing nodes. You do not \
need to monitor it. One check per user request is enough.
- If the user has a concern or instruction for the worker, call \
inject_worker_message(content) to relay it.
- You can still do coding tasks directly while the worker runs.
- If an escalation ticket arrives from the judge, assess severity:
- Low/transient: acknowledge silently, do not disturb the user.
- High/critical: notify the user with a brief analysis and suggested action.
- After starting the worker or checking its status, WAIT for the user's \
next message. Do not take autonomous actions unless the user asks.
## When worker is running — GO SILENT
## When worker asks user a question:
- The system will route the user's response directly to the worker. \
You do not need to relay it. The user will come back to you after responding.
Once you call start_worker(), your job is DONE. Do NOT call ask_user, \
do NOT call get_worker_status(), do NOT emit any text. Just stop. \
The worker owns the conversation now it has its own client-facing \
nodes that talk to the user directly.
**After start_worker, your ENTIRE response should be ONE short \
confirmation sentence with NO tool calls.** Example: \
"Started the vulnerability assessment." that's it. No ask_user, \
no get_worker_status, no follow-up questions.
You only wake up again when:
- The user explicitly addresses you (not answering a worker question)
- A worker question is forwarded to you for relay
- An escalation ticket arrives from the judge
- The worker finishes
If the user explicitly asks about progress, call get_worker_status() \
ONCE and report. Do NOT poll or check proactively.
For escalation tickets: low/transient acknowledge silently. \
High/critical notify the user with a brief analysis.
## When the worker asks the user a question:
- The user's answer is routed to you with context: \
[Worker asked: "...", Options: ...] User answered: "...".
- If the user is answering the worker's question normally, relay it \
using inject_worker_message(answer_text). Then go silent again.
- If the user is rejecting the approach, asking to stop, or giving \
you an instruction, handle it yourself do NOT relay.
## Showing or describing the loaded worker
+208 -38
View File
@@ -152,6 +152,74 @@ def _compact_tool_calls(tool_calls: list[dict[str, Any]]) -> list[dict[str, Any]
return compact
def extract_tool_call_history(messages: list[Message], max_entries: int = 30) -> str:
"""Build a compact tool call history from a list of messages.
Used in compaction summaries to prevent the LLM from re-calling
tools it already called. Extracts tool call details, files saved,
outputs set, and errors encountered.
"""
tool_calls_detail: dict[str, list[str]] = {}
files_saved: list[str] = []
outputs_set: list[str] = []
errors: list[str] = []
def _summarize_input(name: str, args: dict) -> str:
if name == "web_search":
return args.get("query", "")
if name == "web_scrape":
return args.get("url", "")
if name in ("load_data", "save_data"):
return args.get("filename", "")
return ""
for msg in messages:
if msg.role == "assistant" and msg.tool_calls:
for tc in msg.tool_calls:
func = tc.get("function", {})
name = func.get("name", "unknown")
try:
args = json.loads(func.get("arguments", "{}"))
except (json.JSONDecodeError, TypeError):
args = {}
summary = _summarize_input(name, args)
tool_calls_detail.setdefault(name, []).append(summary)
if name == "save_data" and args.get("filename"):
files_saved.append(args["filename"])
if name == "set_output" and args.get("key"):
outputs_set.append(args["key"])
if msg.role == "tool" and msg.is_error:
preview = msg.content[:120].replace("\n", " ")
errors.append(preview)
parts: list[str] = []
if tool_calls_detail:
lines: list[str] = []
for name, inputs in list(tool_calls_detail.items())[:max_entries]:
count = len(inputs)
non_empty = [s for s in inputs if s]
if non_empty:
detail_lines = [f" - {s[:120]}" for s in non_empty[:8]]
lines.append(f" {name} ({count}x):\n" + "\n".join(detail_lines))
else:
lines.append(f" {name} ({count}x)")
parts.append("TOOLS ALREADY CALLED:\n" + "\n".join(lines))
if files_saved:
unique = list(dict.fromkeys(files_saved))
parts.append("FILES SAVED: " + ", ".join(unique))
if outputs_set:
unique = list(dict.fromkeys(outputs_set))
parts.append("OUTPUTS SET: " + ", ".join(unique))
if errors:
parts.append(
"ERRORS (do NOT retry these):\n" + "\n".join(f" - {e}" for e in errors[:10])
)
return "\n\n".join(parts)
# ---------------------------------------------------------------------------
# ConversationStore protocol (Phase 2)
# ---------------------------------------------------------------------------
@@ -373,9 +441,36 @@ class NodeConversation:
def _repair_orphaned_tool_calls(
msgs: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Ensure every tool_call has a matching tool-result message."""
"""Ensure tool_call / tool_result pairs are consistent.
1. **Orphaned tool results** (tool_result with no preceding tool_use)
are dropped. This happens when compaction removes an assistant
message but leaves its tool-result messages behind.
2. **Orphaned tool calls** (tool_use with no following tool_result)
get a synthetic error result appended. This happens when a loop
is cancelled mid-tool-execution.
"""
# Pass 1: collect all tool_call IDs from assistant messages so we
# can identify orphaned tool-result messages.
all_tool_call_ids: set[str] = set()
for m in msgs:
if m.get("role") == "assistant":
for tc in m.get("tool_calls") or []:
tc_id = tc.get("id")
if tc_id:
all_tool_call_ids.add(tc_id)
# Pass 2: build repaired list — drop orphaned tool results, patch
# missing tool results.
repaired: list[dict[str, Any]] = []
for i, m in enumerate(msgs):
# Drop tool-result messages whose tool_call_id has no matching
# tool_use in any assistant message (orphaned by compaction).
if m.get("role") == "tool":
tid = m.get("tool_call_id")
if tid and tid not in all_tool_call_ids:
continue # skip orphaned result
repaired.append(m)
tool_calls = m.get("tool_calls")
if m.get("role") != "assistant" or not tool_calls:
@@ -653,6 +748,7 @@ class NodeConversation:
spillover_dir: str,
keep_recent: int = 4,
phase_graduated: bool = False,
aggressive: bool = False,
) -> None:
"""Structure-preserving compaction: save freeform text to file, keep tool messages.
@@ -662,6 +758,11 @@ class NodeConversation:
after pruning. Only freeform text exchanges (user messages,
text-only assistant messages) are saved to a file and removed.
When *aggressive* is True, non-essential tool call pairs are also
collapsed into a compact summary instead of being kept individually.
Only ``set_output`` calls and error results are preserved; all other
old tool pairs are replaced by a tool-call history summary.
The result: the agent retains exact knowledge of what tools it called,
where each result is stored, and can load the conversation text if
needed. No LLM summary call. No heuristics. Nothing lost.
@@ -693,35 +794,92 @@ class NodeConversation:
# Classify old messages: structural (keep) vs freeform (save to file)
kept_structural: list[Message] = []
freeform_lines: list[str] = []
collapsed_msgs: list[Message] = []
for msg in old_messages:
if msg.role == "tool":
# Tool results — already pruned to ~30 tokens (file reference).
# Keep in conversation.
kept_structural.append(msg)
elif msg.role == "assistant" and msg.tool_calls:
# Assistant message with tool_calls — keep the tool_calls
# with truncated arguments, clear the freeform text content.
compact_tcs = _compact_tool_calls(msg.tool_calls)
kept_structural.append(
Message(
seq=msg.seq,
role=msg.role,
content="",
tool_calls=compact_tcs,
is_error=msg.is_error,
phase_id=msg.phase_id,
is_transition_marker=msg.is_transition_marker,
)
if aggressive:
# Aggressive: only keep set_output tool pairs and error results.
# Everything else is collapsed into a tool-call history summary.
# We need to track tool_call IDs to pair assistant messages with
# their tool results.
protected_tc_ids: set[str] = set()
collapsible_tc_ids: set[str] = set()
# First pass: classify assistant messages
for msg in old_messages:
if msg.role != "assistant" or not msg.tool_calls:
continue
has_protected = any(
tc.get("function", {}).get("name") == "set_output"
for tc in msg.tool_calls
)
else:
# Freeform text (user messages, text-only assistant messages)
# — save to file and remove from conversation.
role_label = msg.role
text = msg.content
if len(text) > 2000:
text = text[:2000] + ""
freeform_lines.append(f"[{role_label}] (seq={msg.seq}): {text}")
tc_ids = {tc.get("id", "") for tc in msg.tool_calls}
if has_protected:
protected_tc_ids |= tc_ids
else:
collapsible_tc_ids |= tc_ids
# Second pass: classify all messages
for msg in old_messages:
if msg.role == "tool":
tc_id = msg.tool_use_id or ""
if tc_id in protected_tc_ids:
kept_structural.append(msg)
elif msg.is_error:
# Error results are always protected
kept_structural.append(msg)
# Protect the parent assistant message too
protected_tc_ids.add(tc_id)
else:
collapsed_msgs.append(msg)
elif msg.role == "assistant" and msg.tool_calls:
tc_ids = {tc.get("id", "") for tc in msg.tool_calls}
if tc_ids & protected_tc_ids:
# Has at least one protected tool call — keep entire msg
compact_tcs = _compact_tool_calls(msg.tool_calls)
kept_structural.append(
Message(
seq=msg.seq,
role=msg.role,
content="",
tool_calls=compact_tcs,
is_error=msg.is_error,
phase_id=msg.phase_id,
is_transition_marker=msg.is_transition_marker,
)
)
else:
collapsed_msgs.append(msg)
else:
# Freeform text — save to file
role_label = msg.role
text = msg.content
if len(text) > 2000:
text = text[:2000] + ""
freeform_lines.append(f"[{role_label}] (seq={msg.seq}): {text}")
else:
# Standard mode: keep all tool call pairs as structural
for msg in old_messages:
if msg.role == "tool":
kept_structural.append(msg)
elif msg.role == "assistant" and msg.tool_calls:
compact_tcs = _compact_tool_calls(msg.tool_calls)
kept_structural.append(
Message(
seq=msg.seq,
role=msg.role,
content="",
tool_calls=compact_tcs,
is_error=msg.is_error,
phase_id=msg.phase_id,
is_transition_marker=msg.is_transition_marker,
)
)
else:
role_label = msg.role
text = msg.content
if len(text) > 2000:
text = text[:2000] + ""
freeform_lines.append(f"[{role_label}] (seq={msg.seq}): {text}")
# Write freeform text to a numbered conversation file
spill_path = Path(spillover_dir)
@@ -741,13 +899,25 @@ class NodeConversation:
conv_filename = ""
# Build reference message
ref_parts: list[str] = []
if conv_filename:
ref_content = (
ref_parts.append(
f"[Previous conversation saved to '{conv_filename}'. "
f"Use load_data('{conv_filename}') to review if needed.]"
)
else:
ref_content = "[Previous freeform messages compacted.]"
elif not collapsed_msgs:
ref_parts.append("[Previous freeform messages compacted.]")
# Aggressive: add collapsed tool-call history to the reference
if collapsed_msgs:
tool_history = extract_tool_call_history(collapsed_msgs)
if tool_history:
ref_parts.append(tool_history)
elif not ref_parts:
ref_parts.append("[Previous tool calls compacted.]")
ref_content = "\n\n".join(ref_parts)
# Use a seq just before the first kept message
recent_messages = list(self._messages[split:])
if kept_structural:
@@ -760,15 +930,15 @@ class NodeConversation:
ref_msg = Message(seq=ref_seq, role="user", content=ref_content)
# Persist: delete old messages from store, write reference + kept structural
# Persist: delete old messages from store, write reference + kept structural.
# In aggressive mode, collapsed messages may be interspersed with kept
# messages, so we delete everything before the recent boundary and
# rewrite only what we want to keep.
if self._store:
first_kept_seq = (
kept_structural[0].seq
if kept_structural
else (recent_messages[0].seq if recent_messages else self._next_seq)
recent_boundary = (
recent_messages[0].seq if recent_messages else self._next_seq
)
# Delete everything before the first structural message we're keeping
await self._store.delete_parts_before(first_kept_seq)
await self._store.delete_parts_before(recent_boundary)
# Write the reference message
await self._store.write_part(ref_msg.seq, ref_msg.to_storage_dict())
# Write kept structural messages (they may have been modified)
File diff suppressed because it is too large Load Diff
+154 -22
View File
@@ -289,6 +289,114 @@ class GraphExecutor:
return errors
# Max chars of formatted messages before proactively splitting for LLM.
_PHASE_LLM_CHAR_LIMIT = 240_000
_PHASE_LLM_MAX_DEPTH = 10
async def _phase_llm_compact(
self,
conversation: Any,
next_spec: NodeSpec,
messages: list,
_depth: int = 0,
) -> str:
"""Summarise messages for phase-boundary compaction.
Uses the same recursive binary-search splitting as EventLoopNode.
"""
from framework.graph.conversation import extract_tool_call_history
from framework.graph.event_loop_node import _is_context_too_large_error
if _depth > self._PHASE_LLM_MAX_DEPTH:
raise RuntimeError("Phase LLM compaction recursion limit")
# Format messages
lines: list[str] = []
for m in messages:
if m.role == "tool":
c = m.content[:500] + ("..." if len(m.content) > 500 else "")
lines.append(f"[tool result]: {c}")
elif m.role == "assistant" and m.tool_calls:
names = [
tc.get("function", {}).get("name", "?")
for tc in m.tool_calls
]
lines.append(f"[assistant (calls: {', '.join(names)})]: "
f"{m.content[:200] if m.content else ''}")
else:
lines.append(f"[{m.role}]: {m.content}")
formatted = "\n\n".join(lines)
# Proactive split
if len(formatted) > self._PHASE_LLM_CHAR_LIMIT and len(messages) > 1:
summary = await self._phase_llm_compact_split(
conversation, next_spec, messages, _depth,
)
else:
max_tokens = getattr(conversation, "_max_history_tokens", 32000)
target_tokens = max_tokens // 2
target_chars = target_tokens * 4
prompt = (
"You are compacting an AI agent's conversation history "
"at a phase boundary.\n\n"
f"NEXT PHASE: {next_spec.name}\n"
)
if next_spec.description:
prompt += f"NEXT PHASE PURPOSE: {next_spec.description}\n"
prompt += (
f"\nCONVERSATION MESSAGES:\n{formatted}\n\n"
"INSTRUCTIONS:\n"
f"Write a summary of approximately {target_chars} characters "
f"(~{target_tokens} tokens).\n"
"Preserve user-stated rules, constraints, and preferences "
"verbatim. Preserve key decisions and results from earlier "
"phases. Preserve context needed for the next phase.\n"
)
summary_budget = max(1024, max_tokens // 2)
try:
response = await self._llm.acomplete(
messages=[{"role": "user", "content": prompt}],
system=(
"You are a conversation compactor. Write a detailed "
"summary preserving context for the next phase."
),
max_tokens=summary_budget,
)
summary = response.content
except Exception as e:
if _is_context_too_large_error(e) and len(messages) > 1:
summary = await self._phase_llm_compact_split(
conversation, next_spec, messages, _depth,
)
else:
raise
# Append tool history at top level only
if _depth == 0:
tool_history = extract_tool_call_history(messages)
if tool_history and "TOOLS ALREADY CALLED" not in summary:
summary += "\n\n" + tool_history
return summary
async def _phase_llm_compact_split(
self,
conversation: Any,
next_spec: NodeSpec,
messages: list,
_depth: int,
) -> str:
"""Split messages in half and summarise each half."""
mid = max(1, len(messages) // 2)
s1 = await self._phase_llm_compact(
conversation, next_spec, messages[:mid], _depth + 1,
)
s2 = await self._phase_llm_compact(
conversation, next_spec, messages[mid:], _depth + 1,
)
return s1 + "\n\n" + s2
async def execute(
self,
graph: GraphSpec,
@@ -1294,9 +1402,7 @@ class GraphExecutor:
# Set current phase for phase-aware compaction
continuous_conversation.set_current_phase(next_spec.id)
# Opportunistic compaction at transition:
# 1. Prune old tool results (free, no LLM call)
# 2. If still over 80%, do a phase-graduated compact
# Phase-boundary compaction (same flow as EventLoopNode._compact)
if continuous_conversation.usage_ratio() > 0.5:
await continuous_conversation.prune_old_tool_results(
protect_tokens=2000,
@@ -1308,40 +1414,66 @@ class GraphExecutor:
_phase_ratio * 100,
)
_data_dir = (
str(self._storage_path / "data") if self._storage_path else None
str(self._storage_path / "data")
if self._storage_path
else None
)
# Step 1: Structural compaction (>=80%)
if _data_dir:
_pre = continuous_conversation.usage_ratio()
await continuous_conversation.compact_preserving_structure(
spillover_dir=_data_dir,
keep_recent=4,
phase_graduated=True,
)
# Circuit breaker: if still over budget, fall back
_post_ratio = continuous_conversation.usage_ratio()
if _post_ratio >= 0.9 * _phase_ratio:
self.logger.warning(
" Structure-preserving compaction ineffective "
"(%.0f%% -> %.0f%%), falling back to summary",
_phase_ratio * 100,
_post_ratio * 100,
)
summary = (
f"Summary of earlier phases (before {next_spec.name}). "
"See transition markers for phase details."
)
await continuous_conversation.compact(
summary,
if continuous_conversation.usage_ratio() >= 0.9 * _pre:
await continuous_conversation.compact_preserving_structure(
spillover_dir=_data_dir,
keep_recent=4,
phase_graduated=True,
aggressive=True,
)
else:
# Step 2: LLM compaction (>95%)
if (
continuous_conversation.usage_ratio() > 0.95
and self._llm is not None
):
self.logger.info(
" LLM phase-boundary compaction "
"(%.0f%% usage)",
continuous_conversation.usage_ratio() * 100,
)
try:
_llm_summary = await self._phase_llm_compact(
continuous_conversation,
next_spec,
list(continuous_conversation.messages),
)
await continuous_conversation.compact(
_llm_summary,
keep_recent=2,
phase_graduated=True,
)
except Exception as e:
self.logger.warning(
" Phase LLM compaction failed: %s", e,
)
# Step 3: Emergency (only if still over budget)
if continuous_conversation.needs_compaction():
self.logger.warning(
" Emergency phase compaction (%.0f%%)",
continuous_conversation.usage_ratio() * 100,
)
summary = (
f"Summary of earlier phases (before {next_spec.name}). "
f"Summary of earlier phases "
f"(before {next_spec.name}). "
"See transition markers for phase details."
)
await continuous_conversation.compact(
summary,
keep_recent=4,
keep_recent=1,
phase_graduated=True,
)
+11 -2
View File
@@ -718,15 +718,24 @@ class EventBus:
node_id: str,
prompt: str = "",
execution_id: str | None = None,
options: list[str] | None = None,
) -> None:
"""Emit client input requested event (client_facing=True nodes)."""
"""Emit client input requested event (client_facing=True nodes).
Args:
options: Optional predefined choices for the user (1-3 items).
The frontend appends an "Other" free-text option automatically.
"""
data: dict[str, Any] = {"prompt": prompt}
if options:
data["options"] = options
await self.publish(
AgentEvent(
type=EventType.CLIENT_INPUT_REQUESTED,
stream_id=stream_id,
node_id=node_id,
execution_id=execution_id,
data={"prompt": prompt},
data=data,
)
)
+9 -2
View File
@@ -511,9 +511,11 @@ class ExecutionStream:
logger.debug(f"Queued execution {execution_id} for stream {self.stream_id}")
return execution_id
# Errors that indicate a fundamental configuration or environment problem.
# Resurrecting after these is pointless — the same error will recur.
# Errors that indicate resurrection won't help — the same error will recur.
# Includes both configuration/environment errors and deterministic node
# failures where the conversation/state hasn't changed.
_FATAL_ERROR_PATTERNS: tuple[str, ...] = (
# Configuration / environment
"credential",
"authentication",
"unauthorized",
@@ -525,6 +527,11 @@ class ExecutionStream:
"permission denied",
"invalid api",
"configuration error",
# Deterministic node failures — resurrecting at the same node with
# the same conversation produces the same result.
"node stalled",
"ghost empty stream",
"max iterations",
)
@classmethod
+23
View File
@@ -132,6 +132,29 @@ async def handle_events(request: web.Request) -> web.StreamResponse:
"SSE connected: session='%s', sub_id='%s', types=%d", session.id, sub_id, len(event_types)
)
# Replay buffered events that were published before this SSE connected.
# The EventBus keeps a history ring-buffer; we replay the subset that
# produces visible chat messages so the frontend never misses early
# queen output. Lifecycle events are NOT replayed to avoid duplicate
# state transitions (turn counter increments, etc.).
_REPLAY_TYPES = {
EventType.CLIENT_OUTPUT_DELTA.value,
EventType.EXECUTION_STARTED.value,
EventType.CLIENT_INPUT_REQUESTED.value,
}
event_type_values = {et.value for et in event_types}
replay_types = _REPLAY_TYPES & event_type_values
replayed = 0
for past_event in event_bus._event_history:
if past_event.type.value in replay_types:
try:
queue.put_nowait(past_event.to_dict())
replayed += 1
except asyncio.QueueFull:
break
if replayed:
logger.info("SSE replayed %d buffered events for session='%s'", replayed, session.id)
event_count = 0
close_reason = "unknown"
try:
+30
View File
@@ -134,6 +134,35 @@ async def handle_chat(request: web.Request) -> web.Response:
return web.json_response({"error": "Queen not available"}, status=503)
async def handle_queen_context(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/queen-context — queue context for the queen.
Unlike /chat, this does NOT trigger an LLM response. The message is
queued in the queen's injection queue and will be drained on her next
natural iteration (prefixed with [External event]:).
Body: {"message": "..."}
"""
session, err = resolve_session(request)
if err:
return err
body = await request.json()
message = body.get("message", "")
if not message:
return web.json_response({"error": "message is required"}, status=400)
queen_executor = session.queen_executor
if queen_executor is not None:
node = queen_executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(message, is_client_input=False)
return web.json_response({"status": "queued", "delivered": True})
return web.json_response({"error": "Queen not available"}, status=503)
async def handle_worker_input(request: web.Request) -> web.Response:
"""POST /api/sessions/{session_id}/worker-input — send input to waiting worker node.
@@ -385,6 +414,7 @@ def register_routes(app: web.Application) -> None:
app.router.add_post("/api/sessions/{session_id}/trigger", handle_trigger)
app.router.add_post("/api/sessions/{session_id}/inject", handle_inject)
app.router.add_post("/api/sessions/{session_id}/chat", handle_chat)
app.router.add_post("/api/sessions/{session_id}/queen-context", handle_queen_context)
app.router.add_post("/api/sessions/{session_id}/worker-input", handle_worker_input)
app.router.add_post("/api/sessions/{session_id}/pause", handle_stop)
app.router.add_post("/api/sessions/{session_id}/resume", handle_resume)
@@ -838,7 +838,7 @@ def register_queen_lifecycle_tools(
injectable = stream.get_injectable_nodes()
if injectable:
target_node_id = injectable[0]["node_id"]
ok = await stream.inject_input(target_node_id, content)
ok = await stream.inject_input(target_node_id, content, is_client_input=True)
if ok:
return json.dumps(
{
+4
View File
@@ -37,6 +37,10 @@ export const executionApi = {
chat: (sessionId: string, message: string) =>
api.post<ChatResult>(`/sessions/${sessionId}/chat`, { message }),
/** Queue context for the queen without triggering an LLM response. */
queenContext: (sessionId: string, message: string) =>
api.post<ChatResult>(`/sessions/${sessionId}/queen-context`, { message }),
workerInput: (sessionId: string, message: string) =>
api.post<ChatResult>(`/sessions/${sessionId}/worker-input`, { message }),
+116 -66
View File
@@ -1,6 +1,7 @@
import { memo, useState, useRef, useEffect } from "react";
import { Send, Square, Crown, Cpu, Check, Loader2, Reply } from "lucide-react";
import { Send, Square, Crown, Cpu, Check, Loader2 } from "lucide-react";
import MarkdownContent from "@/components/MarkdownContent";
import QuestionWidget from "@/components/QuestionWidget";
export interface ChatMessage {
id: string;
@@ -20,15 +21,23 @@ interface ChatPanelProps {
messages: ChatMessage[];
onSend: (message: string, thread: string) => void;
isWaiting?: boolean;
/** When true a worker is thinking (not yet streaming) */
isWorkerWaiting?: boolean;
/** When true the queen is busy (typing or streaming) — shows the stop button */
isBusy?: boolean;
activeThread: string;
/** When true, the worker is waiting for user input — shows inline reply box */
workerAwaitingInput?: boolean;
/** When true, the input is disabled (e.g. during loading) */
disabled?: boolean;
/** Called when user clicks the stop button to cancel the queen's current turn */
onCancel?: () => void;
/** Called when user submits a reply to the worker's input request */
onWorkerReply?: (message: string) => void;
/** Pending question from ask_user — replaces textarea when present */
pendingQuestion?: string | null;
/** Options for the pending question */
pendingOptions?: string[] | null;
/** Called when user submits an answer to the pending question */
onQuestionSubmit?: (answer: string, isOther: boolean) => void;
/** Called when user dismisses the pending question without answering */
onQuestionDismiss?: () => void;
/** Queen operating mode — shown as a tag on queen messages */
queenMode?: "building" | "staging" | "running";
}
@@ -287,10 +296,12 @@ const MessageBubble = memo(function MessageBubble({ msg, queenMode }: { msg: Cha
);
}, (prev, next) => prev.msg.id === next.msg.id && prev.msg.content === next.msg.content && prev.queenMode === next.queenMode);
export default function ChatPanel({ messages, onSend, isWaiting, activeThread, workerAwaitingInput, disabled, onCancel, onWorkerReply, queenMode }: ChatPanelProps) {
export default function ChatPanel({ messages, onSend, isWaiting, isWorkerWaiting, isBusy, activeThread, disabled, onCancel, pendingQuestion, pendingOptions, onQuestionSubmit, onQuestionDismiss, queenMode }: ChatPanelProps) {
const [input, setInput] = useState("");
const [readMap, setReadMap] = useState<Record<string, number>>({});
const bottomRef = useRef<HTMLDivElement>(null);
const scrollRef = useRef<HTMLDivElement>(null);
const stickToBottom = useRef(true);
const textareaRef = useRef<HTMLTextAreaElement>(null);
const threadMessages = messages.filter((m) => {
@@ -307,10 +318,24 @@ export default function ChatPanel({ messages, onSend, isWaiting, activeThread, w
// Suppress unused var
void readMap;
const lastMsg = threadMessages[threadMessages.length - 1];
// Autoscroll: only when user is already near the bottom
const handleScroll = () => {
const el = scrollRef.current;
if (!el) return;
const distFromBottom = el.scrollHeight - el.scrollTop - el.clientHeight;
stickToBottom.current = distFromBottom < 80;
};
useEffect(() => {
bottomRef.current?.scrollIntoView({ behavior: "smooth" });
}, [threadMessages.length, lastMsg?.content, workerAwaitingInput]);
if (stickToBottom.current) {
bottomRef.current?.scrollIntoView({ behavior: "smooth" });
}
}, [threadMessages, pendingQuestion, isWaiting, isWorkerWaiting]);
// Always start pinned to bottom when switching threads
useEffect(() => {
stickToBottom.current = true;
}, [activeThread]);
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
@@ -320,17 +345,6 @@ export default function ChatPanel({ messages, onSend, isWaiting, activeThread, w
if (textareaRef.current) textareaRef.current.style.height = "auto";
};
// Find the last worker message to attach the inline reply box below.
// For explicit ask_user, this will be the worker_input_request message.
// For auto-block, this will be the last client_output_delta streamed message.
const lastWorkerMsgIdx = workerAwaitingInput
? threadMessages.reduce(
(last, m, i) =>
m.role === "worker" && m.type !== "tool_status" && m.type !== "system" ? i : last,
-1,
)
: -1;
return (
<div className="flex flex-col h-full min-w-0">
{/* Compact sub-header */}
@@ -339,8 +353,8 @@ export default function ChatPanel({ messages, onSend, isWaiting, activeThread, w
</div>
{/* Messages */}
<div className="flex-1 overflow-auto px-5 py-4 space-y-3">
{threadMessages.map((msg, idx) => (
<div ref={scrollRef} onScroll={handleScroll} className="flex-1 overflow-auto px-5 py-4 space-y-3">
{threadMessages.map((msg) => (
<div key={msg.id}>
<MessageBubble msg={msg} queenMode={queenMode} />
{idx === lastWorkerMsgIdx && onWorkerReply && (
@@ -351,8 +365,35 @@ export default function ChatPanel({ messages, onSend, isWaiting, activeThread, w
{isWaiting && (
<div className="flex gap-3">
<div className="w-7 h-7 rounded-xl bg-muted flex items-center justify-center">
<Cpu className="w-3.5 h-3.5 text-muted-foreground" />
<div
className="flex-shrink-0 w-9 h-9 rounded-xl flex items-center justify-center"
style={{
backgroundColor: `${queenColor}18`,
border: `1.5px solid ${queenColor}35`,
boxShadow: `0 0 12px ${queenColor}20`,
}}
>
<Crown className="w-4 h-4" style={{ color: queenColor }} />
</div>
<div className="border border-primary/20 bg-primary/5 rounded-2xl rounded-tl-md px-4 py-3">
<div className="flex gap-1.5">
<span className="w-1.5 h-1.5 rounded-full bg-muted-foreground animate-bounce" style={{ animationDelay: "0ms" }} />
<span className="w-1.5 h-1.5 rounded-full bg-muted-foreground animate-bounce" style={{ animationDelay: "150ms" }} />
<span className="w-1.5 h-1.5 rounded-full bg-muted-foreground animate-bounce" style={{ animationDelay: "300ms" }} />
</div>
</div>
</div>
)}
{isWorkerWaiting && !isWaiting && (
<div className="flex gap-3">
<div
className="flex-shrink-0 w-7 h-7 rounded-xl flex items-center justify-center"
style={{
backgroundColor: `${workerColor}18`,
border: `1.5px solid ${workerColor}35`,
}}
>
<Cpu className="w-3.5 h-3.5" style={{ color: workerColor }} />
</div>
<div className="bg-muted/60 rounded-2xl rounded-tl-md px-4 py-3">
<div className="flex gap-1.5">
@@ -366,48 +407,57 @@ export default function ChatPanel({ messages, onSend, isWaiting, activeThread, w
<div ref={bottomRef} />
</div>
{/* Input — always connected to Queen */}
<form onSubmit={handleSubmit} className="p-4 border-t border-border">
<div className="flex items-center gap-3 bg-muted/40 rounded-xl px-4 py-2.5 border border-border focus-within:border-primary/40 transition-colors">
<textarea
ref={textareaRef}
rows={1}
value={input}
onChange={(e) => {
setInput(e.target.value);
const ta = e.target;
ta.style.height = "auto";
ta.style.height = `${Math.min(ta.scrollHeight, 160)}px`;
}}
onKeyDown={(e) => {
if (e.key === "Enter" && !e.shiftKey) {
e.preventDefault();
handleSubmit(e);
}
}}
placeholder={disabled ? "Connecting to agent..." : "Message Queen Bee..."}
disabled={disabled}
className="flex-1 bg-transparent text-sm text-foreground outline-none placeholder:text-muted-foreground disabled:opacity-50 disabled:cursor-not-allowed resize-none overflow-y-auto"
/>
{isWaiting && onCancel ? (
<button
type="button"
onClick={onCancel}
className="p-2 rounded-lg bg-destructive text-destructive-foreground hover:opacity-90 transition-opacity"
>
<Square className="w-4 h-4" />
</button>
) : (
<button
type="submit"
disabled={!input.trim() || disabled}
className="p-2 rounded-lg bg-primary text-primary-foreground disabled:opacity-30 hover:opacity-90 transition-opacity"
>
<Send className="w-4 h-4" />
</button>
)}
</div>
</form>
{/* Input area — question widget replaces textarea when a question is pending */}
{pendingQuestion && pendingOptions && onQuestionSubmit ? (
<QuestionWidget
question={pendingQuestion}
options={pendingOptions}
onSubmit={onQuestionSubmit}
onDismiss={onQuestionDismiss}
/>
) : (
<form onSubmit={handleSubmit} className="p-4">
<div className="flex items-center gap-3 bg-muted/40 rounded-xl px-4 py-2.5 border border-border focus-within:border-primary/40 transition-colors">
<textarea
ref={textareaRef}
rows={1}
value={input}
onChange={(e) => {
setInput(e.target.value);
const ta = e.target;
ta.style.height = "auto";
ta.style.height = `${Math.min(ta.scrollHeight, 160)}px`;
}}
onKeyDown={(e) => {
if (e.key === "Enter" && !e.shiftKey) {
e.preventDefault();
handleSubmit(e);
}
}}
placeholder={disabled ? "Connecting to agent..." : "Message Queen Bee..."}
disabled={disabled}
className="flex-1 bg-transparent text-sm text-foreground outline-none placeholder:text-muted-foreground disabled:opacity-50 disabled:cursor-not-allowed resize-none overflow-y-auto"
/>
{isBusy && onCancel ? (
<button
type="button"
onClick={onCancel}
className="p-2 rounded-lg bg-amber-500/15 text-amber-400 border border-amber-500/40 hover:bg-amber-500/25 transition-colors"
>
<Square className="w-4 h-4" />
</button>
) : (
<button
type="submit"
disabled={!input.trim() || disabled}
className="p-2 rounded-lg bg-primary text-primary-foreground disabled:opacity-30 hover:opacity-90 transition-opacity"
>
<Send className="w-4 h-4" />
</button>
)}
</div>
</form>
)}
</div>
);
}
@@ -0,0 +1,142 @@
import { useState, useRef, useEffect, useCallback } from "react";
import { Send, MessageCircleQuestion, X } from "lucide-react";
export interface QuestionWidgetProps {
/** The question text shown to the user */
question: string;
/** 1-3 predefined options. The UI appends an "Other" free-text option. */
options: string[];
/** Called with the selected option label or custom text, and whether "Other" was chosen */
onSubmit: (answer: string, isOther: boolean) => void;
/** Called when user dismisses the question without answering */
onDismiss?: () => void;
}
export default function QuestionWidget({ question, options, onSubmit, onDismiss }: QuestionWidgetProps) {
const [selected, setSelected] = useState<number | null>(null);
const [customText, setCustomText] = useState("");
const [submitted, setSubmitted] = useState(false);
const inputRef = useRef<HTMLInputElement>(null);
const containerRef = useRef<HTMLDivElement>(null);
// "Other" is always the last option index
const otherIndex = options.length;
const isOtherSelected = selected === otherIndex;
// Focus the text input when "Other" is selected
useEffect(() => {
if (isOtherSelected) {
inputRef.current?.focus();
}
}, [isOtherSelected]);
const canSubmit = selected !== null && (!isOtherSelected || customText.trim().length > 0);
const handleSubmit = useCallback(() => {
if (!canSubmit || submitted) return;
setSubmitted(true);
if (isOtherSelected) {
onSubmit(customText.trim(), true);
} else {
onSubmit(options[selected!], false);
}
}, [canSubmit, submitted, isOtherSelected, customText, options, selected, onSubmit]);
// Keyboard: Enter to submit, number keys to select (only when text input is not focused)
useEffect(() => {
const handleKeyDown = (e: KeyboardEvent) => {
if (submitted) return;
const inTextInput = e.target === inputRef.current;
if (e.key === "Enter" && !e.shiftKey) {
e.preventDefault();
handleSubmit();
return;
}
// Number keys 1-4 select options — skip when typing in the "Other" field
if (!inTextInput) {
const num = parseInt(e.key, 10);
if (num >= 1 && num <= options.length + 1) {
e.preventDefault();
setSelected(num - 1);
}
}
};
window.addEventListener("keydown", handleKeyDown);
return () => window.removeEventListener("keydown", handleKeyDown);
}, [handleSubmit, submitted, options.length]);
if (submitted) return null;
return (
<div ref={containerRef} className="p-4">
<div className="bg-card border border-border rounded-xl shadow-sm overflow-hidden">
{/* Header / Question */}
<div className="px-5 pt-4 pb-3 flex items-start gap-3">
<div className="w-7 h-7 rounded-lg bg-primary/10 border border-primary/20 flex items-center justify-center flex-shrink-0 mt-0.5">
<MessageCircleQuestion className="w-3.5 h-3.5 text-primary" />
</div>
<p className="text-sm font-medium text-foreground leading-relaxed flex-1">{question}</p>
{onDismiss && (
<button
onClick={onDismiss}
className="p-1 rounded-md text-muted-foreground hover:text-foreground hover:bg-muted/60 transition-colors flex-shrink-0"
>
<X className="w-4 h-4" />
</button>
)}
</div>
{/* Options */}
<div className="px-5 pb-3 space-y-1.5">
{options.map((option, idx) => (
<button
key={idx}
onClick={() => setSelected(idx)}
className={`w-full text-left px-4 py-2.5 rounded-lg border text-sm transition-colors ${
selected === idx
? "border-primary bg-primary/10 text-foreground"
: "border-border/60 bg-muted/20 text-foreground hover:border-primary/40 hover:bg-muted/40"
}`}
>
<span className="text-xs text-muted-foreground mr-2">{idx + 1}.</span>
{option}
</button>
))}
{/* "Other" — inline text input that auto-selects on focus */}
<input
ref={inputRef}
type="text"
value={customText}
onFocus={() => setSelected(otherIndex)}
onChange={(e) => {
setSelected(otherIndex);
setCustomText(e.target.value);
}}
placeholder="Type a custom response..."
className={`w-full px-4 py-2.5 rounded-lg border border-dashed text-sm transition-colors bg-transparent placeholder:text-muted-foreground focus:outline-none ${
isOtherSelected
? "border-primary bg-primary/10 text-foreground"
: "border-border text-muted-foreground hover:border-primary/40"
}`}
/>
</div>
{/* Submit */}
<div className="px-5 pb-4">
<button
onClick={handleSubmit}
disabled={!canSubmit}
className="w-full flex items-center justify-center gap-2 py-2.5 rounded-lg text-sm font-medium bg-primary text-primary-foreground hover:bg-primary/90 disabled:opacity-30 disabled:cursor-not-allowed transition-colors"
>
<Send className="w-3.5 h-3.5" />
Submit
</button>
</div>
</div>
</div>
);
}
+9
View File
@@ -167,3 +167,12 @@
.animate-in.slide-in-from-right {
animation: slide-in-from-right 0.2s ease-out;
}
/* Slide-up animation for question widget */
@keyframes slide-in-from-bottom {
from { transform: translateY(16px); opacity: 0; }
to { transform: translateY(0); opacity: 1; }
}
.animate-in.slide-in-from-bottom {
animation: slide-in-from-bottom 0.25s ease-out;
}
+189 -19
View File
@@ -8,6 +8,7 @@ import TopBar from "@/components/TopBar";
import { TAB_STORAGE_KEY, loadPersistedTabs, savePersistedTabs, type PersistedTabState } from "@/lib/tab-persistence";
import NodeDetailPanel from "@/components/NodeDetailPanel";
import CredentialsModal, { type Credential, createFreshCredentials, cloneCredentials, allRequiredCredentialsMet, clearCredentialCache } from "@/components/CredentialsModal";
import { agentsApi } from "@/api/agents";
import { executionApi } from "@/api/execution";
import { graphsApi } from "@/api/graphs";
@@ -249,8 +250,18 @@ interface AgentBackendState {
subagentReports: { subagent_id: string; message: string; data?: Record<string, unknown>; timestamp: string }[];
isTyping: boolean;
isStreaming: boolean;
/** True only when the queen's LLM is actively processing (not worker) */
queenIsTyping: boolean;
/** True only when a worker's LLM is actively processing (not queen) */
workerIsTyping: boolean;
llmSnapshots: Record<string, string>;
activeToolCalls: Record<string, { name: string; done: boolean; streamId: string }>;
/** Structured question text from ask_user with options */
pendingQuestion: string | null;
/** Predefined choices from ask_user (1-3 items); UI appends "Other" */
pendingOptions: string[] | null;
/** Whether the pending question came from queen or worker */
pendingQuestionSource: "queen" | "worker" | null;
}
function defaultAgentState(): AgentBackendState {
@@ -274,8 +285,13 @@ function defaultAgentState(): AgentBackendState {
subagentReports: [],
isTyping: false,
isStreaming: false,
queenIsTyping: false,
workerIsTyping: false,
llmSnapshots: {},
activeToolCalls: {},
pendingQuestion: null,
pendingOptions: null,
pendingQuestionSource: null,
};
}
@@ -355,8 +371,14 @@ export default function Workspace() {
if (persisted) {
const restored = { ...persisted.activeSessionByAgent };
const urlSessions = sessionsByAgent[initialAgent];
if (urlSessions?.length && !restored[initialAgent]) {
restored[initialAgent] = urlSessions[0].id;
if (urlSessions?.length) {
// When a prompt was submitted from home, activate the newly created
// session (last in array) instead of the previously active one.
if (initialPrompt && hasExplicitAgent) {
restored[initialAgent] = urlSessions[urlSessions.length - 1].id;
} else if (!restored[initialAgent]) {
restored[initialAgent] = urlSessions[0].id;
}
}
return restored;
}
@@ -635,7 +657,11 @@ export default function Workspace() {
const result = await sessionsApi.get(existingSessionId);
if (result.loading) continue;
return result as LiveSession;
} catch {
} catch (pollErr) {
// 404 = agent failed to load and was cleaned up — stop immediately
if (pollErr instanceof ApiError && pollErr.status === 404) {
throw new Error("Agent failed to load");
}
if (i === maxAttempts - 1) throw loadErr;
}
}
@@ -930,7 +956,7 @@ export default function Workspace() {
} catch {
// Best-effort — queen may have already finished
}
updateAgentState(activeWorker, { isTyping: false, isStreaming: false });
updateAgentState(activeWorker, { isTyping: false, isStreaming: false, queenIsTyping: false, workerIsTyping: false });
}, [agentStates, activeWorker, updateAgentState]);
// --- Node log helper (writes into agentStates) ---
@@ -1013,7 +1039,7 @@ export default function Workspace() {
case "execution_started":
if (isQueen) {
turnCounterRef.current[turnKey] = currentTurn + 1;
updateAgentState(agentType, { isTyping: true });
updateAgentState(agentType, { isTyping: true, queenIsTyping: true });
} else {
// Warn if prior LLM snapshots are being dropped (edge case: execution_completed never arrived)
const priorSnapshots = agentStates[agentType]?.llmSnapshots || {};
@@ -1024,6 +1050,7 @@ export default function Workspace() {
updateAgentState(agentType, {
isTyping: true,
isStreaming: false,
workerIsTyping: true,
awaitingInput: false,
workerRunState: "running",
currentExecutionId: event.execution_id || agentStates[agentType]?.currentExecutionId || null,
@@ -1031,6 +1058,9 @@ export default function Workspace() {
subagentReports: [],
llmSnapshots: {},
activeToolCalls: {},
pendingQuestion: null,
pendingOptions: null,
pendingQuestionSource: null,
});
markAllNodesAs(agentType, ["running", "looping", "complete", "error"], "pending");
}
@@ -1038,7 +1068,7 @@ export default function Workspace() {
case "execution_completed":
if (isQueen) {
updateAgentState(agentType, { isTyping: false });
updateAgentState(agentType, { isTyping: false, queenIsTyping: false });
} else {
// Flush any remaining LLM snapshots before clearing state
const completedSnapshots = agentStates[agentType]?.llmSnapshots || {};
@@ -1050,11 +1080,15 @@ export default function Workspace() {
updateAgentState(agentType, {
isTyping: false,
isStreaming: false,
workerIsTyping: false,
awaitingInput: false,
workerInputMessageId: null,
workerRunState: "idle",
currentExecutionId: null,
llmSnapshots: {},
pendingQuestion: null,
pendingOptions: null,
pendingQuestionSource: null,
});
markAllNodesAs(agentType, ["running", "looping"], "complete");
@@ -1079,7 +1113,7 @@ export default function Workspace() {
// Mark streaming when LLM text is actively arriving
if (event.type === "llm_text_delta" || event.type === "client_output_delta") {
updateAgentState(agentType, { isStreaming: true });
updateAgentState(agentType, { isStreaming: true, ...(isQueen ? {} : { workerIsTyping: false }) });
}
if (event.type === "llm_text_delta" && !isQueen && event.node_id) {
@@ -1101,8 +1135,41 @@ export default function Workspace() {
if (event.type === "client_input_requested") {
console.log('[CLIENT_INPUT_REQ] stream_id:', streamId, 'isQueen:', isQueen, 'node_id:', event.node_id, 'prompt:', (event.data?.prompt as string)?.slice(0, 80), 'agentType:', agentType);
const rawOptions = event.data?.options;
const options = Array.isArray(rawOptions) ? (rawOptions as string[]) : null;
if (isQueen) {
updateAgentState(agentType, { awaitingInput: true, isTyping: false, isStreaming: false, queenBuilding: false });
const prompt = (event.data?.prompt as string) || "";
const isAutoBlock = !prompt && !options;
// Queen auto-block (empty prompt, no options) should not
// overwrite a pending worker question — the worker's
// QuestionWidget must stay visible. Use the updater form
// to read the latest state and avoid stale-closure races
// when worker and queen events arrive in the same batch.
setAgentStates(prev => {
const cur = prev[agentType] || defaultAgentState();
const workerQuestionActive = cur.pendingQuestionSource === "worker";
if (isAutoBlock && workerQuestionActive) {
return { ...prev, [agentType]: {
...cur,
awaitingInput: true,
isTyping: false,
isStreaming: false,
queenIsTyping: false,
queenBuilding: false,
}};
}
return { ...prev, [agentType]: {
...cur,
awaitingInput: true,
isTyping: false,
isStreaming: false,
queenIsTyping: false,
queenBuilding: false,
pendingQuestion: prompt || null,
pendingOptions: options,
pendingQuestionSource: "queen",
}};
});
} else {
// Worker input request.
// If the prompt is non-empty (explicit ask_user), create a visible
@@ -1130,18 +1197,22 @@ export default function Workspace() {
awaitingInput: true,
isTyping: false,
isStreaming: false,
queenIsTyping: false,
pendingQuestion: prompt || null,
pendingOptions: options,
pendingQuestionSource: options ? "worker" : null,
});
}
}
if (event.type === "execution_paused") {
updateAgentState(agentType, { isTyping: false, isStreaming: false, awaitingInput: false, workerInputMessageId: null });
updateAgentState(agentType, { isTyping: false, isStreaming: false, queenIsTyping: false, workerIsTyping: false, awaitingInput: false, workerInputMessageId: null, pendingQuestion: null, pendingOptions: null, pendingQuestionSource: null });
if (!isQueen) {
updateAgentState(agentType, { workerRunState: "idle", currentExecutionId: null });
markAllNodesAs(agentType, ["running", "looping"], "pending");
}
}
if (event.type === "execution_failed") {
updateAgentState(agentType, { isTyping: false, isStreaming: false, awaitingInput: false, workerInputMessageId: null });
updateAgentState(agentType, { isTyping: false, isStreaming: false, queenIsTyping: false, workerIsTyping: false, awaitingInput: false, workerInputMessageId: null, pendingQuestion: null, pendingOptions: null, pendingQuestionSource: null });
if (!isQueen) {
updateAgentState(agentType, { workerRunState: "idle", currentExecutionId: null });
if (event.node_id) {
@@ -1173,7 +1244,11 @@ export default function Workspace() {
case "node_loop_iteration":
turnCounterRef.current[turnKey] = currentTurn + 1;
updateAgentState(agentType, { isStreaming: false, activeToolCalls: {}, awaitingInput: false });
if (isQueen) {
updateAgentState(agentType, { isStreaming: false, activeToolCalls: {}, awaitingInput: false, pendingQuestion: null, pendingOptions: null, pendingQuestionSource: null });
} else {
updateAgentState(agentType, { isStreaming: false, workerIsTyping: true, activeToolCalls: {}, awaitingInput: false, pendingQuestion: null, pendingOptions: null, pendingQuestionSource: null });
}
if (!isQueen && event.node_id) {
const pendingText = agentStates[agentType]?.llmSnapshots[event.node_id];
if (pendingText?.trim()) {
@@ -1577,6 +1652,11 @@ export default function Workspace() {
return;
}
// If queen has a pending question widget, dismiss it when user types directly
if (agentStates[activeWorker]?.pendingQuestionSource === "queen") {
updateAgentState(activeWorker, { pendingQuestion: null, pendingOptions: null, pendingQuestionSource: null });
}
const userMsg: ChatMessage = {
id: makeId(), agent: "You", agentColor: "",
content: text, timestamp: "", type: "user", thread, createdAt: Date.now(),
@@ -1587,7 +1667,7 @@ export default function Workspace() {
s.id === activeSession.id ? { ...s, messages: [...s.messages, userMsg] } : s
),
}));
updateAgentState(activeWorker, { isTyping: true });
updateAgentState(activeWorker, { isTyping: true, queenIsTyping: true });
if (state?.sessionId && state?.ready) {
executionApi.chat(state.sessionId, text).catch((err: unknown) => {
@@ -1603,7 +1683,7 @@ export default function Workspace() {
s.id === activeSession.id ? { ...s, messages: [...s.messages, errorChatMsg] } : s
),
}));
updateAgentState(activeWorker, { isTyping: false, isStreaming: false });
updateAgentState(activeWorker, { isTyping: false, isStreaming: false, queenIsTyping: false });
});
} else {
const errorMsg: ChatMessage = {
@@ -1640,7 +1720,7 @@ export default function Workspace() {
}));
// Clear awaiting state optimistically
updateAgentState(activeWorker, { awaitingInput: false, workerInputMessageId: null, isTyping: true });
updateAgentState(activeWorker, { awaitingInput: false, workerInputMessageId: null, isTyping: true, pendingQuestion: null, pendingOptions: null, pendingQuestionSource: null });
executionApi.workerInput(state.sessionId, text).catch((err: unknown) => {
const errMsg = err instanceof Error ? err.message : String(err);
@@ -1659,6 +1739,90 @@ export default function Workspace() {
});
}, [activeWorker, activeSession, agentStates, updateAgentState]);
// --- handleWorkerQuestionAnswer: route predefined answers direct to worker, "Other" through queen ---
const handleWorkerQuestionAnswer = useCallback((answer: string, isOther: boolean) => {
if (!activeSession) return;
const state = agentStates[activeWorker];
const question = state?.pendingQuestion || "";
const opts = state?.pendingOptions;
if (isOther) {
// "Other" free-text → route through queen for evaluation
updateAgentState(activeWorker, { pendingQuestion: null, pendingOptions: null, pendingQuestionSource: null });
if (question && opts && state?.sessionId && state?.ready) {
const formatted = `[Worker asked: "${question}" | Options: ${opts.join(", ")}]\nUser answered: "${answer}"`;
const userMsg: ChatMessage = {
id: makeId(), agent: "You", agentColor: "",
content: answer, timestamp: "", type: "user", thread: activeWorker, createdAt: Date.now(),
};
setSessionsByAgent(prev => ({
...prev,
[activeWorker]: prev[activeWorker].map(s =>
s.id === activeSession.id ? { ...s, messages: [...s.messages, userMsg] } : s
),
}));
updateAgentState(activeWorker, { isTyping: true, queenIsTyping: true });
executionApi.chat(state.sessionId, formatted).catch((err: unknown) => {
const errMsg = err instanceof Error ? err.message : String(err);
const errorChatMsg: ChatMessage = {
id: makeId(), agent: "System", agentColor: "",
content: `Failed to send message: ${errMsg}`,
timestamp: "", type: "system", thread: activeWorker, createdAt: Date.now(),
};
setSessionsByAgent(prev => ({
...prev,
[activeWorker]: prev[activeWorker].map(s =>
s.id === activeSession.id ? { ...s, messages: [...s.messages, errorChatMsg] } : s
),
}));
updateAgentState(activeWorker, { isTyping: false, isStreaming: false, queenIsTyping: false });
});
} else {
handleSend(answer, activeWorker);
}
} else {
// Predefined option → send directly to worker
handleWorkerReply(answer);
// Queue context for queen (fire-and-forget, no LLM response triggered)
if (question && state?.sessionId && state?.ready) {
const notification = `[Worker asked: "${question}" | User selected: "${answer}"]`;
executionApi.queenContext(state.sessionId, notification).catch(() => {});
}
}
}, [activeWorker, activeSession, agentStates, handleWorkerReply, handleSend, updateAgentState, setSessionsByAgent]);
// --- handleQueenQuestionAnswer: submit queen's own question answer via /chat ---
// The queen asked the question herself, so she already has context — just send the raw answer.
const handleQueenQuestionAnswer = useCallback((answer: string, _isOther: boolean) => {
updateAgentState(activeWorker, { pendingQuestion: null, pendingOptions: null, pendingQuestionSource: null });
handleSend(answer, activeWorker);
}, [activeWorker, handleSend, updateAgentState]);
// --- handleQuestionDismiss: user closed the question widget without answering ---
// Injects a dismiss signal so the blocked node can continue.
const handleQuestionDismiss = useCallback(() => {
const state = agentStates[activeWorker];
if (!state?.sessionId) return;
const source = state.pendingQuestionSource;
const question = state.pendingQuestion || "";
// Clear UI state immediately
updateAgentState(activeWorker, {
pendingQuestion: null,
pendingOptions: null,
pendingQuestionSource: null,
awaitingInput: false,
});
// Unblock the waiting node with a dismiss signal
const dismissMsg = `[User dismissed the question: "${question}"]`;
if (source === "worker") {
executionApi.workerInput(state.sessionId, dismissMsg).catch(() => {});
} else {
executionApi.chat(state.sessionId, dismissMsg).catch(() => {});
}
}, [agentStates, activeWorker, updateAgentState]);
const handleLoadAgent = useCallback(async (agentPath: string) => {
const state = agentStates[activeWorker];
if (!state?.sessionId) return;
@@ -1873,17 +2037,23 @@ export default function Workspace() {
messages={activeSession.messages}
onSend={handleSend}
onCancel={handleCancelQueen}
onWorkerReply={handleWorkerReply}
activeThread={activeWorker}
isWaiting={(activeAgentState?.isTyping && !activeAgentState?.isStreaming) ?? false}
workerAwaitingInput={
(activeAgentState?.awaitingInput && activeAgentState?.workerRunState === "running") ?? false
}
isWaiting={(activeAgentState?.queenIsTyping && !activeAgentState?.isStreaming) ?? false}
isWorkerWaiting={(activeAgentState?.workerIsTyping && !activeAgentState?.isStreaming) ?? false}
isBusy={activeAgentState?.queenIsTyping ?? false}
disabled={
(activeAgentState?.loading ?? true) ||
!(activeAgentState?.queenReady)
}
queenMode={activeAgentState?.queenMode ?? "building"}
pendingQuestion={activeAgentState?.awaitingInput ? activeAgentState.pendingQuestion : null}
pendingOptions={activeAgentState?.awaitingInput ? activeAgentState.pendingOptions : null}
onQuestionSubmit={
activeAgentState?.pendingQuestionSource === "queen"
? handleQueenQuestionAnswer
: handleWorkerQuestionAnswer
}
onQuestionDismiss={handleQuestionDismiss}
/>
)}
</div>
+73 -3
View File
@@ -578,7 +578,11 @@ class TestClientFacingBlocking:
"""signal_shutdown should unblock a waiting client_facing node."""
llm = MockStreamingLLM(
scenarios=[
tool_call_scenario("ask_user", {"question": "Waiting..."}, tool_use_id="ask_1"),
tool_call_scenario(
"ask_user",
{"question": "Waiting...", "options": ["Continue", "Stop"]},
tool_use_id="ask_1",
),
]
)
bus = EventBus()
@@ -600,7 +604,11 @@ class TestClientFacingBlocking:
"""CLIENT_INPUT_REQUESTED should be published when ask_user blocks."""
llm = MockStreamingLLM(
scenarios=[
tool_call_scenario("ask_user", {"question": "Hello!"}, tool_use_id="ask_1"),
tool_call_scenario(
"ask_user",
{"question": "Hello!", "options": ["Yes", "No"]},
tool_use_id="ask_1",
),
]
)
bus = EventBus()
@@ -796,7 +804,7 @@ class TestClientFacingExpectingWork:
async def user_then_shutdown():
await asyncio.sleep(0.05)
await node.inject_event("furwise.app")
await node.inject_event("furwise.app", is_client_input=True)
# Node should auto-block on "Monitoring..." text.
# Give it time to reach the block, then shutdown.
await asyncio.sleep(0.1)
@@ -2027,3 +2035,65 @@ class TestExecutionId:
node_spec=node_spec, memory=SharedMemory(), goal=goal, input_data={}
)
assert ctx.execution_id == ""
# ---------------------------------------------------------------------------
# Subagent memory snapshot includes accumulator outputs
# ---------------------------------------------------------------------------
class TestSubagentAccumulatorMemory:
"""Verify that subagent memory construction merges accumulator outputs
and includes the subagent's input_keys in read permissions."""
def test_accumulator_values_merged_into_parent_data(self):
"""Keys from OutputAccumulator should appear in subagent memory."""
# Simulate what _execute_subagent does internally:
# parent shared memory has user_request but NOT tweet_content
parent_memory = SharedMemory()
parent_memory.write("user_request", "post a joke")
parent_data = parent_memory.read_all() # {"user_request": "post a joke"}
# Accumulator has tweet_content (set via set_output before delegation)
acc = OutputAccumulator(values={"tweet_content": "Hello world!"})
# Merge accumulator outputs (the fix)
for key, value in acc.to_dict().items():
if key not in parent_data:
parent_data[key] = value
# Build subagent memory
subagent_memory = SharedMemory()
for key, value in parent_data.items():
subagent_memory.write(key, value, validate=False)
subagent_input_keys = ["tweet_content"]
read_keys = set(parent_data.keys()) | set(subagent_input_keys)
scoped = subagent_memory.with_permissions(
read_keys=list(read_keys), write_keys=[]
)
# This would have raised PermissionError before the fix
assert scoped.read("tweet_content") == "Hello world!"
assert scoped.read("user_request") == "post a joke"
def test_input_keys_allowed_even_if_not_in_data(self):
"""Subagent input_keys should be in read permissions even if the
key doesn't exist in memory (returns None instead of PermissionError)."""
parent_memory = SharedMemory()
parent_memory.write("user_request", "hi")
parent_data = parent_memory.read_all()
subagent_memory = SharedMemory()
for key, value in parent_data.items():
subagent_memory.write(key, value, validate=False)
# input_keys includes "tweet_content" which isn't in parent_data
read_keys = set(parent_data.keys()) | {"tweet_content"}
scoped = subagent_memory.with_permissions(
read_keys=list(read_keys), write_keys=[]
)
# Should return None (not raise PermissionError)
assert scoped.read("tweet_content") is None
assert scoped.read("user_request") == "hi"
+564 -1
View File
@@ -2,11 +2,12 @@
from __future__ import annotations
import json
from typing import Any
import pytest
from framework.graph.conversation import Message, NodeConversation
from framework.graph.conversation import Message, NodeConversation, extract_tool_call_history
from framework.storage.conversation_store import FileConversationStore
# ---------------------------------------------------------------------------
@@ -930,3 +931,565 @@ class TestConversationIntegration:
assert restored.next_seq == 4
assert restored.messages[0].content == "new msg"
assert restored.messages[0].seq == 2
# ---------------------------------------------------------------------------
# Helpers for aggressive compaction tests
# ---------------------------------------------------------------------------
def _make_tool_call(call_id: str, name: str, args: dict) -> dict:
return {
"id": call_id,
"type": "function",
"function": {"name": name, "arguments": json.dumps(args)},
}
async def _build_tool_heavy_conversation(
store: MockConversationStore | None = None,
) -> NodeConversation:
"""Build a conversation with many tool call pairs.
Layout: user msg, then 5x (assistant with append_data tool_call + tool result),
then 1x (assistant with set_output tool_call + tool result), then user msg + assistant msg.
"""
conv = NodeConversation(store=store)
await conv.add_user_message("Process the data") # seq 0
for i in range(5):
args = {"filename": "output.html", "content": "x" * 500}
tc = [_make_tool_call(f"call_{i}", "append_data", args)]
conv._messages.append(Message(
seq=conv._next_seq, role="assistant",
content=f"Appending part {i}", tool_calls=tc,
))
if store:
await store.write_part(conv._next_seq, conv._messages[-1].to_storage_dict())
conv._next_seq += 1
conv._messages.append(Message(
seq=conv._next_seq, role="tool",
content='{"success": true}', tool_use_id=f"call_{i}",
))
if store:
await store.write_part(conv._next_seq, conv._messages[-1].to_storage_dict())
conv._next_seq += 1
# set_output call — must be protected
so_tc = [_make_tool_call("call_so", "set_output", {"key": "result", "value": "done"})]
conv._messages.append(
Message(seq=conv._next_seq, role="assistant", content="Setting output", tool_calls=so_tc)
)
if store:
await store.write_part(conv._next_seq, conv._messages[-1].to_storage_dict())
conv._next_seq += 1
conv._messages.append(Message(
seq=conv._next_seq, role="tool",
content="Output 'result' set successfully.",
tool_use_id="call_so",
))
if store:
await store.write_part(conv._next_seq, conv._messages[-1].to_storage_dict())
conv._next_seq += 1
# Recent messages
await conv.add_user_message("Continue")
await conv.add_assistant_message("Working on it")
return conv
# ---------------------------------------------------------------------------
# Tests: aggressive structural compaction
# ---------------------------------------------------------------------------
class TestAggressiveStructuralCompaction:
@pytest.mark.asyncio
async def test_aggressive_collapses_tool_pairs(self, tmp_path):
"""Aggressive mode should collapse non-essential tool pairs into a summary."""
conv = await _build_tool_heavy_conversation()
spill = str(tmp_path)
await conv.compact_preserving_structure(
spillover_dir=spill, keep_recent=2, aggressive=True,
)
# The 5 append_data pairs (10 msgs) + 1 user msg should be collapsed.
# Remaining: ref_msg + set_output pair (2 msgs) + 2 recent = 5
assert conv.message_count == 5
assert conv.messages[0].role == "user" # ref message
assert "TOOLS ALREADY CALLED" in conv.messages[0].content
assert "append_data (5x)" in conv.messages[0].content
# set_output pair should be preserved
assert conv.messages[1].role == "assistant"
assert conv.messages[1].tool_calls is not None
assert conv.messages[1].tool_calls[0]["function"]["name"] == "set_output"
assert conv.messages[2].role == "tool"
# Recent messages intact
assert conv.messages[3].content == "Continue"
assert conv.messages[4].content == "Working on it"
@pytest.mark.asyncio
async def test_aggressive_preserves_set_output(self, tmp_path):
"""set_output tool calls are always protected in aggressive mode."""
conv = await _build_tool_heavy_conversation()
spill = str(tmp_path)
await conv.compact_preserving_structure(
spillover_dir=spill, keep_recent=2, aggressive=True,
)
# Find all tool calls in remaining messages
tool_names = []
for msg in conv.messages:
if msg.tool_calls:
for tc in msg.tool_calls:
tool_names.append(tc["function"]["name"])
assert "set_output" in tool_names
# append_data should NOT be in remaining messages (collapsed)
assert "append_data" not in tool_names
@pytest.mark.asyncio
async def test_aggressive_preserves_errors(self, tmp_path):
"""Error tool results are always protected in aggressive mode."""
conv = NodeConversation()
await conv.add_user_message("Start")
# Regular tool call
tc1 = [_make_tool_call("call_ok", "web_search", {"query": "test"})]
conv._messages.append(
Message(seq=conv._next_seq, role="assistant", content="", tool_calls=tc1)
)
conv._next_seq += 1
conv._messages.append(
Message(seq=conv._next_seq, role="tool", content="results", tool_use_id="call_ok")
)
conv._next_seq += 1
# Error tool call
tc2 = [_make_tool_call("call_err", "web_scrape", {"url": "http://broken.com"})]
conv._messages.append(
Message(seq=conv._next_seq, role="assistant", content="", tool_calls=tc2)
)
conv._next_seq += 1
conv._messages.append(
Message(
seq=conv._next_seq, role="tool", content="Connection timeout",
tool_use_id="call_err", is_error=True,
)
)
conv._next_seq += 1
await conv.add_user_message("Next")
await conv.add_assistant_message("OK")
spill = str(tmp_path)
await conv.compact_preserving_structure(
spillover_dir=spill, keep_recent=2, aggressive=True,
)
# Error pair should be preserved
error_msgs = [m for m in conv.messages if m.role == "tool" and m.is_error]
assert len(error_msgs) == 1
assert error_msgs[0].content == "Connection timeout"
@pytest.mark.asyncio
async def test_standard_mode_keeps_all_tool_pairs(self, tmp_path):
"""Non-aggressive mode should keep all tool pairs (existing behavior)."""
conv = await _build_tool_heavy_conversation()
spill = str(tmp_path)
await conv.compact_preserving_structure(
spillover_dir=spill, keep_recent=2, aggressive=False,
)
# All 6 tool pairs (12 msgs) should be kept as structural.
# Removed: 1 user msg (freeform). Remaining: ref + 12 structural + 2 recent = 15
assert conv.message_count == 15
@pytest.mark.asyncio
async def test_two_pass_sequence(self, tmp_path):
"""Standard pass then aggressive pass produces valid result."""
conv = await _build_tool_heavy_conversation()
spill = str(tmp_path)
# Pass 1: standard
await conv.compact_preserving_structure(
spillover_dir=spill, keep_recent=2,
)
after_standard = conv.message_count
assert after_standard == 15 # all structural kept
# Pass 2: aggressive
await conv.compact_preserving_structure(
spillover_dir=spill, keep_recent=2, aggressive=True,
)
after_aggressive = conv.message_count
assert after_aggressive < after_standard
# ref + set_output pair + 2 recent = 5
assert after_aggressive == 5
@pytest.mark.asyncio
async def test_aggressive_persists_correctly(self, tmp_path):
"""Aggressive compaction correctly updates the store."""
store = MockConversationStore()
conv = await _build_tool_heavy_conversation(store=store)
spill = str(tmp_path)
await conv.compact_preserving_structure(
spillover_dir=spill, keep_recent=2, aggressive=True,
)
# Verify store state matches in-memory state
parts = await store.read_parts()
assert len(parts) == conv.message_count
class TestExtractToolCallHistory:
def test_basic_extraction(self):
msgs = [
Message(seq=0, role="assistant", content="", tool_calls=[
_make_tool_call("c1", "web_search", {"query": "python async"}),
]),
Message(seq=1, role="tool", content="results", tool_use_id="c1"),
Message(seq=2, role="assistant", content="", tool_calls=[
_make_tool_call("c2", "save_data", {"filename": "output.txt", "content": "data"}),
]),
Message(seq=3, role="tool", content="saved", tool_use_id="c2"),
]
result = extract_tool_call_history(msgs)
assert "web_search (1x)" in result
assert "save_data (1x)" in result
assert "FILES SAVED: output.txt" in result
def test_errors_included(self):
msgs = [
Message(
seq=0, role="tool", content="Connection refused",
is_error=True, tool_use_id="c1",
),
]
result = extract_tool_call_history(msgs)
assert "ERRORS" in result
assert "Connection refused" in result
def test_empty_messages(self):
assert extract_tool_call_history([]) == ""
# ---------------------------------------------------------------------------
# Tests for _is_context_too_large_error
# ---------------------------------------------------------------------------
class TestIsContextTooLargeError:
def test_context_window_class_name(self):
from framework.graph.event_loop_node import _is_context_too_large_error
class ContextWindowExceededError(Exception):
pass
assert _is_context_too_large_error(ContextWindowExceededError("x"))
def test_openai_context_length(self):
from framework.graph.event_loop_node import _is_context_too_large_error
err = RuntimeError(
"This model's maximum context length is 128000 tokens"
)
assert _is_context_too_large_error(err)
def test_anthropic_too_long(self):
from framework.graph.event_loop_node import _is_context_too_large_error
err = RuntimeError("prompt is too long: 150000 tokens > 100000")
assert _is_context_too_large_error(err)
def test_generic_exceeds_limit(self):
from framework.graph.event_loop_node import _is_context_too_large_error
err = ValueError("Request exceeds token limit")
assert _is_context_too_large_error(err)
def test_unrelated_error(self):
from framework.graph.event_loop_node import _is_context_too_large_error
assert not _is_context_too_large_error(ValueError("connection refused"))
assert not _is_context_too_large_error(RuntimeError("timeout"))
# ---------------------------------------------------------------------------
# Tests for _format_messages_for_summary
# ---------------------------------------------------------------------------
class TestFormatMessagesForSummary:
def test_user_assistant_messages(self):
from framework.graph.event_loop_node import EventLoopNode
msgs = [
Message(seq=0, role="user", content="Hello world"),
Message(seq=1, role="assistant", content="Hi there"),
]
result = EventLoopNode._format_messages_for_summary(msgs)
assert "[user]: Hello world" in result
assert "[assistant]: Hi there" in result
def test_tool_result_truncated(self):
from framework.graph.event_loop_node import EventLoopNode
msgs = [
Message(seq=0, role="tool", content="x" * 1000, tool_use_id="c1"),
]
result = EventLoopNode._format_messages_for_summary(msgs)
assert "[tool result]:" in result
assert "..." in result
# Should be truncated to 500 + "..."
assert len(result) < 600
def test_assistant_with_tool_calls(self):
from framework.graph.event_loop_node import EventLoopNode
tc = [_make_tool_call("c1", "web_search", {"query": "test"})]
msgs = [
Message(seq=0, role="assistant", content="Searching", tool_calls=tc),
]
result = EventLoopNode._format_messages_for_summary(msgs)
assert "web_search" in result
assert "[assistant (calls:" in result
# ---------------------------------------------------------------------------
# Tests for _llm_compact (recursive binary-search)
# ---------------------------------------------------------------------------
class TestLlmCompact:
"""Test the recursive LLM compaction with mock LLM."""
def _make_node(self):
"""Create a minimal EventLoopNode for testing."""
from framework.graph.event_loop_node import EventLoopNode, LoopConfig
config = LoopConfig(max_history_tokens=32000)
node = EventLoopNode.__new__(EventLoopNode)
node._config = config
node._event_bus = None
node._judge = None
node._approval_callback = None
node._tool_executor = None
node._adaptive_learner = None
# Set class-level constants (already on class, but explicit)
return node
def _make_ctx(self, llm_responses=None, llm_error=None):
"""Create a mock NodeContext with controllable LLM."""
from unittest.mock import AsyncMock, MagicMock
from framework.graph.node import NodeSpec
spec = NodeSpec(
id="test",
name="Test Node",
description="A test node",
node_type="event_loop",
input_keys=[],
output_keys=["result"],
)
ctx = MagicMock()
ctx.node_spec = spec
ctx.node_id = "test"
ctx.stream_id = "test"
ctx.continuous_mode = False
ctx.runtime_logger = None
mock_llm = AsyncMock()
if llm_error:
mock_llm.acomplete.side_effect = llm_error
elif llm_responses:
responses = []
for text in llm_responses:
resp = MagicMock()
resp.content = text
responses.append(resp)
mock_llm.acomplete.side_effect = responses
else:
resp = MagicMock()
resp.content = "Summary of conversation."
mock_llm.acomplete.return_value = resp
ctx.llm = mock_llm
return ctx
@pytest.mark.asyncio
async def test_single_call_success(self):
node = self._make_node()
ctx = self._make_ctx()
msgs = [
Message(seq=0, role="user", content="Do something"),
Message(seq=1, role="assistant", content="Done"),
]
result = await node._llm_compact(ctx, msgs, None)
assert "Summary of conversation." in result
ctx.llm.acomplete.assert_called_once()
@pytest.mark.asyncio
async def test_context_too_large_triggers_split(self):
"""When LLM raises context error, should split and retry."""
from unittest.mock import MagicMock
node = self._make_node()
call_count = 0
async def mock_acomplete(**kwargs):
nonlocal call_count
call_count += 1
# First call with full messages → fail
# Subsequent calls with smaller chunks → succeed
if call_count == 1:
raise RuntimeError(
"This model's maximum context length is 128000 tokens"
)
resp = MagicMock()
resp.content = f"Summary part {call_count}"
return resp
ctx = self._make_ctx()
ctx.llm.acomplete = mock_acomplete
msgs = [
Message(seq=i, role="user", content=f"Message {i}")
for i in range(10)
]
result = await node._llm_compact(ctx, msgs, None)
# Should have split and produced two summaries
assert "Summary part" in result
assert call_count >= 3 # 1 failure + 2 successful halves
@pytest.mark.asyncio
async def test_non_context_error_propagates(self):
"""Non-context errors should propagate, not trigger splitting."""
node = self._make_node()
ctx = self._make_ctx(llm_error=ValueError("API key invalid"))
msgs = [
Message(seq=0, role="user", content="Hello"),
Message(seq=1, role="assistant", content="Hi"),
]
with pytest.raises(ValueError, match="API key invalid"):
await node._llm_compact(ctx, msgs, None)
@pytest.mark.asyncio
async def test_proactive_split_for_large_input(self):
"""Messages exceeding char limit should be split proactively."""
node = self._make_node()
# Lower the limit for testing
node._LLM_COMPACT_CHAR_LIMIT = 100
ctx = self._make_ctx(
llm_responses=["Part 1 summary", "Part 2 summary"],
)
msgs = [
Message(seq=0, role="user", content="x" * 80),
Message(seq=1, role="user", content="y" * 80),
]
result = await node._llm_compact(ctx, msgs, None)
assert "Part 1 summary" in result
assert "Part 2 summary" in result
# LLM should have been called twice (no failure, proactive split)
assert ctx.llm.acomplete.call_count == 2
@pytest.mark.asyncio
async def test_tool_history_appended_at_top_level(self):
"""Tool history should only be appended at depth 0."""
node = self._make_node()
ctx = self._make_ctx()
tc = [_make_tool_call("c1", "web_search", {"query": "test"})]
msgs = [
Message(seq=0, role="assistant", content="", tool_calls=tc),
Message(seq=1, role="tool", content="results", tool_use_id="c1"),
]
result = await node._llm_compact(ctx, msgs, None)
assert "TOOLS ALREADY CALLED" in result
assert "web_search" in result
# ---------------------------------------------------------------------------
# Orphaned tool result repair
# ---------------------------------------------------------------------------
class TestRepairOrphanedToolCalls:
"""Test _repair_orphaned_tool_calls handles both directions."""
def test_orphaned_tool_result_dropped(self):
"""Tool result with no matching tool_use should be dropped."""
msgs = [
# tool result with no preceding assistant tool_use
{"role": "tool", "tool_call_id": "orphan_1", "content": "stale result"},
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"},
]
repaired = NodeConversation._repair_orphaned_tool_calls(msgs)
assert len(repaired) == 2
assert repaired[0]["role"] == "user"
assert repaired[1]["role"] == "assistant"
def test_valid_tool_pair_preserved(self):
"""Tool result with matching tool_use should be kept."""
msgs = [
{"role": "user", "content": "search"},
{
"role": "assistant",
"content": "",
"tool_calls": [{"id": "tc_1", "function": {"name": "search", "arguments": "{}"}}],
},
{"role": "tool", "tool_call_id": "tc_1", "content": "results"},
]
repaired = NodeConversation._repair_orphaned_tool_calls(msgs)
assert len(repaired) == 3
assert repaired[2]["tool_call_id"] == "tc_1"
def test_orphaned_tool_use_gets_stub(self):
"""Tool use with no following tool result gets a synthetic error stub."""
msgs = [
{"role": "user", "content": "search"},
{
"role": "assistant",
"content": "",
"tool_calls": [{"id": "tc_1", "function": {"name": "search", "arguments": "{}"}}],
},
# No tool result follows
{"role": "user", "content": "what happened?"},
]
repaired = NodeConversation._repair_orphaned_tool_calls(msgs)
# Should insert a synthetic tool result between assistant and user
assert len(repaired) == 4
assert repaired[2]["role"] == "tool"
assert repaired[2]["tool_call_id"] == "tc_1"
assert "interrupted" in repaired[2]["content"].lower()
def test_mixed_orphans(self):
"""Both orphaned results and orphaned calls handled together."""
msgs = [
# Orphaned result (no matching tool_use)
{"role": "tool", "tool_call_id": "gone_1", "content": "old result"},
{"role": "user", "content": "try again"},
{
"role": "assistant",
"content": "",
"tool_calls": [{"id": "tc_2", "function": {"name": "fetch", "arguments": "{}"}}],
},
# Missing result for tc_2
{"role": "user", "content": "done?"},
]
repaired = NodeConversation._repair_orphaned_tool_calls(msgs)
# orphaned result dropped, stub added for tc_2
roles = [m["role"] for m in repaired]
assert roles == ["user", "assistant", "tool", "user"]
assert repaired[2]["tool_call_id"] == "tc_2"
@@ -90,7 +90,7 @@ edges = [
source="confirm-draft",
target="intake",
condition=EdgeCondition.CONDITIONAL,
condition_expr="batch_complete == True and send_started == True and send_count >= 1 and sent_message_ids is not None and len(sent_message_ids) >= 1",
condition_expr="batch_complete == True",
priority=1,
),
]
@@ -83,8 +83,8 @@ confirm_draft_node = NodeSpec(
client_facing=True,
max_node_visits=0,
input_keys=["email_list", "filter_criteria"],
output_keys=["batch_complete", "restart", "send_started", "send_count", "sent_message_ids", "send_failures"],
nullable_output_keys=["batch_complete", "restart", "send_started", "send_count", "sent_message_ids", "send_failures"],
output_keys=["batch_complete", "restart"],
nullable_output_keys=["batch_complete", "restart"],
success_criteria="User confirmed recipients and personalized replies sent for each.",
system_prompt="""\
You are a Gmail reply assistant. Present emails for confirmation, then send personalized replies.
@@ -99,22 +99,14 @@ You are a Gmail reply assistant. Present emails for confirmation, then send pers
**STEP 2 Handle user response:**
If user CONFIRMS (says yes, go ahead, sounds good, etc.):
1. Immediately call set_output("send_started", True) before any send tools.
2. For EACH email in email_list, call gmail_reply_email with:
For EACH email in email_list:
1. Read the subject and snippet
2. Use tone_guidance from filter_criteria + any user-specified preferences
3. Call gmail_reply_email with:
- message_id: the email's message_id
- html: personalized 2-4 sentence reply based on email context, using tone_guidance from filter_criteria and any new user preferences.
3. Track send results during this run:
- send_count: number of successful gmail_reply_email calls
- sent_message_ids: list of message_ids successfully replied to
- send_failures: list of {"message_id": "...", "error": "..."} for failed sends
4. REQUIRED completion gate:
- You MUST NOT set batch_complete=True unless send_started is True AND send_count >= 1 AND sent_message_ids is non-empty.
- If no sends succeeded, do NOT set batch_complete=True. Instead explain what failed and ask user whether to retry or restart.
5. After successful sends, call set_output in a separate turn:
- set_output("send_count", <int>)
- set_output("sent_message_ids", <list>)
- set_output("send_failures", <list>)
- set_output("batch_complete", True)
- html: personalized 2-4 sentence reply based on email context
(The tool automatically handles recipient, subject, and threading)
4. After all replies sent, call: set_output("batch_complete", True)
If user wants to CHANGE LOGIC/FILTER (says change filter, different criteria, not these emails, wrong emails, etc.):
1. Acknowledge their request