feat: POST /api/colonies/import — onboard a colony from a tarball
Accepts a multipart upload of `tar` / `tar.gz` (any compression
tarfile.open auto-detects) containing a single top-level directory and
unpacks it into HIVE_HOME/colonies/<name>. Lets a desktop client (or any
external tool) hand a colony spec to a remote runtime to run.
Form fields:
file (required) the archive blob
name (optional) override the colony name; defaults to
the archive's top-level dir
replace_existing (optional) "true" to overwrite; else 409 if the
target dir already exists
Safety:
- 50 MB upload cap (multipart reader streams + caps each part)
- Manual path-traversal validation per member (Python 3.11 compatible —
tarfile's safe `filter='data'` only landed in 3.12)
- Symlinks, hardlinks, device, fifo entries all rejected
- Colony name validated against the existing [a-z0-9_]+ pattern used by
routes_colony_workers + queen_lifecycle_tools
- Mode bits masked to 0o755 / 0o644 so a tampered tar can't ship
world-writable scripts
Tests cover happy path, name override, 409 / 201 around replace_existing,
path traversal, absolute paths, symlinks, multiple top-level dirs,
invalid colony name, missing file part, corrupt tar, non-multipart, and
uncompressed tar.
Future work (not in this PR): export endpoint, colony list/delete via
this same prefix, and an MCP tool wrapper so queens can move colonies
between hosts mid-conversation.
This commit is contained in:
@@ -344,6 +344,7 @@ def create_app(model: str | None = None) -> web.Application:
|
|||||||
app.router.add_get("/api/browser/status/stream", handle_browser_status_stream)
|
app.router.add_get("/api/browser/status/stream", handle_browser_status_stream)
|
||||||
|
|
||||||
# Register route modules
|
# Register route modules
|
||||||
|
from framework.server.routes_colonies import register_routes as register_colonies_routes
|
||||||
from framework.server.routes_colony_tools import register_routes as register_colony_tools_routes
|
from framework.server.routes_colony_tools import register_routes as register_colony_tools_routes
|
||||||
from framework.server.routes_colony_workers import register_routes as register_colony_worker_routes
|
from framework.server.routes_colony_workers import register_routes as register_colony_worker_routes
|
||||||
from framework.server.routes_config import register_routes as register_config_routes
|
from framework.server.routes_config import register_routes as register_config_routes
|
||||||
@@ -370,6 +371,7 @@ def create_app(model: str | None = None) -> web.Application:
|
|||||||
register_log_routes(app)
|
register_log_routes(app)
|
||||||
register_queen_routes(app)
|
register_queen_routes(app)
|
||||||
register_queen_tools_routes(app)
|
register_queen_tools_routes(app)
|
||||||
|
register_colonies_routes(app)
|
||||||
register_colony_tools_routes(app)
|
register_colony_tools_routes(app)
|
||||||
register_mcp_routes(app)
|
register_mcp_routes(app)
|
||||||
register_colony_worker_routes(app)
|
register_colony_worker_routes(app)
|
||||||
|
|||||||
@@ -0,0 +1,238 @@
|
|||||||
|
"""HTTP routes for colony import/export — moving a colony spec between hosts.
|
||||||
|
|
||||||
|
Today, just the import side: accept a `tar.gz` of a colony directory and
|
||||||
|
unpack it into ``HIVE_HOME/colonies/<name>/`` so a desktop client (or any
|
||||||
|
external mover) can hand a colony to a remote runtime to run.
|
||||||
|
|
||||||
|
POST /api/colonies/import -- multipart/form-data
|
||||||
|
file required -- .tar / .tar.gz / .tar.bz2 / .tar.xz
|
||||||
|
name optional -- override the colony name; defaults to the
|
||||||
|
archive's single top-level directory
|
||||||
|
replace_existing optional -- "true" to overwrite an existing colony,
|
||||||
|
else 409 if the target dir exists
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import io
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
import shutil
|
||||||
|
import tarfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from aiohttp import web
|
||||||
|
|
||||||
|
from framework.config import COLONIES_DIR
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Matches the convention used elsewhere in the codebase (see
|
||||||
|
# routes_colony_workers and queen_lifecycle_tools): lowercase alphanumerics
|
||||||
|
# and underscores only. No dots, no slashes — names are filesystem segments.
|
||||||
|
_COLONY_NAME_RE = re.compile(r"^[a-z0-9_]+$")
|
||||||
|
|
||||||
|
# 50 MB cap on upload size. Colonies bundle scripts, prompts, memories,
|
||||||
|
# and small data files; anything bigger usually shouldn't be in version
|
||||||
|
# control to begin with. Bump if a real use-case lands here.
|
||||||
|
_MAX_UPLOAD_BYTES = 50 * 1024 * 1024
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_colony_name(name: str) -> str | None:
|
||||||
|
"""Return an error message if name isn't a valid colony name, else None."""
|
||||||
|
if not name:
|
||||||
|
return "colony name is required"
|
||||||
|
if len(name) > 64:
|
||||||
|
return "colony name too long (max 64 chars)"
|
||||||
|
if not _COLONY_NAME_RE.match(name):
|
||||||
|
return "colony name must match [a-z0-9_]+"
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _archive_top_level(tf: tarfile.TarFile) -> tuple[str | None, str | None]:
|
||||||
|
"""Find the archive's single top-level directory, if it has one.
|
||||||
|
|
||||||
|
Returns ``(name, error)``. Allows the archive to optionally include a
|
||||||
|
leading ``./`` prefix on every member (some tar implementations emit this).
|
||||||
|
"""
|
||||||
|
tops: set[str] = set()
|
||||||
|
for member in tf.getmembers():
|
||||||
|
# Reject empty / absolute / parent-traversal names early; the deeper
|
||||||
|
# walker rejects them again, but failing fast here gives a cleaner
|
||||||
|
# error message back to the caller.
|
||||||
|
if not member.name or member.name.startswith("/"):
|
||||||
|
return None, f"invalid member path: {member.name!r}"
|
||||||
|
parts = Path(member.name).parts
|
||||||
|
if not parts or parts[0] == "..":
|
||||||
|
return None, f"invalid member path: {member.name!r}"
|
||||||
|
# Skip the archive's own root entry if present (`tar` emits "./").
|
||||||
|
first = parts[0] if parts[0] != "." else (parts[1] if len(parts) > 1 else "")
|
||||||
|
if first:
|
||||||
|
tops.add(first)
|
||||||
|
if len(tops) != 1:
|
||||||
|
return None, "archive must contain exactly one top-level directory"
|
||||||
|
return next(iter(tops)), None
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_extract_tar(tf: tarfile.TarFile, dest: Path, *, strip_prefix: str) -> str | None:
|
||||||
|
"""Extract every member of ``tf`` into ``dest``, stripping ``strip_prefix``.
|
||||||
|
|
||||||
|
Each member's resolved path must stay under ``dest``; symlinks, hardlinks,
|
||||||
|
and device/fifo entries are rejected. Returns an error string on failure.
|
||||||
|
|
||||||
|
Python's ``tarfile.extractall(filter='data')`` does similar checks but
|
||||||
|
only landed in 3.12; we run on 3.11+, so do the validation explicitly.
|
||||||
|
"""
|
||||||
|
base = dest.resolve()
|
||||||
|
base.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
for member in tf.getmembers():
|
||||||
|
# Compute the relative target name after stripping the top-level dir.
|
||||||
|
# Both "<prefix>/foo" and "./<prefix>/foo" map to "foo".
|
||||||
|
name = member.name
|
||||||
|
if name.startswith("./"):
|
||||||
|
name = name[2:]
|
||||||
|
if name == strip_prefix:
|
||||||
|
# The top-level dir entry itself; nothing to extract beyond
|
||||||
|
# making sure dest exists (handled above).
|
||||||
|
continue
|
||||||
|
prefix_with_sep = f"{strip_prefix}/"
|
||||||
|
if not name.startswith(prefix_with_sep):
|
||||||
|
return f"member outside top-level dir: {member.name!r}"
|
||||||
|
rel = name[len(prefix_with_sep):]
|
||||||
|
if not rel:
|
||||||
|
continue
|
||||||
|
# Reject any "..", absolute paths, or weird member types.
|
||||||
|
if ".." in Path(rel).parts:
|
||||||
|
return f"path traversal in member: {member.name!r}"
|
||||||
|
if member.issym() or member.islnk():
|
||||||
|
return f"symlinks/hardlinks not supported: {member.name!r}"
|
||||||
|
if member.isdev() or member.isfifo():
|
||||||
|
return f"device/fifo not supported: {member.name!r}"
|
||||||
|
|
||||||
|
target = (base / rel).resolve()
|
||||||
|
try:
|
||||||
|
target.relative_to(base)
|
||||||
|
except ValueError:
|
||||||
|
return f"member escapes destination: {member.name!r}"
|
||||||
|
|
||||||
|
if member.isdir():
|
||||||
|
target.mkdir(parents=True, exist_ok=True)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Regular file. Extract via stream copy so we don't trust tarfile's
|
||||||
|
# built-in path handling — we already resolved the destination.
|
||||||
|
target.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
src = tf.extractfile(member)
|
||||||
|
if src is None:
|
||||||
|
# Unknown member type that slipped past the checks above.
|
||||||
|
return f"unsupported member: {member.name!r}"
|
||||||
|
with target.open("wb") as out:
|
||||||
|
shutil.copyfileobj(src, out)
|
||||||
|
# Best-effort mode bits — masked to user-rwx + group/other-rx so we
|
||||||
|
# don't accidentally honour world-writable bits from a tampered tar.
|
||||||
|
target.chmod(member.mode & 0o755 if member.mode else 0o644)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_import_colony(request: web.Request) -> web.Response:
|
||||||
|
"""POST /api/colonies/import — unpack a colony tarball into HIVE_HOME."""
|
||||||
|
if not request.content_type.startswith("multipart/"):
|
||||||
|
return web.json_response(
|
||||||
|
{"error": "expected multipart/form-data"}, status=400
|
||||||
|
)
|
||||||
|
|
||||||
|
reader = await request.multipart()
|
||||||
|
upload: bytes | None = None
|
||||||
|
upload_filename: str | None = None
|
||||||
|
form: dict[str, str] = {}
|
||||||
|
|
||||||
|
while True:
|
||||||
|
part = await reader.next()
|
||||||
|
if part is None:
|
||||||
|
break
|
||||||
|
if part.name == "file":
|
||||||
|
buf = io.BytesIO()
|
||||||
|
while True:
|
||||||
|
chunk = await part.read_chunk(size=65536)
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
buf.write(chunk)
|
||||||
|
if buf.tell() > _MAX_UPLOAD_BYTES:
|
||||||
|
return web.json_response(
|
||||||
|
{"error": f"upload exceeds {_MAX_UPLOAD_BYTES} bytes"},
|
||||||
|
status=413,
|
||||||
|
)
|
||||||
|
upload = buf.getvalue()
|
||||||
|
upload_filename = part.filename or ""
|
||||||
|
else:
|
||||||
|
form[part.name or ""] = (await part.text()).strip()
|
||||||
|
|
||||||
|
if upload is None:
|
||||||
|
return web.json_response({"error": "missing 'file' part"}, status=400)
|
||||||
|
|
||||||
|
replace_existing = form.get("replace_existing", "false").lower() == "true"
|
||||||
|
name_override = form.get("name", "").strip() or None
|
||||||
|
|
||||||
|
# Open the archive — tarfile auto-detects compression with mode='r:*'.
|
||||||
|
try:
|
||||||
|
tf = tarfile.open(fileobj=io.BytesIO(upload), mode="r:*")
|
||||||
|
except tarfile.TarError as err:
|
||||||
|
return web.json_response(
|
||||||
|
{"error": f"invalid tar archive: {err}"}, status=400
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
top, top_err = _archive_top_level(tf)
|
||||||
|
if top_err or top is None:
|
||||||
|
return web.json_response({"error": top_err}, status=400)
|
||||||
|
|
||||||
|
colony_name = name_override or top
|
||||||
|
name_err = _validate_colony_name(colony_name)
|
||||||
|
if name_err:
|
||||||
|
return web.json_response({"error": name_err}, status=400)
|
||||||
|
|
||||||
|
target = COLONIES_DIR / colony_name
|
||||||
|
if target.exists():
|
||||||
|
if not replace_existing:
|
||||||
|
return web.json_response(
|
||||||
|
{
|
||||||
|
"error": "colony already exists",
|
||||||
|
"name": colony_name,
|
||||||
|
"hint": "set replace_existing=true to overwrite",
|
||||||
|
},
|
||||||
|
status=409,
|
||||||
|
)
|
||||||
|
shutil.rmtree(target)
|
||||||
|
|
||||||
|
extract_err = _safe_extract_tar(tf, target, strip_prefix=top)
|
||||||
|
if extract_err:
|
||||||
|
# Best-effort cleanup so a partial extract doesn't get left behind.
|
||||||
|
shutil.rmtree(target, ignore_errors=True)
|
||||||
|
return web.json_response({"error": extract_err}, status=400)
|
||||||
|
finally:
|
||||||
|
tf.close()
|
||||||
|
|
||||||
|
files_imported = sum(1 for _ in target.rglob("*") if _.is_file())
|
||||||
|
logger.info(
|
||||||
|
"Imported colony %s (%d files) from upload %s (%d bytes)",
|
||||||
|
colony_name,
|
||||||
|
files_imported,
|
||||||
|
upload_filename or "<unnamed>",
|
||||||
|
len(upload),
|
||||||
|
)
|
||||||
|
|
||||||
|
return web.json_response(
|
||||||
|
{
|
||||||
|
"name": colony_name,
|
||||||
|
"path": str(target),
|
||||||
|
"files_imported": files_imported,
|
||||||
|
"replaced": replace_existing and target.exists(),
|
||||||
|
},
|
||||||
|
status=201,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def register_routes(app: web.Application) -> None:
|
||||||
|
app.router.add_post("/api/colonies/import", handle_import_colony)
|
||||||
@@ -0,0 +1,234 @@
|
|||||||
|
"""Tests for POST /api/colonies/import — tar-based colony onboarding.
|
||||||
|
|
||||||
|
The handler resolves writes against ``framework.config.COLONIES_DIR``;
|
||||||
|
every test redirects that into a ``tmp_path`` so we never touch the real
|
||||||
|
``~/.hive/colonies`` tree.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import io
|
||||||
|
import tarfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from aiohttp import FormData, web
|
||||||
|
from aiohttp.test_utils import TestClient, TestServer
|
||||||
|
|
||||||
|
from framework.server import routes_colonies
|
||||||
|
|
||||||
|
|
||||||
|
def _build_tar(layout: dict[str, bytes | None], *, gzip: bool = True) -> bytes:
|
||||||
|
"""Build an in-memory tar with the given paths.
|
||||||
|
|
||||||
|
``layout`` maps archive member names to file contents; passing ``None``
|
||||||
|
creates a directory entry instead of a regular file.
|
||||||
|
"""
|
||||||
|
buf = io.BytesIO()
|
||||||
|
mode = "w:gz" if gzip else "w"
|
||||||
|
with tarfile.open(fileobj=buf, mode=mode) as tf:
|
||||||
|
for name, content in layout.items():
|
||||||
|
if content is None:
|
||||||
|
info = tarfile.TarInfo(name=name)
|
||||||
|
info.type = tarfile.DIRTYPE
|
||||||
|
info.mode = 0o755
|
||||||
|
tf.addfile(info)
|
||||||
|
else:
|
||||||
|
info = tarfile.TarInfo(name=name)
|
||||||
|
info.size = len(content)
|
||||||
|
info.mode = 0o644
|
||||||
|
tf.addfile(info, io.BytesIO(content))
|
||||||
|
return buf.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
def _build_tar_with_symlink(top: str, link_name: str, link_target: str) -> bytes:
|
||||||
|
buf = io.BytesIO()
|
||||||
|
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
|
||||||
|
info = tarfile.TarInfo(name=top)
|
||||||
|
info.type = tarfile.DIRTYPE
|
||||||
|
info.mode = 0o755
|
||||||
|
tf.addfile(info)
|
||||||
|
sym = tarfile.TarInfo(name=f"{top}/{link_name}")
|
||||||
|
sym.type = tarfile.SYMTYPE
|
||||||
|
sym.linkname = link_target
|
||||||
|
tf.addfile(sym)
|
||||||
|
return buf.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def colonies_dir(tmp_path, monkeypatch):
|
||||||
|
"""Redirect COLONIES_DIR into a tmp tree."""
|
||||||
|
colonies = tmp_path / "colonies"
|
||||||
|
colonies.mkdir()
|
||||||
|
monkeypatch.setattr(routes_colonies, "COLONIES_DIR", colonies)
|
||||||
|
return colonies
|
||||||
|
|
||||||
|
|
||||||
|
async def _client(app: web.Application) -> TestClient:
|
||||||
|
return TestClient(TestServer(app))
|
||||||
|
|
||||||
|
|
||||||
|
def _app() -> web.Application:
|
||||||
|
app = web.Application()
|
||||||
|
routes_colonies.register_routes(app)
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
|
def _form(file_bytes: bytes, *, filename: str = "colony.tar.gz", **fields: str) -> FormData:
|
||||||
|
fd = FormData()
|
||||||
|
fd.add_field("file", file_bytes, filename=filename, content_type="application/gzip")
|
||||||
|
for k, v in fields.items():
|
||||||
|
fd.add_field(k, v)
|
||||||
|
return fd
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_happy_path_imports_colony(colonies_dir: Path) -> None:
|
||||||
|
archive = _build_tar(
|
||||||
|
{
|
||||||
|
"x_daily/": None,
|
||||||
|
"x_daily/metadata.json": b'{"colony_name":"x_daily"}',
|
||||||
|
"x_daily/scripts/run.sh": b"#!/bin/sh\necho hi\n",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
async with await _client(_app()) as c:
|
||||||
|
resp = await c.post("/api/colonies/import", data=_form(archive))
|
||||||
|
assert resp.status == 201, await resp.text()
|
||||||
|
body = await resp.json()
|
||||||
|
assert body["name"] == "x_daily"
|
||||||
|
assert body["files_imported"] == 2
|
||||||
|
assert (colonies_dir / "x_daily" / "metadata.json").read_bytes() == b'{"colony_name":"x_daily"}'
|
||||||
|
assert (colonies_dir / "x_daily" / "scripts" / "run.sh").exists()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_name_override(colonies_dir: Path) -> None:
|
||||||
|
archive = _build_tar({"x_daily/": None, "x_daily/file.txt": b"hi"})
|
||||||
|
async with await _client(_app()) as c:
|
||||||
|
resp = await c.post(
|
||||||
|
"/api/colonies/import", data=_form(archive, name="other_name")
|
||||||
|
)
|
||||||
|
assert resp.status == 201
|
||||||
|
body = await resp.json()
|
||||||
|
assert body["name"] == "other_name"
|
||||||
|
assert (colonies_dir / "other_name" / "file.txt").read_bytes() == b"hi"
|
||||||
|
assert not (colonies_dir / "x_daily").exists()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rejects_existing_without_replace_flag(colonies_dir: Path) -> None:
|
||||||
|
(colonies_dir / "x_daily").mkdir()
|
||||||
|
(colonies_dir / "x_daily" / "old.txt").write_text("preserved")
|
||||||
|
archive = _build_tar({"x_daily/": None, "x_daily/new.txt": b"new"})
|
||||||
|
async with await _client(_app()) as c:
|
||||||
|
resp = await c.post("/api/colonies/import", data=_form(archive))
|
||||||
|
assert resp.status == 409
|
||||||
|
# Original content untouched
|
||||||
|
assert (colonies_dir / "x_daily" / "old.txt").read_text() == "preserved"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_replace_existing_overwrites(colonies_dir: Path) -> None:
|
||||||
|
(colonies_dir / "x_daily").mkdir()
|
||||||
|
(colonies_dir / "x_daily" / "old.txt").write_text("preserved")
|
||||||
|
archive = _build_tar({"x_daily/": None, "x_daily/new.txt": b"new"})
|
||||||
|
async with await _client(_app()) as c:
|
||||||
|
resp = await c.post(
|
||||||
|
"/api/colonies/import",
|
||||||
|
data=_form(archive, replace_existing="true"),
|
||||||
|
)
|
||||||
|
assert resp.status == 201, await resp.text()
|
||||||
|
assert not (colonies_dir / "x_daily" / "old.txt").exists()
|
||||||
|
assert (colonies_dir / "x_daily" / "new.txt").read_text() == "new"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rejects_path_traversal(colonies_dir: Path) -> None:
|
||||||
|
archive = _build_tar(
|
||||||
|
{
|
||||||
|
"x_daily/": None,
|
||||||
|
"x_daily/../escape.txt": b"oops",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
async with await _client(_app()) as c:
|
||||||
|
resp = await c.post("/api/colonies/import", data=_form(archive))
|
||||||
|
assert resp.status == 400
|
||||||
|
assert "traversal" in (await resp.json())["error"].lower() or "outside" in (await resp.json())["error"].lower()
|
||||||
|
assert not (colonies_dir / "x_daily").exists()
|
||||||
|
assert not (colonies_dir.parent / "escape.txt").exists()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rejects_absolute_member(colonies_dir: Path) -> None:
|
||||||
|
archive = _build_tar({"x_daily/": None, "/etc/passwd": b"oops"})
|
||||||
|
async with await _client(_app()) as c:
|
||||||
|
resp = await c.post("/api/colonies/import", data=_form(archive))
|
||||||
|
assert resp.status == 400
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rejects_symlinks(colonies_dir: Path) -> None:
|
||||||
|
archive = _build_tar_with_symlink("x_daily", "evil", "/etc/passwd")
|
||||||
|
async with await _client(_app()) as c:
|
||||||
|
resp = await c.post("/api/colonies/import", data=_form(archive))
|
||||||
|
assert resp.status == 400
|
||||||
|
assert "symlink" in (await resp.json())["error"].lower()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rejects_multiple_top_level_dirs(colonies_dir: Path) -> None:
|
||||||
|
archive = _build_tar(
|
||||||
|
{
|
||||||
|
"a/": None,
|
||||||
|
"a/x.txt": b"a",
|
||||||
|
"b/": None,
|
||||||
|
"b/y.txt": b"b",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
async with await _client(_app()) as c:
|
||||||
|
resp = await c.post("/api/colonies/import", data=_form(archive))
|
||||||
|
assert resp.status == 400
|
||||||
|
assert "top-level" in (await resp.json())["error"].lower()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rejects_invalid_colony_name(colonies_dir: Path) -> None:
|
||||||
|
archive = _build_tar({"Bad-Name/": None, "Bad-Name/x.txt": b"x"})
|
||||||
|
async with await _client(_app()) as c:
|
||||||
|
resp = await c.post("/api/colonies/import", data=_form(archive))
|
||||||
|
assert resp.status == 400
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rejects_non_multipart(colonies_dir: Path) -> None:
|
||||||
|
async with await _client(_app()) as c:
|
||||||
|
resp = await c.post("/api/colonies/import", data=b"not multipart", headers={"Content-Type": "application/octet-stream"})
|
||||||
|
assert resp.status == 400
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rejects_corrupt_tar(colonies_dir: Path) -> None:
|
||||||
|
async with await _client(_app()) as c:
|
||||||
|
resp = await c.post("/api/colonies/import", data=_form(b"not a real tar"))
|
||||||
|
assert resp.status == 400
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_rejects_missing_file_part(colonies_dir: Path) -> None:
|
||||||
|
fd = FormData()
|
||||||
|
fd.add_field("name", "anything")
|
||||||
|
async with await _client(_app()) as c:
|
||||||
|
resp = await c.post("/api/colonies/import", data=fd)
|
||||||
|
assert resp.status == 400
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_accepts_uncompressed_tar(colonies_dir: Path) -> None:
|
||||||
|
archive = _build_tar({"x_daily/": None, "x_daily/file.txt": b"plain"}, gzip=False)
|
||||||
|
async with await _client(_app()) as c:
|
||||||
|
resp = await c.post(
|
||||||
|
"/api/colonies/import",
|
||||||
|
data=_form(archive, filename="colony.tar"),
|
||||||
|
)
|
||||||
|
assert resp.status == 201
|
||||||
|
assert (colonies_dir / "x_daily" / "file.txt").read_text() == "plain"
|
||||||
Reference in New Issue
Block a user