#!/usr/bin/env python3 """Timeline viewer for Hive LLM debug JSONL sessions. Sister script to ``llm_debug_log_visualizer.py``. Where that one renders turn-by-turn cards, this one renders a chronological event timeline so a developer can click any event (user input, tool use, tool result, assistant text) and inspect the *raw* request payload that was sent to the LLM at that moment — system prompt, full tool schemas, full messages array. Usage: uv run --no-project scripts/llm_timeline_viewer.py uv run --no-project scripts/llm_timeline_viewer.py --session uv run --no-project scripts/llm_timeline_viewer.py --port 8080 """ from __future__ import annotations import argparse import http.server import json import urllib.parse import webbrowser from collections import defaultdict from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any @dataclass class SessionSummary: execution_id: str log_file: str start_timestamp: str end_timestamp: str turn_count: int streams: list[str] nodes: list[str] models: list[str] def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--logs-dir", type=Path, default=Path.home() / ".hive" / "llm_logs", help="Directory containing Hive LLM debug JSONL files.", ) parser.add_argument("--session", help="Execution ID to select initially.") parser.add_argument("--limit-files", type=int, default=200) parser.add_argument("--port", type=int, default=0) parser.add_argument("--no-open", action="store_true") parser.add_argument("--include-tests", action="store_true") return parser.parse_args() def _format_timestamp(raw: str) -> str: if not raw: return "-" try: return datetime.fromisoformat(raw).strftime("%Y-%m-%d %H:%M:%S") except ValueError: return raw def _reassemble_records(records: list[dict[str, Any]]) -> list[dict[str, Any]]: """Convert new-format (header + turn-delta) records into legacy-shape full turns. Records lacking ``_kind`` are passed through unchanged. Inputs must be in file order so headers precede the turns that reference them. """ headers: dict[str, dict[str, Any]] = {} # execution_id -> latest session_header pools: dict[str, dict[str, dict[str, Any]]] = {} # execution_id -> hash -> message body out: list[dict[str, Any]] = [] for rec in records: kind = rec.get("_kind") if kind == "session_header": eid = str(rec.get("execution_id") or "") headers[eid] = rec pools.setdefault(eid, {}) continue if kind == "turn": eid = str(rec.get("execution_id") or "") pool = pools.setdefault(eid, {}) new_msgs = rec.get("new_messages") or {} if isinstance(new_msgs, dict): pool.update(new_msgs) hashes = rec.get("message_hashes") or [] messages = [pool[h] for h in hashes if h in pool] header = headers.get(eid, {}) out.append( { "timestamp": rec.get("timestamp", ""), "execution_id": eid, "node_id": rec.get("node_id", ""), "stream_id": rec.get("stream_id", ""), "iteration": rec.get("iteration", 0), "system_prompt": header.get("system_prompt", ""), "tools": header.get("tools", []), "messages": messages, "assistant_text": rec.get("assistant_text", ""), "tool_calls": rec.get("tool_calls", []), "tool_results": rec.get("tool_results", []), "token_counts": rec.get("token_counts", {}), "_log_file": rec.get("_log_file", ""), } ) continue out.append(rec) return out def _is_test_session(execution_id: str, records: list[dict[str, Any]]) -> bool: if execution_id.startswith(" list[SessionSummary]: if not logs_dir.exists(): raise FileNotFoundError(f"log directory not found: {logs_dir}") files = sorted( [p for p in logs_dir.iterdir() if p.is_file() and p.suffix == ".jsonl"], key=lambda p: p.stat().st_mtime, reverse=True, )[:limit_files] by_session: dict[str, list[dict[str, Any]]] = defaultdict(list) for path in files: try: with path.open(encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line: continue try: payload = json.loads(line) except json.JSONDecodeError: continue # session_header is metadata, not a turn — don't count it. if payload.get("_kind") == "session_header": continue eid = str(payload.get("execution_id") or "").strip() if not eid: continue minimal = { "timestamp": payload.get("timestamp", ""), "iteration": payload.get("iteration", 0), "stream_id": payload.get("stream_id", ""), "node_id": payload.get("node_id", ""), "token_counts": payload.get("token_counts", {}), "_log_file": str(path), } by_session[eid].append(minimal) except OSError: continue if not include_tests: by_session = {eid: recs for eid, recs in by_session.items() if not _is_test_session(eid, recs)} summaries: list[SessionSummary] = [] for eid, recs in by_session.items(): recs.sort(key=lambda r: (str(r.get("timestamp", "")), r.get("iteration", 0))) first, last = recs[0], recs[-1] summaries.append( SessionSummary( execution_id=eid, log_file=str(first.get("_log_file", "")), start_timestamp=str(first.get("timestamp", "")), end_timestamp=str(last.get("timestamp", "")), turn_count=len(recs), streams=sorted({str(r.get("stream_id", "")) for r in recs if r.get("stream_id")}), nodes=sorted({str(r.get("node_id", "")) for r in recs if r.get("node_id")}), models=sorted( { str(r.get("token_counts", {}).get("model", "")) for r in recs if isinstance(r.get("token_counts"), dict) and r.get("token_counts", {}).get("model") } ), ) ) summaries.sort(key=lambda s: s.start_timestamp, reverse=True) return summaries def _load_session_data(logs_dir: Path, session_id: str, limit_files: int) -> list[dict[str, Any]] | None: if not logs_dir.exists(): return None files = sorted( [p for p in logs_dir.iterdir() if p.is_file() and p.suffix == ".jsonl"], key=lambda p: p.stat().st_mtime, reverse=True, )[:limit_files] records: list[dict[str, Any]] = [] for path in files: # Reassemble per-file: each file is self-contained because the writer # re-emits the session_header on every process start, so we never need # cross-file state to fill in messages/system_prompt/tools. file_records: list[dict[str, Any]] = [] try: with path.open(encoding="utf-8") as handle: for line_number, raw_line in enumerate(handle, start=1): line = raw_line.strip() if not line: continue try: payload = json.loads(line) except json.JSONDecodeError: records.append( { "timestamp": "", "execution_id": "", "_parse_error": f"{path.name}:{line_number}", "_raw_line": line, "_log_file": str(path), } ) continue if str(payload.get("execution_id") or "").strip() != session_id: continue payload["_log_file"] = str(path) file_records.append(payload) except OSError: continue records.extend(_reassemble_records(file_records)) if not records: return None records.sort(key=lambda r: (str(r.get("timestamp", "")), r.get("iteration", 0))) return records def _render_html(summaries: list[SessionSummary], initial_session_id: str) -> str: summaries_data = [ { "execution_id": s.execution_id, "log_file": s.log_file, "start_timestamp": s.start_timestamp, "end_timestamp": s.end_timestamp, "start_display": _format_timestamp(s.start_timestamp), "end_display": _format_timestamp(s.end_timestamp), "turn_count": s.turn_count, "streams": s.streams, "nodes": s.nodes, "models": s.models, } for s in summaries ] initial = initial_session_id or (summaries[0].execution_id if summaries else "") return f""" Hive LLM Timeline

Sessions

Timeline

Raw context sent to LLM

Select an event on the timeline to view the raw request payload (system prompt, tool schemas, full messages array) for that LLM call.
""" def _run_server(html: str, logs_dir: Path, limit_files: int, port: int, no_open: bool) -> None: html_bytes = html.encode("utf-8") cache: dict[str, list[dict[str, Any]]] = {} class Handler(http.server.BaseHTTPRequestHandler): def do_GET(self) -> None: if self.path == "/": self._respond(200, "text/html; charset=utf-8", html_bytes) elif self.path.startswith("/api/session/"): sid = urllib.parse.unquote(self.path[len("/api/session/") :]) records = cache.get(sid) if records is None: records = _load_session_data(logs_dir, sid, limit_files) if records is not None: cache[sid] = records if records is None: self._respond(404, "application/json", b"[]") else: body = json.dumps(records, ensure_ascii=False).encode("utf-8") self._respond(200, "application/json", body) else: self.send_error(404) def _respond(self, code: int, content_type: str, body: bytes) -> None: self.send_response(code) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def log_message(self, format: str, *args: object) -> None: pass server = http.server.HTTPServer(("127.0.0.1", port), Handler) actual_port = server.server_address[1] url = f"http://127.0.0.1:{actual_port}" print(f"Serving timeline viewer at {url} (Ctrl+C to stop)") if not no_open: webbrowser.open(url) try: server.serve_forever() except KeyboardInterrupt: print("\nStopped.") finally: server.server_close() def main() -> int: args = _parse_args() logs_dir = args.logs_dir.expanduser() summaries = _discover_session_summaries(logs_dir, args.limit_files, args.include_tests) initial = args.session or (summaries[0].execution_id if summaries else "") if initial and not any(s.execution_id == initial for s in summaries): print(f"session not found: {initial}") return 1 html = _render_html(summaries, initial) _run_server(html, logs_dir, args.limit_files, args.port, args.no_open) return 0 if __name__ == "__main__": raise SystemExit(main())