Files
hive/core/tests/test_storage.py
kernel_crush eba7524955 refactor: remove deprecated storage/backend.py (267 lines) (#6849)
* refactor: remove deprecated storage/backend.py (267 lines)

Delete the fully deprecated FileStorage class and inline its 5 still-active
methods (_validate_key, _load_run_sync, _load_summary_sync, _delete_run_sync,
_list_all_runs_sync) directly into ConcurrentStorage.

Changes:
- Delete core/framework/storage/backend.py (267 lines of no-op/deprecated code)
- Inline active read methods into ConcurrentStorage (no new FileStorage dep)
- Remove deprecated index operations (get_runs_by_goal, get_runs_by_status,
  get_runs_by_node, list_all_goals) and their associated locking
- Update __init__.py to export ConcurrentStorage instead of FileStorage
- Update runtime/core.py to use ConcurrentStorage directly
- Fix Runtime.end_run() to call save_run_sync() (sync wrapper) instead of
  the async save_run(), which was silently dropping the coroutine
- Update test_path_traversal_fix.py to test ConcurrentStorage._validate_key()
- Clean up test_storage.py — remove all FileStorage test classes, un-skip
  ConcurrentStorage tests now that it's self-contained
- Remove stale FileStorage references from testing/test_storage.py docstring,
  testing/debug_tool.py docstring, and test_runtime.py skip reasons

All 44 tests pass, ruff check and ruff format clean.

Fixes #6797

* fix(core): address CodeRabbitAI PR review feedback

 - Fix critical no-op in ConcurrentStorage._save_run_sync by implementing atomic persistence to 
uns/{run_id}.json.
 - Update 	est_path_traversal_fix.py to test ConcurrentStorage directly and use real file paths for end-to-end validation.
 - Unskip 	est_run_saved_on_end and assert actual run file persistence.
 - Fix debug_tool.py to use load_run_sync() instead of the async load_run().

* fix(core): address round 2 of CodeRabbitAI reviews

 - Add _validate_key to _save_run_sync and _load_summary_sync to enforce path traversal protections on the lowest level APIs.
 - Invalidate summary cache and refresh run cache in save_run_sync() to match the async save_run() cache coherence behavior.
 - Add tests for load_summary and save_run_sync path traversal rejection.
2026-03-29 22:48:12 +08:00

143 lines
4.3 KiB
Python

"""Tests for the storage module - ConcurrentStorage backend.
DEPRECATED: FileStorage has been removed.
New sessions use unified storage at sessions/{session_id}/state.json.
These tests are kept for backward compatibility verification of ConcurrentStorage only.
"""
import time
from pathlib import Path
import pytest
from framework.schemas.run import Run, RunMetrics, RunStatus
from framework.storage.concurrent import CacheEntry, ConcurrentStorage
# === HELPER FUNCTIONS ===
def create_test_run(
run_id: str = "test_run_1",
goal_id: str = "test_goal",
status: RunStatus = RunStatus.COMPLETED,
nodes_executed: list[str] | None = None,
) -> Run:
"""Create a test Run object with minimal required fields."""
metrics = RunMetrics(
total_decisions=1,
successful_decisions=1,
failed_decisions=0,
nodes_executed=nodes_executed or ["node_1"],
)
return Run(
id=run_id,
goal_id=goal_id,
status=status,
metrics=metrics,
narrative="Test run completed.",
)
# === CACHE ENTRY TESTS ===
class TestCacheEntry:
"""Test CacheEntry dataclass."""
def test_is_expired_false_when_fresh(self):
"""Cache entry should not be expired when fresh."""
entry = CacheEntry(value="test", timestamp=time.time())
assert entry.is_expired(ttl=60.0) is False
def test_is_expired_true_when_old(self):
"""Cache entry should be expired when older than TTL."""
old_timestamp = time.time() - 120 # 2 minutes ago
entry = CacheEntry(value="test", timestamp=old_timestamp)
assert entry.is_expired(ttl=60.0) is True
# === CONCURRENTSTORAGE TESTS ===
class TestConcurrentStorageBasics:
"""Test basic ConcurrentStorage operations."""
def test_init(self, tmp_path: Path):
"""Test ConcurrentStorage initialization."""
storage = ConcurrentStorage(tmp_path)
assert storage.base_path == tmp_path
assert storage._running is False
@pytest.mark.asyncio
async def test_start_and_stop(self, tmp_path: Path):
"""Test starting and stopping the storage."""
storage = ConcurrentStorage(tmp_path)
await storage.start()
assert storage._running is True
assert storage._batch_task is not None
await storage.stop()
assert storage._running is False
@pytest.mark.asyncio
async def test_double_start_is_idempotent(self, tmp_path: Path):
"""Starting twice should be safe."""
storage = ConcurrentStorage(tmp_path)
await storage.start()
await storage.start() # Should not raise
assert storage._running is True
await storage.stop()
@pytest.mark.asyncio
async def test_double_stop_is_idempotent(self, tmp_path: Path):
"""Stopping twice should be safe."""
storage = ConcurrentStorage(tmp_path)
await storage.start()
await storage.stop()
await storage.stop() # Should not raise
assert storage._running is False
class TestConcurrentStorageCacheManagement:
"""Test ConcurrentStorage cache management."""
def test_clear_cache(self, tmp_path: Path):
"""Test clearing the cache."""
storage = ConcurrentStorage(tmp_path)
storage._cache["test_key"] = CacheEntry(value="test", timestamp=time.time())
storage.clear_cache()
assert len(storage._cache) == 0
def test_invalidate_cache(self, tmp_path: Path):
"""Test invalidating a specific cache entry."""
storage = ConcurrentStorage(tmp_path)
storage._cache["key1"] = CacheEntry(value="test1", timestamp=time.time())
storage._cache["key2"] = CacheEntry(value="test2", timestamp=time.time())
storage.invalidate_cache("key1")
assert "key1" not in storage._cache
assert "key2" in storage._cache
def test_get_cache_stats(self, tmp_path: Path):
"""Test getting cache statistics."""
storage = ConcurrentStorage(tmp_path, cache_ttl=60.0)
# Add fresh entry
storage._cache["fresh"] = CacheEntry(value="test", timestamp=time.time())
# Add expired entry
storage._cache["expired"] = CacheEntry(value="test", timestamp=time.time() - 120)
stats = storage.get_cache_stats()
assert stats["total_entries"] == 2
assert stats["expired_entries"] == 1
assert stats["valid_entries"] == 1