fix(security): prevent path traversal in session_store (#6876)

Validate that resolved session path stays within the sessions directory
using Path.is_relative_to(). Prevents session_id values like
"../../something" from escaping the sandbox.

Also guard the caller in _write_run_event where get_session_path is
called outside the existing OSError try/except block.

Fixes #1000

Co-authored-by: Sidhartha kumar <Alearner12@users.noreply.github.com>
This commit is contained in:
Hundao
2026-03-30 23:53:23 +08:00
committed by GitHub
parent e9fd0158b9
commit ae85d2bf59
3 changed files with 34 additions and 2 deletions
+4 -1
View File
@@ -961,7 +961,10 @@ class ExecutionStream:
return
import json as _json
session_dir = self._session_store.get_session_path(execution_id)
try:
session_dir = self._session_store.get_session_path(execution_id)
except ValueError:
return
runs_file = session_dir / "runs.jsonl"
now = datetime.now()
record = {
+7 -1
View File
@@ -62,8 +62,14 @@ class SessionStore:
Returns:
Path to session directory
Raises:
ValueError: If session_id resolves outside the sessions directory
"""
return self.sessions_dir / session_id
resolved = (self.sessions_dir / session_id).resolve()
if not resolved.is_relative_to(self.sessions_dir.resolve()):
raise ValueError(f"Invalid session ID: {session_id}")
return resolved
def get_state_path(self, session_id: str) -> Path:
"""
+23
View File
@@ -225,5 +225,28 @@ class TestPathTraversalWithActualFiles:
assert run_file.exists()
class TestSessionStorePathTraversal:
"""Path traversal protection in SessionStore.get_session_path()."""
@pytest.fixture
def store(self, tmp_path):
from framework.storage.session_store import SessionStore
return SessionStore(tmp_path)
def test_valid_session_id(self, store):
path = store.get_session_path("session_20260206_143022_abc12345")
assert path.name == "session_20260206_143022_abc12345"
def test_blocks_parent_traversal(self, store):
with pytest.raises(ValueError, match="Invalid session ID"):
store.get_session_path("../../etc/passwd")
@pytest.mark.asyncio
async def test_delete_session_blocks_traversal(self, store):
with pytest.raises(ValueError, match="Invalid session ID"):
await store.delete_session("../../package")
if __name__ == "__main__":
pytest.main([__file__, "-v"])