diff --git a/core/framework/tracker/llm_debug_logger.py b/core/framework/tracker/llm_debug_logger.py index 4ca58974..02786166 100644 --- a/core/framework/tracker/llm_debug_logger.py +++ b/core/framework/tracker/llm_debug_logger.py @@ -1,11 +1,22 @@ """Write every LLM turn to ~/.hive/llm_logs/.jsonl for replay/debugging. -Each line is a JSON object with the full LLM turn: the request payload -(system prompt + messages), assistant text, tool calls, tool results, and -token counts. The file is opened lazily on first call and flushed after every -write. Errors are silently swallowed — this must never break the agent. +Two record kinds, distinguished by ``_kind``: + +* ``session_header`` — emitted on the first turn of an ``execution_id`` and + any time its ``system_prompt`` or ``tools`` change. Carries those large + fields once instead of per-turn. +* ``turn`` — one per LLM call. Carries per-turn outputs plus a + content-addressed message delta: ``message_hashes`` is the full ordered + message sequence for this turn, ``new_messages`` is hash → body for + messages we haven't emitted before for this ``execution_id``. The reader + reassembles full ``messages`` by accumulating ``new_messages`` across + prior turn records. Content-addressed (not positional) because the agent + prunes messages mid-session — a tail-delta would be wrong. + +Errors are silently swallowed — this must never break the agent. """ +import hashlib import json import logging import os @@ -28,6 +39,12 @@ def _llm_debug_dir() -> Path: _log_file: IO[str] | None = None _log_ready = False # lazy init guard +# Per-execution_id delta state. Reset implicitly on process restart — a fresh +# log file has no prior context, so re-emitting the header on first turn is +# correct. +_session_header_hash: dict[str, str] = {} +_session_seen_msgs: dict[str, set[str]] = {} + def _open_log() -> IO[str] | None: """Open the JSONL log file for this process.""" @@ -61,6 +78,17 @@ def _serialize_tools(tools: Any) -> list[dict[str, Any]]: return out +def _content_hash(payload: Any) -> str: + raw = json.dumps(payload, default=str, sort_keys=True, ensure_ascii=False) + return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:16] + + +def _write_line(record: dict[str, Any]) -> None: + assert _log_file is not None + _log_file.write(json.dumps(record, default=str) + "\n") + _log_file.flush() + + def log_llm_turn( *, node_id: str, @@ -75,7 +103,7 @@ def log_llm_turn( token_counts: dict[str, Any], tools: list[Any] | None = None, ) -> None: - """Write one JSONL line capturing a complete LLM turn. + """Write JSONL records capturing one LLM turn (header + turn delta). Never raises. """ @@ -89,23 +117,57 @@ def log_llm_turn( _log_ready = True if _log_file is None: return - record = { - # UTC + offset matches tool_call start_timestamp (agent_loop.py) - # so the viewer can render every event in one consistent local zone. - "timestamp": datetime.now(UTC).isoformat(), - "node_id": node_id, - "stream_id": stream_id, - "execution_id": execution_id, - "iteration": iteration, - "system_prompt": system_prompt, - "tools": _serialize_tools(tools), - "messages": messages, - "assistant_text": assistant_text, - "tool_calls": tool_calls, - "tool_results": tool_results, - "token_counts": token_counts, - } - _log_file.write(json.dumps(record, default=str) + "\n") - _log_file.flush() + + # UTC + offset matches tool_call start_timestamp (agent_loop.py) + # so the viewer can render every event in one consistent local zone. + timestamp = datetime.now(UTC).isoformat() + serialized_tools = _serialize_tools(tools) + + # Re-emit the header on first turn or whenever system/tools change. + # The Queen reflects different prompts across turns, so we can't + # assume strict immutability per execution_id. + header_hash = _content_hash({"system_prompt": system_prompt, "tools": serialized_tools}) + if _session_header_hash.get(execution_id) != header_hash: + _write_line( + { + "_kind": "session_header", + "timestamp": timestamp, + "execution_id": execution_id, + "node_id": node_id, + "stream_id": stream_id, + "header_hash": header_hash, + "system_prompt": system_prompt, + "tools": serialized_tools, + } + ) + _session_header_hash[execution_id] = header_hash + + seen = _session_seen_msgs.setdefault(execution_id, set()) + message_hashes: list[str] = [] + new_messages: dict[str, dict[str, Any]] = {} + for msg in messages or []: + h = _content_hash(msg) + message_hashes.append(h) + if h not in seen: + seen.add(h) + new_messages[h] = msg + + _write_line( + { + "_kind": "turn", + "timestamp": timestamp, + "execution_id": execution_id, + "node_id": node_id, + "stream_id": stream_id, + "iteration": iteration, + "header_hash": header_hash, + "message_hashes": message_hashes, + "new_messages": new_messages, + "assistant_text": assistant_text, + "tool_calls": tool_calls, + "tool_results": tool_results, + "token_counts": token_counts, + } + ) except Exception: pass # never break the agent diff --git a/scripts/llm_debug_log_visualizer.py b/scripts/llm_debug_log_visualizer.py deleted file mode 100644 index 05b00878..00000000 --- a/scripts/llm_debug_log_visualizer.py +++ /dev/null @@ -1,1067 +0,0 @@ -#!/usr/bin/env python3 -"""Open a browser-based viewer for Hive LLM debug JSONL sessions. - -Starts a local HTTP server and loads session data on demand (one at a time). - -Usage: - uv run --no-project scripts/llm_debug_log_visualizer.py - uv run --no-project scripts/llm_debug_log_visualizer.py --session - uv run --no-project scripts/llm_debug_log_visualizer.py --port 8080 - uv run --no-project scripts/llm_debug_log_visualizer.py --output debug.html -""" - -from __future__ import annotations - -import argparse -import http.server -import json -import urllib.parse -import webbrowser -from collections import defaultdict -from dataclasses import dataclass -from datetime import datetime -from pathlib import Path -from typing import Any - - -@dataclass -class SessionSummary: - execution_id: str - log_file: str - start_timestamp: str - end_timestamp: str - turn_count: int - streams: list[str] - nodes: list[str] - models: list[str] - - -def _parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument( - "--logs-dir", - type=Path, - default=Path.home() / ".hive" / "llm_logs", - help="Directory containing Hive LLM debug JSONL files.", - ) - parser.add_argument( - "--session", - help="Execution ID to select initially in the webpage.", - ) - parser.add_argument( - "--output", - type=Path, - help="Optional HTML output path. Defaults to a temporary file.", - ) - parser.add_argument( - "--limit-files", - type=int, - default=200, - help="Maximum number of newest log files to scan.", - ) - parser.add_argument( - "--port", - type=int, - default=0, - help="Port for the local server (0 = auto-pick a free port).", - ) - parser.add_argument( - "--no-open", - action="store_true", - help="Start the server but do not open a browser.", - ) - parser.add_argument( - "--include-tests", - action="store_true", - help="Show test/mock sessions (hidden by default).", - ) - return parser.parse_args() - - -def _safe_read_jsonl(path: Path) -> list[dict[str, Any]]: - records: list[dict[str, Any]] = [] - try: - with path.open(encoding="utf-8") as handle: - for line_number, raw_line in enumerate(handle, start=1): - line = raw_line.strip() - if not line: - continue - try: - payload = json.loads(line) - except json.JSONDecodeError: - payload = { - "timestamp": "", - "execution_id": "", - "assistant_text": "", - "_parse_error": f"{path.name}:{line_number}", - "_raw_line": line, - } - payload["_log_file"] = str(path) - records.append(payload) - except OSError as exc: - print(f"warning: failed to read {path}: {exc}") - return records - - -def _discover_records(logs_dir: Path, limit_files: int) -> list[dict[str, Any]]: - if not logs_dir.exists(): - raise FileNotFoundError(f"log directory not found: {logs_dir}") - - files = sorted( - [path for path in logs_dir.iterdir() if path.is_file() and path.suffix == ".jsonl"], - key=lambda path: path.stat().st_mtime, - reverse=True, - )[:limit_files] - - records: list[dict[str, Any]] = [] - for path in files: - records.extend(_safe_read_jsonl(path)) - return records - - -def _format_timestamp(raw: str) -> str: - if not raw: - return "-" - try: - return datetime.fromisoformat(raw).strftime("%Y-%m-%d %H:%M:%S") - except ValueError: - return raw - - -def _is_test_session(execution_id: str, records: list[dict[str, Any]]) -> bool: - """Return True for sessions that look like test artifacts.""" - if execution_id.startswith(" tuple[list[SessionSummary], dict[str, list[dict[str, Any]]]]: - by_session: dict[str, list[dict[str, Any]]] = defaultdict(list) - for record in records: - execution_id = str(record.get("execution_id") or "").strip() - if execution_id: - by_session[execution_id].append(record) - - if not include_tests: - by_session = {eid: recs for eid, recs in by_session.items() if not _is_test_session(eid, recs)} - - summaries: list[SessionSummary] = [] - for execution_id, session_records in by_session.items(): - session_records.sort( - key=lambda record: ( - str(record.get("timestamp", "")), - record.get("iteration", 0), - ) - ) - first = session_records[0] - last = session_records[-1] - summaries.append( - SessionSummary( - execution_id=execution_id, - log_file=str(first.get("_log_file", "")), - start_timestamp=str(first.get("timestamp", "")), - end_timestamp=str(last.get("timestamp", "")), - turn_count=len(session_records), - streams=sorted({str(r.get("stream_id", "")) for r in session_records if r.get("stream_id")}), - nodes=sorted({str(r.get("node_id", "")) for r in session_records if r.get("node_id")}), - models=sorted( - { - str(r.get("token_counts", {}).get("model", "")) - for r in session_records - if isinstance(r.get("token_counts"), dict) and r.get("token_counts", {}).get("model") - } - ), - ) - ) - - summaries.sort(key=lambda summary: summary.start_timestamp, reverse=True) - return summaries, by_session - - -def _render_html( - summaries: list[SessionSummary], - initial_session_id: str, -) -> str: - summaries_data = [ - { - "execution_id": summary.execution_id, - "log_file": summary.log_file, - "start_timestamp": summary.start_timestamp, - "end_timestamp": summary.end_timestamp, - "start_display": _format_timestamp(summary.start_timestamp), - "end_display": _format_timestamp(summary.end_timestamp), - "turn_count": summary.turn_count, - "streams": summary.streams, - "nodes": summary.nodes, - "models": summary.models, - } - for summary in summaries - ] - - initial = initial_session_id or (summaries[0].execution_id if summaries else "") - return f""" - - - - - Hive LLM Debug Viewer - - - -
- -
-
-

LLM Debug Session

- -
-
-
- - - -
-
-
-
- - - - - -""" - - -def _sort_records(records: list[dict[str, Any]]) -> list[dict[str, Any]]: - return sorted( - records, - key=lambda r: (str(r.get("timestamp", "")), r.get("iteration", 0)), - ) - - -def _load_session_data(logs_dir: Path, session_id: str, limit_files: int) -> list[dict[str, Any]] | None: - """Load records for a specific session on demand.""" - if not logs_dir.exists(): - return None - - files = sorted( - [path for path in logs_dir.iterdir() if path.is_file() and path.suffix == ".jsonl"], - key=lambda path: path.stat().st_mtime, - reverse=True, - )[:limit_files] - - records: list[dict[str, Any]] = [] - for path in files: - try: - with path.open(encoding="utf-8") as handle: - for line_number, raw_line in enumerate(handle, start=1): - line = raw_line.strip() - if not line: - continue - try: - payload = json.loads(line) - except json.JSONDecodeError: - payload = { - "timestamp": "", - "execution_id": "", - "assistant_text": "", - "_parse_error": f"{path.name}:{line_number}", - "_raw_line": line, - } - # Only include records for this session - if str(payload.get("execution_id") or "").strip() == session_id: - payload["_log_file"] = str(path) - records.append(payload) - except OSError: - continue - - return _sort_records(records) if records else None - - -def _run_server( - html: str, - logs_dir: Path, - limit_files: int, - port: int, - no_open: bool, -) -> None: - html_bytes = html.encode("utf-8") - session_cache: dict[str, list[dict[str, Any]]] = {} - - class Handler(http.server.BaseHTTPRequestHandler): - def do_GET(self) -> None: - if self.path == "/": - self._respond(200, "text/html; charset=utf-8", html_bytes) - elif self.path.startswith("/api/session/"): - sid = urllib.parse.unquote(self.path[len("/api/session/") :]) - # Check cache first - if sid in session_cache: - records = session_cache[sid] - else: - records = _load_session_data(logs_dir, sid, limit_files) - if records is not None: - session_cache[sid] = records - if records is None: - self._respond(404, "application/json", b"[]") - else: - body = json.dumps(records, ensure_ascii=False).encode("utf-8") - self._respond(200, "application/json", body) - else: - self.send_error(404) - - def _respond(self, code: int, content_type: str, body: bytes) -> None: - self.send_response(code) - self.send_header("Content-Type", content_type) - self.send_header("Content-Length", str(len(body))) - self.end_headers() - self.wfile.write(body) - - def log_message(self, format: str, *args: object) -> None: - pass # silence per-request logs - - server = http.server.HTTPServer(("127.0.0.1", port), Handler) - actual_port = server.server_address[1] - url = f"http://127.0.0.1:{actual_port}" - print(f"Serving at {url} (Ctrl+C to stop)") - - if not no_open: - webbrowser.open(url) - - try: - server.serve_forever() - except KeyboardInterrupt: - print("\nStopped.") - finally: - server.server_close() - - -def _discover_session_summaries(logs_dir: Path, limit_files: int, include_tests: bool) -> list[SessionSummary]: - """Discover only session summaries without loading full record data.""" - if not logs_dir.exists(): - raise FileNotFoundError(f"log directory not found: {logs_dir}") - - files = sorted( - [path for path in logs_dir.iterdir() if path.is_file() and path.suffix == ".jsonl"], - key=lambda path: path.stat().st_mtime, - reverse=True, - )[:limit_files] - - # Collect minimal info per session: just first/last records and metadata - by_session: dict[str, list[dict[str, Any]]] = defaultdict(list) - for path in files: - try: - with path.open(encoding="utf-8") as handle: - for raw_line in handle: - line = raw_line.strip() - if not line: - continue - try: - payload = json.loads(line) - except json.JSONDecodeError: - continue - execution_id = str(payload.get("execution_id") or "").strip() - if execution_id: - # Store minimal data for summary generation - minimal = { - "timestamp": payload.get("timestamp", ""), - "iteration": payload.get("iteration", 0), - "stream_id": payload.get("stream_id", ""), - "node_id": payload.get("node_id", ""), - "token_counts": payload.get("token_counts", {}), - "_log_file": str(path), - } - by_session[execution_id].append(minimal) - except OSError: - continue - - # Filter out test sessions if needed - if not include_tests: - by_session = {eid: recs for eid, recs in by_session.items() if not _is_test_session(eid, recs)} - - summaries: list[SessionSummary] = [] - for execution_id, session_records in by_session.items(): - session_records.sort( - key=lambda record: ( - str(record.get("timestamp", "")), - record.get("iteration", 0), - ) - ) - first = session_records[0] - last = session_records[-1] - summaries.append( - SessionSummary( - execution_id=execution_id, - log_file=str(first.get("_log_file", "")), - start_timestamp=str(first.get("timestamp", "")), - end_timestamp=str(last.get("timestamp", "")), - turn_count=len(session_records), - streams=sorted({str(r.get("stream_id", "")) for r in session_records if r.get("stream_id")}), - nodes=sorted({str(r.get("node_id", "")) for r in session_records if r.get("node_id")}), - models=sorted( - { - str(r.get("token_counts", {}).get("model", "")) - for r in session_records - if isinstance(r.get("token_counts"), dict) and r.get("token_counts", {}).get("model") - } - ), - ) - ) - - summaries.sort(key=lambda summary: summary.start_timestamp, reverse=True) - return summaries - - -def main() -> int: - args = _parse_args() - logs_dir = args.logs_dir.expanduser() - - # Only discover summaries, not full session data - summaries = _discover_session_summaries(logs_dir, args.limit_files, args.include_tests) - - initial_session_id = args.session or (summaries[0].execution_id if summaries else "") - if initial_session_id and not any(s.execution_id == initial_session_id for s in summaries): - print(f"session not found: {initial_session_id}") - return 1 - - html_report = _render_html(summaries, initial_session_id) - - if args.output: - args.output.parent.mkdir(parents=True, exist_ok=True) - args.output.write_text(html_report, encoding="utf-8") - print(args.output) - return 0 - - _run_server(html_report, logs_dir, args.limit_files, args.port, args.no_open) - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/scripts/llm_timeline_viewer.py b/scripts/llm_timeline_viewer.py index 49db4ce4..a1e4cd30 100644 --- a/scripts/llm_timeline_viewer.py +++ b/scripts/llm_timeline_viewer.py @@ -64,6 +64,54 @@ def _format_timestamp(raw: str) -> str: return raw +def _reassemble_records(records: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Convert new-format (header + turn-delta) records into legacy-shape full turns. + + Records lacking ``_kind`` are passed through unchanged. Inputs must be in + file order so headers precede the turns that reference them. + """ + headers: dict[str, dict[str, Any]] = {} # execution_id -> latest session_header + pools: dict[str, dict[str, dict[str, Any]]] = {} # execution_id -> hash -> message body + + out: list[dict[str, Any]] = [] + for rec in records: + kind = rec.get("_kind") + if kind == "session_header": + eid = str(rec.get("execution_id") or "") + headers[eid] = rec + pools.setdefault(eid, {}) + continue + if kind == "turn": + eid = str(rec.get("execution_id") or "") + pool = pools.setdefault(eid, {}) + new_msgs = rec.get("new_messages") or {} + if isinstance(new_msgs, dict): + pool.update(new_msgs) + hashes = rec.get("message_hashes") or [] + messages = [pool[h] for h in hashes if h in pool] + header = headers.get(eid, {}) + out.append( + { + "timestamp": rec.get("timestamp", ""), + "execution_id": eid, + "node_id": rec.get("node_id", ""), + "stream_id": rec.get("stream_id", ""), + "iteration": rec.get("iteration", 0), + "system_prompt": header.get("system_prompt", ""), + "tools": header.get("tools", []), + "messages": messages, + "assistant_text": rec.get("assistant_text", ""), + "tool_calls": rec.get("tool_calls", []), + "tool_results": rec.get("tool_results", []), + "token_counts": rec.get("token_counts", {}), + "_log_file": rec.get("_log_file", ""), + } + ) + continue + out.append(rec) + return out + + def _is_test_session(execution_id: str, records: list[dict[str, Any]]) -> bool: if execution_id.startswith(" lis records: list[dict[str, Any]] = [] for path in files: + # Reassemble per-file: each file is self-contained because the writer + # re-emits the session_header on every process start, so we never need + # cross-file state to fill in messages/system_prompt/tools. + file_records: list[dict[str, Any]] = [] try: with path.open(encoding="utf-8") as handle: for line_number, raw_line in enumerate(handle, start=1): @@ -166,17 +221,23 @@ def _load_session_data(logs_dir: Path, session_id: str, limit_files: int) -> lis try: payload = json.loads(line) except json.JSONDecodeError: - payload = { - "timestamp": "", - "execution_id": "", - "_parse_error": f"{path.name}:{line_number}", - "_raw_line": line, - } - if str(payload.get("execution_id") or "").strip() == session_id: - payload["_log_file"] = str(path) - records.append(payload) + records.append( + { + "timestamp": "", + "execution_id": "", + "_parse_error": f"{path.name}:{line_number}", + "_raw_line": line, + "_log_file": str(path), + } + ) + continue + if str(payload.get("execution_id") or "").strip() != session_id: + continue + payload["_log_file"] = str(path) + file_records.append(payload) except OSError: continue + records.extend(_reassemble_records(file_records)) if not records: return None