Compare commits
1 Commits
main
...
feature/profiler
| Author | SHA1 | Date | |
|---|---|---|---|
| 9cdc4f2006 |
@@ -14,10 +14,13 @@ from framework.observability.logging import (
|
||||
get_trace_context,
|
||||
set_trace_context,
|
||||
)
|
||||
from framework.observability.profiling import get_profile_snapshot, start_debug_profiling
|
||||
|
||||
__all__ = [
|
||||
"configure_logging",
|
||||
"get_trace_context",
|
||||
"set_trace_context",
|
||||
"clear_trace_context",
|
||||
"start_debug_profiling",
|
||||
"get_profile_snapshot",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,280 @@
|
||||
"""
|
||||
Event loop profiling for debug mode.
|
||||
|
||||
Activated by ``hive open --debug`` (or ``hive serve --debug``).
|
||||
|
||||
What this provides
|
||||
------------------
|
||||
- **Event loop lag sampling** — a background task wakes every 100 ms and
|
||||
measures how late it actually woke. Lag > 50 ms means something blocked
|
||||
the loop (sync file I/O, CPU-bound work, etc.).
|
||||
- **asyncio slow-callback capture** — when asyncio debug mode is on the
|
||||
runtime logs every callback that held the loop for > ``slow_callback_duration``
|
||||
seconds. We intercept those log records and store them in a ring buffer so
|
||||
the ``/api/debug/profile`` endpoint can surface them.
|
||||
- **asyncio debug mode** — ``loop.set_debug(True)`` enables asyncio's own
|
||||
built-in coroutine-origin tracking and slow-callback warnings.
|
||||
|
||||
Usage (framework-internal)
|
||||
--------------------------
|
||||
Inside an async context (after ``asyncio.run()`` enters):
|
||||
|
||||
from framework.observability.profiling import start_debug_profiling
|
||||
monitor = await start_debug_profiling()
|
||||
...
|
||||
# on shutdown:
|
||||
await monitor.stop()
|
||||
monitor.print_summary()
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import logging
|
||||
import time
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Ring-buffer sizes
|
||||
_MAX_LAG_SAMPLES = 600 # 60 s at 100 ms interval
|
||||
_MAX_SLOW_CALLBACKS = 100
|
||||
|
||||
# Thresholds
|
||||
_LAG_WARN_THRESHOLD_MS = 50.0 # log a warning when loop lag exceeds this
|
||||
_SAMPLE_INTERVAL_S = 0.1 # how often to probe the event loop (100 ms)
|
||||
|
||||
|
||||
class _SlowCallbackCapture(logging.Handler):
|
||||
"""Intercepts asyncio's built-in slow-callback warnings.
|
||||
|
||||
asyncio emits them at WARNING level from the ``asyncio`` logger when
|
||||
``loop.set_debug(True)`` is active and a callback holds the loop for
|
||||
longer than ``loop.slow_callback_duration`` seconds.
|
||||
|
||||
Each captured entry is tagged ``blocking=True`` only if the lag sampler
|
||||
measured a spike at the same time — which is the definitive signal that
|
||||
the loop was actually held rather than the task being slow async I/O.
|
||||
"""
|
||||
|
||||
def __init__(self, monitor: EventLoopMonitor) -> None:
|
||||
super().__init__(level=logging.WARNING)
|
||||
self._monitor = monitor
|
||||
|
||||
def emit(self, record: logging.LogRecord) -> None:
|
||||
msg = record.getMessage()
|
||||
# asyncio formats these as: "Executing <...> took X.XXX seconds"
|
||||
if "took" in msg and "second" in msg:
|
||||
# Check whether the lag sampler saw a spike in the past two probe
|
||||
# intervals. If yes this is a real blocking call; if no the task
|
||||
# was just slow async I/O that properly yielded the loop.
|
||||
now = time.monotonic()
|
||||
window = _SAMPLE_INTERVAL_S * 2
|
||||
recent_lag = max(
|
||||
(lag for ts, lag in self._monitor._lag_spikes if now - ts <= window),
|
||||
default=0.0,
|
||||
)
|
||||
self._monitor._record_slow_callback(
|
||||
{
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"message": msg,
|
||||
"source": "asyncio",
|
||||
"lag_during_ms": round(recent_lag, 2),
|
||||
# True only when lag also spiked → real event loop block
|
||||
"blocking": recent_lag > _LAG_WARN_THRESHOLD_MS,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class EventLoopMonitor:
|
||||
"""Measures event-loop lag by scheduling a repeating probe task.
|
||||
|
||||
The probe sleeps for ``_SAMPLE_INTERVAL_S`` seconds. The *actual* sleep
|
||||
duration minus the *expected* duration gives the lag introduced by any
|
||||
blocking work that ran between two consecutive iterations of the loop.
|
||||
|
||||
All stats are available via :meth:`snapshot` and are exposed through the
|
||||
``/api/debug/profile`` HTTP endpoint.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._samples: list[float] = [] # lag in ms, newest last
|
||||
self._lag_spikes: list[tuple[float, float]] = [] # (monotonic_ts, lag_ms)
|
||||
self._slow_callbacks: list[dict[str, Any]] = [] # ring buffer
|
||||
self._start_time: float = 0.0
|
||||
self._task: asyncio.Task | None = None
|
||||
self._log_handler: _SlowCallbackCapture | None = None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Lifecycle
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the background probe and attach the slow-callback log handler."""
|
||||
self._start_time = time.monotonic()
|
||||
|
||||
# Attach handler to asyncio logger to capture slow-callback warnings
|
||||
self._log_handler = _SlowCallbackCapture(self)
|
||||
asyncio_logger = logging.getLogger("asyncio")
|
||||
asyncio_logger.addHandler(self._log_handler)
|
||||
|
||||
self._task = asyncio.create_task(self._probe_loop(), name="hive-loop-monitor")
|
||||
logger.info(
|
||||
"Event loop monitor started — lag threshold %.0f ms, sample interval %.0f ms",
|
||||
_LAG_WARN_THRESHOLD_MS,
|
||||
_SAMPLE_INTERVAL_S * 1000,
|
||||
)
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Cancel the probe task and detach the log handler."""
|
||||
if self._task and not self._task.done():
|
||||
self._task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await self._task
|
||||
|
||||
if self._log_handler is not None:
|
||||
asyncio_logger = logging.getLogger("asyncio")
|
||||
asyncio_logger.removeHandler(self._log_handler)
|
||||
self._log_handler = None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal probe
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _probe_loop(self) -> None:
|
||||
loop = asyncio.get_running_loop()
|
||||
while True:
|
||||
expected_wake = loop.time() + _SAMPLE_INTERVAL_S
|
||||
await asyncio.sleep(_SAMPLE_INTERVAL_S)
|
||||
actual_wake = loop.time()
|
||||
lag_ms = max(0.0, (actual_wake - expected_wake) * 1000)
|
||||
|
||||
self._samples.append(lag_ms)
|
||||
if len(self._samples) > _MAX_LAG_SAMPLES:
|
||||
self._samples = self._samples[-_MAX_LAG_SAMPLES:]
|
||||
|
||||
if lag_ms > _LAG_WARN_THRESHOLD_MS:
|
||||
self._lag_spikes.append((time.monotonic(), lag_ms))
|
||||
if len(self._lag_spikes) > 200:
|
||||
self._lag_spikes = self._lag_spikes[-200:]
|
||||
logger.warning(
|
||||
"Event loop lag %.1f ms — possible blocking call on main thread",
|
||||
lag_ms,
|
||||
extra={"event": "loop_lag", "lag_ms": round(lag_ms, 1)},
|
||||
)
|
||||
|
||||
def _record_slow_callback(self, entry: dict[str, Any]) -> None:
|
||||
self._slow_callbacks.append(entry)
|
||||
if len(self._slow_callbacks) > _MAX_SLOW_CALLBACKS:
|
||||
self._slow_callbacks = self._slow_callbacks[-_MAX_SLOW_CALLBACKS:]
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Stats
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def snapshot(self) -> dict[str, Any]:
|
||||
"""Return current profiling stats as a plain dict (JSON-serialisable)."""
|
||||
samples = list(self._samples)
|
||||
uptime = round(time.monotonic() - self._start_time, 1) if self._start_time else 0.0
|
||||
|
||||
if not samples:
|
||||
return {
|
||||
"enabled": True,
|
||||
"uptime_seconds": uptime,
|
||||
"samples": 0,
|
||||
"slow_callbacks": list(self._slow_callbacks),
|
||||
}
|
||||
|
||||
sorted_s = sorted(samples)
|
||||
n = len(sorted_s)
|
||||
|
||||
def pct(p: float) -> float:
|
||||
return round(sorted_s[min(int(n * p), n - 1)], 2)
|
||||
|
||||
over = sum(1 for s in samples if s > _LAG_WARN_THRESHOLD_MS)
|
||||
|
||||
callbacks = list(self._slow_callbacks)
|
||||
return {
|
||||
"enabled": True,
|
||||
"uptime_seconds": uptime,
|
||||
"samples": n,
|
||||
"mean_lag_ms": round(sum(samples) / n, 2),
|
||||
"p50_lag_ms": pct(0.50),
|
||||
"p95_lag_ms": pct(0.95),
|
||||
"p99_lag_ms": pct(0.99),
|
||||
"worst_lag_ms": round(max(samples), 2),
|
||||
"slow_samples": over,
|
||||
"slow_sample_pct": round(over / n * 100, 1),
|
||||
"slow_threshold_ms": _LAG_WARN_THRESHOLD_MS,
|
||||
# All asyncio slow-task warnings captured
|
||||
"slow_callbacks": callbacks,
|
||||
# Subset where lag also spiked → real event loop blocking
|
||||
"blocking_callbacks": [cb for cb in callbacks if cb.get("blocking")],
|
||||
# Subset that are just slow async I/O — loop was free the whole time
|
||||
"benign_slow_tasks": [cb for cb in callbacks if not cb.get("blocking")],
|
||||
}
|
||||
|
||||
def print_summary(self) -> None:
|
||||
"""Print a human-readable summary to stdout (called on server shutdown)."""
|
||||
snap = self.snapshot()
|
||||
if snap["samples"] == 0:
|
||||
print("[profiler] No samples collected.")
|
||||
return
|
||||
|
||||
print("\n[profiler] Event loop summary")
|
||||
print(f" Uptime: {snap['uptime_seconds']}s")
|
||||
print(f" Samples: {snap['samples']}")
|
||||
print(f" Mean lag: {snap['mean_lag_ms']} ms")
|
||||
p50, p95, p99 = snap["p50_lag_ms"], snap["p95_lag_ms"], snap["p99_lag_ms"]
|
||||
print(f" p50 / p95 / p99: {p50} / {p95} / {p99} ms")
|
||||
print(f" Worst lag: {snap['worst_lag_ms']} ms")
|
||||
slow_pct = snap["slow_sample_pct"]
|
||||
thresh = snap["slow_threshold_ms"]
|
||||
print(f" Slow samples: {snap['slow_samples']} ({slow_pct}%) > {thresh} ms")
|
||||
blocking = snap.get("blocking_callbacks", [])
|
||||
benign = snap.get("benign_slow_tasks", [])
|
||||
if blocking:
|
||||
print(f" BLOCKING callbacks: {len(blocking)} ← investigate these")
|
||||
for cb in blocking[-5:]:
|
||||
lag = cb.get("lag_during_ms", "?")
|
||||
print(f" [lag {lag}ms] {cb['timestamp']} {cb['message'][:120]}")
|
||||
if benign:
|
||||
print(
|
||||
f" Slow async tasks (benign): {len(benign)}"
|
||||
" — loop was free, just slow I/O"
|
||||
)
|
||||
print()
|
||||
|
||||
|
||||
# Module-level singleton so routes can access it without passing it around.
|
||||
_monitor: EventLoopMonitor | None = None
|
||||
|
||||
|
||||
async def start_debug_profiling() -> EventLoopMonitor:
|
||||
"""Enable asyncio debug mode and start the event loop monitor.
|
||||
|
||||
Must be called from inside a running event loop (i.e. inside
|
||||
``asyncio.run()``). Returns the monitor so the caller can stop it and
|
||||
print a summary on shutdown.
|
||||
"""
|
||||
global _monitor
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
# asyncio built-in debug mode: enables coroutine-origin tracking and
|
||||
# logs any callback/coroutine that holds the loop for > slow_callback_duration.
|
||||
loop.set_debug(True)
|
||||
loop.slow_callback_duration = 0.1 # 100 ms — asyncio default; avoids noise
|
||||
|
||||
_monitor = EventLoopMonitor()
|
||||
await _monitor.start()
|
||||
return _monitor
|
||||
|
||||
|
||||
def get_profile_snapshot() -> dict[str, Any]:
|
||||
"""Return the current monitor snapshot, or a disabled sentinel if not running."""
|
||||
if _monitor is None:
|
||||
return {"enabled": False}
|
||||
return _monitor.snapshot()
|
||||
@@ -1644,6 +1644,13 @@ def cmd_serve(args: argparse.Namespace) -> int:
|
||||
async def run_server():
|
||||
manager = app["manager"]
|
||||
|
||||
# Start event loop profiling when --debug is active.
|
||||
_profiler = None
|
||||
if getattr(args, "debug", False):
|
||||
from framework.observability.profiling import start_debug_profiling
|
||||
|
||||
_profiler = await start_debug_profiling()
|
||||
|
||||
# Preload agents specified via --agent
|
||||
for agent_path in args.agent:
|
||||
try:
|
||||
@@ -1673,6 +1680,8 @@ def cmd_serve(args: argparse.Namespace) -> int:
|
||||
if has_frontend:
|
||||
print(f"Dashboard: {dashboard_url}")
|
||||
print(f"Health: {dashboard_url}/api/health")
|
||||
if _profiler is not None:
|
||||
print(f"Profiler: {dashboard_url}/api/debug/profile")
|
||||
print(f"Agents loaded: {sum(1 for s in manager.list_sessions() if s.worker_runtime)}")
|
||||
print()
|
||||
print("Press Ctrl+C to stop")
|
||||
@@ -1687,6 +1696,9 @@ def cmd_serve(args: argparse.Namespace) -> int:
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
if _profiler is not None:
|
||||
await _profiler.stop()
|
||||
_profiler.print_summary()
|
||||
await manager.shutdown_all()
|
||||
await runner.cleanup()
|
||||
|
||||
|
||||
@@ -249,6 +249,10 @@ def create_app(model: str | None = None) -> web.Application:
|
||||
register_graph_routes(app)
|
||||
register_log_routes(app)
|
||||
|
||||
from framework.server.routes_debug import register_routes as register_debug_routes
|
||||
|
||||
register_debug_routes(app)
|
||||
|
||||
# Static file serving — Option C production mode
|
||||
# If frontend/dist/ exists, serve built frontend files on /
|
||||
_setup_static_serving(app)
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
"""Debug/profiling routes — only meaningful when ``hive open --debug`` is active.
|
||||
|
||||
Routes
|
||||
------
|
||||
GET /api/debug/profile
|
||||
Returns the current event loop profiling snapshot as JSON.
|
||||
When profiling is not active (``--debug`` flag absent) the response is
|
||||
``{"enabled": false}``.
|
||||
|
||||
GET /api/debug/slow-callbacks
|
||||
Returns the ring buffer of slow callbacks captured from asyncio's
|
||||
built-in slow-callback logger. Empty list when not active.
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
from framework.observability.profiling import get_profile_snapshot
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def handle_profile(request: web.Request) -> web.Response:
|
||||
"""GET /api/debug/profile — current event loop profiling stats."""
|
||||
return web.json_response(get_profile_snapshot())
|
||||
|
||||
|
||||
async def handle_slow_callbacks(request: web.Request) -> web.Response:
|
||||
"""GET /api/debug/slow-callbacks — recent slow asyncio callbacks."""
|
||||
snap = get_profile_snapshot()
|
||||
return web.json_response(
|
||||
{
|
||||
"enabled": snap.get("enabled", False),
|
||||
"slow_callbacks": snap.get("slow_callbacks", []),
|
||||
"slow_threshold_ms": snap.get("slow_threshold_ms", 50),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def register_routes(app: web.Application) -> None:
|
||||
app.router.add_get("/api/debug/profile", handle_profile)
|
||||
app.router.add_get("/api/debug/slow-callbacks", handle_slow_callbacks)
|
||||
Reference in New Issue
Block a user