refactor: unify storage layer for agent runtime

This commit is contained in:
Timothy
2026-02-06 12:20:46 -08:00
parent a28ea40a7d
commit d68783a612
9 changed files with 929 additions and 60 deletions
+6 -1
View File
@@ -288,7 +288,12 @@ class GraphExecutor:
)
if self.runtime_logger:
self.runtime_logger.start_run(goal_id=goal.id)
# Extract session_id from storage_path if available (for unified sessions)
# storage_path format: base_path/sessions/{session_id}/
session_id = ""
if self._storage_path and self._storage_path.name.startswith("session_"):
session_id = self._storage_path.name
self.runtime_logger.start_run(goal_id=goal.id, session_id=session_id)
self.logger.info(f"🚀 Starting execution: {goal.name}")
self.logger.info(f" Goal: {goal.description}")
+7 -1
View File
@@ -18,6 +18,7 @@ from framework.runtime.execution_stream import EntryPointSpec, ExecutionStream
from framework.runtime.outcome_aggregator import OutcomeAggregator
from framework.runtime.shared_state import SharedStateManager
from framework.storage.concurrent import ConcurrentStorage
from framework.storage.session_store import SessionStore
if TYPE_CHECKING:
from framework.graph.edge import GraphSpec
@@ -121,12 +122,16 @@ class AgentRuntime:
self._runtime_log_store = runtime_log_store
# Initialize storage
storage_path_obj = Path(storage_path) if isinstance(storage_path, str) else storage_path
self._storage = ConcurrentStorage(
base_path=storage_path,
base_path=storage_path_obj,
cache_ttl=self._config.cache_ttl,
batch_interval=self._config.batch_interval,
)
# Initialize SessionStore for unified sessions (always enabled)
self._session_store = SessionStore(storage_path_obj)
# Initialize shared components
self._state_manager = SharedStateManager()
self._event_bus = EventBus(max_history=self._config.max_history)
@@ -216,6 +221,7 @@ class AgentRuntime:
result_retention_max=self._config.execution_result_max,
result_retention_ttl_seconds=self._config.execution_result_ttl_seconds,
runtime_log_store=self._runtime_log_store,
session_store=self._session_store,
)
await stream.start()
self._streams[ep_id] = stream
+117 -2
View File
@@ -28,6 +28,7 @@ if TYPE_CHECKING:
from framework.runtime.event_bus import EventBus
from framework.runtime.outcome_aggregator import OutcomeAggregator
from framework.storage.concurrent import ConcurrentStorage
from framework.storage.session_store import SessionStore
logger = logging.getLogger(__name__)
@@ -113,6 +114,7 @@ class ExecutionStream:
result_retention_max: int | None = 1000,
result_retention_ttl_seconds: float | None = None,
runtime_log_store: Any = None,
session_store: "SessionStore | None" = None,
):
"""
Initialize execution stream.
@@ -130,6 +132,7 @@ class ExecutionStream:
tools: Available tools
tool_executor: Function to execute tools
runtime_log_store: Optional RuntimeLogStore for per-execution logging
session_store: Optional SessionStore for unified session storage
"""
self.stream_id = stream_id
self.entry_spec = entry_spec
@@ -145,6 +148,7 @@ class ExecutionStream:
self._result_retention_max = result_retention_max
self._result_retention_ttl_seconds = result_retention_ttl_seconds
self._runtime_log_store = runtime_log_store
self._session_store = session_store
# Create stream-scoped runtime
self._runtime = StreamRuntime(
@@ -224,6 +228,13 @@ class ExecutionStream:
await task
except asyncio.CancelledError:
pass
except RuntimeError as e:
# Task may be attached to a different event loop (e.g., when TUI
# uses a separate loop). Log and continue cleanup.
if "attached to a different loop" in str(e):
logger.warning(f"Task cleanup skipped (different event loop): {e}")
else:
raise
self._execution_tasks.clear()
self._active_executions.clear()
@@ -278,8 +289,21 @@ class ExecutionStream:
if not self._running:
raise RuntimeError(f"ExecutionStream '{self.stream_id}' is not running")
# Generate execution ID
execution_id = f"exec_{self.stream_id}_{uuid.uuid4().hex[:8]}"
# Generate execution ID using unified session format
if self._session_store:
execution_id = self._session_store.generate_session_id()
else:
# Fallback to old format if SessionStore not available (shouldn't happen)
import warnings
warnings.warn(
"SessionStore not available, using deprecated exec_* ID format. "
"Please ensure AgentRuntime is properly initialized.",
DeprecationWarning,
stacklevel=2,
)
execution_id = f"exec_{self.stream_id}_{uuid.uuid4().hex[:8]}"
if correlation_id is None:
correlation_id = execution_id
@@ -363,6 +387,9 @@ class ExecutionStream:
# Track executor so inject_input() can reach EventLoopNode instances
self._active_executors[execution_id] = executor
# Write initial session state
await self._write_session_state(execution_id, ctx)
# Create modified graph with entry point
# We need to override the entry_node to use our entry point
modified_graph = self._create_modified_graph()
@@ -387,6 +414,9 @@ class ExecutionStream:
if result.paused_at:
ctx.status = "paused"
# Write final session state
await self._write_session_state(execution_id, ctx, result=result)
# Emit completion/failure event
if self._event_bus:
if result.success:
@@ -423,6 +453,9 @@ class ExecutionStream:
),
)
# Write error session state
await self._write_session_state(execution_id, ctx, error=str(e))
# Emit failure event
if self._event_bus:
await self._event_bus.emit_execution_failed(
@@ -446,6 +479,88 @@ class ExecutionStream:
self._completion_events.pop(execution_id, None)
self._execution_tasks.pop(execution_id, None)
async def _write_session_state(
self,
execution_id: str,
ctx: ExecutionContext,
result: ExecutionResult | None = None,
error: str | None = None,
) -> None:
"""
Write state.json for a session.
Args:
execution_id: Session/execution ID
ctx: Execution context
result: Optional execution result (if completed)
error: Optional error message (if failed)
"""
# Only write if session_store is available
if not self._session_store:
return
from framework.schemas.session_state import SessionState, SessionStatus
try:
# Determine status
if result:
if result.paused_at:
status = SessionStatus.PAUSED
elif result.success:
status = SessionStatus.COMPLETED
else:
status = SessionStatus.FAILED
elif error:
status = SessionStatus.FAILED
else:
status = SessionStatus.ACTIVE
# Create SessionState
if result:
# Create from execution result
state = SessionState.from_execution_result(
session_id=execution_id,
goal_id=self.goal.id,
result=result,
stream_id=self.stream_id,
correlation_id=ctx.correlation_id,
started_at=ctx.started_at.isoformat(),
input_data=ctx.input_data,
agent_id=self.graph.id,
entry_point=self.entry_spec.id,
)
else:
# Create initial state
from framework.schemas.session_state import SessionTimestamps
now = datetime.now().isoformat()
state = SessionState(
session_id=execution_id,
stream_id=self.stream_id,
correlation_id=ctx.correlation_id,
goal_id=self.goal.id,
agent_id=self.graph.id,
entry_point=self.entry_spec.id,
status=status,
timestamps=SessionTimestamps(
started_at=ctx.started_at.isoformat(),
updated_at=now,
),
input_data=ctx.input_data,
)
# Handle error case
if error:
state.result.error = error
# Write state.json
await self._session_store.write_state(execution_id, state)
logger.debug(f"Wrote state.json for session {execution_id} (status={status})")
except Exception as e:
# Log but don't fail the execution
logger.error(f"Failed to write state.json for {execution_id}: {e}")
def _create_modified_graph(self) -> "GraphSpec":
"""Create a graph with the entry point overridden."""
# Use the existing graph but override entry_node
+89 -26
View File
@@ -10,14 +10,15 @@ incremental append-on-write. This provides crash resilience — data is on
disk as soon as it's logged, not only at end_run(). L1 (summary) is still
written once at end as a regular JSON file since it aggregates L2.
Storage layout::
Storage layout (current)::
{base_path}/
runs/
{run_id}/
summary.json # Level 1 — written once at end
details.jsonl # Level 2appended per node completion
tool_logs.jsonl # Level 3 — appended per step
sessions/
{session_id}/
logs/
summary.json # Level 1written once at end
details.jsonl # Level 2 — appended per node completion
tool_logs.jsonl # Level 3 — appended per step
"""
from __future__ import annotations
@@ -44,7 +45,36 @@ class RuntimeLogStore:
def __init__(self, base_path: Path) -> None:
self._base_path = base_path
self._runs_dir = base_path / "runs"
# Note: _runs_dir is determined per-run_id by _get_run_dir()
def _get_run_dir(self, run_id: str) -> Path:
"""Determine run directory path based on run_id format.
- New format (session_*): {storage_root}/sessions/{run_id}/logs/
- Old format (anything else): {base_path}/runs/{run_id}/ (deprecated)
When base_path ends with 'runtime_logs', we use the parent directory
to avoid nesting under runtime_logs/.
This allows backward compatibility for reading old logs.
"""
if run_id.startswith("session_"):
# New: sessions/{session_id}/logs/
# If base_path ends with runtime_logs, use parent (storage root)
is_runtime_logs = self._base_path.name == "runtime_logs"
root = self._base_path.parent if is_runtime_logs else self._base_path
return root / "sessions" / run_id / "logs"
else:
# Old: runs/{run_id}/ (deprecated, backward compatibility only)
import warnings
warnings.warn(
f"Reading logs from deprecated location for run_id={run_id}. "
"New sessions use unified storage at sessions/session_*/logs/",
DeprecationWarning,
stacklevel=3,
)
return self._base_path / "runs" / run_id
# -------------------------------------------------------------------
# Incremental write (sync — called from locked sections)
@@ -52,19 +82,19 @@ class RuntimeLogStore:
def ensure_run_dir(self, run_id: str) -> None:
"""Create the run directory immediately. Called by start_run()."""
run_dir = self._runs_dir / run_id
run_dir = self._get_run_dir(run_id)
run_dir.mkdir(parents=True, exist_ok=True)
def append_step(self, run_id: str, step: NodeStepLog) -> None:
"""Append one JSONL line to tool_logs.jsonl. Sync."""
path = self._runs_dir / run_id / "tool_logs.jsonl"
path = self._get_run_dir(run_id) / "tool_logs.jsonl"
line = json.dumps(step.model_dump(), ensure_ascii=False) + "\n"
with open(path, "a", encoding="utf-8") as f:
f.write(line)
def append_node_detail(self, run_id: str, detail: NodeDetail) -> None:
"""Append one JSONL line to details.jsonl. Sync."""
path = self._runs_dir / run_id / "details.jsonl"
path = self._get_run_dir(run_id) / "details.jsonl"
line = json.dumps(detail.model_dump(), ensure_ascii=False) + "\n"
with open(path, "a", encoding="utf-8") as f:
f.write(line)
@@ -74,7 +104,7 @@ class RuntimeLogStore:
Used by end_run() to aggregate L2 into L1. Skips corrupt lines.
"""
path = self._runs_dir / run_id / "details.jsonl"
path = self._get_run_dir(run_id) / "details.jsonl"
return _read_jsonl_as_models(path, NodeDetail)
# -------------------------------------------------------------------
@@ -83,7 +113,7 @@ class RuntimeLogStore:
async def save_summary(self, run_id: str, summary: RunSummaryLog) -> None:
"""Write summary.json atomically. Called once at end_run()."""
run_dir = self._runs_dir / run_id
run_dir = self._get_run_dir(run_id)
await asyncio.to_thread(run_dir.mkdir, parents=True, exist_ok=True)
await self._write_json(run_dir / "summary.json", summary.model_dump())
@@ -93,12 +123,12 @@ class RuntimeLogStore:
async def load_summary(self, run_id: str) -> RunSummaryLog | None:
"""Load Level 1 summary for a specific run."""
data = await self._read_json(self._runs_dir / run_id / "summary.json")
data = await self._read_json(self._get_run_dir(run_id) / "summary.json")
return RunSummaryLog(**data) if data is not None else None
async def load_details(self, run_id: str) -> RunDetailsLog | None:
"""Load Level 2 details from details.jsonl for a specific run."""
path = self._runs_dir / run_id / "details.jsonl"
path = self._get_run_dir(run_id) / "details.jsonl"
def _read() -> RunDetailsLog | None:
if not path.exists():
@@ -110,7 +140,7 @@ class RuntimeLogStore:
async def load_tool_logs(self, run_id: str) -> RunToolLogs | None:
"""Load Level 3 tool logs from tool_logs.jsonl for a specific run."""
path = self._runs_dir / run_id / "tool_logs.jsonl"
path = self._get_run_dir(run_id) / "tool_logs.jsonl"
def _read() -> RunToolLogs | None:
if not path.exists():
@@ -126,14 +156,15 @@ class RuntimeLogStore:
needs_attention: bool | None = None,
limit: int = 20,
) -> list[RunSummaryLog]:
"""Scan runs/ directory, load summaries, filter, and sort by timestamp desc.
"""Scan both old and new directory structures, load summaries, filter, and sort.
Scans:
- Old: base_path/runs/{run_id}/
- New: base_path/sessions/{session_id}/logs/
Directories without summary.json are treated as in-progress runs and
get a synthetic summary with status="in_progress".
"""
if not self._runs_dir.exists():
return []
entries = await asyncio.to_thread(self._scan_run_dirs)
summaries: list[RunSummaryLog] = []
@@ -141,7 +172,7 @@ class RuntimeLogStore:
summary = await self.load_summary(run_id)
if summary is None:
# In-progress run: no summary.json yet. Synthesize one.
run_dir = self._runs_dir / run_id
run_dir = self._get_run_dir(run_id)
if not run_dir.is_dir():
continue
summary = RunSummaryLog(
@@ -166,14 +197,46 @@ class RuntimeLogStore:
# -------------------------------------------------------------------
def _scan_run_dirs(self) -> list[str]:
"""Return list of run_id directory names (sync, for use in to_thread).
"""Return list of run_id directory names from both old and new locations.
Includes all directories, not just those with summary.json, so
in-progress runs are visible.
Scans:
- New: base_path/sessions/{session_id}/logs/ (preferred)
- Old: base_path/runs/{run_id}/ (deprecated, backward compatibility)
Returns run_ids/session_ids. Includes all directories, not just those
with summary.json, so in-progress runs are visible.
"""
if not self._runs_dir.exists():
return []
return [d.name for d in self._runs_dir.iterdir() if d.is_dir()]
run_ids = []
# Scan new location: base_path/sessions/{session_id}/logs/
# Determine the correct base path for sessions
is_runtime_logs = self._base_path.name == "runtime_logs"
root = self._base_path.parent if is_runtime_logs else self._base_path
sessions_dir = root / "sessions"
if sessions_dir.exists():
for session_dir in sessions_dir.iterdir():
if session_dir.is_dir() and session_dir.name.startswith("session_"):
logs_dir = session_dir / "logs"
if logs_dir.exists() and logs_dir.is_dir():
run_ids.append(session_dir.name)
# Scan old location: base_path/runs/ (deprecated)
old_runs_dir = self._base_path / "runs"
if old_runs_dir.exists():
old_ids = [d.name for d in old_runs_dir.iterdir() if d.is_dir()]
if old_ids:
import warnings
warnings.warn(
f"Found {len(old_ids)} runs in deprecated location. "
"Consider migrating to unified session storage.",
DeprecationWarning,
stacklevel=3,
)
run_ids.extend(old_ids)
return run_ids
@staticmethod
async def _write_json(path: Path, data: dict) -> None:
+20 -5
View File
@@ -52,11 +52,26 @@ class RuntimeLogger:
self._logged_node_ids: set[str] = set()
self._lock = threading.Lock()
def start_run(self, goal_id: str = "") -> str:
"""Start a new run. Called by GraphExecutor at graph start. Returns run_id."""
ts = datetime.now(UTC).strftime("%Y%m%dT%H%M%S")
short_uuid = uuid.uuid4().hex[:8]
self._run_id = f"{ts}_{short_uuid}"
def start_run(self, goal_id: str = "", session_id: str = "") -> str:
"""Start a new run. Called by GraphExecutor at graph start. Returns run_id.
Args:
goal_id: Goal ID for this run
session_id: Optional session ID. If provided, uses it as run_id (for unified sessions).
Otherwise generates a new run_id in old format.
Returns:
The run_id (same as session_id if provided)
"""
if session_id:
# Use provided session_id as run_id (unified sessions)
self._run_id = session_id
else:
# Generate run_id in old format (backward compatibility)
ts = datetime.now(UTC).strftime("%Y%m%dT%H%M%S")
short_uuid = uuid.uuid4().hex[:8]
self._run_id = f"{ts}_{short_uuid}"
self._goal_id = goal_id
self._started_at = datetime.now(UTC).isoformat()
self._logged_node_ids = set()
+274
View File
@@ -0,0 +1,274 @@
"""
Session State Schema - Unified state for session execution.
This schema consolidates data from Run, ExecutionResult, and runtime logs
into a single source of truth for session status and resumability.
"""
from datetime import datetime
from enum import StrEnum
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel, Field, computed_field
if TYPE_CHECKING:
from framework.graph.executor import ExecutionResult
from framework.schemas.run import Run
class SessionStatus(StrEnum):
"""Status of a session execution."""
ACTIVE = "active" # Currently executing
PAUSED = "paused" # Waiting for resume (client input, pause node)
COMPLETED = "completed" # Finished successfully
FAILED = "failed" # Finished with error
CANCELLED = "cancelled" # User/system cancelled
class SessionTimestamps(BaseModel):
"""Timestamps tracking session lifecycle."""
started_at: str # ISO 8601 format
updated_at: str # ISO 8601 format (updated on every state write)
completed_at: str | None = None
paused_at_time: str | None = None # When it was paused
model_config = {"extra": "allow"}
class SessionProgress(BaseModel):
"""Execution progress tracking."""
current_node: str | None = None
paused_at: str | None = None # Node ID where paused
resume_from: str | None = None # Entry point or node ID to resume from
steps_executed: int = 0
total_tokens: int = 0
total_latency_ms: int = 0
path: list[str] = Field(default_factory=list) # Node IDs traversed
# Quality metrics (from ExecutionResult)
total_retries: int = 0
nodes_with_failures: list[str] = Field(default_factory=list)
retry_details: dict[str, int] = Field(default_factory=dict)
had_partial_failures: bool = False
execution_quality: str = "clean" # "clean", "degraded", or "failed"
node_visit_counts: dict[str, int] = Field(default_factory=dict)
model_config = {"extra": "allow"}
class SessionResult(BaseModel):
"""Final result of session execution."""
success: bool | None = None # None if still running
error: str | None = None
output: dict[str, Any] = Field(default_factory=dict)
model_config = {"extra": "allow"}
class SessionMetrics(BaseModel):
"""Execution metrics (from Run.metrics)."""
decision_count: int = 0
problem_count: int = 0
total_input_tokens: int = 0
total_output_tokens: int = 0
nodes_executed: list[str] = Field(default_factory=list)
edges_traversed: list[str] = Field(default_factory=list)
model_config = {"extra": "allow"}
class SessionState(BaseModel):
"""
Complete state for a session execution.
This is the single source of truth for session status and resumability.
Consolidates data from ExecutionResult, ExecutionContext, Run, and runtime logs.
Version History:
- v1.0: Initial schema (2026-02-06)
"""
# Schema version for forward/backward compatibility
schema_version: str = "1.0"
# Identity
session_id: str # Format: session_YYYYMMDD_HHMMSS_{uuid_8char}
stream_id: str = "" # Which ExecutionStream created this
correlation_id: str = "" # For correlating related executions
# Status
status: SessionStatus = SessionStatus.ACTIVE
# Goal/Agent context
goal_id: str
agent_id: str = ""
entry_point: str = "start"
# Timestamps
timestamps: SessionTimestamps
# Progress
progress: SessionProgress = Field(default_factory=SessionProgress)
# Result
result: SessionResult = Field(default_factory=SessionResult)
# Memory (for resumability)
memory: dict[str, Any] = Field(default_factory=dict)
# Metrics
metrics: SessionMetrics = Field(default_factory=SessionMetrics)
# Problems (from Run.problems)
problems: list[dict[str, Any]] = Field(default_factory=list)
# Decisions (from Run.decisions - can be large, so store references)
decisions: list[dict[str, Any]] = Field(default_factory=list)
# Input data (for debugging/replay)
input_data: dict[str, Any] = Field(default_factory=dict)
# Isolation level (from ExecutionContext)
isolation_level: str = "shared"
model_config = {"extra": "allow"}
@computed_field
@property
def duration_ms(self) -> int:
"""Duration of the session in milliseconds."""
if not self.timestamps.completed_at:
return 0
started = datetime.fromisoformat(self.timestamps.started_at)
completed = datetime.fromisoformat(self.timestamps.completed_at)
return int((completed - started).total_seconds() * 1000)
@computed_field
@property
def is_resumable(self) -> bool:
"""Can this session be resumed?"""
return self.status == SessionStatus.PAUSED and self.progress.resume_from is not None
@classmethod
def from_execution_result(
cls,
session_id: str,
goal_id: str,
result: "ExecutionResult",
stream_id: str = "",
correlation_id: str = "",
started_at: str = "",
input_data: dict[str, Any] | None = None,
agent_id: str = "",
entry_point: str = "start",
) -> "SessionState":
"""Create SessionState from ExecutionResult."""
now = datetime.now().isoformat()
# Determine status based on execution result
if result.paused_at:
status = SessionStatus.PAUSED
elif result.success:
status = SessionStatus.COMPLETED
else:
status = SessionStatus.FAILED
return cls(
session_id=session_id,
stream_id=stream_id,
correlation_id=correlation_id,
goal_id=goal_id,
agent_id=agent_id,
entry_point=entry_point,
status=status,
timestamps=SessionTimestamps(
started_at=started_at or now,
updated_at=now,
completed_at=now if not result.paused_at else None,
paused_at_time=now if result.paused_at else None,
),
progress=SessionProgress(
current_node=result.paused_at or (result.path[-1] if result.path else None),
paused_at=result.paused_at,
resume_from=result.session_state.get("resume_from")
if result.session_state
else None,
steps_executed=result.steps_executed,
total_tokens=result.total_tokens,
total_latency_ms=result.total_latency_ms,
path=result.path,
total_retries=result.total_retries,
nodes_with_failures=result.nodes_with_failures,
retry_details=result.retry_details,
had_partial_failures=result.had_partial_failures,
execution_quality=result.execution_quality,
node_visit_counts=result.node_visit_counts,
),
result=SessionResult(
success=result.success,
error=result.error,
output=result.output,
),
memory=result.session_state.get("memory", {}) if result.session_state else {},
input_data=input_data or {},
)
@classmethod
def from_legacy_run(cls, run: "Run", session_id: str, stream_id: str = "") -> "SessionState":
"""Create SessionState from legacy Run object."""
from framework.schemas.run import RunStatus
now = datetime.now().isoformat()
# Map RunStatus to SessionStatus
status_mapping = {
RunStatus.RUNNING: SessionStatus.ACTIVE,
RunStatus.COMPLETED: SessionStatus.COMPLETED,
RunStatus.FAILED: SessionStatus.FAILED,
RunStatus.CANCELLED: SessionStatus.CANCELLED,
RunStatus.STUCK: SessionStatus.FAILED,
}
status = status_mapping.get(run.status, SessionStatus.FAILED)
return cls(
schema_version="1.0",
session_id=session_id,
stream_id=stream_id,
goal_id=run.goal_id,
status=status,
timestamps=SessionTimestamps(
started_at=run.started_at.isoformat(),
updated_at=now,
completed_at=run.completed_at.isoformat() if run.completed_at else None,
),
result=SessionResult(
success=run.status == RunStatus.COMPLETED,
output=run.output_data,
),
metrics=SessionMetrics(
decision_count=run.metrics.total_decisions,
problem_count=len(run.problems),
total_input_tokens=run.metrics.total_tokens, # Approximate
total_output_tokens=0, # Not tracked in old format
nodes_executed=run.metrics.nodes_executed,
edges_traversed=run.metrics.edges_traversed,
),
decisions=[d.model_dump() for d in run.decisions],
problems=[p.model_dump() for p in run.problems],
input_data=run.input_data,
)
def to_session_state_dict(self) -> dict[str, Any]:
"""Convert to session_state format for GraphExecutor.execute()."""
return {
"paused_at": self.progress.paused_at,
"resume_from": self.progress.resume_from,
"memory": self.memory,
"next_node": None,
}
+24 -25
View File
@@ -36,16 +36,16 @@ class FileStorage:
self._ensure_dirs()
def _ensure_dirs(self) -> None:
"""Create directory structure if it doesn't exist."""
dirs = [
self.base_path / "runs",
self.base_path / "indexes" / "by_goal",
self.base_path / "indexes" / "by_status",
self.base_path / "indexes" / "by_node",
self.base_path / "summaries",
]
for d in dirs:
d.mkdir(parents=True, exist_ok=True)
"""Create directory structure if it doesn't exist.
NOTE: Only creates indexes directory now. The runs/ and summaries/
directories are deprecated and no longer used for new sessions.
"""
# Only create indexes directory (still used for backward compatibility)
indexes_dir = self.base_path / "indexes"
indexes_dir.mkdir(parents=True, exist_ok=True)
# Do NOT create runs/ and summaries/ - these are deprecated
def _validate_key(self, key: str) -> None:
"""
@@ -84,23 +84,22 @@ class FileStorage:
# === RUN OPERATIONS ===
def save_run(self, run: Run) -> None:
"""Save a run to storage."""
# Save full run using Pydantic's model_dump_json
run_path = self.base_path / "runs" / f"{run.id}.json"
with atomic_write(run_path) as f:
f.write(run.model_dump_json(indent=2))
"""Save a run to storage.
# Save summary
summary = RunSummary.from_run(run)
summary_path = self.base_path / "summaries" / f"{run.id}.json"
with atomic_write(summary_path) as f:
f.write(summary.model_dump_json(indent=2))
DEPRECATED: This method writes to the old runs/ and summaries/ structure.
New sessions use unified storage at sessions/{session_id}/state.json.
This method is now a no-op to prevent creating deprecated directories.
"""
import warnings
# Update indexes
self._add_to_index("by_goal", run.goal_id, run.id)
self._add_to_index("by_status", run.status.value, run.id)
for node_id in run.metrics.nodes_executed:
self._add_to_index("by_node", node_id, run.id)
warnings.warn(
"FileStorage.save_run() is deprecated. "
"New sessions use unified storage at sessions/{session_id}/state.json. "
"This write has been skipped.",
DeprecationWarning,
stacklevel=2,
)
# No-op: do not write to deprecated locations
def load_run(self, run_id: str) -> Run | None:
"""Load a run from storage."""
+213
View File
@@ -0,0 +1,213 @@
"""
Session Store - Unified session storage with state.json.
Handles reading and writing session state to the new unified structure:
sessions/session_YYYYMMDD_HHMMSS_{uuid}/state.json
"""
import asyncio
import logging
import uuid
from datetime import datetime
from pathlib import Path
from framework.schemas.session_state import SessionState
from framework.utils.io import atomic_write
logger = logging.getLogger(__name__)
class SessionStore:
"""
Unified session storage with state.json.
Manages sessions in the new structure:
{base_path}/sessions/session_YYYYMMDD_HHMMSS_{uuid}/
state.json # Single source of truth
conversations/ # Per-node EventLoop state
artifacts/ # Spillover data
logs/ # L1/L2/L3 observability
summary.json
details.jsonl
tool_logs.jsonl
"""
def __init__(self, base_path: Path):
"""
Initialize session store.
Args:
base_path: Base path for storage (e.g., ~/.hive/twitter_outreach)
"""
self.base_path = Path(base_path)
self.sessions_dir = self.base_path / "sessions"
def generate_session_id(self) -> str:
"""
Generate session ID in format: session_YYYYMMDD_HHMMSS_{uuid}.
Returns:
Session ID string (e.g., "session_20260206_143022_abc12345")
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
short_uuid = uuid.uuid4().hex[:8]
return f"session_{timestamp}_{short_uuid}"
def get_session_path(self, session_id: str) -> Path:
"""
Get path to session directory.
Args:
session_id: Session ID
Returns:
Path to session directory
"""
return self.sessions_dir / session_id
def get_state_path(self, session_id: str) -> Path:
"""
Get path to state.json file.
Args:
session_id: Session ID
Returns:
Path to state.json
"""
return self.get_session_path(session_id) / "state.json"
async def write_state(self, session_id: str, state: SessionState) -> None:
"""
Atomically write state.json for a session.
Uses temp file + rename for crash safety.
Args:
session_id: Session ID
state: SessionState to write
"""
def _write():
state_path = self.get_state_path(session_id)
state_path.parent.mkdir(parents=True, exist_ok=True)
with atomic_write(state_path) as f:
f.write(state.model_dump_json(indent=2))
await asyncio.to_thread(_write)
logger.debug(f"Wrote state.json for session {session_id}")
async def read_state(self, session_id: str) -> SessionState | None:
"""
Read state.json for a session.
Args:
session_id: Session ID
Returns:
SessionState or None if not found
"""
def _read():
state_path = self.get_state_path(session_id)
if not state_path.exists():
return None
return SessionState.model_validate_json(state_path.read_text())
return await asyncio.to_thread(_read)
async def list_sessions(
self,
status: str | None = None,
goal_id: str | None = None,
limit: int = 100,
) -> list[SessionState]:
"""
List sessions, optionally filtered by status or goal.
Args:
status: Optional status filter (e.g., "paused", "completed")
goal_id: Optional goal ID filter
limit: Maximum number of sessions to return
Returns:
List of SessionState objects
"""
def _scan():
sessions = []
if not self.sessions_dir.exists():
return sessions
for session_dir in self.sessions_dir.iterdir():
if not session_dir.is_dir():
continue
state_path = session_dir / "state.json"
if not state_path.exists():
continue
try:
state = SessionState.model_validate_json(state_path.read_text())
# Apply filters
if status and state.status != status:
continue
if goal_id and state.goal_id != goal_id:
continue
sessions.append(state)
except Exception as e:
logger.warning(f"Failed to load {state_path}: {e}")
continue
# Sort by updated_at descending (most recent first)
sessions.sort(key=lambda s: s.timestamps.updated_at, reverse=True)
return sessions[:limit]
return await asyncio.to_thread(_scan)
async def delete_session(self, session_id: str) -> bool:
"""
Delete a session and all its data.
Args:
session_id: Session ID to delete
Returns:
True if deleted, False if not found
"""
def _delete():
import shutil
session_path = self.get_session_path(session_id)
if not session_path.exists():
return False
shutil.rmtree(session_path)
logger.info(f"Deleted session {session_id}")
return True
return await asyncio.to_thread(_delete)
async def session_exists(self, session_id: str) -> bool:
"""
Check if a session exists.
Args:
session_id: Session ID
Returns:
True if session exists
"""
def _check():
return self.get_state_path(session_id).exists()
return await asyncio.to_thread(_check)
+179
View File
@@ -0,0 +1,179 @@
"""
State Writer - Dual-write adapter for migration period.
Writes execution state to both old (Run/RunSummary) and new (state.json) formats
to maintain backward compatibility during the transition period.
"""
import logging
import os
from datetime import datetime
from framework.schemas.run import Problem, Run, RunMetrics, RunStatus
from framework.schemas.session_state import SessionState, SessionStatus
from framework.storage.concurrent import ConcurrentStorage
from framework.storage.session_store import SessionStore
logger = logging.getLogger(__name__)
class StateWriter:
"""
Writes execution state to both old and new formats during migration.
During the dual-write phase:
- New format (state.json) is written when USE_UNIFIED_SESSIONS=true
- Old format (Run/RunSummary) is always written for backward compatibility
"""
def __init__(self, old_storage: ConcurrentStorage, session_store: SessionStore):
"""
Initialize state writer.
Args:
old_storage: ConcurrentStorage for old format (runs/, summaries/)
session_store: SessionStore for new format (sessions/*/state.json)
"""
self.old = old_storage
self.new = session_store
self.dual_write_enabled = os.getenv("USE_UNIFIED_SESSIONS", "false").lower() == "true"
async def write_execution_state(
self,
session_id: str,
state: SessionState,
) -> None:
"""
Write execution state to both old and new formats.
Args:
session_id: Session ID
state: SessionState to write
"""
# Write to new format if enabled
if self.dual_write_enabled:
try:
await self.new.write_state(session_id, state)
logger.debug(f"Wrote state.json for session {session_id}")
except Exception as e:
logger.error(f"Failed to write state.json for {session_id}: {e}")
# Don't fail - old format is still written
# Always write to old format for backward compatibility
try:
run = self._convert_to_run(state)
await self.old.save_run(run)
logger.debug(f"Wrote Run object for session {session_id}")
except Exception as e:
logger.error(f"Failed to write Run object for {session_id}: {e}")
# This is more critical - reraise if old format fails
raise
def _convert_to_run(self, state: SessionState) -> Run:
"""
Convert SessionState to legacy Run object.
Args:
state: SessionState to convert
Returns:
Run object
"""
# Map SessionStatus to RunStatus
status_mapping = {
SessionStatus.ACTIVE: RunStatus.RUNNING,
SessionStatus.PAUSED: RunStatus.RUNNING, # Paused is still "running" in old format
SessionStatus.COMPLETED: RunStatus.COMPLETED,
SessionStatus.FAILED: RunStatus.FAILED,
SessionStatus.CANCELLED: RunStatus.CANCELLED,
}
run_status = status_mapping.get(state.status, RunStatus.FAILED)
# Convert timestamps
started_at = datetime.fromisoformat(state.timestamps.started_at)
completed_at = (
datetime.fromisoformat(state.timestamps.completed_at)
if state.timestamps.completed_at
else None
)
# Build RunMetrics
metrics = RunMetrics(
total_decisions=state.metrics.decision_count,
successful_decisions=state.metrics.decision_count
- len(state.progress.nodes_with_failures), # Approximate
failed_decisions=len(state.progress.nodes_with_failures),
total_tokens=state.metrics.total_input_tokens + state.metrics.total_output_tokens,
total_latency_ms=state.progress.total_latency_ms,
nodes_executed=state.metrics.nodes_executed,
edges_traversed=state.metrics.edges_traversed,
)
# Convert problems (SessionState stores as dicts, Run expects Problem objects)
problems = []
for p_dict in state.problems:
# Handle both old Problem objects and new dict format
if isinstance(p_dict, dict):
problems.append(Problem(**p_dict))
else:
problems.append(p_dict)
# Convert decisions (SessionState stores as dicts, Run expects Decision objects)
from framework.schemas.decision import Decision
decisions = []
for d_dict in state.decisions:
# Handle both old Decision objects and new dict format
if isinstance(d_dict, dict):
try:
decisions.append(Decision(**d_dict))
except Exception:
# Skip invalid decisions
continue
else:
decisions.append(d_dict)
# Create Run object
run = Run(
id=state.session_id, # Use session_id as run_id
goal_id=state.goal_id,
started_at=started_at,
status=run_status,
completed_at=completed_at,
decisions=decisions,
problems=problems,
metrics=metrics,
goal_description="", # Not stored in SessionState
input_data=state.input_data,
output_data=state.result.output,
)
return run
async def read_state(
self,
session_id: str,
prefer_new: bool = True,
) -> SessionState | None:
"""
Read execution state from either format.
Args:
session_id: Session ID
prefer_new: If True, try new format first (default)
Returns:
SessionState or None if not found
"""
if prefer_new:
# Try new format first
state = await self.new.read_state(session_id)
if state:
return state
# Fall back to old format
run = await self.old.load_run(session_id)
if run:
return SessionState.from_legacy_run(run, session_id)
return None