Merge branch 'feature/tool-call-revamp' into feat/open-hive

This commit is contained in:
Timothy
2026-02-27 18:45:35 -08:00
22 changed files with 2071 additions and 177 deletions
@@ -195,7 +195,7 @@ class DeepResearchAgent:
max_tokens=self.config.max_tokens,
loop_config={
"max_iterations": 100,
"max_tool_calls_per_turn": 20,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
conversation_mode="continuous",
@@ -71,6 +71,12 @@ Important:
- Track which URL each finding comes from (you'll need citations later)
- Call set_output for each key in a SEPARATE turn (not in the same turn as other tool calls)
Context management:
- Your tool results are automatically saved to files. After compaction, the file \
references remain in the conversation use load_data() to recover any content you need.
- Use append_data('research_notes.md', ...) to maintain a running log of key findings \
as you go. This survives compaction and helps the report node produce a detailed report.
When done, use set_output (one key at a time, separate turns):
- set_output("findings", "Structured summary: key findings with source URLs for each claim. \
Include themes, contradictions, and confidence levels.")
@@ -161,6 +167,9 @@ Requirements:
- Every factual claim must cite its source with [n] notation
- Be objective present multiple viewpoints where sources disagree
- Answer the original research questions from the brief
- If findings appear incomplete or summarized, call list_data_files() and load_data() \
to access the detailed source material from the research phase. The research node's \
tool results and research_notes.md contain the full data.
Save the HTML:
save_data(filename="report.html", data="<html>...</html>")
+1 -1
View File
@@ -1768,7 +1768,7 @@ async def _run_pipeline(websocket, initial_message: str):
judge=judge,
config=LoopConfig(
max_iterations=30,
max_tool_calls_per_turn=15,
max_tool_calls_per_turn=30,
max_history_tokens=64000,
max_tool_result_chars=8_000,
spillover_dir=str(_DATA_DIR),
+2 -2
View File
@@ -751,7 +751,7 @@ async def _run_pipeline(websocket, topic: str):
judge=None, # implicit judge: accept when output_keys filled
config=LoopConfig(
max_iterations=20,
max_tool_calls_per_turn=10,
max_tool_calls_per_turn=30,
max_history_tokens=32_000,
),
conversation_store=store_a,
@@ -849,7 +849,7 @@ async def _run_pipeline(websocket, topic: str):
judge=None, # implicit judge
config=LoopConfig(
max_iterations=10,
max_tool_calls_per_turn=5,
max_tool_calls_per_turn=30,
max_history_tokens=32_000,
),
conversation_store=store_b,
+1 -1
View File
@@ -1257,7 +1257,7 @@ async def _run_org_pipeline(websocket, topic: str):
judge=judge,
config=LoopConfig(
max_iterations=30,
max_tool_calls_per_turn=25,
max_tool_calls_per_turn=30,
max_history_tokens=32_000,
),
conversation_store=store,
@@ -453,7 +453,7 @@ identity_prompt = (
)
loop_config = {
"max_iterations": 50,
"max_tool_calls_per_turn": 10,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
}
@@ -539,7 +539,7 @@ class CredentialTesterAgent:
max_tokens=self.config.max_tokens,
loop_config={
"max_iterations": 50,
"max_tool_calls_per_turn": 10,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
conversation_mode="continuous",
+3 -3
View File
@@ -127,7 +127,7 @@ identity_prompt = (
)
loop_config = {
"max_iterations": 100,
"max_tool_calls_per_turn": 20,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
}
@@ -160,8 +160,8 @@ queen_graph = GraphSpec(
edges=[],
conversation_mode="continuous",
loop_config={
"max_iterations": 200,
"max_tool_calls_per_turn": 10,
"max_iterations": 999_999,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
)
@@ -351,7 +351,7 @@ value. These DO NOT EXIST.
```python
loop_config = {
"max_iterations": 100,
"max_tool_calls_per_turn": 20,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
}
```
+179 -7
View File
@@ -5,6 +5,7 @@ from __future__ import annotations
import json
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal, Protocol, runtime_checkable
@@ -90,15 +91,46 @@ class Message:
def _extract_spillover_filename(content: str) -> str | None:
"""Extract spillover filename from a truncated tool result.
"""Extract spillover filename from a tool result annotation.
Matches the pattern produced by EventLoopNode._truncate_tool_result():
"saved to 'tool_github_list_stargazers_abc123.txt'"
Matches patterns produced by EventLoopNode._truncate_tool_result():
- Large result: "saved to 'web_search_1.txt'"
- Small result: "[Saved to 'web_search_1.txt']"
"""
match = re.search(r"saved to '([^']+)'", content)
match = re.search(r"[Ss]aved to '([^']+)'", content)
return match.group(1) if match else None
_TC_ARG_LIMIT = 200 # max chars per tool_call argument after compaction
def _compact_tool_calls(tool_calls: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Truncate tool_call arguments to save context tokens during compaction.
Preserves ``id``, ``type``, and ``function.name`` exactly. Truncates
``function.arguments`` (a JSON string) to at most ``_TC_ARG_LIMIT`` chars
so that large payloads (e.g. set_output with full findings) don't survive
compaction and defeat the purpose of context reduction.
"""
compact = []
for tc in tool_calls:
func = tc.get("function", {})
args = func.get("arguments", "")
if len(args) > _TC_ARG_LIMIT:
args = args[:_TC_ARG_LIMIT] + "…[truncated]"
compact.append(
{
"id": tc.get("id", ""),
"type": tc.get("type", "function"),
"function": {
"name": func.get("name", ""),
"arguments": args,
},
}
)
return compact
# ---------------------------------------------------------------------------
# ConversationStore protocol (Phase 2)
# ---------------------------------------------------------------------------
@@ -353,12 +385,20 @@ class NodeConversation:
"""Best available token estimate.
Uses actual API input token count when available (set via
:meth:`update_token_count`), otherwise falls back to the rough
``total_chars / 4`` heuristic.
:meth:`update_token_count`), otherwise falls back to a
``total_chars / 4`` heuristic that includes both message content
AND tool_call argument sizes.
"""
if self._last_api_input_tokens is not None:
return self._last_api_input_tokens
total_chars = sum(len(m.content) for m in self._messages)
total_chars = 0
for m in self._messages:
total_chars += len(m.content)
if m.tool_calls:
for tc in m.tool_calls:
func = tc.get("function", {})
total_chars += len(func.get("arguments", ""))
total_chars += len(func.get("name", ""))
return total_chars // 4
def update_token_count(self, actual_input_tokens: int) -> None:
@@ -587,6 +627,138 @@ class NodeConversation:
self._messages = [summary_msg] + recent_messages
self._last_api_input_tokens = None # reset; next LLM call will recalibrate
async def compact_preserving_structure(
self,
spillover_dir: str,
keep_recent: int = 4,
phase_graduated: bool = False,
) -> None:
"""Structure-preserving compaction: save freeform text to file, keep tool messages.
Unlike ``compact()`` which replaces ALL old messages with a single LLM
summary, this method preserves the tool call structure (assistant
messages with tool_calls + tool result messages) that are already tiny
after pruning. Only freeform text exchanges (user messages,
text-only assistant messages) are saved to a file and removed.
The result: the agent retains exact knowledge of what tools it called,
where each result is stored, and can load the conversation text if
needed. No LLM summary call. No heuristics. Nothing lost.
"""
if not self._messages:
return
total = len(self._messages)
# Determine split point (same logic as compact)
if phase_graduated and self._current_phase:
split = self._find_phase_graduated_split()
else:
split = None
if split is None:
keep_recent = max(0, min(keep_recent, total - 1))
split = total - keep_recent if keep_recent > 0 else total
# Advance split past orphaned tool results at the boundary
while split < total and self._messages[split].role == "tool":
split += 1
if split == 0:
return
old_messages = self._messages[:split]
# Classify old messages: structural (keep) vs freeform (save to file)
kept_structural: list[Message] = []
freeform_lines: list[str] = []
for msg in old_messages:
if msg.role == "tool":
# Tool results — already pruned to ~30 tokens (file reference).
# Keep in conversation.
kept_structural.append(msg)
elif msg.role == "assistant" and msg.tool_calls:
# Assistant message with tool_calls — keep the tool_calls
# with truncated arguments, clear the freeform text content.
compact_tcs = _compact_tool_calls(msg.tool_calls)
kept_structural.append(
Message(
seq=msg.seq,
role=msg.role,
content="",
tool_calls=compact_tcs,
is_error=msg.is_error,
phase_id=msg.phase_id,
is_transition_marker=msg.is_transition_marker,
)
)
else:
# Freeform text (user messages, text-only assistant messages)
# — save to file and remove from conversation.
role_label = msg.role
text = msg.content
if len(text) > 2000:
text = text[:2000] + ""
freeform_lines.append(f"[{role_label}] (seq={msg.seq}): {text}")
# Write freeform text to a numbered conversation file
spill_path = Path(spillover_dir)
spill_path.mkdir(parents=True, exist_ok=True)
# Find next conversation file number
existing = sorted(spill_path.glob("conversation_*.md"))
next_n = len(existing) + 1
conv_filename = f"conversation_{next_n}.md"
if freeform_lines:
header = f"## Compacted conversation (messages 1-{split})\n\n"
conv_text = header + "\n\n".join(freeform_lines)
(spill_path / conv_filename).write_text(conv_text, encoding="utf-8")
else:
# Nothing to save — skip file creation
conv_filename = ""
# Build reference message
if conv_filename:
ref_content = (
f"[Previous conversation saved to '{conv_filename}'. "
f"Use load_data('{conv_filename}') to review if needed.]"
)
else:
ref_content = "[Previous freeform messages compacted.]"
# Use a seq just before the first kept message
recent_messages = list(self._messages[split:])
if kept_structural:
ref_seq = kept_structural[0].seq - 1
elif recent_messages:
ref_seq = recent_messages[0].seq - 1
else:
ref_seq = self._next_seq
self._next_seq += 1
ref_msg = Message(seq=ref_seq, role="user", content=ref_content)
# Persist: delete old messages from store, write reference + kept structural
if self._store:
first_kept_seq = (
kept_structural[0].seq
if kept_structural
else (recent_messages[0].seq if recent_messages else self._next_seq)
)
# Delete everything before the first structural message we're keeping
await self._store.delete_parts_before(first_kept_seq)
# Write the reference message
await self._store.write_part(ref_msg.seq, ref_msg.to_storage_dict())
# Write kept structural messages (they may have been modified)
for msg in kept_structural:
await self._store.write_part(msg.seq, msg.to_storage_dict())
await self._store.write_cursor({"next_seq": self._next_seq})
# Reassemble: reference + kept structural (in original order) + recent
self._messages = [ref_msg] + kept_structural + recent_messages
self._last_api_input_tokens = None
def _find_phase_graduated_split(self) -> int | None:
"""Find split point that preserves current + previous phase.
+333 -117
View File
@@ -14,6 +14,7 @@ from __future__ import annotations
import asyncio
import json
import logging
import re
import time
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
@@ -74,7 +75,7 @@ class LoopConfig:
"""Configuration for the event loop."""
max_iterations: int = 50
max_tool_calls_per_turn: int = 10
max_tool_calls_per_turn: int = 30
judge_every_n_turns: int = 1
stall_detection_threshold: int = 3
max_history_tokens: int = 32_000
@@ -91,7 +92,7 @@ class LoopConfig:
# written to a file and the truncated message includes the filename so
# the agent can retrieve it with load_data(). If *spillover_dir* is
# ``None`` the result is simply truncated with an explanatory note.
max_tool_result_chars: int = 3_000
max_tool_result_chars: int = 30_000
spillover_dir: str | None = None # Path string; created on first use
# --- Stream retry (transient error recovery within EventLoopNode) ---
@@ -107,6 +108,14 @@ class LoopConfig:
# N consecutive turns. For client-facing nodes, blocks for user input.
# For non-client-facing nodes, injects a warning into the conversation.
tool_doom_loop_threshold: int = 3
# --- Client-facing auto-block grace period ---
# When a client-facing node produces text-only turns (no tools, no
# set_output), the judge is skipped for this many consecutive auto-block
# turns. After the grace period, the judge runs to apply RETRY pressure
# on models stuck in a clarification loop. Explicit ask_user() calls
# always skip the judge regardless of this setting.
cf_grace_turns: int = 1
tool_doom_loop_enabled: bool = True
@@ -216,6 +225,8 @@ class EventLoopNode(NodeProtocol):
self._stream_task: asyncio.Task | None = None
# Track which nodes already have an action plan emitted (skip on revisit)
self._action_plan_emitted: set[str] = set()
# Monotonic counter for spillover file naming (web_search_1.txt, etc.)
self._spill_counter: int = 0
def validate_input(self, ctx: NodeContext) -> list[str]:
"""Validate hard requirements only.
@@ -245,6 +256,10 @@ class EventLoopNode(NodeProtocol):
# Verdict counters for runtime logging
_accept_count = _retry_count = _escalate_count = _continue_count = 0
# Client-facing auto-block grace: consecutive text-only turns without
# any real tool call or set_output. Resets on progress.
_cf_text_only_streak = 0
# 1. Guard: LLM required
if ctx.llm is None:
error_msg = "LLM provider not available"
@@ -365,6 +380,9 @@ class EventLoopNode(NodeProtocol):
if initial_message:
await conversation.add_user_message(initial_message)
# 2b. Restore spill counter from existing files (resume safety)
self._restore_spill_counter()
# 3. Build tool list: node tools + synthetic set_output + ask_user tools
tools = list(ctx.available_tools)
set_output_tool = self._build_set_output_tool(ctx.node_spec.output_keys)
@@ -403,10 +421,6 @@ class EventLoopNode(NodeProtocol):
recent_responses: list[str] = _restored_recent_responses
recent_tool_fingerprints: list[list[tuple[str, str]]] = _restored_tool_fingerprints
# 5b. Client-facing state: after user responds, expect the LLM to
# work (call tools) rather than auto-blocking again on text-only.
_cf_expecting_work = False
# 6. Main loop
for iteration in range(start_iteration, self._config.max_iterations):
iter_start = time.time()
@@ -537,7 +551,36 @@ class EventLoopNode(NodeProtocol):
await asyncio.sleep(delay)
continue # retry same iteration
# Non-transient or retries exhausted — existing crash handler
# Non-transient or retries exhausted.
# For client-facing nodes, surface the error and wait
# for user input instead of killing the loop. The user
# can retry or adjust the request.
if ctx.node_spec.client_facing:
error_msg = f"LLM call failed: {e}"
logger.error(
"[%s] iter=%d: %s — waiting for user input",
node_id,
iteration,
error_msg,
)
if self._event_bus:
await self._event_bus.emit_node_retry(
stream_id=stream_id,
node_id=node_id,
retry_count=_stream_retry_count,
max_retries=self._config.max_stream_retries,
error=str(e)[:500],
execution_id=execution_id,
)
# Inject the error as an assistant message so the
# user sees it, then block for their next message.
await conversation.add_assistant_message(
f"[Error: {error_msg}. Please try again.]"
)
await self._await_user_input(ctx, prompt="")
break # exit retry loop, continue outer iteration
# Non-client-facing: crash as before
import traceback
iter_latency_ms = int((time.time() - iter_start) * 1000)
@@ -594,6 +637,10 @@ class EventLoopNode(NodeProtocol):
if conversation.needs_compaction():
await self._compact_tiered(ctx, conversation, accumulator)
# Reset auto-block grace streak when real work happens
if real_tool_results or outputs_set:
_cf_text_only_streak = 0
# 6e'''. Empty response guard — if the LLM returned nothing
# (no text, no real tools, no set_output) and all required
# outputs are already set, accept immediately. This prevents
@@ -732,25 +779,17 @@ class EventLoopNode(NodeProtocol):
recent_tool_fingerprints=recent_tool_fingerprints,
)
# 6h. Client-facing state transition: tool calls mean the LLM
# acted on user input, so the next text-only turn is a new
# presentation (auto-block is appropriate again).
if real_tool_results or outputs_set:
_cf_expecting_work = False
# 6h'. Client-facing input blocking
#
# Two triggers:
# (a) Explicit ask_user() — always blocks, then falls through
# to judge evaluation (6i).
# (a) Explicit ask_user() — blocks, then skips judge (6i).
# The LLM intentionally asked a question; judging before the
# user answers would inject confusing "missing outputs"
# feedback.
# (b) Auto-block — a text-only turn (no real tools, no
# set_output) from a client-facing node is addressed to the
# user. Block for their response, then *skip* judge so the
# next LLM turn can process the reply without confusing
# "missing outputs" feedback.
# However, if the user already provided input and the LLM
# responds with text-only instead of calling tools, fall
# through to judge so weak models get RETRY feedback.
# set_output) from a client-facing node. Blocks for the
# user's response, then falls through to judge so models
# stuck in a clarification loop get RETRY feedback.
#
# Turns that include tool calls or set_output are *work*, not
# conversation — they flow through without blocking.
@@ -762,19 +801,10 @@ class EventLoopNode(NodeProtocol):
_cf_block = True
_cf_prompt = ask_user_prompt
elif assistant_text and not real_tool_results and not outputs_set:
_missing = self._get_missing_output_keys(
accumulator,
ctx.node_spec.output_keys,
ctx.node_spec.nullable_output_keys,
)
if _cf_expecting_work and _missing:
# User already responded and required outputs are
# still missing — LLM should be working, not
# talking. Fall through to judge (6i).
pass
else:
_cf_block = True
_cf_auto = True
# Text-only response from client-facing node — this is
# addressed to the user. Always block for their reply.
_cf_block = True
_cf_auto = True
if _cf_block:
if self._shutdown:
@@ -831,8 +861,6 @@ class EventLoopNode(NodeProtocol):
ctx, prompt=_cf_prompt, skip_emit=user_input_requested
)
logger.info("[%s] iter=%d: unblocked, got_input=%s", node_id, iteration, got_input)
if got_input:
_cf_expecting_work = True
if not got_input:
await self._publish_loop_completed(
stream_id, node_id, iteration + 1, execution_id
@@ -879,33 +907,73 @@ class EventLoopNode(NodeProtocol):
recent_responses.clear()
# Skip judge after blocking for user input — both auto-block
# and explicit ask_user. The user's message sits in the
# injection queue and won't be drained until step 6b of the
# next iteration. If we let the judge fire now it sees
# "missing outputs" and injects RETRY feedback *before* the
# user's answer, confusing the LLM.
# _continue_count += 1
# if ctx.runtime_logger:
# iter_latency_ms = int((time.time() - iter_start) * 1000)
# verdict_fb = (
# "Auto-blocked for user input (pre-interaction)"
# if _cf_auto
# else "Blocked for ask_user input (skip judge)"
# )
# ctx.runtime_logger.log_step(
# node_id=node_id,
# node_type="event_loop",
# step_index=iteration,
# verdict="CONTINUE",
# verdict_feedback=verdict_fb,
# tool_calls=logged_tool_calls,
# llm_text=assistant_text,
# input_tokens=turn_tokens.get("input", 0),
# output_tokens=turn_tokens.get("output", 0),
# latency_ms=iter_latency_ms,
# )
# continue
# -- Judge-skip decision after client-facing blocking --
#
# Explicit ask_user: skip judge while the agent is still
# gathering information from the user. BUT if all required
# outputs have already been set, don't skip — fall through to
# the judge so it can accept the completed node.
#
# Auto-block (text-only, no tools): skip judge within a
# grace period of N consecutive text-only turns. Normal
# conversations are 1-3 exchanges before set_output.
# After the grace period, fall through to judge so models
# stuck in a clarification loop get RETRY pressure.
if not _cf_auto:
# Explicit ask_user: skip judge only if outputs are incomplete
_missing = (
self._get_missing_output_keys(
accumulator,
ctx.node_spec.output_keys,
ctx.node_spec.nullable_output_keys,
)
if accumulator is not None
else True
)
_outputs_complete = not _missing
if not _outputs_complete:
_cf_text_only_streak = 0
_continue_count += 1
if ctx.runtime_logger:
iter_latency_ms = int((time.time() - iter_start) * 1000)
ctx.runtime_logger.log_step(
node_id=node_id,
node_type="event_loop",
step_index=iteration,
verdict="CONTINUE",
verdict_feedback="Blocked for ask_user input (skip judge)",
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
output_tokens=turn_tokens.get("output", 0),
latency_ms=iter_latency_ms,
)
continue
# All outputs set — fall through to judge for acceptance
# Auto-block: apply grace period
_cf_text_only_streak += 1
if _cf_text_only_streak <= self._config.cf_grace_turns:
_continue_count += 1
if ctx.runtime_logger:
iter_latency_ms = int((time.time() - iter_start) * 1000)
ctx.runtime_logger.log_step(
node_id=node_id,
node_type="event_loop",
step_index=iteration,
verdict="CONTINUE",
verdict_feedback=(
f"Auto-block grace ({_cf_text_only_streak}"
f"/{self._config.cf_grace_turns})"
),
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
output_tokens=turn_tokens.get("output", 0),
latency_ms=iter_latency_ms,
)
continue
# Beyond grace period — fall through to judge (6i)
# 6i. Judge evaluation
should_judge = (
@@ -981,7 +1049,6 @@ class EventLoopNode(NodeProtocol):
)
await conversation.add_user_message(hint)
# Gap D: log ACCEPT-with-missing-keys as RETRY
_cf_expecting_work = True
_retry_count += 1
if ctx.runtime_logger:
iter_latency_ms = int((time.time() - iter_start) * 1000)
@@ -1091,7 +1158,6 @@ class EventLoopNode(NodeProtocol):
)
elif verdict.action == "RETRY":
_cf_expecting_work = True
_retry_count += 1
if ctx.runtime_logger:
iter_latency_ms = int((time.time() - iter_start) * 1000)
@@ -1386,10 +1452,15 @@ class EventLoopNode(NodeProtocol):
}
for tc in tool_calls
]
await conversation.add_assistant_message(
content=accumulated_text,
tool_calls=tc_dicts,
)
# Skip storing empty turns — no content, no tool calls.
# An empty assistant message (e.g. Codex returning nothing after
# a tool result) confuses some models on the next turn and causes
# cascading empty-stream failures.
if accumulated_text or tc_dicts:
await conversation.add_assistant_message(
content=accumulated_text,
tool_calls=tc_dicts,
)
# If no tool calls, turn is complete
if not tool_calls:
@@ -1461,6 +1532,7 @@ class EventLoopNode(NodeProtocol):
pass
key = tc.tool_input.get("key", "")
await accumulator.set(key, value)
self._record_learning(key, value)
outputs_set_this_turn.append(key)
await self._publish_output_key_set(stream_id, node_id, key, execution_id)
logged_tool_calls.append(
@@ -1889,8 +1961,19 @@ class EventLoopNode(NodeProtocol):
# Client-facing nodes with no output keys are meant for
# continuous interaction — they should not auto-accept.
# Only exit via shutdown, max_iterations, or max_node_visits.
# Inject tool-use pressure so models stuck in a
# "narrate-instead-of-act" loop get corrective feedback.
if not output_keys and ctx.node_spec.client_facing:
return JudgeVerdict(action="RETRY", feedback="")
return JudgeVerdict(
action="RETRY",
feedback=(
"STOP describing what you will do. "
"You have FULL access to all tools — file creation, "
"shell commands, MCP tools — and you CAN call them "
"directly in your response. Respond ONLY with tool "
"calls, no prose. Execute the task now."
),
)
# Level 2: conversation-aware quality check (if success_criteria set)
if ctx.node_spec.success_criteria and ctx.llm:
@@ -2164,27 +2247,102 @@ class EventLoopNode(NodeProtocol):
result = await result
return result
def _record_learning(self, key: str, value: Any) -> None:
"""Append a set_output value to adapt.md as a learning entry.
Called at set_output time the moment knowledge is produced so that
adapt.md accumulates the agent's outputs across the session. Since
adapt.md is injected into the system prompt, these persist through
any compaction.
"""
if not self._config.spillover_dir:
return
try:
adapt_path = Path(self._config.spillover_dir) / "adapt.md"
content = adapt_path.read_text(encoding="utf-8") if adapt_path.exists() else ""
if "## Outputs" not in content:
content += "\n\n## Outputs\n"
# Truncate long values for memory (full value is in shared memory)
v_str = str(value)
if len(v_str) > 500:
v_str = v_str[:500] + ""
entry = f"- {key}: {v_str}\n"
# Replace existing entry for same key (update, not duplicate)
lines = content.splitlines(keepends=True)
replaced = False
for i, line in enumerate(lines):
if line.startswith(f"- {key}:"):
lines[i] = entry
replaced = True
break
if replaced:
content = "".join(lines)
else:
content += entry
adapt_path.write_text(content, encoding="utf-8")
except Exception as e:
logger.warning("Failed to record learning for key=%s: %s", key, e)
def _next_spill_filename(self, tool_name: str) -> str:
"""Return a short, monotonic filename for a tool result spill."""
self._spill_counter += 1
# Shorten common tool name prefixes to save tokens
short = tool_name.removeprefix("tool_").removeprefix("mcp_")
return f"{short}_{self._spill_counter}.txt"
def _restore_spill_counter(self) -> None:
"""Scan spillover_dir for existing spill files and restore the counter."""
spill_dir = self._config.spillover_dir
if not spill_dir:
return
spill_path = Path(spill_dir)
if not spill_path.is_dir():
return
max_n = 0
for f in spill_path.iterdir():
if not f.is_file():
continue
m = re.search(r"_(\d+)\.txt$", f.name)
if m:
max_n = max(max_n, int(m.group(1)))
if max_n > self._spill_counter:
self._spill_counter = max_n
logger.info("Restored spill counter to %d from existing files", max_n)
def _truncate_tool_result(
self,
result: ToolResult,
tool_name: str,
) -> ToolResult:
"""Truncate a large tool result to keep the conversation context small.
"""Persist tool result to file and optionally truncate for context.
If *spillover_dir* is configured and the result exceeds
*max_tool_result_chars*, the full content is written to a file and
the in-context result is replaced with a preview + filename reference.
Without *spillover_dir*, large results are truncated with a note.
When *spillover_dir* is configured, EVERY non-error tool result is
saved to a file (short filename like ``web_search_1.txt``). A
``[Saved to '...']`` annotation is appended so the reference
survives pruning and compaction.
Small results (and errors) pass through unchanged.
- Small results ( limit): full content kept + file annotation
- Large results (> limit): preview + file reference
- Errors: pass through unchanged
- load_data results: truncate with pagination hint (no re-spill)
"""
limit = self._config.max_tool_result_chars
if limit <= 0 or result.is_error or len(result.content) <= limit:
# Errors always pass through unchanged
if result.is_error:
return result
# load_data is the designated mechanism for reading spilled files.
# Don't re-spill (circular), but DO truncate with a pagination hint.
# load_data reads FROM spilled files — never re-spill (circular).
# Just truncate with a pagination hint if the result is too large.
if tool_name == "load_data":
if limit <= 0 or len(result.content) <= limit:
return result # Small load_data result — pass through as-is
# Large load_data result — truncate with pagination hint
preview_chars = max(limit - 300, limit // 2)
preview = result.content[:preview_chars]
truncated = (
@@ -2206,21 +2364,14 @@ class EventLoopNode(NodeProtocol):
is_error=False,
)
# Determine a preview size — leave room for the metadata wrapper
preview_chars = max(limit - 300, limit // 2)
preview = result.content[:preview_chars]
spill_dir = self._config.spillover_dir
if spill_dir:
spill_path = Path(spill_dir)
spill_path.mkdir(parents=True, exist_ok=True)
# Use tool_use_id for uniqueness, sanitise for filesystem
safe_id = result.tool_use_id.replace("/", "_")[:60]
filename = f"tool_{tool_name}_{safe_id}.txt"
filename = self._next_spill_filename(tool_name)
# Pretty-print JSON content so load_data's line-based
# pagination works correctly. Compact JSON (no newlines)
# would produce a single line that defeats pagination.
# pagination works correctly.
write_content = result.content
try:
parsed = json.loads(result.content)
@@ -2230,20 +2381,46 @@ class EventLoopNode(NodeProtocol):
(spill_path / filename).write_text(write_content, encoding="utf-8")
truncated = (
f"[Result from {tool_name}: {len(result.content)} chars — "
f"too large for context, saved to '{filename}'. "
f"Use load_data(filename='{filename}') "
f"to read the full result.]\n\n"
f"Preview:\n{preview}"
if limit > 0 and len(result.content) > limit:
# Large result: preview + file reference
preview_chars = max(limit - 300, limit // 2)
preview = result.content[:preview_chars]
content = (
f"[Result from {tool_name}: {len(result.content)} chars — "
f"too large for context, saved to '{filename}'. "
f"Use load_data(filename='{filename}') "
f"to read the full result.]\n\n"
f"Preview:\n{preview}"
)
logger.info(
"Tool result spilled to file: %s (%d chars → %s)",
tool_name,
len(result.content),
filename,
)
else:
# Small result: keep full content + annotation
content = (
f"{result.content}\n\n"
f"[Saved to '{filename}']"
)
logger.info(
"Tool result saved to file: %s (%d chars → %s)",
tool_name,
len(result.content),
filename,
)
return ToolResult(
tool_use_id=result.tool_use_id,
content=content,
is_error=False,
)
logger.info(
"Tool result spilled to file: %s (%d chars → %s)",
tool_name,
len(result.content),
filename,
)
else:
# No spillover_dir — truncate in-place if needed
if limit > 0 and len(result.content) > limit:
preview_chars = max(limit - 300, limit // 2)
preview = result.content[:preview_chars]
truncated = (
f"[Result from {tool_name}: {len(result.content)} chars — "
f"truncated to fit context budget. Only the first "
@@ -2255,12 +2432,13 @@ class EventLoopNode(NodeProtocol):
len(result.content),
len(truncated),
)
return ToolResult(
tool_use_id=result.tool_use_id,
content=truncated,
is_error=False,
)
return ToolResult(
tool_use_id=result.tool_use_id,
content=truncated,
is_error=False,
)
return result
async def _compact_tiered(
self,
@@ -2329,18 +2507,46 @@ class EventLoopNode(NodeProtocol):
if ratio >= 1.2:
level = "emergency"
keep = 1
logger.warning("Emergency compaction triggered (usage %.0f%%)", ratio * 100)
summary = self._build_emergency_summary(ctx, accumulator, conversation)
await conversation.compact(summary, keep_recent=1, phase_graduated=_phase_grad)
elif ratio >= 1.0:
level = "aggressive"
keep = 2
logger.info("Aggressive compaction triggered (usage %.0f%%)", ratio * 100)
summary = await self._generate_compaction_summary(ctx, conversation)
await conversation.compact(summary, keep_recent=2, phase_graduated=_phase_grad)
else:
level = "normal"
summary = await self._generate_compaction_summary(ctx, conversation)
await conversation.compact(summary, keep_recent=4, phase_graduated=_phase_grad)
keep = 4
spill_dir = self._config.spillover_dir
if spill_dir:
# Structure-preserving: save freeform text to file, keep tool messages
await conversation.compact_preserving_structure(
spillover_dir=spill_dir,
keep_recent=keep,
phase_graduated=_phase_grad,
)
# Circuit breaker: if structure-preserving compaction barely helped
# (still over budget), fall back to destructive compact() which
# replaces everything with a summary.
mid_ratio = conversation.usage_ratio()
if mid_ratio >= 0.9 * ratio:
logger.warning(
"Structure-preserving compaction ineffective "
"(%.0f%% -> %.0f%%), falling back to summary compaction",
ratio * 100,
mid_ratio * 100,
)
summary = self._build_emergency_summary(ctx, accumulator, conversation)
await conversation.compact(
summary, keep_recent=keep, phase_graduated=_phase_grad
)
else:
# Fallback: LLM-based summary (no spillover dir available)
if level == "emergency":
summary = self._build_emergency_summary(ctx, accumulator, conversation)
else:
summary = await self._generate_compaction_summary(ctx, conversation)
await conversation.compact(summary, keep_recent=keep, phase_graduated=_phase_grad)
new_ratio = conversation.usage_ratio()
logger.info(
@@ -2502,13 +2708,23 @@ class EventLoopNode(NodeProtocol):
if adapt_text:
parts.append(f"AGENT MEMORY (adapt.md):\n{adapt_text}")
files = sorted(
all_files = sorted(
f.name for f in data_dir.iterdir() if f.is_file() and f.name != "adapt.md"
)
if files:
file_list = "\n".join(f" - {f}" for f in files[:30])
# Separate conversation history files from regular data files
conv_files = [f for f in all_files if re.match(r"conversation_\d+\.md$", f)]
data_files = [f for f in all_files if f not in conv_files]
if conv_files:
conv_list = "\n".join(f" - {f}" for f in conv_files)
parts.append(
"CONVERSATION HISTORY (freeform messages saved during compaction — "
"use load_data to review earlier dialogue):\n" + conv_list
)
if data_files:
file_list = "\n".join(f" - {f}" for f in data_files[:30])
parts.append("DATA FILES (use load_data to read):\n" + file_list)
else:
if not all_files:
parts.append(
"NOTE: Large tool results may have been saved to files. "
"Use list_data_files() to check."
+42 -11
View File
@@ -1288,19 +1288,50 @@ class GraphExecutor:
protect_tokens=2000,
)
if continuous_conversation.needs_compaction():
_phase_ratio = continuous_conversation.usage_ratio()
self.logger.info(
" Phase-boundary compaction (%.0f%% usage)",
continuous_conversation.usage_ratio() * 100,
_phase_ratio * 100,
)
summary = (
f"Summary of earlier phases (before {next_spec.name}). "
"See transition markers for phase details."
)
await continuous_conversation.compact(
summary,
keep_recent=4,
phase_graduated=True,
_data_dir = (
str(self._storage_path / "data")
if self._storage_path
else None
)
if _data_dir:
await continuous_conversation.compact_preserving_structure(
spillover_dir=_data_dir,
keep_recent=4,
phase_graduated=True,
)
# Circuit breaker: if still over budget, fall back
_post_ratio = continuous_conversation.usage_ratio()
if _post_ratio >= 0.9 * _phase_ratio:
self.logger.warning(
" Structure-preserving compaction ineffective "
"(%.0f%% -> %.0f%%), falling back to summary",
_phase_ratio * 100,
_post_ratio * 100,
)
summary = (
f"Summary of earlier phases (before {next_spec.name}). "
"See transition markers for phase details."
)
await continuous_conversation.compact(
summary,
keep_recent=4,
phase_graduated=True,
)
else:
summary = (
f"Summary of earlier phases (before {next_spec.name}). "
"See transition markers for phase details."
)
await continuous_conversation.compact(
summary,
keep_recent=4,
phase_graduated=True,
)
# Update input_data for next node
input_data = result.output
@@ -1686,11 +1717,11 @@ class GraphExecutor:
judge=None, # implicit judge: accept when output_keys are filled
config=LoopConfig(
max_iterations=lc.get("max_iterations", default_max_iter),
max_tool_calls_per_turn=lc.get("max_tool_calls_per_turn", 10),
max_tool_calls_per_turn=lc.get("max_tool_calls_per_turn", 30),
tool_call_overflow_margin=lc.get("tool_call_overflow_margin", 0.5),
stall_detection_threshold=lc.get("stall_detection_threshold", 3),
max_history_tokens=lc.get("max_history_tokens", 32000),
max_tool_result_chars=lc.get("max_tool_result_chars", 3_000),
max_tool_result_chars=lc.get("max_tool_result_chars", 30_000),
spillover_dir=spillover,
),
tool_executor=self.tool_executor,
+50 -23
View File
@@ -118,6 +118,11 @@ RATE_LIMIT_MAX_RETRIES = 10
RATE_LIMIT_BACKOFF_BASE = 2 # seconds
RATE_LIMIT_MAX_DELAY = 120 # seconds - cap to prevent absurd waits
# Empty-stream retries use a short fixed delay, not the rate-limit backoff.
# Conversation-structure issues are deterministic — long waits don't help.
EMPTY_STREAM_MAX_RETRIES = 3
EMPTY_STREAM_RETRY_DELAY = 1.0 # seconds
# Directory for dumping failed requests
FAILED_REQUESTS_DIR = Path.home() / ".hive" / "failed_requests"
@@ -770,6 +775,20 @@ class LiteLLMProvider(LLMProvider):
else:
full_messages.insert(0, {"role": "system", "content": json_instruction.strip()})
# Remove ghost empty assistant messages (content="" and no tool_calls).
# These arise when a model returns an empty stream after a tool result
# (an "expected" no-op turn). Keeping them in history confuses some
# models (notably Codex/gpt-5.3) and causes cascading empty streams.
full_messages = [
m
for m in full_messages
if not (
m.get("role") == "assistant"
and not m.get("content")
and not m.get("tool_calls")
)
]
kwargs: dict[str, Any] = {
"model": self.model,
"messages": full_messages,
@@ -899,7 +918,7 @@ class LiteLLMProvider(LLMProvider):
# (If text deltas were yielded above, has_content is True
# and we skip the retry path — nothing was yielded in vain.)
has_content = accumulated_text or tool_calls_acc
if not has_content and attempt < RATE_LIMIT_MAX_RETRIES:
if not has_content:
# If the conversation ends with an assistant or tool
# message, an empty stream is expected — the LLM has
# nothing new to say. Don't burn retries on this;
@@ -912,8 +931,12 @@ class LiteLLMProvider(LLMProvider):
None,
)
if last_role in ("assistant", "tool"):
logger.debug(
"[stream] Empty response after %s message — expected, not retrying.",
logger.warning(
"[stream] %s returned empty stream after %s message "
"(no text, no tool calls). Treating as a no-op turn. "
"If this repeats, the agent may be stuck — check for "
"ghost empty assistant messages in conversation history.",
self.model,
last_role,
)
for event in tail_events:
@@ -937,26 +960,30 @@ class LiteLLMProvider(LLMProvider):
yield event
return
wait = _compute_retry_delay(attempt)
token_count, token_method = _estimate_tokens(
self.model,
full_messages,
)
dump_path = _dump_failed_request(
model=self.model,
kwargs=kwargs,
error_type="empty_stream",
attempt=attempt,
)
logger.warning(
f"[stream-retry] {self.model} returned empty stream — "
f"~{token_count} tokens ({token_method}). "
f"Request dumped to: {dump_path}. "
f"Retrying in {wait}s "
f"(attempt {attempt + 1}/{RATE_LIMIT_MAX_RETRIES})"
)
await asyncio.sleep(wait)
continue
# Empty stream after a user message — use short fixed
# retries, not the rate-limit backoff. This is likely
# a deterministic conversation-structure issue, so long
# exponential waits don't help.
if attempt < EMPTY_STREAM_MAX_RETRIES:
token_count, token_method = _estimate_tokens(
self.model,
full_messages,
)
dump_path = _dump_failed_request(
model=self.model,
kwargs=kwargs,
error_type="empty_stream",
attempt=attempt,
)
logger.warning(
f"[stream-retry] {self.model} returned empty stream — "
f"~{token_count} tokens ({token_method}). "
f"Request dumped to: {dump_path}. "
f"Retrying in {EMPTY_STREAM_RETRY_DELAY}s "
f"(attempt {attempt + 1}/{EMPTY_STREAM_MAX_RETRIES})"
)
await asyncio.sleep(EMPTY_STREAM_RETRY_DELAY)
continue
# Success (or final attempt) — flush remaining events.
for event in tail_events:
+1 -1
View File
@@ -1946,7 +1946,7 @@ def get_session_status() -> str:
@mcp.tool()
def configure_loop(
max_iterations: Annotated[int, "Maximum loop iterations per node execution (default 50)"] = 50,
max_tool_calls_per_turn: Annotated[int, "Maximum tool calls per LLM turn (default 10)"] = 10,
max_tool_calls_per_turn: Annotated[int, "Maximum tool calls per LLM turn (default 30)"] = 30,
stall_detection_threshold: Annotated[
int, "Consecutive identical responses before stall detection triggers (default 3)"
] = 3,
+9 -1
View File
@@ -435,7 +435,15 @@ class ToolRegistry:
filtered_context = {
k: v for k, v in base_context.items() if k in tool_params
}
merged_inputs = {**filtered_context, **inputs}
# Strip context params from LLM inputs — the framework
# values are authoritative (prevents the LLM from passing
# e.g. data_dir="/data" and overriding the real path).
clean_inputs = {
k: v
for k, v in inputs.items()
if k not in registry_ref.CONTEXT_PARAMS
}
merged_inputs = {**clean_inputs, **filtered_context}
result = client_ref.call_tool(tool_name, merged_inputs)
# MCP tools return content array, extract the result
if isinstance(result, list) and len(result) > 0:
+7
View File
@@ -771,9 +771,16 @@ export default function Workspace() {
}
}, [updateAgentState]);
// Track which sessions already have an in-flight or completed graph fetch
// to prevent the flood of duplicate API calls. agentStates changes on every
// SSE event (text delta, tool_call, etc.) which re-triggers this effect
// before the first response has returned.
const fetchedGraphSessionsRef = useRef<Set<string>>(new Set());
useEffect(() => {
for (const [agentType, state] of Object.entries(agentStates)) {
if (!state.sessionId || !state.ready || state.nodeSpecs.length > 0 || state.graphId) continue;
if (fetchedGraphSessionsRef.current.has(state.sessionId)) continue;
fetchedGraphSessionsRef.current.add(state.sessionId);
fetchGraphForAgent(agentType, state.sessionId);
}
}, [agentStates, fetchGraphForAgent]);
File diff suppressed because it is too large Load Diff
@@ -214,7 +214,7 @@ class CompetitiveIntelAgent:
max_tokens=self.config.max_tokens,
loop_config={
"max_iterations": 100,
"max_tool_calls_per_turn": 20,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
)
@@ -195,7 +195,7 @@ class DeepResearchAgent:
max_tokens=self.config.max_tokens,
loop_config={
"max_iterations": 100,
"max_tool_calls_per_turn": 20,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
)
@@ -71,6 +71,12 @@ Important:
- Track which URL each finding comes from (you'll need citations later)
- Call set_output for each key in a SEPARATE turn (not in the same turn as other tool calls)
Context management:
- Your tool results are automatically saved to files. After compaction, the file \
references remain in the conversation use load_data() to recover any content you need.
- Use append_data('research_notes.md', ...) to maintain a running log of key findings \
as you go. This survives compaction and helps the report node produce a detailed report.
When done, use set_output (one key at a time, separate turns):
- set_output("findings", "Structured summary: key findings with source URLs for each claim. \
Include themes, contradictions, and confidence levels.")
@@ -246,8 +252,11 @@ report covers. Ask if they have questions.
- Every factual claim MUST cite its source with [n] notation
- Answer the original research questions from the brief
- If an append_data call fails with a truncation error, break it into smaller chunks
- If findings appear incomplete or summarized, call list_data_files() and load_data() \
to access the detailed source material from the research phase. The research node's \
tool results and research_notes.md contain the full data.
""",
tools=["save_data", "append_data", "serve_file_to_user"],
tools=["save_data", "append_data", "serve_file_to_user", "load_data", "list_data_files"],
)
__all__ = [
+1 -1
View File
@@ -179,7 +179,7 @@ class JobHunterAgent:
max_tokens=self.config.max_tokens,
loop_config={
"max_iterations": 100,
"max_tool_calls_per_turn": 20,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
conversation_mode="continuous",
@@ -152,7 +152,7 @@ class TechNewsReporterAgent:
max_tokens=self.config.max_tokens,
loop_config={
"max_iterations": 50,
"max_tool_calls_per_turn": 10,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
)
@@ -210,7 +210,7 @@ class VulnerabilityResearcherAgent:
max_tokens=self.config.max_tokens,
loop_config={
"max_iterations": 100,
"max_tool_calls_per_turn": 20,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
conversation_mode="continuous",