Files
deer-flow/backend/scripts/migrate_user_isolation.py
T
yangzheli 59c4a3f0a4
Unit Tests / backend-unit-tests (push) Waiting to run
E2E Tests / e2e-tests (push) Waiting to run
Frontend Unit Tests / frontend-unit-tests (push) Waiting to run
Lint Check / lint (push) Waiting to run
Lint Check / lint-frontend (push) Waiting to run
feat(agent): add custom-agent self-updates with user isolation (#2713)
* feat(agent): add update_agent tool for in-chat custom-agent self-updates (#2616)

Custom agents had no built-in way to persist updates to their own SOUL.md /
config.yaml from a normal chat — `setup_agent` was only bound during the
bootstrap flow, so when the user asked the agent to refine its description
or personality, the agent would shell out via bash/write_file and the edits
landed in a temporary sandbox/tool workspace instead of
`{base_dir}/agents/{agent_name}/`.

Changes:
- New `update_agent` builtin tool with partial-update semantics (only the
  fields you pass are written) and atomic temp-file + os.replace writes so
  a failed update never corrupts existing SOUL.md / config.yaml.
- Lead agent now binds `update_agent` in the non-bootstrap path whenever
  `agent_name` is set in the runtime context. Default agent (no
  agent_name) and bootstrap flow are unchanged.
- New `<self_update>` system-prompt section is injected for custom agents,
  instructing them to use `update_agent` — and explicitly NOT bash /
  write_file — to persist self-updates.
- Tests: 11 new cases in `tests/test_update_agent_tool.py` covering
  validation (missing/invalid agent_name, unknown agent, no fields),
  partial updates (soul-only, description-only, skills=[] vs omitted),
  no-op detection, atomic-write safety, and AgentConfig round-tripping;
  plus 2 new cases in `tests/test_lead_agent_prompt.py` covering the
  self-update prompt section.
- Docs: updated backend/CLAUDE.md builtin tools list and tools.mdx
  (en/zh) with the new tool description.

* feat(agent): isolate custom agents per user

Store custom agent definitions under the effective user, keep legacy agents readable until migration, and cover API/tool/migration behavior with tests.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat: consistent write/delete targets & add --user-id to migration

---------

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-05 23:17:42 +08:00

243 lines
8.4 KiB
Python

"""One-time migration: move legacy thread dirs and memory into per-user layout.
Usage:
PYTHONPATH=. python scripts/migrate_user_isolation.py [--dry-run] [--user-id USER_ID]
The script is idempotent — re-running it after a successful migration is a no-op.
"""
import argparse
import logging
import shutil
from deerflow.config.paths import Paths, get_paths
logger = logging.getLogger(__name__)
def migrate_thread_dirs(
paths: Paths,
thread_owner_map: dict[str, str],
*,
dry_run: bool = False,
) -> list[dict]:
"""Move legacy thread directories into per-user layout.
Args:
paths: Paths instance.
thread_owner_map: Mapping of thread_id -> user_id from threads_meta table.
dry_run: If True, only log what would happen.
Returns:
List of migration report entries.
"""
report: list[dict] = []
legacy_threads = paths.base_dir / "threads"
if not legacy_threads.exists():
logger.info("No legacy threads directory found — nothing to migrate.")
return report
for thread_dir in sorted(legacy_threads.iterdir()):
if not thread_dir.is_dir():
continue
thread_id = thread_dir.name
user_id = thread_owner_map.get(thread_id, "default")
dest = paths.base_dir / "users" / user_id / "threads" / thread_id
entry = {"thread_id": thread_id, "user_id": user_id, "action": ""}
if dest.exists():
conflicts_dir = paths.base_dir / "migration-conflicts" / thread_id
entry["action"] = f"conflict -> {conflicts_dir}"
if not dry_run:
conflicts_dir.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(thread_dir), str(conflicts_dir))
logger.warning("Conflict for thread %s: moved to %s", thread_id, conflicts_dir)
else:
entry["action"] = f"moved -> {dest}"
if not dry_run:
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(thread_dir), str(dest))
logger.info("Migrated thread %s -> user %s", thread_id, user_id)
report.append(entry)
# Clean up empty legacy threads dir
if not dry_run and legacy_threads.exists() and not any(legacy_threads.iterdir()):
legacy_threads.rmdir()
return report
def migrate_agents(
paths: Paths,
user_id: str = "default",
*,
dry_run: bool = False,
) -> list[dict]:
"""Move legacy custom-agent directories into per-user layout.
Legacy layout: ``{base_dir}/agents/{name}/``
Per-user layout: ``{base_dir}/users/{user_id}/agents/{name}/``
Pre-existing per-user agents take precedence: if a destination already
exists for an agent name, the legacy copy is moved to
``{base_dir}/migration-conflicts/agents/{name}/`` for manual review.
Args:
paths: Paths instance.
user_id: Target user to receive the legacy agents (defaults to
``"default"``, matching ``DEFAULT_USER_ID`` for no-auth setups).
dry_run: If True, only log what would happen.
Returns:
List of migration report entries, one per legacy agent directory found.
"""
report: list[dict] = []
legacy_agents = paths.agents_dir
if not legacy_agents.exists():
logger.info("No legacy agents directory found — nothing to migrate.")
return report
for agent_dir in sorted(legacy_agents.iterdir()):
if not agent_dir.is_dir():
continue
agent_name = agent_dir.name
dest = paths.user_agent_dir(user_id, agent_name)
entry = {"agent": agent_name, "user_id": user_id, "action": ""}
if dest.exists():
conflicts_dir = paths.base_dir / "migration-conflicts" / "agents" / agent_name
entry["action"] = f"conflict -> {conflicts_dir}"
if not dry_run:
conflicts_dir.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(agent_dir), str(conflicts_dir))
logger.warning("Conflict for agent %s: moved legacy copy to %s", agent_name, conflicts_dir)
else:
entry["action"] = f"moved -> {dest}"
if not dry_run:
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(agent_dir), str(dest))
logger.info("Migrated agent %s -> user %s", agent_name, user_id)
report.append(entry)
# Clean up empty legacy agents dir
if not dry_run and legacy_agents.exists() and not any(legacy_agents.iterdir()):
legacy_agents.rmdir()
return report
def migrate_memory(
paths: Paths,
user_id: str = "default",
*,
dry_run: bool = False,
) -> None:
"""Move legacy global memory.json into per-user layout.
Args:
paths: Paths instance.
user_id: Target user to receive the legacy memory.
dry_run: If True, only log.
"""
legacy_mem = paths.base_dir / "memory.json"
if not legacy_mem.exists():
logger.info("No legacy memory.json found — nothing to migrate.")
return
dest = paths.user_memory_file(user_id)
if dest.exists():
legacy_backup = paths.base_dir / "memory.legacy.json"
logger.warning("Destination %s exists; renaming legacy to %s", dest, legacy_backup)
if not dry_run:
legacy_mem.rename(legacy_backup)
return
logger.info("Migrating memory.json -> %s", dest)
if not dry_run:
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(legacy_mem), str(dest))
def _build_owner_map_from_db(paths: Paths) -> dict[str, str]:
"""Query threads_meta table for thread_id -> user_id mapping.
Uses raw sqlite3 to avoid async dependencies.
"""
import sqlite3
db_path = paths.base_dir / "deer-flow.db"
if not db_path.exists():
logger.info("No database found at %s — using empty owner map.", db_path)
return {}
conn = sqlite3.connect(str(db_path))
try:
cursor = conn.execute("SELECT thread_id, user_id FROM threads_meta WHERE user_id IS NOT NULL")
return {row[0]: row[1] for row in cursor.fetchall()}
except sqlite3.OperationalError as e:
logger.warning("Failed to query threads_meta: %s", e)
return {}
finally:
conn.close()
def main() -> None:
parser = argparse.ArgumentParser(description="Migrate DeerFlow data to per-user layout")
parser.add_argument("--dry-run", action="store_true", help="Log actions without making changes")
parser.add_argument(
"--user-id",
default="default",
metavar="USER_ID",
help=("User ID to claim un-owned legacy data (global memory.json and legacy custom agents). Defaults to 'default'. In multi-user installs, set this to the operator account that should inherit those legacy artifacts."),
)
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
paths = get_paths()
logger.info("Base directory: %s", paths.base_dir)
logger.info("Dry run: %s", args.dry_run)
logger.info("Claiming un-owned legacy data for user_id=%s", args.user_id)
owner_map = _build_owner_map_from_db(paths)
logger.info("Found %d thread ownership records in DB", len(owner_map))
report = migrate_thread_dirs(paths, owner_map, dry_run=args.dry_run)
migrate_memory(paths, user_id=args.user_id, dry_run=args.dry_run)
agent_report = migrate_agents(paths, user_id=args.user_id, dry_run=args.dry_run)
if report:
logger.info("Thread migration report:")
for entry in report:
logger.info(" thread=%s user=%s action=%s", entry["thread_id"], entry["user_id"], entry["action"])
else:
logger.info("No threads to migrate.")
if agent_report:
logger.info("Agent migration report:")
for entry in agent_report:
logger.info(" agent=%s user=%s action=%s", entry["agent"], entry["user_id"], entry["action"])
else:
logger.info("No agents to migrate.")
unowned = [e for e in report if e["user_id"] == "default"]
if unowned:
logger.warning("%d thread(s) had no owner and were assigned to 'default':", len(unowned))
for e in unowned:
logger.warning(" %s", e["thread_id"])
if agent_report:
logger.warning(
"%d legacy agent(s) were assigned to '%s'. If those agents belonged to other users, move them manually under {base_dir}/users/<user_id>/agents/.",
len(agent_report),
args.user_id,
)
if __name__ == "__main__":
main()