feat: compaction debugger
This commit is contained in:
@@ -4248,13 +4248,66 @@ class EventLoopNode(NodeProtocol):
|
||||
"re-doing work.\n"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _build_message_inventory(
|
||||
conversation: NodeConversation,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Build a per-message size inventory for debug logging."""
|
||||
inventory: list[dict[str, Any]] = []
|
||||
for m in conversation.messages:
|
||||
content_chars = len(m.content)
|
||||
tc_chars = 0
|
||||
tool_name = None
|
||||
if m.tool_calls:
|
||||
for tc in m.tool_calls:
|
||||
args = tc.get("function", {}).get("arguments", "")
|
||||
tc_chars += len(args) if isinstance(args, str) else len(
|
||||
json.dumps(args)
|
||||
)
|
||||
names = [
|
||||
tc.get("function", {}).get("name", "?")
|
||||
for tc in m.tool_calls
|
||||
]
|
||||
tool_name = ", ".join(names)
|
||||
elif m.role == "tool" and m.tool_use_id:
|
||||
for prev in conversation.messages:
|
||||
if prev.tool_calls:
|
||||
for tc in prev.tool_calls:
|
||||
if tc.get("id") == m.tool_use_id:
|
||||
tool_name = tc.get("function", {}).get(
|
||||
"name", "?"
|
||||
)
|
||||
break
|
||||
if tool_name:
|
||||
break
|
||||
entry: dict[str, Any] = {
|
||||
"seq": m.seq,
|
||||
"role": m.role,
|
||||
"content_chars": content_chars,
|
||||
}
|
||||
if tc_chars:
|
||||
entry["tool_call_args_chars"] = tc_chars
|
||||
if tool_name:
|
||||
entry["tool"] = tool_name
|
||||
if m.is_error:
|
||||
entry["is_error"] = True
|
||||
if m.phase_id:
|
||||
entry["phase"] = m.phase_id
|
||||
if content_chars > 2000:
|
||||
entry["preview"] = m.content[:200] + "…"
|
||||
inventory.append(entry)
|
||||
return inventory
|
||||
|
||||
async def _log_compaction(
|
||||
self,
|
||||
ctx: NodeContext,
|
||||
conversation: NodeConversation,
|
||||
ratio_before: float,
|
||||
pre_inventory: list[dict[str, Any]] | None = None,
|
||||
) -> None:
|
||||
"""Log compaction result to runtime logger and event bus."""
|
||||
"""Log compaction result to runtime logger, event bus, and debug file."""
|
||||
import os as _os
|
||||
|
||||
ratio_after = conversation.usage_ratio()
|
||||
before_pct = round(ratio_before * 100)
|
||||
after_pct = round(ratio_after * 100)
|
||||
@@ -4300,6 +4353,91 @@ class EventLoopNode(NodeProtocol):
|
||||
)
|
||||
)
|
||||
|
||||
# Write detailed debug log to ~/.hive/compaction_log/ when enabled
|
||||
if _os.environ.get("HIVE_COMPACTION_DEBUG"):
|
||||
self._write_compaction_debug_log(
|
||||
ctx, before_pct, after_pct, level, pre_inventory
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _write_compaction_debug_log(
|
||||
ctx: NodeContext,
|
||||
before_pct: int,
|
||||
after_pct: int,
|
||||
level: str,
|
||||
inventory: list[dict[str, Any]] | None,
|
||||
) -> None:
|
||||
"""Write detailed compaction analysis to ~/.hive/compaction_log/."""
|
||||
log_dir = Path.home() / ".hive" / "compaction_log"
|
||||
log_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
ts = datetime.now(UTC).strftime("%Y%m%dT%H%M%S_%f")
|
||||
node_label = ctx.node_id.replace("/", "_")
|
||||
log_path = log_dir / f"{ts}_{node_label}.md"
|
||||
|
||||
lines: list[str] = [
|
||||
f"# Compaction Debug — {ctx.node_id}",
|
||||
f"**Time:** {datetime.now(UTC).isoformat()}",
|
||||
f"**Node:** {ctx.node_spec.name} (`{ctx.node_id}`)",
|
||||
]
|
||||
if ctx.stream_id:
|
||||
lines.append(f"**Stream:** {ctx.stream_id}")
|
||||
lines.append(f"**Level:** {level}")
|
||||
lines.append(f"**Usage:** {before_pct}% → {after_pct}%")
|
||||
lines.append("")
|
||||
|
||||
if inventory:
|
||||
total_chars = sum(
|
||||
e.get("content_chars", 0) + e.get("tool_call_args_chars", 0)
|
||||
for e in inventory
|
||||
)
|
||||
lines.append(
|
||||
f"## Pre-Compaction Message Inventory "
|
||||
f"({len(inventory)} messages, {total_chars:,} total chars)"
|
||||
)
|
||||
lines.append("")
|
||||
ranked = sorted(
|
||||
inventory,
|
||||
key=lambda e: e.get("content_chars", 0)
|
||||
+ e.get("tool_call_args_chars", 0),
|
||||
reverse=True,
|
||||
)
|
||||
lines.append("| # | seq | role | tool | chars | % of total | flags |")
|
||||
lines.append("|---|-----|------|------|------:|------------|-------|")
|
||||
for i, entry in enumerate(ranked, 1):
|
||||
chars = entry.get("content_chars", 0) + entry.get(
|
||||
"tool_call_args_chars", 0
|
||||
)
|
||||
pct = (chars / total_chars * 100) if total_chars else 0
|
||||
tool = entry.get("tool", "")
|
||||
flags = []
|
||||
if entry.get("is_error"):
|
||||
flags.append("error")
|
||||
if entry.get("phase"):
|
||||
flags.append(f"phase={entry['phase']}")
|
||||
lines.append(
|
||||
f"| {i} | {entry['seq']} | {entry['role']} | {tool} "
|
||||
f"| {chars:,} | {pct:.1f}% | {', '.join(flags)} |"
|
||||
)
|
||||
|
||||
large = [e for e in ranked if e.get("preview")]
|
||||
if large:
|
||||
lines.append("")
|
||||
lines.append("### Large message previews")
|
||||
for entry in large:
|
||||
lines.append(
|
||||
f"\n**seq={entry['seq']}** "
|
||||
f"({entry['role']}, {entry.get('tool', '')}):"
|
||||
)
|
||||
lines.append(f"```\n{entry['preview']}\n```")
|
||||
lines.append("")
|
||||
|
||||
try:
|
||||
log_path.write_text("\n".join(lines), encoding="utf-8")
|
||||
logger.debug("Compaction debug log written to %s", log_path)
|
||||
except OSError:
|
||||
logger.debug("Failed to write compaction debug log to %s", log_path)
|
||||
|
||||
def _build_emergency_summary(
|
||||
self,
|
||||
ctx: NodeContext,
|
||||
|
||||
Reference in New Issue
Block a user