Files
deer-flow/backend/tests/test_skills_installer.py

410 lines
17 KiB
Python

"""Tests for deerflow.skills.installer — shared skill installation logic."""
import shutil
import stat
import zipfile
from pathlib import Path
import pytest
from deerflow.skills.installer import (
SkillSecurityScanError,
is_symlink_member,
is_unsafe_zip_member,
resolve_skill_dir_from_archive,
safe_extract_skill_archive,
should_ignore_archive_entry,
)
from deerflow.skills.security_scanner import ScanResult
from deerflow.skills.storage import get_or_new_skill_storage
# ---------------------------------------------------------------------------
# is_unsafe_zip_member
# ---------------------------------------------------------------------------
class TestIsUnsafeZipMember:
def test_absolute_path(self):
info = zipfile.ZipInfo("/etc/passwd")
assert is_unsafe_zip_member(info) is True
def test_windows_absolute_path(self):
info = zipfile.ZipInfo("C:\\Windows\\system32\\drivers\\etc\\hosts")
assert is_unsafe_zip_member(info) is True
def test_dotdot_traversal(self):
info = zipfile.ZipInfo("foo/../../../etc/passwd")
assert is_unsafe_zip_member(info) is True
def test_safe_member(self):
info = zipfile.ZipInfo("my-skill/SKILL.md")
assert is_unsafe_zip_member(info) is False
def test_empty_filename(self):
info = zipfile.ZipInfo("")
assert is_unsafe_zip_member(info) is False
# ---------------------------------------------------------------------------
# is_symlink_member
# ---------------------------------------------------------------------------
class TestIsSymlinkMember:
def test_detects_symlink(self):
info = zipfile.ZipInfo("link.txt")
info.external_attr = (stat.S_IFLNK | 0o777) << 16
assert is_symlink_member(info) is True
def test_regular_file(self):
info = zipfile.ZipInfo("file.txt")
info.external_attr = (stat.S_IFREG | 0o644) << 16
assert is_symlink_member(info) is False
# ---------------------------------------------------------------------------
# should_ignore_archive_entry
# ---------------------------------------------------------------------------
class TestShouldIgnoreArchiveEntry:
def test_macosx_ignored(self):
assert should_ignore_archive_entry(Path("__MACOSX")) is True
def test_dotfile_ignored(self):
assert should_ignore_archive_entry(Path(".DS_Store")) is True
def test_normal_dir_not_ignored(self):
assert should_ignore_archive_entry(Path("my-skill")) is False
# ---------------------------------------------------------------------------
# resolve_skill_dir_from_archive
# ---------------------------------------------------------------------------
class TestResolveSkillDir:
def test_single_dir(self, tmp_path):
(tmp_path / "my-skill").mkdir()
(tmp_path / "my-skill" / "SKILL.md").write_text("content")
assert resolve_skill_dir_from_archive(tmp_path) == tmp_path / "my-skill"
def test_with_macosx(self, tmp_path):
(tmp_path / "my-skill").mkdir()
(tmp_path / "my-skill" / "SKILL.md").write_text("content")
(tmp_path / "__MACOSX").mkdir()
assert resolve_skill_dir_from_archive(tmp_path) == tmp_path / "my-skill"
def test_empty_after_filter(self, tmp_path):
(tmp_path / "__MACOSX").mkdir()
(tmp_path / ".DS_Store").write_text("meta")
with pytest.raises(ValueError, match="empty"):
resolve_skill_dir_from_archive(tmp_path)
# ---------------------------------------------------------------------------
# safe_extract_skill_archive
# ---------------------------------------------------------------------------
class TestSafeExtract:
def _make_zip(self, tmp_path, members: dict[str, str | bytes]) -> Path:
"""Create a zip with given filename->content entries."""
zip_path = tmp_path / "test.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
for name, content in members.items():
if isinstance(content, str):
content = content.encode()
zf.writestr(name, content)
return zip_path
def test_rejects_zip_bomb(self, tmp_path):
zip_path = self._make_zip(tmp_path, {"big.txt": "x" * 1000})
dest = tmp_path / "out"
dest.mkdir()
with zipfile.ZipFile(zip_path) as zf:
with pytest.raises(ValueError, match="too large"):
safe_extract_skill_archive(zf, dest, max_total_size=100)
def test_rejects_absolute_path(self, tmp_path):
zip_path = tmp_path / "abs.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("/etc/passwd", "root:x:0:0")
dest = tmp_path / "out"
dest.mkdir()
with zipfile.ZipFile(zip_path) as zf:
with pytest.raises(ValueError, match="unsafe"):
safe_extract_skill_archive(zf, dest)
def test_skips_symlinks(self, tmp_path):
zip_path = tmp_path / "sym.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
info = zipfile.ZipInfo("link.txt")
info.external_attr = (stat.S_IFLNK | 0o777) << 16
zf.writestr(info, "/etc/passwd")
zf.writestr("normal.txt", "hello")
dest = tmp_path / "out"
dest.mkdir()
with zipfile.ZipFile(zip_path) as zf:
safe_extract_skill_archive(zf, dest)
assert (dest / "normal.txt").exists()
assert not (dest / "link.txt").exists()
def test_normal_archive(self, tmp_path):
zip_path = self._make_zip(
tmp_path,
{
"my-skill/SKILL.md": "---\nname: test\ndescription: x\n---\n# Test",
"my-skill/README.md": "readme",
},
)
dest = tmp_path / "out"
dest.mkdir()
with zipfile.ZipFile(zip_path) as zf:
safe_extract_skill_archive(zf, dest)
assert (dest / "my-skill" / "SKILL.md").exists()
assert (dest / "my-skill" / "README.md").exists()
# ---------------------------------------------------------------------------
# install_skill_from_archive (full integration)
# ---------------------------------------------------------------------------
class TestInstallSkillFromArchive:
@pytest.fixture(autouse=True)
def _allow_security_scan(self, monkeypatch):
async def _scan(*args, **kwargs):
return ScanResult(decision="allow", reason="ok")
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
def _make_skill_zip(self, tmp_path: Path, skill_name: str = "test-skill") -> Path:
"""Create a valid .skill archive."""
zip_path = tmp_path / f"{skill_name}.skill"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr(
f"{skill_name}/SKILL.md",
f"---\nname: {skill_name}\ndescription: A test skill\n---\n\n# {skill_name}\n",
)
return zip_path
def test_success(self, tmp_path):
zip_path = self._make_skill_zip(tmp_path)
skills_root = tmp_path / "skills"
skills_root.mkdir()
result = get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
assert result["success"] is True
assert result["skill_name"] == "test-skill"
assert (skills_root / "custom" / "test-skill" / "SKILL.md").exists()
def test_scans_skill_markdown_before_install(self, tmp_path, monkeypatch):
zip_path = self._make_skill_zip(tmp_path)
skills_root = tmp_path / "skills"
skills_root.mkdir()
calls = []
async def _scan(content, *, executable, location):
calls.append({"content": content, "executable": executable, "location": location})
return ScanResult(decision="allow", reason="ok")
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
assert calls == [
{
"content": "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n",
"executable": False,
"location": "test-skill/SKILL.md",
}
]
def test_scans_support_files_and_scripts_before_install(self, tmp_path, monkeypatch):
zip_path = tmp_path / "test-skill.skill"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("test-skill/SKILL.md", "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n")
zf.writestr("test-skill/references/guide.md", "# Guide\n")
zf.writestr("test-skill/templates/prompt.txt", "Use care.\n")
zf.writestr("test-skill/scripts/run.sh", "#!/bin/sh\necho ok\n")
zf.writestr("test-skill/assets/logo.png", b"\x89PNG\r\n\x1a\n")
zf.writestr("test-skill/references/.env", "TOKEN=secret\n")
zf.writestr("test-skill/templates/config.cfg", "TOKEN=secret\n")
skills_root = tmp_path / "skills"
skills_root.mkdir()
calls = []
async def _scan(content, *, executable, location):
calls.append({"content": content, "executable": executable, "location": location})
return ScanResult(decision="allow", reason="ok")
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
assert calls == [
{
"content": "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n",
"executable": False,
"location": "test-skill/SKILL.md",
},
{
"content": "# Guide\n",
"executable": False,
"location": "test-skill/references/guide.md",
},
{
"content": "#!/bin/sh\necho ok\n",
"executable": True,
"location": "test-skill/scripts/run.sh",
},
{
"content": "Use care.\n",
"executable": False,
"location": "test-skill/templates/prompt.txt",
},
]
assert all("secret" not in call["content"] for call in calls)
def test_nested_skill_markdown_prevents_install(self, tmp_path):
zip_path = tmp_path / "test-skill.skill"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("test-skill/SKILL.md", "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n")
zf.writestr("test-skill/references/other/SKILL.md", "# Nested skill\n")
skills_root = tmp_path / "skills"
skills_root.mkdir()
with pytest.raises(SkillSecurityScanError, match="nested SKILL.md"):
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
assert not (skills_root / "custom" / "test-skill").exists()
def test_script_warn_prevents_install(self, tmp_path, monkeypatch):
zip_path = tmp_path / "test-skill.skill"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("test-skill/SKILL.md", "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n")
zf.writestr("test-skill/scripts/run.sh", "#!/bin/sh\necho ok\n")
skills_root = tmp_path / "skills"
skills_root.mkdir()
async def _scan(*args, executable, **kwargs):
if executable:
return ScanResult(decision="warn", reason="script needs review")
return ScanResult(decision="allow", reason="ok")
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
with pytest.raises(SkillSecurityScanError, match="rejected executable.*script needs review"):
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
assert not (skills_root / "custom" / "test-skill").exists()
def test_security_scan_block_prevents_install(self, tmp_path, monkeypatch):
zip_path = self._make_skill_zip(tmp_path, skill_name="blocked-skill")
skills_root = tmp_path / "skills"
skills_root.mkdir()
async def _scan(*args, **kwargs):
return ScanResult(decision="block", reason="prompt injection")
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
with pytest.raises(SkillSecurityScanError, match="Security scan blocked.*prompt injection"):
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
assert not (skills_root / "custom" / "blocked-skill").exists()
def test_copy_failure_does_not_leave_partial_install(self, tmp_path, monkeypatch):
zip_path = self._make_skill_zip(tmp_path)
skills_root = tmp_path / "skills"
skills_root.mkdir()
def _copytree(src, dst):
partial = Path(dst)
partial.mkdir(parents=True)
(partial / "partial.txt").write_text("partial", encoding="utf-8")
raise OSError("copy failed")
monkeypatch.setattr("deerflow.skills.installer.shutil.copytree", _copytree)
with pytest.raises(OSError, match="copy failed"):
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
custom_dir = skills_root / "custom"
assert not (custom_dir / "test-skill").exists()
assert not [path for path in custom_dir.iterdir() if path.name.startswith(".installing-test-skill-")]
def test_concurrent_target_creation_does_not_get_clobbered(self, tmp_path, monkeypatch):
zip_path = self._make_skill_zip(tmp_path)
skills_root = tmp_path / "skills"
skills_root.mkdir()
target = skills_root / "custom" / "test-skill"
original_copytree = shutil.copytree
def _copytree(src, dst):
target.mkdir(parents=True)
(target / "marker.txt").write_text("external", encoding="utf-8")
return original_copytree(src, dst)
monkeypatch.setattr("deerflow.skills.installer.shutil.copytree", _copytree)
with pytest.raises(ValueError, match="already exists"):
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
assert (target / "marker.txt").read_text(encoding="utf-8") == "external"
assert not (target / "SKILL.md").exists()
def test_move_failure_cleans_reserved_target(self, tmp_path, monkeypatch):
zip_path = self._make_skill_zip(tmp_path)
skills_root = tmp_path / "skills"
skills_root.mkdir()
def _move(src, dst):
Path(dst).write_text("partial", encoding="utf-8")
raise OSError("move failed")
monkeypatch.setattr("deerflow.skills.installer.shutil.move", _move)
with pytest.raises(OSError, match="move failed"):
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
assert not (skills_root / "custom" / "test-skill").exists()
def test_duplicate_raises(self, tmp_path):
zip_path = self._make_skill_zip(tmp_path)
skills_root = tmp_path / "skills"
(skills_root / "custom" / "test-skill").mkdir(parents=True)
with pytest.raises(ValueError, match="already exists"):
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
def test_invalid_extension(self, tmp_path):
bad_path = tmp_path / "bad.zip"
bad_path.write_text("not a skill")
with pytest.raises(ValueError, match=".skill"):
get_or_new_skill_storage(skills_path=tmp_path).install_skill_from_archive(bad_path)
def test_bad_frontmatter(self, tmp_path):
zip_path = tmp_path / "bad.skill"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("bad/SKILL.md", "no frontmatter here")
skills_root = tmp_path / "skills"
skills_root.mkdir()
with pytest.raises(ValueError, match="Invalid skill"):
get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
def test_nonexistent_file(self, tmp_path):
with pytest.raises(FileNotFoundError):
get_or_new_skill_storage(skills_path=tmp_path).install_skill_from_archive(Path("/nonexistent/path.skill"))
def test_macosx_filtered_during_resolve(self, tmp_path):
"""Archive with __MACOSX dir still installs correctly."""
zip_path = tmp_path / "mac.skill"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("my-skill/SKILL.md", "---\nname: my-skill\ndescription: desc\n---\n# My Skill\n")
zf.writestr("__MACOSX/._my-skill", "meta")
skills_root = tmp_path / "skills"
skills_root.mkdir()
result = get_or_new_skill_storage(skills_path=skills_root).install_skill_from_archive(zip_path)
assert result["success"] is True
assert result["skill_name"] == "my-skill"