Merge pull request #6159 from aden-hive/fix/subagent-reply-stall

fix: route subagent report_to_parent escalations to queen instead of user
This commit is contained in:
RichardTang-Aden
2026-03-10 18:24:33 -07:00
committed by GitHub
3 changed files with 32 additions and 14 deletions
+10 -13
View File
@@ -73,6 +73,7 @@ class _EscalationReceiver:
def __init__(self) -> None:
self._event = asyncio.Event()
self._response: str | None = None
self._awaiting_input = True # So inject_worker_message() can prefer us
async def inject_event(self, content: str, *, is_client_input: bool = False) -> None:
"""Called by ExecutionStream.inject_input() when the user responds."""
@@ -4323,22 +4324,18 @@ class EventLoopNode(NodeProtocol):
registry[escalation_id] = receiver
try:
# Stream message to user (parent's node_id so TUI shows parent talking)
await self._event_bus.emit_client_output_delta(
stream_id=ctx.node_id,
node_id=ctx.node_id,
content=message,
snapshot=message,
execution_id=ctx.execution_id,
)
# Request input (escalation_id for routing response back)
await self._event_bus.emit_client_input_requested(
stream_id=ctx.node_id,
# Escalate to the queen instead of asking the user directly.
# The queen handles the request and injects the response via
# inject_worker_message(), which finds this receiver through
# its _awaiting_input flag.
await self._event_bus.emit_escalation_requested(
stream_id=ctx.stream_id or ctx.node_id,
node_id=escalation_id,
prompt=message,
reason=f"Subagent report (wait_for_response) from {agent_id}",
context=message,
execution_id=ctx.execution_id,
)
# Block until user responds
# Block until queen responds
return await receiver.wait()
finally:
registry.pop(escalation_id, None)
+5
View File
@@ -1531,6 +1531,11 @@ class AgentRuntime:
for executor in stream._active_executors.values():
for node_id, node in executor.node_registry.items():
if getattr(node, "_awaiting_input", False):
# Skip escalation receivers — those are handled
# by the queen via inject_worker_message(), not
# by the user directly.
if ":escalation:" in node_id:
continue
return node_id, graph_id
return None, None
+17 -1
View File
@@ -1452,7 +1452,23 @@ def register_queen_lifecycle_tools(
if reg is None:
return json.dumps({"error": "Worker graph not found"})
# Find an active node that can accept injected input
# Prefer nodes that are actively waiting (e.g. escalation receivers
# blocked on queen guidance) over the main event-loop node.
for stream in reg.streams.values():
waiting = stream.get_waiting_nodes()
if waiting:
target_node_id = waiting[0]["node_id"]
ok = await stream.inject_input(target_node_id, content, is_client_input=True)
if ok:
return json.dumps(
{
"status": "delivered",
"node_id": target_node_id,
"content_preview": content[:100],
}
)
# Fallback: inject into any injectable node
for stream in reg.streams.values():
injectable = stream.get_injectable_nodes()
if injectable: