micro-fix: remove duplicate _execute_subagent method in EventLoopNode
This commit is contained in:
@@ -4059,328 +4059,3 @@ class EventLoopNode(NodeProtocol):
|
||||
is_error=True,
|
||||
)
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Subagent Execution
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
async def _execute_subagent(
|
||||
self,
|
||||
ctx: NodeContext,
|
||||
agent_id: str,
|
||||
task: str,
|
||||
*,
|
||||
accumulator: OutputAccumulator | None = None,
|
||||
) -> ToolResult:
|
||||
"""Execute a subagent and return the result as a ToolResult.
|
||||
|
||||
The subagent:
|
||||
- Gets a fresh conversation with just the task
|
||||
- Has read-only access to the parent's readable memory
|
||||
- Cannot delegate to its own subagents (prevents recursion)
|
||||
- Returns its output in structured JSON format
|
||||
|
||||
Args:
|
||||
ctx: Parent node's context (for memory, tools, LLM access).
|
||||
agent_id: The node ID of the subagent to invoke.
|
||||
task: The task description to give the subagent.
|
||||
accumulator: Parent's OutputAccumulator — provides outputs that
|
||||
have been set via ``set_output`` but not yet written to
|
||||
shared memory (which only happens after the node completes).
|
||||
|
||||
Returns:
|
||||
ToolResult with structured JSON output containing:
|
||||
- message: Human-readable summary
|
||||
- data: Subagent's output (free-form JSON)
|
||||
- metadata: Execution metadata (success, tokens, latency)
|
||||
"""
|
||||
from framework.graph.node import NodeContext, SharedMemory
|
||||
|
||||
# Log subagent invocation start
|
||||
logger.info(
|
||||
"\n" + "=" * 60 + "\n"
|
||||
"🤖 SUBAGENT INVOCATION\n"
|
||||
"=" * 60 + "\n"
|
||||
"Parent Node: %s\n"
|
||||
"Subagent ID: %s\n"
|
||||
"Task: %s\n" + "=" * 60,
|
||||
ctx.node_id,
|
||||
agent_id,
|
||||
task[:500] + "..." if len(task) > 500 else task,
|
||||
)
|
||||
|
||||
# 1. Validate agent exists in registry
|
||||
if agent_id not in ctx.node_registry:
|
||||
return ToolResult(
|
||||
tool_use_id="",
|
||||
content=json.dumps(
|
||||
{
|
||||
"message": f"Sub-agent '{agent_id}' not found in registry",
|
||||
"data": None,
|
||||
"metadata": {"agent_id": agent_id, "success": False, "error": "not_found"},
|
||||
}
|
||||
),
|
||||
is_error=True,
|
||||
)
|
||||
|
||||
subagent_spec = ctx.node_registry[agent_id]
|
||||
|
||||
# 2. Create read-only memory snapshot
|
||||
# Start with everything the parent can read from shared memory.
|
||||
parent_data = ctx.memory.read_all()
|
||||
|
||||
# Merge in-flight outputs from the parent's accumulator.
|
||||
# set_output() writes to the accumulator but shared memory is only
|
||||
# updated after the parent node completes — so the subagent would
|
||||
# otherwise miss any keys the parent set before delegating.
|
||||
if accumulator:
|
||||
for key, value in accumulator.to_dict().items():
|
||||
if key not in parent_data:
|
||||
parent_data[key] = value
|
||||
|
||||
subagent_memory = SharedMemory()
|
||||
for key, value in parent_data.items():
|
||||
subagent_memory.write(key, value, validate=False)
|
||||
|
||||
# Allow reads for parent data AND the subagent's declared input_keys
|
||||
# (input_keys may reference keys that exist but weren't in read_all,
|
||||
# or keys that were just written by the accumulator).
|
||||
read_keys = set(parent_data.keys()) | set(subagent_spec.input_keys or [])
|
||||
scoped_memory = subagent_memory.with_permissions(
|
||||
read_keys=list(read_keys),
|
||||
write_keys=[], # Read-only!
|
||||
)
|
||||
|
||||
# 2b. Set up report callback (one-way channel to parent / event bus)
|
||||
subagent_reports: list[dict] = []
|
||||
|
||||
async def _report_callback(
|
||||
message: str,
|
||||
data: dict | None = None,
|
||||
*,
|
||||
wait_for_response: bool = False,
|
||||
) -> str | None:
|
||||
subagent_reports.append({"message": message, "data": data, "timestamp": time.time()})
|
||||
if self._event_bus:
|
||||
await self._event_bus.emit_subagent_report(
|
||||
stream_id=ctx.node_id,
|
||||
node_id=f"{ctx.node_id}:subagent:{agent_id}",
|
||||
subagent_id=agent_id,
|
||||
message=message,
|
||||
data=data,
|
||||
execution_id=ctx.execution_id,
|
||||
)
|
||||
|
||||
if not wait_for_response:
|
||||
return None
|
||||
|
||||
if not self._event_bus:
|
||||
logger.warning(
|
||||
"Subagent '%s' requested user response but no event_bus available",
|
||||
agent_id,
|
||||
)
|
||||
return None
|
||||
|
||||
# Create isolated receiver and register for input routing
|
||||
import uuid
|
||||
|
||||
escalation_id = f"{ctx.node_id}:escalation:{uuid.uuid4().hex[:8]}"
|
||||
receiver = _EscalationReceiver()
|
||||
registry = ctx.shared_node_registry
|
||||
|
||||
registry[escalation_id] = receiver
|
||||
try:
|
||||
# Stream message to user (parent's node_id so TUI shows parent talking)
|
||||
await self._event_bus.emit_client_output_delta(
|
||||
stream_id=ctx.node_id,
|
||||
node_id=ctx.node_id,
|
||||
content=message,
|
||||
snapshot=message,
|
||||
execution_id=ctx.execution_id,
|
||||
)
|
||||
# Request input (escalation_id for routing response back)
|
||||
await self._event_bus.emit_client_input_requested(
|
||||
stream_id=ctx.node_id,
|
||||
node_id=escalation_id,
|
||||
prompt=message,
|
||||
execution_id=ctx.execution_id,
|
||||
)
|
||||
# Block until user responds
|
||||
return await receiver.wait()
|
||||
finally:
|
||||
registry.pop(escalation_id, None)
|
||||
|
||||
# 3. Filter tools for subagent
|
||||
# Use the full tool catalog (ctx.all_tools) so subagents can access tools
|
||||
# that aren't in the parent node's filtered set (e.g. browser tools for a
|
||||
# GCU subagent when the parent only has web_scrape/save_data).
|
||||
# Falls back to ctx.available_tools if all_tools is empty (e.g. in tests).
|
||||
subagent_tool_names = set(subagent_spec.tools or [])
|
||||
tool_source = ctx.all_tools if ctx.all_tools else ctx.available_tools
|
||||
|
||||
subagent_tools = [
|
||||
t
|
||||
for t in tool_source
|
||||
if t.name in subagent_tool_names and t.name != "delegate_to_sub_agent"
|
||||
]
|
||||
|
||||
missing = subagent_tool_names - {t.name for t in subagent_tools}
|
||||
if missing:
|
||||
logger.warning(
|
||||
"Subagent '%s' requested tools not found in catalog: %s",
|
||||
agent_id,
|
||||
sorted(missing),
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"📦 Subagent '%s' configuration:\n"
|
||||
" - System prompt: %s\n"
|
||||
" - Tools available (%d): %s\n"
|
||||
" - Memory keys inherited: %s",
|
||||
agent_id,
|
||||
(subagent_spec.system_prompt[:200] + "...")
|
||||
if subagent_spec.system_prompt and len(subagent_spec.system_prompt) > 200
|
||||
else subagent_spec.system_prompt,
|
||||
len(subagent_tools),
|
||||
[t.name for t in subagent_tools],
|
||||
list(parent_data.keys()),
|
||||
)
|
||||
|
||||
# 4. Build subagent context
|
||||
max_iter = min(self._config.max_iterations, 10)
|
||||
subagent_ctx = NodeContext(
|
||||
runtime=ctx.runtime,
|
||||
node_id=f"{ctx.node_id}:subagent:{agent_id}",
|
||||
node_spec=subagent_spec,
|
||||
memory=scoped_memory,
|
||||
input_data={"task": task, **parent_data},
|
||||
llm=ctx.llm,
|
||||
available_tools=subagent_tools,
|
||||
goal_context=(
|
||||
f"Your specific task: {task}\n\n"
|
||||
f"COMPLETION REQUIREMENTS:\n"
|
||||
f"When your task is done, you MUST call set_output() "
|
||||
f"for each required key: {subagent_spec.output_keys}\n"
|
||||
f"Alternatively, call report_to_parent(mark_complete=true) "
|
||||
f"with your findings in message/data.\n"
|
||||
f"You have a maximum of {max_iter} turns to complete this task."
|
||||
),
|
||||
goal=ctx.goal,
|
||||
max_tokens=ctx.max_tokens,
|
||||
runtime_logger=ctx.runtime_logger,
|
||||
is_subagent_mode=True, # Prevents nested delegation
|
||||
report_callback=_report_callback,
|
||||
node_registry={}, # Empty - no nested subagents
|
||||
shared_node_registry=ctx.shared_node_registry, # For escalation routing
|
||||
)
|
||||
|
||||
# 5. Create and execute subagent EventLoopNode
|
||||
# Derive a conversation store for the subagent from the parent's store.
|
||||
# Each invocation gets a unique path so that repeated delegate calls
|
||||
# (e.g. one per profile) don't restore a stale completed conversation.
|
||||
self._subagent_instance_counter.setdefault(agent_id, 0)
|
||||
self._subagent_instance_counter[agent_id] += 1
|
||||
subagent_instance = str(self._subagent_instance_counter[agent_id])
|
||||
|
||||
subagent_conv_store = None
|
||||
if self._conversation_store is not None:
|
||||
from framework.storage.conversation_store import FileConversationStore
|
||||
|
||||
parent_base = getattr(self._conversation_store, "_base", None)
|
||||
if parent_base is not None:
|
||||
# Store subagent conversations parallel to the parent node,
|
||||
# not nested inside it. e.g. conversations/{node}:subagent:{agent_id}:{instance}/
|
||||
conversations_dir = parent_base.parent # e.g. conversations/
|
||||
subagent_dir_name = f"{agent_id}-{subagent_instance}"
|
||||
subagent_store_path = conversations_dir / subagent_dir_name
|
||||
subagent_conv_store = FileConversationStore(base_path=subagent_store_path)
|
||||
|
||||
# Derive a subagent-scoped spillover dir so large tool results
|
||||
# (e.g. browser_snapshot) get written to disk instead of being
|
||||
# silently truncated. Each instance gets its own directory to
|
||||
# avoid file collisions between concurrent subagents.
|
||||
subagent_spillover = None
|
||||
if self._config.spillover_dir:
|
||||
subagent_spillover = str(
|
||||
Path(self._config.spillover_dir) / agent_id / subagent_instance
|
||||
)
|
||||
|
||||
subagent_node = EventLoopNode(
|
||||
event_bus=None, # Subagents don't emit events to parent's bus
|
||||
judge=SubagentJudge(task=task, max_iterations=max_iter),
|
||||
config=LoopConfig(
|
||||
max_iterations=max_iter, # Tighter budget
|
||||
max_tool_calls_per_turn=self._config.max_tool_calls_per_turn,
|
||||
tool_call_overflow_margin=self._config.tool_call_overflow_margin,
|
||||
max_history_tokens=self._config.max_history_tokens,
|
||||
stall_detection_threshold=self._config.stall_detection_threshold,
|
||||
max_tool_result_chars=self._config.max_tool_result_chars,
|
||||
spillover_dir=subagent_spillover,
|
||||
),
|
||||
tool_executor=self._tool_executor,
|
||||
conversation_store=subagent_conv_store,
|
||||
)
|
||||
|
||||
try:
|
||||
logger.info("🚀 Starting subagent '%s' execution...", agent_id)
|
||||
start_time = time.time()
|
||||
result = await subagent_node.execute(subagent_ctx)
|
||||
latency_ms = int((time.time() - start_time) * 1000)
|
||||
|
||||
logger.info(
|
||||
"\n" + "-" * 60 + "\n"
|
||||
"✅ SUBAGENT '%s' COMPLETED\n"
|
||||
"-" * 60 + "\n"
|
||||
"Success: %s\n"
|
||||
"Latency: %dms\n"
|
||||
"Tokens used: %s\n"
|
||||
"Output keys: %s\n" + "-" * 60,
|
||||
agent_id,
|
||||
result.success,
|
||||
latency_ms,
|
||||
result.tokens_used,
|
||||
list(result.output.keys()) if result.output else [],
|
||||
)
|
||||
|
||||
result_json = {
|
||||
"message": (
|
||||
f"Sub-agent '{agent_id}' completed successfully"
|
||||
if result.success
|
||||
else f"Sub-agent '{agent_id}' failed: {result.error}"
|
||||
),
|
||||
"data": result.output,
|
||||
"reports": subagent_reports if subagent_reports else None,
|
||||
"metadata": {
|
||||
"agent_id": agent_id,
|
||||
"success": result.success,
|
||||
"tokens_used": result.tokens_used,
|
||||
"latency_ms": latency_ms,
|
||||
"report_count": len(subagent_reports),
|
||||
},
|
||||
}
|
||||
|
||||
return ToolResult(
|
||||
tool_use_id="",
|
||||
content=json.dumps(result_json, indent=2, default=str),
|
||||
is_error=not result.success,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"\n" + "!" * 60 + "\n❌ SUBAGENT '%s' FAILED\nError: %s\n" + "!" * 60,
|
||||
agent_id,
|
||||
str(e),
|
||||
)
|
||||
result_json = {
|
||||
"message": f"Sub-agent '{agent_id}' raised exception: {e}",
|
||||
"data": None,
|
||||
"metadata": {
|
||||
"agent_id": agent_id,
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
},
|
||||
}
|
||||
return ToolResult(
|
||||
tool_use_id="",
|
||||
content=json.dumps(result_json, indent=2),
|
||||
is_error=True,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user