diff --git a/core/framework/agent_loop/agent_loop.py b/core/framework/agent_loop/agent_loop.py index 745982d2..07e2742d 100644 --- a/core/framework/agent_loop/agent_loop.py +++ b/core/framework/agent_loop/agent_loop.py @@ -1040,6 +1040,7 @@ class AgentLoop(AgentProtocol): tool_calls=logged_tool_calls, tool_results=real_tool_results, token_counts=turn_tokens, + tools=tools, ) # DS-13: inject context preservation warning once when token usage diff --git a/core/framework/tracker/llm_debug_logger.py b/core/framework/tracker/llm_debug_logger.py index 0e1d05f1..b9fe95f4 100644 --- a/core/framework/tracker/llm_debug_logger.py +++ b/core/framework/tracker/llm_debug_logger.py @@ -9,7 +9,7 @@ write. Errors are silently swallowed — this must never break the agent. import json import logging import os -from datetime import datetime +from datetime import UTC, datetime from pathlib import Path from typing import IO, Any @@ -37,6 +37,28 @@ def _open_log() -> IO[str] | None: return open(path, "a", encoding="utf-8") # noqa: SIM115 +def _serialize_tools(tools: Any) -> list[dict[str, Any]]: + """Reduce a list of Tool dataclasses to the schema fields shown to the LLM. + + Best-effort: unknown shapes fall back to ``str()`` so logging never raises. + """ + if not tools: + return [] + out: list[dict[str, Any]] = [] + for tool in tools: + try: + out.append( + { + "name": getattr(tool, "name", ""), + "description": getattr(tool, "description", ""), + "parameters": getattr(tool, "parameters", {}) or {}, + } + ) + except Exception: + out.append({"name": str(tool)}) + return out + + def log_llm_turn( *, node_id: str, @@ -49,6 +71,7 @@ def log_llm_turn( tool_calls: list[dict[str, Any]], tool_results: list[dict[str, Any]], token_counts: dict[str, Any], + tools: list[Any] | None = None, ) -> None: """Write one JSONL line capturing a complete LLM turn. @@ -65,12 +88,15 @@ def log_llm_turn( if _log_file is None: return record = { - "timestamp": datetime.now().isoformat(), + # UTC + offset matches tool_call start_timestamp (agent_loop.py) + # so the viewer can render every event in one consistent local zone. + "timestamp": datetime.now(UTC).isoformat(), "node_id": node_id, "stream_id": stream_id, "execution_id": execution_id, "iteration": iteration, "system_prompt": system_prompt, + "tools": _serialize_tools(tools), "messages": messages, "assistant_text": assistant_text, "tool_calls": tool_calls, diff --git a/scripts/llm_timeline_viewer.py b/scripts/llm_timeline_viewer.py new file mode 100644 index 00000000..fa1c200b --- /dev/null +++ b/scripts/llm_timeline_viewer.py @@ -0,0 +1,830 @@ +#!/usr/bin/env python3 +"""Timeline viewer for Hive LLM debug JSONL sessions. + +Sister script to ``llm_debug_log_visualizer.py``. Where that one renders +turn-by-turn cards, this one renders a chronological event timeline so a +developer can click any event (user input, tool use, tool result, assistant +text) and inspect the *raw* request payload that was sent to the LLM at that +moment — system prompt, full tool schemas, full messages array. + +Usage: + uv run --no-project scripts/llm_timeline_viewer.py + uv run --no-project scripts/llm_timeline_viewer.py --session + uv run --no-project scripts/llm_timeline_viewer.py --port 8080 +""" + +from __future__ import annotations + +import argparse +import http.server +import json +import urllib.parse +import webbrowser +from collections import defaultdict +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any + + +@dataclass +class SessionSummary: + execution_id: str + log_file: str + start_timestamp: str + end_timestamp: str + turn_count: int + streams: list[str] + nodes: list[str] + models: list[str] + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--logs-dir", + type=Path, + default=Path.home() / ".hive" / "llm_logs", + help="Directory containing Hive LLM debug JSONL files.", + ) + parser.add_argument("--session", help="Execution ID to select initially.") + parser.add_argument("--limit-files", type=int, default=200) + parser.add_argument("--port", type=int, default=0) + parser.add_argument("--no-open", action="store_true") + parser.add_argument("--include-tests", action="store_true") + return parser.parse_args() + + +def _format_timestamp(raw: str) -> str: + if not raw: + return "-" + try: + return datetime.fromisoformat(raw).strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + return raw + + +def _is_test_session(execution_id: str, records: list[dict[str, Any]]) -> bool: + if execution_id.startswith(" list[SessionSummary]: + if not logs_dir.exists(): + raise FileNotFoundError(f"log directory not found: {logs_dir}") + + files = sorted( + [p for p in logs_dir.iterdir() if p.is_file() and p.suffix == ".jsonl"], + key=lambda p: p.stat().st_mtime, + reverse=True, + )[:limit_files] + + by_session: dict[str, list[dict[str, Any]]] = defaultdict(list) + for path in files: + try: + with path.open(encoding="utf-8") as handle: + for raw_line in handle: + line = raw_line.strip() + if not line: + continue + try: + payload = json.loads(line) + except json.JSONDecodeError: + continue + eid = str(payload.get("execution_id") or "").strip() + if not eid: + continue + minimal = { + "timestamp": payload.get("timestamp", ""), + "iteration": payload.get("iteration", 0), + "stream_id": payload.get("stream_id", ""), + "node_id": payload.get("node_id", ""), + "token_counts": payload.get("token_counts", {}), + "_log_file": str(path), + } + by_session[eid].append(minimal) + except OSError: + continue + + if not include_tests: + by_session = { + eid: recs for eid, recs in by_session.items() if not _is_test_session(eid, recs) + } + + summaries: list[SessionSummary] = [] + for eid, recs in by_session.items(): + recs.sort(key=lambda r: (str(r.get("timestamp", "")), r.get("iteration", 0))) + first, last = recs[0], recs[-1] + summaries.append( + SessionSummary( + execution_id=eid, + log_file=str(first.get("_log_file", "")), + start_timestamp=str(first.get("timestamp", "")), + end_timestamp=str(last.get("timestamp", "")), + turn_count=len(recs), + streams=sorted({str(r.get("stream_id", "")) for r in recs if r.get("stream_id")}), + nodes=sorted({str(r.get("node_id", "")) for r in recs if r.get("node_id")}), + models=sorted( + { + str(r.get("token_counts", {}).get("model", "")) + for r in recs + if isinstance(r.get("token_counts"), dict) + and r.get("token_counts", {}).get("model") + } + ), + ) + ) + + summaries.sort(key=lambda s: s.start_timestamp, reverse=True) + return summaries + + +def _load_session_data( + logs_dir: Path, session_id: str, limit_files: int +) -> list[dict[str, Any]] | None: + if not logs_dir.exists(): + return None + + files = sorted( + [p for p in logs_dir.iterdir() if p.is_file() and p.suffix == ".jsonl"], + key=lambda p: p.stat().st_mtime, + reverse=True, + )[:limit_files] + + records: list[dict[str, Any]] = [] + for path in files: + try: + with path.open(encoding="utf-8") as handle: + for line_number, raw_line in enumerate(handle, start=1): + line = raw_line.strip() + if not line: + continue + try: + payload = json.loads(line) + except json.JSONDecodeError: + payload = { + "timestamp": "", + "execution_id": "", + "_parse_error": f"{path.name}:{line_number}", + "_raw_line": line, + } + if str(payload.get("execution_id") or "").strip() == session_id: + payload["_log_file"] = str(path) + records.append(payload) + except OSError: + continue + + if not records: + return None + records.sort(key=lambda r: (str(r.get("timestamp", "")), r.get("iteration", 0))) + return records + + +def _render_html(summaries: list[SessionSummary], initial_session_id: str) -> str: + summaries_data = [ + { + "execution_id": s.execution_id, + "log_file": s.log_file, + "start_timestamp": s.start_timestamp, + "end_timestamp": s.end_timestamp, + "start_display": _format_timestamp(s.start_timestamp), + "end_display": _format_timestamp(s.end_timestamp), + "turn_count": s.turn_count, + "streams": s.streams, + "nodes": s.nodes, + "models": s.models, + } + for s in summaries + ] + initial = initial_session_id or (summaries[0].execution_id if summaries else "") + return f""" + + + + + Hive LLM Timeline + + + +
+
+
+

Sessions

+ +
+
+
+
+
+

Timeline

+ +
+
+
+
+

Raw context sent to LLM

+
+
Select an event on the timeline to view the raw request payload (system prompt, tool schemas, full messages array) for that LLM call.
+
+
+
+ + + + + +""" + + +def _run_server(html: str, logs_dir: Path, limit_files: int, port: int, no_open: bool) -> None: + html_bytes = html.encode("utf-8") + cache: dict[str, list[dict[str, Any]]] = {} + + class Handler(http.server.BaseHTTPRequestHandler): + def do_GET(self) -> None: + if self.path == "/": + self._respond(200, "text/html; charset=utf-8", html_bytes) + elif self.path.startswith("/api/session/"): + sid = urllib.parse.unquote(self.path[len("/api/session/") :]) + records = cache.get(sid) + if records is None: + records = _load_session_data(logs_dir, sid, limit_files) + if records is not None: + cache[sid] = records + if records is None: + self._respond(404, "application/json", b"[]") + else: + body = json.dumps(records, ensure_ascii=False).encode("utf-8") + self._respond(200, "application/json", body) + else: + self.send_error(404) + + def _respond(self, code: int, content_type: str, body: bytes) -> None: + self.send_response(code) + self.send_header("Content-Type", content_type) + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format: str, *args: object) -> None: + pass + + server = http.server.HTTPServer(("127.0.0.1", port), Handler) + actual_port = server.server_address[1] + url = f"http://127.0.0.1:{actual_port}" + print(f"Serving timeline viewer at {url} (Ctrl+C to stop)") + if not no_open: + webbrowser.open(url) + try: + server.serve_forever() + except KeyboardInterrupt: + print("\nStopped.") + finally: + server.server_close() + + +def main() -> int: + args = _parse_args() + logs_dir = args.logs_dir.expanduser() + summaries = _discover_session_summaries(logs_dir, args.limit_files, args.include_tests) + initial = args.session or (summaries[0].execution_id if summaries else "") + if initial and not any(s.execution_id == initial for s in summaries): + print(f"session not found: {initial}") + return 1 + html = _render_html(summaries, initial) + _run_server(html, logs_dir, args.limit_files, args.port, args.no_open) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())