feat: runtime log tools

This commit is contained in:
Timothy
2026-02-06 12:37:18 -08:00
parent d68783a612
commit a73239dd98
2 changed files with 136 additions and 37 deletions
+77 -22
View File
@@ -1,7 +1,10 @@
"""
File-based storage backend for runtime data.
Stores runs as JSON files with indexes for efficient querying.
DEPRECATED: This storage backend is deprecated for new sessions.
New sessions use unified storage at sessions/{session_id}/state.json.
This module is kept for backward compatibility with old run data only.
Uses Pydantic's built-in serialization.
"""
@@ -14,21 +17,24 @@ from framework.utils.io import atomic_write
class FileStorage:
"""
Simple file-based storage for runs.
DEPRECATED: File-based storage for old runs only.
Directory structure:
New sessions use unified storage at sessions/{session_id}/state.json.
This class is kept for backward compatibility with old run data.
Old directory structure (deprecated):
{base_path}/
runs/
{run_id}.json # Full run data
indexes/
runs/ # DEPRECATED - no longer written
{run_id}.json
summaries/ # DEPRECATED - no longer written
{run_id}.json
indexes/ # DEPRECATED - no longer written or read
by_goal/
{goal_id}.json # List of run IDs for this goal
{goal_id}.json
by_status/
{status}.json # List of run IDs with this status
{status}.json
by_node/
{node_id}.json # List of run IDs that used this node
summaries/
{run_id}.json # Run summary (for quick loading)
{node_id}.json
"""
def __init__(self, base_path: str | Path):
@@ -38,14 +44,13 @@ class FileStorage:
def _ensure_dirs(self) -> None:
"""Create directory structure if it doesn't exist.
NOTE: Only creates indexes directory now. The runs/ and summaries/
directories are deprecated and no longer used for new sessions.
NOTE: All directories (runs/, summaries/, indexes/) are deprecated.
New sessions use unified storage at sessions/{session_id}/state.json.
This method is kept for backward compatibility but creates no directories.
"""
# Only create indexes directory (still used for backward compatibility)
indexes_dir = self.base_path / "indexes"
indexes_dir.mkdir(parents=True, exist_ok=True)
# Do NOT create runs/ and summaries/ - these are deprecated
# Do NOT create any directories - all are deprecated
# - runs/ and summaries/ deprecated in Phase 3
# - indexes/ deprecated - scan sessions/*/state.json instead
def _validate_key(self, key: str) -> None:
"""
@@ -147,17 +152,53 @@ class FileStorage:
# === QUERY OPERATIONS ===
def get_runs_by_goal(self, goal_id: str) -> list[str]:
"""Get all run IDs for a goal."""
"""Get all run IDs for a goal.
DEPRECATED: Indexes are deprecated. For new sessions, scan sessions/*/state.json instead.
This method only returns old run IDs from deprecated indexes.
"""
import warnings
warnings.warn(
"FileStorage.get_runs_by_goal() is deprecated. "
"For new sessions, scan sessions/*/state.json instead.",
DeprecationWarning,
stacklevel=2,
)
return self._get_index("by_goal", goal_id)
def get_runs_by_status(self, status: str | RunStatus) -> list[str]:
"""Get all run IDs with a status."""
"""Get all run IDs with a status.
DEPRECATED: Indexes are deprecated. For new sessions, scan sessions/*/state.json instead.
This method only returns old run IDs from deprecated indexes.
"""
import warnings
warnings.warn(
"FileStorage.get_runs_by_status() is deprecated. "
"For new sessions, scan sessions/*/state.json instead.",
DeprecationWarning,
stacklevel=2,
)
if isinstance(status, RunStatus):
status = status.value
return self._get_index("by_status", status)
def get_runs_by_node(self, node_id: str) -> list[str]:
"""Get all run IDs that executed a node."""
"""Get all run IDs that executed a node.
DEPRECATED: Indexes are deprecated. For new sessions, scan sessions/*/state.json instead.
This method only returns old run IDs from deprecated indexes.
"""
import warnings
warnings.warn(
"FileStorage.get_runs_by_node() is deprecated. "
"For new sessions, scan sessions/*/state.json instead.",
DeprecationWarning,
stacklevel=2,
)
return self._get_index("by_node", node_id)
def list_all_runs(self) -> list[str]:
@@ -166,8 +207,22 @@ class FileStorage:
return [f.stem for f in runs_dir.glob("*.json")]
def list_all_goals(self) -> list[str]:
"""List all goal IDs that have runs."""
"""List all goal IDs that have runs.
DEPRECATED: Indexes are deprecated. For new sessions, scan sessions/*/state.json instead.
This method only returns goals from old run IDs in deprecated indexes.
"""
import warnings
warnings.warn(
"FileStorage.list_all_goals() is deprecated. "
"For new sessions, scan sessions/*/state.json instead.",
DeprecationWarning,
stacklevel=2,
)
goals_dir = self.base_path / "indexes" / "by_goal"
if not goals_dir.exists():
return []
return [f.stem for f in goals_dir.glob("*.json")]
# === INDEX OPERATIONS ===
@@ -15,7 +15,6 @@ from __future__ import annotations
import json
import logging
import os
from pathlib import Path
from fastmcp import FastMCP
@@ -47,6 +46,36 @@ def _read_jsonl(path: Path) -> list[dict]:
return results
def _get_run_dirs(agent_work_dir: Path) -> list[tuple[str, Path]]:
"""Scan both old and new storage locations for run directories.
Returns list of (run_id, log_dir_path) tuples.
Scans:
- New: {agent_work_dir}/sessions/{session_id}/logs/
- Old: {agent_work_dir}/runtime_logs/runs/{run_id}/ (deprecated)
"""
run_dirs = []
# Scan new location: sessions/{session_id}/logs/
sessions_dir = agent_work_dir / "sessions"
if sessions_dir.exists():
for session_dir in sessions_dir.iterdir():
if session_dir.is_dir() and session_dir.name.startswith("session_"):
logs_dir = session_dir / "logs"
if logs_dir.exists() and logs_dir.is_dir():
run_dirs.append((session_dir.name, logs_dir))
# Scan old location: runtime_logs/runs/ (deprecated)
old_runs_dir = agent_work_dir / "runtime_logs" / "runs"
if old_runs_dir.exists():
for run_dir in old_runs_dir.iterdir():
if run_dir.is_dir():
run_dirs.append((run_dir.name, run_dir))
return run_dirs
def register_tools(mcp: FastMCP) -> None:
"""Register runtime log query tools with the MCP server."""
@@ -58,6 +87,7 @@ def register_tools(mcp: FastMCP) -> None:
) -> dict:
"""Query runtime log summaries. Returns high-level pass/fail for recent graph runs.
Scans both old (runtime_logs/runs/) and new (sessions/*/logs/) locations.
Use status='needs_attention' to find runs that need debugging.
Other status values: 'success', 'failure', 'degraded', 'in_progress'.
Leave status empty to see all runs.
@@ -70,21 +100,15 @@ def register_tools(mcp: FastMCP) -> None:
Returns:
Dict with 'runs' list of summary objects and 'total' count
"""
runs_dir = Path(agent_work_dir) / "runtime_logs" / "runs"
if not runs_dir.exists():
work_dir = Path(agent_work_dir)
run_dirs = _get_run_dirs(work_dir)
if not run_dirs:
return {"runs": [], "total": 0, "message": "No runtime logs found"}
summaries = []
try:
entries = os.listdir(runs_dir)
except OSError:
return {"runs": [], "total": 0, "error": "Cannot read runs directory"}
for run_id in entries:
run_dir = runs_dir / run_id
if not run_dir.is_dir():
continue
summary_path = run_dir / "summary.json"
for run_id, log_dir in run_dirs:
summary_path = log_dir / "summary.json"
if summary_path.exists():
try:
data = json.loads(summary_path.read_text(encoding="utf-8"))
@@ -128,6 +152,8 @@ def register_tools(mcp: FastMCP) -> None:
and attention flags. Use after query_runtime_logs identifies
a run to investigate.
Supports both old (runtime_logs/runs/) and new (sessions/*/logs/) locations.
Args:
agent_work_dir: Path to the agent's working directory
run_id: The run ID from query_runtime_logs results
@@ -137,7 +163,15 @@ def register_tools(mcp: FastMCP) -> None:
Returns:
Dict with run_id and nodes list of per-node details
"""
details_path = Path(agent_work_dir) / "runtime_logs" / "runs" / run_id / "details.jsonl"
work_dir = Path(agent_work_dir)
# Try new location first: sessions/{session_id}/logs/
if run_id.startswith("session_"):
details_path = work_dir / "sessions" / run_id / "logs" / "details.jsonl"
else:
# Old location: runtime_logs/runs/{run_id}/
details_path = work_dir / "runtime_logs" / "runs" / run_id / "details.jsonl"
if not details_path.exists():
return {"error": f"No details found for run {run_id}"}
@@ -164,6 +198,8 @@ def register_tools(mcp: FastMCP) -> None:
query_runtime_log_details. Returns tool inputs/outputs,
LLM text, and token counts per step.
Supports both old (runtime_logs/runs/) and new (sessions/*/logs/) locations.
Args:
agent_work_dir: Path to the agent's working directory
run_id: The run ID from query_runtime_logs results
@@ -173,7 +209,15 @@ def register_tools(mcp: FastMCP) -> None:
Returns:
Dict with run_id and steps list of tool/LLM details
"""
tool_logs_path = Path(agent_work_dir) / "runtime_logs" / "runs" / run_id / "tool_logs.jsonl"
work_dir = Path(agent_work_dir)
# Try new location first: sessions/{session_id}/logs/
if run_id.startswith("session_"):
tool_logs_path = work_dir / "sessions" / run_id / "logs" / "tool_logs.jsonl"
else:
# Old location: runtime_logs/runs/{run_id}/
tool_logs_path = work_dir / "runtime_logs" / "runs" / run_id / "tool_logs.jsonl"
if not tool_logs_path.exists():
return {"error": f"No tool logs found for run {run_id}"}