410 lines
17 KiB
Python
410 lines
17 KiB
Python
"""Tests for deerflow.skills.installer — shared skill installation logic."""
|
|
|
|
import shutil
|
|
import stat
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from deerflow.skills.installer import (
|
|
SkillSecurityScanError,
|
|
is_symlink_member,
|
|
is_unsafe_zip_member,
|
|
resolve_skill_dir_from_archive,
|
|
safe_extract_skill_archive,
|
|
should_ignore_archive_entry,
|
|
)
|
|
from deerflow.skills.security_scanner import ScanResult
|
|
from deerflow.skills.storage import get_or_new_skill_storage
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# is_unsafe_zip_member
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestIsUnsafeZipMember:
|
|
def test_absolute_path(self):
|
|
info = zipfile.ZipInfo("/etc/passwd")
|
|
assert is_unsafe_zip_member(info) is True
|
|
|
|
def test_windows_absolute_path(self):
|
|
info = zipfile.ZipInfo("C:\\Windows\\system32\\drivers\\etc\\hosts")
|
|
assert is_unsafe_zip_member(info) is True
|
|
|
|
def test_dotdot_traversal(self):
|
|
info = zipfile.ZipInfo("foo/../../../etc/passwd")
|
|
assert is_unsafe_zip_member(info) is True
|
|
|
|
def test_safe_member(self):
|
|
info = zipfile.ZipInfo("my-skill/SKILL.md")
|
|
assert is_unsafe_zip_member(info) is False
|
|
|
|
def test_empty_filename(self):
|
|
info = zipfile.ZipInfo("")
|
|
assert is_unsafe_zip_member(info) is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# is_symlink_member
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestIsSymlinkMember:
|
|
def test_detects_symlink(self):
|
|
info = zipfile.ZipInfo("link.txt")
|
|
info.external_attr = (stat.S_IFLNK | 0o777) << 16
|
|
assert is_symlink_member(info) is True
|
|
|
|
def test_regular_file(self):
|
|
info = zipfile.ZipInfo("file.txt")
|
|
info.external_attr = (stat.S_IFREG | 0o644) << 16
|
|
assert is_symlink_member(info) is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# should_ignore_archive_entry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestShouldIgnoreArchiveEntry:
|
|
def test_macosx_ignored(self):
|
|
assert should_ignore_archive_entry(Path("__MACOSX")) is True
|
|
|
|
def test_dotfile_ignored(self):
|
|
assert should_ignore_archive_entry(Path(".DS_Store")) is True
|
|
|
|
def test_normal_dir_not_ignored(self):
|
|
assert should_ignore_archive_entry(Path("my-skill")) is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# resolve_skill_dir_from_archive
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestResolveSkillDir:
|
|
def test_single_dir(self, tmp_path):
|
|
(tmp_path / "my-skill").mkdir()
|
|
(tmp_path / "my-skill" / "SKILL.md").write_text("content")
|
|
assert resolve_skill_dir_from_archive(tmp_path) == tmp_path / "my-skill"
|
|
|
|
def test_with_macosx(self, tmp_path):
|
|
(tmp_path / "my-skill").mkdir()
|
|
(tmp_path / "my-skill" / "SKILL.md").write_text("content")
|
|
(tmp_path / "__MACOSX").mkdir()
|
|
assert resolve_skill_dir_from_archive(tmp_path) == tmp_path / "my-skill"
|
|
|
|
def test_empty_after_filter(self, tmp_path):
|
|
(tmp_path / "__MACOSX").mkdir()
|
|
(tmp_path / ".DS_Store").write_text("meta")
|
|
with pytest.raises(ValueError, match="empty"):
|
|
resolve_skill_dir_from_archive(tmp_path)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# safe_extract_skill_archive
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSafeExtract:
|
|
def _make_zip(self, tmp_path, members: dict[str, str | bytes]) -> Path:
|
|
"""Create a zip with given filename->content entries."""
|
|
zip_path = tmp_path / "test.zip"
|
|
with zipfile.ZipFile(zip_path, "w") as zf:
|
|
for name, content in members.items():
|
|
if isinstance(content, str):
|
|
content = content.encode()
|
|
zf.writestr(name, content)
|
|
return zip_path
|
|
|
|
def test_rejects_zip_bomb(self, tmp_path):
|
|
zip_path = self._make_zip(tmp_path, {"big.txt": "x" * 1000})
|
|
dest = tmp_path / "out"
|
|
dest.mkdir()
|
|
with zipfile.ZipFile(zip_path) as zf:
|
|
with pytest.raises(ValueError, match="too large"):
|
|
safe_extract_skill_archive(zf, dest, max_total_size=100)
|
|
|
|
def test_rejects_absolute_path(self, tmp_path):
|
|
zip_path = tmp_path / "abs.zip"
|
|
with zipfile.ZipFile(zip_path, "w") as zf:
|
|
zf.writestr("/etc/passwd", "root:x:0:0")
|
|
dest = tmp_path / "out"
|
|
dest.mkdir()
|
|
with zipfile.ZipFile(zip_path) as zf:
|
|
with pytest.raises(ValueError, match="unsafe"):
|
|
safe_extract_skill_archive(zf, dest)
|
|
|
|
def test_skips_symlinks(self, tmp_path):
|
|
zip_path = tmp_path / "sym.zip"
|
|
with zipfile.ZipFile(zip_path, "w") as zf:
|
|
info = zipfile.ZipInfo("link.txt")
|
|
info.external_attr = (stat.S_IFLNK | 0o777) << 16
|
|
zf.writestr(info, "/etc/passwd")
|
|
zf.writestr("normal.txt", "hello")
|
|
dest = tmp_path / "out"
|
|
dest.mkdir()
|
|
with zipfile.ZipFile(zip_path) as zf:
|
|
safe_extract_skill_archive(zf, dest)
|
|
assert (dest / "normal.txt").exists()
|
|
assert not (dest / "link.txt").exists()
|
|
|
|
def test_normal_archive(self, tmp_path):
|
|
zip_path = self._make_zip(
|
|
tmp_path,
|
|
{
|
|
"my-skill/SKILL.md": "---\nname: test\ndescription: x\n---\n# Test",
|
|
"my-skill/README.md": "readme",
|
|
},
|
|
)
|
|
dest = tmp_path / "out"
|
|
dest.mkdir()
|
|
with zipfile.ZipFile(zip_path) as zf:
|
|
safe_extract_skill_archive(zf, dest)
|
|
assert (dest / "my-skill" / "SKILL.md").exists()
|
|
assert (dest / "my-skill" / "README.md").exists()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# install_skill_from_archive (full integration)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestInstallSkillFromArchive:
|
|
@pytest.fixture(autouse=True)
|
|
def _allow_security_scan(self, monkeypatch):
|
|
async def _scan(*args, **kwargs):
|
|
return ScanResult(decision="allow", reason="ok")
|
|
|
|
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
|
|
|
|
def _make_skill_zip(self, tmp_path: Path, skill_name: str = "test-skill") -> Path:
|
|
"""Create a valid .skill archive."""
|
|
zip_path = tmp_path / f"{skill_name}.skill"
|
|
with zipfile.ZipFile(zip_path, "w") as zf:
|
|
zf.writestr(
|
|
f"{skill_name}/SKILL.md",
|
|
f"---\nname: {skill_name}\ndescription: A test skill\n---\n\n# {skill_name}\n",
|
|
)
|
|
return zip_path
|
|
|
|
def test_success(self, tmp_path):
|
|
zip_path = self._make_skill_zip(tmp_path)
|
|
skills_root = tmp_path / "skills"
|
|
skills_root.mkdir()
|
|
result = get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
|
|
assert result["success"] is True
|
|
assert result["skill_name"] == "test-skill"
|
|
assert (skills_root / "custom" / "test-skill" / "SKILL.md").exists()
|
|
|
|
def test_scans_skill_markdown_before_install(self, tmp_path, monkeypatch):
|
|
zip_path = self._make_skill_zip(tmp_path)
|
|
skills_root = tmp_path / "skills"
|
|
skills_root.mkdir()
|
|
calls = []
|
|
|
|
async def _scan(content, *, executable, location):
|
|
calls.append({"content": content, "executable": executable, "location": location})
|
|
return ScanResult(decision="allow", reason="ok")
|
|
|
|
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
|
|
|
|
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
|
|
|
|
assert calls == [
|
|
{
|
|
"content": "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n",
|
|
"executable": False,
|
|
"location": "test-skill/SKILL.md",
|
|
}
|
|
]
|
|
|
|
def test_scans_support_files_and_scripts_before_install(self, tmp_path, monkeypatch):
|
|
zip_path = tmp_path / "test-skill.skill"
|
|
with zipfile.ZipFile(zip_path, "w") as zf:
|
|
zf.writestr("test-skill/SKILL.md", "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n")
|
|
zf.writestr("test-skill/references/guide.md", "# Guide\n")
|
|
zf.writestr("test-skill/templates/prompt.txt", "Use care.\n")
|
|
zf.writestr("test-skill/scripts/run.sh", "#!/bin/sh\necho ok\n")
|
|
zf.writestr("test-skill/assets/logo.png", b"\x89PNG\r\n\x1a\n")
|
|
zf.writestr("test-skill/references/.env", "TOKEN=secret\n")
|
|
zf.writestr("test-skill/templates/config.cfg", "TOKEN=secret\n")
|
|
skills_root = tmp_path / "skills"
|
|
skills_root.mkdir()
|
|
calls = []
|
|
|
|
async def _scan(content, *, executable, location):
|
|
calls.append({"content": content, "executable": executable, "location": location})
|
|
return ScanResult(decision="allow", reason="ok")
|
|
|
|
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
|
|
|
|
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
|
|
|
|
assert calls == [
|
|
{
|
|
"content": "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n",
|
|
"executable": False,
|
|
"location": "test-skill/SKILL.md",
|
|
},
|
|
{
|
|
"content": "# Guide\n",
|
|
"executable": False,
|
|
"location": "test-skill/references/guide.md",
|
|
},
|
|
{
|
|
"content": "#!/bin/sh\necho ok\n",
|
|
"executable": True,
|
|
"location": "test-skill/scripts/run.sh",
|
|
},
|
|
{
|
|
"content": "Use care.\n",
|
|
"executable": False,
|
|
"location": "test-skill/templates/prompt.txt",
|
|
},
|
|
]
|
|
assert all("secret" not in call["content"] for call in calls)
|
|
|
|
def test_nested_skill_markdown_prevents_install(self, tmp_path):
|
|
zip_path = tmp_path / "test-skill.skill"
|
|
with zipfile.ZipFile(zip_path, "w") as zf:
|
|
zf.writestr("test-skill/SKILL.md", "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n")
|
|
zf.writestr("test-skill/references/other/SKILL.md", "# Nested skill\n")
|
|
skills_root = tmp_path / "skills"
|
|
skills_root.mkdir()
|
|
|
|
with pytest.raises(SkillSecurityScanError, match="nested SKILL.md"):
|
|
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
|
|
|
|
assert not (skills_root / "custom" / "test-skill").exists()
|
|
|
|
def test_script_warn_prevents_install(self, tmp_path, monkeypatch):
|
|
zip_path = tmp_path / "test-skill.skill"
|
|
with zipfile.ZipFile(zip_path, "w") as zf:
|
|
zf.writestr("test-skill/SKILL.md", "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n")
|
|
zf.writestr("test-skill/scripts/run.sh", "#!/bin/sh\necho ok\n")
|
|
skills_root = tmp_path / "skills"
|
|
skills_root.mkdir()
|
|
|
|
async def _scan(*args, executable, **kwargs):
|
|
if executable:
|
|
return ScanResult(decision="warn", reason="script needs review")
|
|
return ScanResult(decision="allow", reason="ok")
|
|
|
|
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
|
|
|
|
with pytest.raises(SkillSecurityScanError, match="rejected executable.*script needs review"):
|
|
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
|
|
|
|
assert not (skills_root / "custom" / "test-skill").exists()
|
|
|
|
def test_security_scan_block_prevents_install(self, tmp_path, monkeypatch):
|
|
zip_path = self._make_skill_zip(tmp_path, skill_name="blocked-skill")
|
|
skills_root = tmp_path / "skills"
|
|
skills_root.mkdir()
|
|
|
|
async def _scan(*args, **kwargs):
|
|
return ScanResult(decision="block", reason="prompt injection")
|
|
|
|
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
|
|
|
|
with pytest.raises(SkillSecurityScanError, match="Security scan blocked.*prompt injection"):
|
|
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
|
|
|
|
assert not (skills_root / "custom" / "blocked-skill").exists()
|
|
|
|
def test_copy_failure_does_not_leave_partial_install(self, tmp_path, monkeypatch):
|
|
zip_path = self._make_skill_zip(tmp_path)
|
|
skills_root = tmp_path / "skills"
|
|
skills_root.mkdir()
|
|
|
|
def _copytree(src, dst):
|
|
partial = Path(dst)
|
|
partial.mkdir(parents=True)
|
|
(partial / "partial.txt").write_text("partial", encoding="utf-8")
|
|
raise OSError("copy failed")
|
|
|
|
monkeypatch.setattr("deerflow.skills.installer.shutil.copytree", _copytree)
|
|
|
|
with pytest.raises(OSError, match="copy failed"):
|
|
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
|
|
|
|
custom_dir = skills_root / "custom"
|
|
assert not (custom_dir / "test-skill").exists()
|
|
assert not [path for path in custom_dir.iterdir() if path.name.startswith(".installing-test-skill-")]
|
|
|
|
def test_concurrent_target_creation_does_not_get_clobbered(self, tmp_path, monkeypatch):
|
|
zip_path = self._make_skill_zip(tmp_path)
|
|
skills_root = tmp_path / "skills"
|
|
skills_root.mkdir()
|
|
target = skills_root / "custom" / "test-skill"
|
|
original_copytree = shutil.copytree
|
|
|
|
def _copytree(src, dst):
|
|
target.mkdir(parents=True)
|
|
(target / "marker.txt").write_text("external", encoding="utf-8")
|
|
return original_copytree(src, dst)
|
|
|
|
monkeypatch.setattr("deerflow.skills.installer.shutil.copytree", _copytree)
|
|
|
|
with pytest.raises(ValueError, match="already exists"):
|
|
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
|
|
|
|
assert (target / "marker.txt").read_text(encoding="utf-8") == "external"
|
|
assert not (target / "SKILL.md").exists()
|
|
|
|
def test_move_failure_cleans_reserved_target(self, tmp_path, monkeypatch):
|
|
zip_path = self._make_skill_zip(tmp_path)
|
|
skills_root = tmp_path / "skills"
|
|
skills_root.mkdir()
|
|
|
|
def _move(src, dst):
|
|
Path(dst).write_text("partial", encoding="utf-8")
|
|
raise OSError("move failed")
|
|
|
|
monkeypatch.setattr("deerflow.skills.installer.shutil.move", _move)
|
|
|
|
with pytest.raises(OSError, match="move failed"):
|
|
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
|
|
|
|
assert not (skills_root / "custom" / "test-skill").exists()
|
|
|
|
def test_duplicate_raises(self, tmp_path):
|
|
zip_path = self._make_skill_zip(tmp_path)
|
|
skills_root = tmp_path / "skills"
|
|
(skills_root / "custom" / "test-skill").mkdir(parents=True)
|
|
with pytest.raises(ValueError, match="already exists"):
|
|
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
|
|
|
|
def test_invalid_extension(self, tmp_path):
|
|
bad_path = tmp_path / "bad.zip"
|
|
bad_path.write_text("not a skill")
|
|
with pytest.raises(ValueError, match=".skill"):
|
|
get_or_new_skill_storage(skills_path=tmp_path).install_skill_from_archive(bad_path)
|
|
|
|
def test_bad_frontmatter(self, tmp_path):
|
|
zip_path = tmp_path / "bad.skill"
|
|
with zipfile.ZipFile(zip_path, "w") as zf:
|
|
zf.writestr("bad/SKILL.md", "no frontmatter here")
|
|
skills_root = tmp_path / "skills"
|
|
skills_root.mkdir()
|
|
with pytest.raises(ValueError, match="Invalid skill"):
|
|
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
|
|
|
|
def test_nonexistent_file(self, tmp_path):
|
|
with pytest.raises(FileNotFoundError):
|
|
get_or_new_skill_storage(skills_path=tmp_path).install_skill_from_archive(Path("/nonexistent/path.skill"))
|
|
|
|
def test_macosx_filtered_during_resolve(self, tmp_path):
|
|
"""Archive with __MACOSX dir still installs correctly."""
|
|
zip_path = tmp_path / "mac.skill"
|
|
with zipfile.ZipFile(zip_path, "w") as zf:
|
|
zf.writestr("my-skill/SKILL.md", "---\nname: my-skill\ndescription: desc\n---\n# My Skill\n")
|
|
zf.writestr("__MACOSX/._my-skill", "meta")
|
|
skills_root = tmp_path / "skills"
|
|
skills_root.mkdir()
|
|
result = get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
|
|
assert result["success"] is True
|
|
assert result["skill_name"] == "my-skill"
|