Compare commits

...

2 Commits

Author SHA1 Message Date
Timothy 74a283aab6 fix: runtime oauth issues 2026-05-04 20:44:07 -07:00
Timothy 569c715031 feat: allow credential assignment as worker profile 2026-05-04 12:01:43 -07:00
19 changed files with 1273 additions and 12 deletions
@@ -94,7 +94,7 @@ def _list_aden_accounts() -> list[dict]:
client = AdenCredentialClient( client = AdenCredentialClient(
AdenClientConfig( AdenClientConfig(
base_url=os.environ.get("ADEN_API_URL", "https://api.adenhq.com"), base_url=os.environ.get("ADEN_API_URL", "https://app.open-hive.com"),
) )
) )
try: try:
@@ -326,6 +326,18 @@ the rest.
overall purpose. Validated up front a bad cron, missing task, or \ overall purpose. Validated up front a bad cron, missing task, or \
malformed webhook path fails the call before anything is written, \ malformed webhook path fails the call before anything is written, \
so you can retry with corrected input. so you can retry with corrected input.
- ``worker_profiles`` (optional array) pass this ONLY when the \
colony needs multiple authorized accounts of the same vendor (two \
Slack workspaces, two Gmail accounts) so each worker calls the \
right one. Each entry: ``{name, integrations: {provider: alias}, \
task?, skill_name?, concurrency_hint?, prompt_override?, \
tool_filter?}``. ``alias`` is the account label the user assigned \
on hive.adenhq.com (e.g. ``work``, ``personal``); discover \
available aliases via ``get_account_info()``. If omitted, the \
colony has a single implicit ``default`` profile that uses each \
provider's primary account — that's the right call for almost \
every colony. Use ``update_worker_profile`` to swap a profile's \
alias later without rebuilding the colony.
- After this returns, the chat is over: the session locks immediately \ - After this returns, the chat is over: the session locks immediately \
and the user gets a "compact and start a new session with you" \ and the user gets a "compact and start a new session with you" \
button. So make your call to create_colony the last thing you do \ button. So make your call to create_colony the last thing you do \
@@ -289,6 +289,24 @@ class AdenSyncProvider(CredentialProvider):
""" """
synced = 0 synced = 0
# Echo where we're talking and which key prefix we're using so a
# 401 can be diagnosed without enabling httpx debug logs. The key
# prefix is the safest discriminator: if the desktop minted a key
# against backend X but the runtime is hitting backend Y, the
# prefix in the log won't match the one the user finds in their
# ``hive_auth.bin`` (or in the dashboard's Keys panel).
cfg = self._client.config
api_key = cfg.api_key or ""
key_summary = (
f"{api_key[:8]}{api_key[-4:]}" if len(api_key) >= 12 else "<short>"
)
logger.info(
"AdenSync: GET %s/v1/credentials key=%s len=%d",
cfg.base_url.rstrip("/"),
key_summary,
len(api_key),
)
try: try:
integrations = self._client.list_integrations() integrations = self._client.list_integrations()
+9 -1
View File
@@ -714,7 +714,7 @@ class CredentialStore:
@classmethod @classmethod
def with_aden_sync( def with_aden_sync(
cls, cls,
base_url: str = "https://api.adenhq.com", base_url: str | None = None,
cache_ttl_seconds: int = 300, cache_ttl_seconds: int = 300,
local_path: str | None = None, local_path: str | None = None,
auto_sync: bool = True, auto_sync: bool = True,
@@ -762,6 +762,14 @@ class CredentialStore:
logger.info("ADEN_API_KEY not set, using local-only credential storage") logger.info("ADEN_API_KEY not set, using local-only credential storage")
return cls(storage=local_storage, **kwargs) return cls(storage=local_storage, **kwargs)
# Honor ADEN_API_URL when no explicit base_url was passed. The
# legacy default (https://api.adenhq.com) was a stale brand
# alias; the new canonical host is app.open-hive.com (matches
# cloud-deployed hive-backend) but local dev typically points
# at http://localhost:8889 via this env var.
if base_url is None:
base_url = os.environ.get("ADEN_API_URL", "https://app.open-hive.com")
# Try to setup Aden sync # Try to setup Aden sync
try: try:
from .aden import ( from .aden import (
+20
View File
@@ -825,6 +825,7 @@ class ColonyRuntime:
tools: list[Any] | None = None, tools: list[Any] | None = None,
tool_executor: Callable | None = None, tool_executor: Callable | None = None,
stream_id: str | None = None, stream_id: str | None = None,
profile_name: str | None = None,
) -> list[str]: ) -> list[str]:
"""Spawn worker clones and start them in the background. """Spawn worker clones and start them in the background.
@@ -854,8 +855,20 @@ class ColonyRuntime:
raise RuntimeError("ColonyRuntime is not running") raise RuntimeError("ColonyRuntime is not running")
from framework.agent_loop.agent_loop import AgentLoop from framework.agent_loop.agent_loop import AgentLoop
from framework.host.worker_profiles import get_worker_profile
from framework.storage.conversation_store import FileConversationStore from framework.storage.conversation_store import FileConversationStore
# Resolve the profile binding for this spawn. ``profile_name=None``
# means "use the default profile"; an unknown name silently falls
# back to default (the legacy single-template behavior). The
# resolved integrations map is threaded into Worker(...) so
# account_overrides() can pin its MCP tool calls.
_resolved_profile = (
get_worker_profile(self._colony_id, profile_name) if profile_name else None
)
_profile_name_resolved = _resolved_profile.name if _resolved_profile else (profile_name or "")
_profile_integrations = dict(_resolved_profile.integrations) if _resolved_profile else {}
# Resolve per-spawn vs colony-default code identity # Resolve per-spawn vs colony-default code identity
spawn_spec = agent_spec or self._agent_spec spawn_spec = agent_spec or self._agent_spec
spawn_tools = tools if tools is not None else self._tools spawn_tools = tools if tools is not None else self._tools
@@ -1008,6 +1021,8 @@ class ColonyRuntime:
event_bus=self._scoped_event_bus, event_bus=self._scoped_event_bus,
colony_id=self._colony_id, colony_id=self._colony_id,
storage_path=worker_storage, storage_path=worker_storage,
profile_name=_profile_name_resolved,
integrations=_profile_integrations,
) )
self._workers[worker_id] = worker self._workers[worker_id] = worker
@@ -1030,6 +1045,7 @@ class ColonyRuntime:
tasks: list[dict[str, Any]], tasks: list[dict[str, Any]],
*, *,
tools_override: list[Any] | None = None, tools_override: list[Any] | None = None,
profile_name: str | None = None,
) -> list[str]: ) -> list[str]:
"""Spawn a batch of parallel workers, one per task spec. """Spawn a batch of parallel workers, one per task spec.
@@ -1055,11 +1071,15 @@ class ColonyRuntime:
task_data = spec.get("data") task_data = spec.get("data")
if task_data is not None and not isinstance(task_data, dict): if task_data is not None and not isinstance(task_data, dict):
task_data = {"value": task_data} task_data = {"value": task_data}
# Per-task profile_name override beats the batch-level default,
# so a fan-out can mix profiles (e.g. half tasks routed to
# Slack:work and half to Slack:personal).
ids = await self.spawn( ids = await self.spawn(
task=task_text, task=task_text,
count=1, count=1,
input_data=task_data or {"task": task_text}, input_data=task_data or {"task": task_text},
tools=tools_override, tools=tools_override,
profile_name=spec.get("profile_name") or profile_name,
) )
worker_ids.extend(ids) worker_ids.extend(ids)
return worker_ids return worker_ids
+22
View File
@@ -54,6 +54,11 @@ class WorkerInfo:
status: WorkerStatus status: WorkerStatus
started_at: float = 0.0 started_at: float = 0.0
result: WorkerResult | None = None result: WorkerResult | None = None
# Name of the colony's worker profile this worker was spawned from.
# Empty for legacy / single-template colonies. Surfaced in the UI so
# the user can see "Worker w_42 in colony X is using profile slack-work"
# and reason about which authorized account this run is touching.
profile_name: str = ""
class Worker: class Worker:
@@ -79,6 +84,8 @@ class Worker:
colony_id: str = "", colony_id: str = "",
persistent: bool = False, persistent: bool = False,
storage_path: Path | None = None, storage_path: Path | None = None,
profile_name: str = "",
integrations: dict[str, str] | None = None,
): ):
self.id = worker_id self.id = worker_id
self.task = task self.task = task
@@ -88,6 +95,12 @@ class Worker:
self._event_bus = event_bus self._event_bus = event_bus
self._colony_id = colony_id self._colony_id = colony_id
self._persistent = persistent self._persistent = persistent
# Worker profile binding. ``integrations`` is a {provider_id: alias}
# map applied as default account overrides for every MCP tool call
# this worker makes (see CredentialStoreAdapter.account_overrides).
# An explicit ``account="..."`` arg on a tool call still wins.
self._profile_name = profile_name
self._integrations: dict[str, str] = dict(integrations or {})
# Canonical on-disk home for this worker (conversations, events, # Canonical on-disk home for this worker (conversations, events,
# result.json, data). Required when seed_conversation() is used — # result.json, data). Required when seed_conversation() is used —
# we deliberately do NOT fall back to CWD, which previously caused # we deliberately do NOT fall back to CWD, which previously caused
@@ -114,6 +127,7 @@ class Worker:
status=self.status, status=self.status,
started_at=self._started_at, started_at=self._started_at,
result=self._result, result=self._result,
profile_name=self._profile_name,
) )
@property @property
@@ -173,7 +187,15 @@ class Worker:
exc_info=True, exc_info=True,
) )
# Pin MCP tool calls to this worker's profile-bound aliases. Empty
# mapping is a no-op so ephemeral workers and legacy single-profile
# colonies are unaffected. The contextvar is propagated to all
# awaited child coroutines, so every tool invocation downstream of
# ``execute`` sees the binding without further plumbing.
from aden_tools.credentials.store_adapter import account_overrides
try: try:
with account_overrides(self._integrations):
result = await self._agent_loop.execute(self._context) result = await self._agent_loop.execute(self._context)
duration = time.monotonic() - self._started_at duration = time.monotonic() - self._started_at
+207
View File
@@ -0,0 +1,207 @@
"""Worker profile data model and per-colony persistence.
A colony today has a single worker template at ``{colony_dir}/worker.json``
spawned as N parallel clones with identical tools, prompt, and credentials.
Worker profiles let the queen declare multiple templates per colony, each
with its own credential aliases (e.g. one profile pinned to Slack workspace
"work" and another to "personal").
Layout::
{COLONIES_DIR}/{colony_name}/
worker.json # legacy / "default" profile
profiles/
slack-work/worker.json
slack-personal/worker.json
metadata.json # has worker_profiles: [{...}, ...]
The default profile keeps living at ``{colony_dir}/worker.json`` so existing
colonies and code that hardcodes that path stay correct. Named profiles live
under ``profiles/<name>/`` and are read through :func:`worker_spec_path`.
"""
from __future__ import annotations
import logging
import re
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any
from framework.config import COLONIES_DIR
from framework.host.colony_metadata import (
colony_metadata_path,
load_colony_metadata,
update_colony_metadata,
)
logger = logging.getLogger(__name__)
DEFAULT_PROFILE_NAME = "default"
_PROFILE_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,63}$")
@dataclass
class WorkerProfile:
"""Template for a worker spawned within a colony.
``integrations`` maps provider id (``slack``, ``google``, ``github``) to
the alias of the connected account this profile should use. The runtime
sets these aliases as defaults on MCP tool calls; an explicit
``account="..."`` argument on a call still wins.
"""
name: str
task: str = ""
skill_name: str = ""
integrations: dict[str, str] = field(default_factory=dict)
concurrency_hint: int | None = None
prompt_override: str | None = None
tool_filter: list[str] | None = None
def to_dict(self) -> dict[str, Any]:
d = asdict(self)
# Drop None / empty fields so on-disk metadata stays tidy.
if d.get("prompt_override") is None:
d.pop("prompt_override", None)
if d.get("tool_filter") is None:
d.pop("tool_filter", None)
if d.get("concurrency_hint") is None:
d.pop("concurrency_hint", None)
return d
@classmethod
def from_dict(cls, data: dict[str, Any]) -> WorkerProfile:
return cls(
name=str(data.get("name", "")).strip(),
task=str(data.get("task", "")),
skill_name=str(data.get("skill_name", "")),
integrations={
str(k): str(v)
for k, v in (data.get("integrations") or {}).items()
if str(k) and str(v)
},
concurrency_hint=(
int(data["concurrency_hint"])
if isinstance(data.get("concurrency_hint"), int) and data["concurrency_hint"] > 0
else None
),
prompt_override=(data.get("prompt_override") or None),
tool_filter=list(data["tool_filter"]) if isinstance(data.get("tool_filter"), list) else None,
)
def validate_profile_name(name: str) -> str | None:
"""Return an error message if ``name`` is invalid, else ``None``."""
if not isinstance(name, str) or not _PROFILE_NAME_RE.match(name):
return (
"profile name must be lowercase alphanumeric (with - or _), "
"start with a letter/digit, and be ≤64 characters"
)
return None
def worker_spec_path(colony_name: str, profile_name: str | None = None) -> Path:
"""Return the on-disk path to a profile's ``worker.json``.
The default / unnamed profile lives at ``{colony_dir}/worker.json``
(legacy location). Named profiles live at
``{colony_dir}/profiles/{profile_name}/worker.json``.
"""
colony_dir = COLONIES_DIR / colony_name
if not profile_name or profile_name == DEFAULT_PROFILE_NAME:
return colony_dir / "worker.json"
return colony_dir / "profiles" / profile_name / "worker.json"
def list_worker_profiles(colony_name: str) -> list[WorkerProfile]:
"""Return the colony's declared worker profiles.
Legacy colonies (no ``worker_profiles`` field in metadata.json) get a
synthetic single-entry list with the default profile, so dispatch logic
elsewhere can treat the profile registry as always non-empty.
"""
metadata = load_colony_metadata(colony_name)
raw = metadata.get("worker_profiles")
if not isinstance(raw, list) or not raw:
return [WorkerProfile(name=DEFAULT_PROFILE_NAME)]
profiles: list[WorkerProfile] = []
seen: set[str] = set()
for entry in raw:
if not isinstance(entry, dict):
continue
profile = WorkerProfile.from_dict(entry)
if not profile.name or profile.name in seen:
continue
if validate_profile_name(profile.name) is not None:
logger.warning(
"worker_profiles: skipping invalid profile name %r in colony %s",
profile.name,
colony_name,
)
continue
seen.add(profile.name)
profiles.append(profile)
if not profiles:
return [WorkerProfile(name=DEFAULT_PROFILE_NAME)]
return profiles
def get_worker_profile(colony_name: str, profile_name: str) -> WorkerProfile | None:
"""Return one profile by name, or ``None`` if not declared."""
for profile in list_worker_profiles(colony_name):
if profile.name == profile_name:
return profile
return None
def save_worker_profiles(colony_name: str, profiles: list[WorkerProfile]) -> list[WorkerProfile]:
"""Persist ``profiles`` to the colony's metadata.json.
Validates names, deduplicates, and refuses to write an empty list (use
the default profile representation instead). Returns the canonicalized
list as written.
"""
if not colony_metadata_path(colony_name).parent.exists():
raise FileNotFoundError(f"Colony '{colony_name}' not found")
canonical: list[WorkerProfile] = []
seen: set[str] = set()
for profile in profiles:
err = validate_profile_name(profile.name)
if err is not None:
raise ValueError(err)
if profile.name in seen:
raise ValueError(f"duplicate worker profile name: {profile.name!r}")
seen.add(profile.name)
canonical.append(profile)
if not canonical:
canonical = [WorkerProfile(name=DEFAULT_PROFILE_NAME)]
update_colony_metadata(colony_name, {"worker_profiles": [p.to_dict() for p in canonical]})
return canonical
def upsert_worker_profile(colony_name: str, profile: WorkerProfile) -> list[WorkerProfile]:
"""Insert or replace a single profile, preserving siblings."""
err = validate_profile_name(profile.name)
if err is not None:
raise ValueError(err)
existing = list_worker_profiles(colony_name)
out = [p for p in existing if p.name != profile.name]
out.append(profile)
return save_worker_profiles(colony_name, out)
def delete_worker_profile(colony_name: str, profile_name: str) -> bool:
"""Remove a profile by name. Returns True if a profile was removed.
Refuses to remove the default profile so dispatch always has a fallback.
"""
if profile_name == DEFAULT_PROFILE_NAME:
raise ValueError("cannot delete the default worker profile")
existing = list_worker_profiles(colony_name)
out = [p for p in existing if p.name != profile_name]
if len(out) == len(existing):
return False
save_worker_profiles(colony_name, out)
return True
+141
View File
@@ -501,5 +501,146 @@ async def _import_multi_root(
) )
def _find_workers_bound_to_profile(request: web.Request, colony_name: str, profile_name: str) -> list[str]:
"""Return live worker IDs bound to ``(colony_name, profile_name)``.
Walks every live session's ColonyRuntime workers map. Used to refuse
profile deletes / renames while workers are still using the binding
the contextvar that pins a worker's MCP account lookups is set at
spawn time and a profile mutation underneath a running worker would
leave its tool calls pointing at a removed alias on the next turn.
"""
manager = request.app.get("manager")
if manager is None:
return []
bound: list[str] = []
try:
sessions = manager.list_sessions()
except Exception:
return []
for s in sessions:
runtime = getattr(s, "colony", None) or getattr(s, "colony_runtime", None)
if runtime is None:
continue
if getattr(runtime, "_colony_id", None) != colony_name:
continue
try:
for info in runtime.list_workers():
if info.profile_name == profile_name and info.status in {
"WorkerStatus.RUNNING",
"WorkerStatus.PENDING",
"running",
"pending",
}:
bound.append(info.id)
except Exception:
continue
return bound
async def handle_list_worker_profiles(request: web.Request) -> web.Response:
"""GET /api/colonies/{colony_name}/worker_profiles"""
colony_name = request.match_info["colony_name"]
err = _validate_colony_name(colony_name)
if err:
return web.json_response({"error": err}, status=400)
if not (COLONIES_DIR / colony_name).exists():
return web.json_response({"error": f"colony '{colony_name}' not found"}, status=404)
from framework.host.worker_profiles import list_worker_profiles
profiles = list_worker_profiles(colony_name)
return web.json_response({"worker_profiles": [p.to_dict() for p in profiles]})
async def handle_upsert_worker_profile(request: web.Request) -> web.Response:
"""POST /api/colonies/{colony_name}/worker_profiles — create or replace one profile.
Body: ``{name, integrations?, task?, skill_name?, concurrency_hint?,
prompt_override?, tool_filter?}``. Existing siblings are
preserved; an existing profile with the same ``name`` is replaced
(so the desktop can use this for both add and edit).
"""
colony_name = request.match_info["colony_name"]
err = _validate_colony_name(colony_name)
if err:
return web.json_response({"error": err}, status=400)
if not (COLONIES_DIR / colony_name).exists():
return web.json_response({"error": f"colony '{colony_name}' not found"}, status=404)
try:
body = await request.json()
except Exception:
return web.json_response({"error": "invalid JSON body"}, status=400)
if not isinstance(body, dict):
return web.json_response({"error": "body must be a JSON object"}, status=400)
from framework.host.worker_profiles import (
WorkerProfile,
upsert_worker_profile,
validate_profile_name,
)
profile = WorkerProfile.from_dict(body)
name_err = validate_profile_name(profile.name)
if name_err:
return web.json_response({"error": name_err}, status=400)
try:
saved = upsert_worker_profile(colony_name, profile)
except (FileNotFoundError, ValueError) as exc:
return web.json_response({"error": str(exc)}, status=400)
return web.json_response({"worker_profiles": [p.to_dict() for p in saved]}, status=201)
async def handle_delete_worker_profile(request: web.Request) -> web.Response:
"""DELETE /api/colonies/{colony_name}/worker_profiles/{profile_name}.
Refused with 409 + ``bound_workers`` listing if a live worker is
bound to the profile, so the user can stop those workers before
pruning the binding.
"""
colony_name = request.match_info["colony_name"]
profile_name = request.match_info["profile_name"]
err = _validate_colony_name(colony_name)
if err:
return web.json_response({"error": err}, status=400)
if not (COLONIES_DIR / colony_name).exists():
return web.json_response({"error": f"colony '{colony_name}' not found"}, status=404)
bound = _find_workers_bound_to_profile(request, colony_name, profile_name)
if bound:
return web.json_response(
{
"error": "profile is bound to live workers; stop them first",
"bound_workers": bound,
},
status=409,
)
from framework.host.worker_profiles import delete_worker_profile
try:
removed = delete_worker_profile(colony_name, profile_name)
except ValueError as exc:
return web.json_response({"error": str(exc)}, status=400)
if not removed:
return web.json_response({"error": f"profile '{profile_name}' not found"}, status=404)
return web.json_response({"deleted": True, "profile_name": profile_name})
def register_routes(app: web.Application) -> None: def register_routes(app: web.Application) -> None:
app.router.add_post("/api/colonies/import", handle_import_colony) app.router.add_post("/api/colonies/import", handle_import_colony)
app.router.add_get(
"/api/colonies/{colony_name}/worker_profiles",
handle_list_worker_profiles,
)
app.router.add_post(
"/api/colonies/{colony_name}/worker_profiles",
handle_upsert_worker_profile,
)
app.router.add_delete(
"/api/colonies/{colony_name}/worker_profiles/{profile_name}",
handle_delete_worker_profile,
)
@@ -62,6 +62,7 @@ def _worker_info_to_dict(info) -> dict:
"status": str(info.status), "status": str(info.status),
"started_at": info.started_at, "started_at": info.started_at,
"result": result_dict, "result": result_dict,
"profile_name": getattr(info, "profile_name", "") or "",
} }
@@ -43,6 +43,21 @@ def _get_store(request: web.Request) -> CredentialStore:
return request.app["credential_store"] return request.app["credential_store"]
def _reset_credential_adapter_cache() -> None:
"""Clear the memoized CredentialStoreAdapter so the next call re-syncs.
The adapter cache is keyed on ``(id(specs), ADEN_API_KEY)``; without
this reset, a key save/delete done after process startup is invisible
to in-process MCP tool calls until restart.
"""
try:
from aden_tools.credentials.store_adapter import _reset_default_adapter_cache
_reset_default_adapter_cache()
except Exception:
logger.warning("Failed to reset credential adapter cache", exc_info=True)
def _invalidate_queen_credentials_cache(request: web.Request) -> None: def _invalidate_queen_credentials_cache(request: web.Request) -> None:
"""Force every live Queen session to rebuild its ambient credentials block. """Force every live Queen session to rebuild its ambient credentials block.
@@ -158,6 +173,12 @@ async def handle_save_credential(request: web.Request) -> web.Response:
save_aden_api_key(key) save_aden_api_key(key)
# Make the new key visible to the in-process AdenSyncProvider on
# the very next CredentialStoreAdapter.default() call. The adapter
# cache is keyed on this env var.
os.environ["ADEN_API_KEY"] = key
_reset_credential_adapter_cache()
# Immediately sync OAuth tokens from Aden (runs in executor because # Immediately sync OAuth tokens from Aden (runs in executor because
# _presync_aden_tokens makes blocking HTTP calls to the Aden server). # _presync_aden_tokens makes blocking HTTP calls to the Aden server).
try: try:
@@ -193,6 +214,11 @@ async def handle_delete_credential(request: web.Request) -> web.Response:
deleted = delete_aden_api_key() deleted = delete_aden_api_key()
if not deleted: if not deleted:
return web.json_response({"error": "Credential 'aden_api_key' not found"}, status=404) return web.json_response({"error": "Credential 'aden_api_key' not found"}, status=404)
# Drop the env var so the next adapter rebuild lands in the
# non-Aden branch instead of trying to reuse the stale key.
os.environ.pop("ADEN_API_KEY", None)
_reset_credential_adapter_cache()
_invalidate_queen_credentials_cache(request)
return web.json_response({"deleted": True}) return web.json_response({"deleted": True})
store = _get_store(request) store = _get_store(request)
@@ -358,6 +384,9 @@ async def handle_resync_credentials(request: web.Request) -> web.Response:
# _presync_aden_tokens makes blocking HTTP calls to the Aden server. # _presync_aden_tokens makes blocking HTTP calls to the Aden server.
await loop.run_in_executor(None, lambda: _presync_aden_tokens(CREDENTIAL_SPECS, force=True)) await loop.run_in_executor(None, lambda: _presync_aden_tokens(CREDENTIAL_SPECS, force=True))
# Drop the cached adapter so newly-fetched accounts are visible
# to the next MCP tool call without waiting for a process restart.
_reset_credential_adapter_cache()
_invalidate_queen_credentials_cache(request) _invalidate_queen_credentials_cache(request)
accounts_by_provider = _collect_accounts_by_provider() accounts_by_provider = _collect_accounts_by_provider()
+65
View File
@@ -1151,6 +1151,7 @@ async def fork_session_into_colony(
task: str, task: str,
tasks: list[dict] | None = None, tasks: list[dict] | None = None,
concurrency_hint: int | None = None, concurrency_hint: int | None = None,
worker_profiles: list[dict] | None = None,
) -> dict: ) -> dict:
"""Fork a queen session into a colony directory. """Fork a queen session into a colony directory.
@@ -1398,6 +1399,56 @@ async def fork_session_into_colony(
worker_meta["concurrency_hint"] = concurrency_hint worker_meta["concurrency_hint"] = concurrency_hint
worker_config_path.write_text(json.dumps(worker_meta, indent=2, ensure_ascii=False), encoding="utf-8") worker_config_path.write_text(json.dumps(worker_meta, indent=2, ensure_ascii=False), encoding="utf-8")
# ── 2a. Materialize named worker profiles ────────────────────
# Each named profile gets its own ``profiles/<name>/worker.json``
# cloned from the base worker_meta with profile-specific overrides
# (task, system_prompt, tool_filter, concurrency_hint). The base
# ``worker.json`` above acts as the implicit "default" profile.
persisted_profiles: list[dict] = []
if worker_profiles:
from framework.host.worker_profiles import (
DEFAULT_PROFILE_NAME,
WorkerProfile,
validate_profile_name,
worker_spec_path,
)
for raw in worker_profiles:
if not isinstance(raw, dict):
continue
profile = WorkerProfile.from_dict(raw)
err = validate_profile_name(profile.name)
if err is not None:
logger.warning("create_colony: invalid profile name %r: %s", profile.name, err)
continue
profile_meta = dict(worker_meta)
profile_meta["profile_name"] = profile.name
if profile.task:
profile_meta["goal"] = {
**profile_meta.get("goal", {}),
"description": profile.task,
}
if profile.prompt_override:
profile_meta["system_prompt"] = (
f"{worker_meta['system_prompt']}\n\n{profile.prompt_override}"
)
if profile.tool_filter:
profile_meta["tools"] = [t for t in worker_meta["tools"] if t in set(profile.tool_filter)]
if isinstance(profile.concurrency_hint, int) and profile.concurrency_hint > 0:
profile_meta["concurrency_hint"] = profile.concurrency_hint
if profile.integrations:
profile_meta["integrations"] = dict(profile.integrations)
target = worker_spec_path(colony_name, profile.name)
if profile.name == DEFAULT_PROFILE_NAME:
# Skip — the legacy file already written above is the
# canonical default.
persisted_profiles.append(profile.to_dict())
continue
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(json.dumps(profile_meta, indent=2, ensure_ascii=False), encoding="utf-8")
persisted_profiles.append(profile.to_dict())
# ── 3. Duplicate queen session into colony ─────────────────── # ── 3. Duplicate queen session into colony ───────────────────
# Copy the queen's full session directory (conversations, events, # Copy the queen's full session directory (conversations, events,
# meta) into a new queen-session dir assigned to this colony. # meta) into a new queen-session dir assigned to this colony.
@@ -1577,6 +1628,20 @@ async def fork_session_into_colony(
"task": worker_task[:100], "task": worker_task[:100],
"spawned_at": datetime.now(UTC).isoformat(), "spawned_at": datetime.now(UTC).isoformat(),
} }
if persisted_profiles:
# Persist the canonical profile roster so dispatch + UI can read
# back what the queen declared at create_colony time. Merge with
# any existing list so a later update_worker_profile call doesn't
# erase profiles created in an earlier fork.
existing_profiles = metadata.get("worker_profiles") or []
if not isinstance(existing_profiles, list):
existing_profiles = []
seen = {p["name"] for p in persisted_profiles if isinstance(p, dict) and p.get("name")}
merged = list(persisted_profiles) + [
p for p in existing_profiles
if isinstance(p, dict) and p.get("name") and p["name"] not in seen
]
metadata["worker_profiles"] = merged
metadata_path.write_text(json.dumps(metadata, indent=2, ensure_ascii=False), encoding="utf-8") metadata_path.write_text(json.dumps(metadata, indent=2, ensure_ascii=False), encoding="utf-8")
# ── 4a. Inherit the queen's tool allowlist into the colony ─── # ── 4a. Inherit the queen's tool allowlist into the colony ───
@@ -1642,6 +1642,7 @@ def register_queen_lifecycle_tools(
tasks: list[dict] | None = None, tasks: list[dict] | None = None,
concurrency_hint: int | None = None, concurrency_hint: int | None = None,
triggers: list[dict] | None = None, triggers: list[dict] | None = None,
worker_profiles: list[dict] | None = None,
) -> str: ) -> str:
"""Create a colony and materialize its skill folder in one atomic call. """Create a colony and materialize its skill folder in one atomic call.
@@ -1822,6 +1823,7 @@ def register_queen_lifecycle_tools(
concurrency_hint=( concurrency_hint=(
concurrency_hint if isinstance(concurrency_hint, int) and concurrency_hint > 0 else None concurrency_hint if isinstance(concurrency_hint, int) and concurrency_hint > 0 else None
), ),
worker_profiles=worker_profiles if isinstance(worker_profiles, list) else None,
) )
except Exception as e: except Exception as e:
logger.exception("create_colony: fork failed after installing skill") logger.exception("create_colony: fork failed after installing skill")
@@ -2184,6 +2186,46 @@ def register_queen_lifecycle_tools(
"required": ["id", "trigger_type", "trigger_config", "task"], "required": ["id", "trigger_type", "trigger_config", "task"],
}, },
}, },
"worker_profiles": {
"type": "array",
"description": (
"Optional roster of worker profiles. Use this "
"when the colony needs to operate multiple "
"authorized accounts of the same vendor (two "
"Slack workspaces, two Gmail accounts) — each "
"profile pins its own credential alias so workers "
"spawned under that profile call Slack/Gmail/etc. "
"as the right account by default. If omitted, the "
"colony has a single implicit 'default' profile "
"that uses each provider's primary account. Each "
"entry: name (lowercase id, unique within the "
"colony), integrations (provider id → account "
"alias, e.g. {'slack': 'work'}), optional task "
"(per-profile task override), optional skill_name "
"(if the profile uses a different skill than the "
"colony default), optional concurrency_hint, "
"prompt_override, tool_filter."
),
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"task": {"type": "string"},
"skill_name": {"type": "string"},
"integrations": {
"type": "object",
"additionalProperties": {"type": "string"},
},
"concurrency_hint": {"type": "integer", "minimum": 1},
"prompt_override": {"type": "string"},
"tool_filter": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["name"],
},
},
}, },
"required": [ "required": [
"colony_name", "colony_name",
@@ -2201,6 +2243,116 @@ def register_queen_lifecycle_tools(
) )
tools_registered += 1 tools_registered += 1
# --- update_worker_profile ---------------------------------------------------
async def update_worker_profile(
*,
colony_name: str,
profile_name: str,
integrations: dict[str, str] | None = None,
task: str | None = None,
skill_name: str | None = None,
concurrency_hint: int | None = None,
prompt_override: str | None = None,
tool_filter: list[str] | None = None,
) -> str:
"""Insert or update a single worker profile on an existing colony.
Use this to adjust an account binding ("switch the slack-work
profile from alias 'work' to alias 'work-2'") or to add a new
profile after the colony was already created. Existing siblings
are preserved. Pass only the fields you want to change; ``None``
means "don't touch", and an empty dict/list means "clear".
"""
from framework.host.worker_profiles import (
WorkerProfile,
get_worker_profile,
upsert_worker_profile,
validate_profile_name,
)
cn = (colony_name or "").strip()
if not _COLONY_NAME_RE.match(cn):
return json.dumps(
{"error": "colony_name must be lowercase alphanumeric with underscores."}
)
err = validate_profile_name(profile_name)
if err is not None:
return json.dumps({"error": err})
existing = get_worker_profile(cn, profile_name)
merged = WorkerProfile(
name=profile_name,
task=existing.task if existing else "",
skill_name=existing.skill_name if existing else "",
integrations=dict(existing.integrations) if existing else {},
concurrency_hint=existing.concurrency_hint if existing else None,
prompt_override=existing.prompt_override if existing else None,
tool_filter=list(existing.tool_filter) if (existing and existing.tool_filter) else None,
)
if integrations is not None:
merged.integrations = {str(k): str(v) for k, v in integrations.items() if str(k) and str(v)}
if task is not None:
merged.task = task
if skill_name is not None:
merged.skill_name = skill_name
if concurrency_hint is not None:
merged.concurrency_hint = (
concurrency_hint if isinstance(concurrency_hint, int) and concurrency_hint > 0 else None
)
if prompt_override is not None:
merged.prompt_override = prompt_override or None
if tool_filter is not None:
merged.tool_filter = list(tool_filter) if tool_filter else None
try:
saved = upsert_worker_profile(cn, merged)
except (FileNotFoundError, ValueError) as exc:
return json.dumps({"error": str(exc)})
return json.dumps(
{
"ok": True,
"colony_name": cn,
"profile_name": profile_name,
"worker_profiles": [p.to_dict() for p in saved],
}
)
_update_worker_profile_tool = Tool(
name="update_worker_profile",
description=(
"Insert or update one worker profile on an existing colony. "
"Use this to swap a profile's account alias (e.g. 'switch "
"slack-work to use alias work-2') or to add a profile after "
"create_colony. Existing siblings are preserved. Pass only "
"the fields you want to change."
),
parameters={
"type": "object",
"properties": {
"colony_name": {"type": "string"},
"profile_name": {"type": "string"},
"integrations": {
"type": "object",
"additionalProperties": {"type": "string"},
},
"task": {"type": "string"},
"skill_name": {"type": "string"},
"concurrency_hint": {"type": "integer", "minimum": 1},
"prompt_override": {"type": "string"},
"tool_filter": {"type": "array", "items": {"type": "string"}},
},
"required": ["colony_name", "profile_name"],
},
)
registry.register(
"update_worker_profile",
_update_worker_profile_tool,
lambda inputs: update_worker_profile(**inputs),
)
tools_registered += 1
# --- start_incubating_colony ------------------------------------------------- # --- start_incubating_colony -------------------------------------------------
async def start_incubating_colony( async def start_incubating_colony(
+37
View File
@@ -25,6 +25,19 @@ export interface ColonyToolsUpdateResult {
note?: string; note?: string;
} }
/** A worker template within a colony. ``integrations`` maps provider
* id (``slack``, ``google``, etc.) to the alias of the connected
* account this profile pins by default for MCP tool calls. */
export interface WorkerProfile {
name: string;
task?: string;
skill_name?: string;
integrations?: Record<string, string>;
concurrency_hint?: number;
prompt_override?: string;
tool_filter?: string[];
}
export const coloniesApi = { export const coloniesApi = {
/** List every colony on disk with a summary of its tool allowlist. */ /** List every colony on disk with a summary of its tool allowlist. */
list: () => list: () =>
@@ -47,4 +60,28 @@ export const coloniesApi = {
`/colony/${encodeURIComponent(colonyName)}/tools`, `/colony/${encodeURIComponent(colonyName)}/tools`,
{ enabled_mcp_tools: enabled }, { enabled_mcp_tools: enabled },
), ),
/** List the colony's worker profiles. Always returns at least one
* entry — legacy colonies materialise a synthetic ``default``. */
listWorkerProfiles: (colonyName: string) =>
api.get<{ worker_profiles: WorkerProfile[] }>(
`/colonies/${encodeURIComponent(colonyName)}/worker_profiles`,
),
/** Insert or replace a single worker profile. Existing siblings are
* preserved. The desktop uses this for both add and edit. */
upsertWorkerProfile: (colonyName: string, profile: WorkerProfile) =>
api.post<{ worker_profiles: WorkerProfile[] }>(
`/colonies/${encodeURIComponent(colonyName)}/worker_profiles`,
profile,
),
/** Delete a worker profile. Returns 409 with ``bound_workers`` when a
* live worker is still using it; the UI should prompt the user to
* stop those workers first. The synthetic ``default`` profile cannot
* be deleted. */
deleteWorkerProfile: (colonyName: string, profileName: string) =>
api.delete<{ deleted: boolean; profile_name: string }>(
`/colonies/${encodeURIComponent(colonyName)}/worker_profiles/${encodeURIComponent(profileName)}`,
),
}; };
+5
View File
@@ -14,6 +14,11 @@ export interface WorkerSummary {
status: string; status: string;
started_at: number; started_at: number;
result: WorkerResult | null; result: WorkerResult | null;
/** Name of the colony's worker profile this worker was spawned from.
* Empty for legacy / single-template colonies. Surfaced as a small
* badge in the Sessions tab so the user can see which authorized
* account the worker is calling MCP tools as. */
profile_name?: string;
} }
export interface ColonySkill { export interface ColonySkill {
@@ -15,6 +15,11 @@ import {
Zap, Zap,
Activity, Activity,
Loader2, Loader2,
Plus,
Trash2,
Pencil,
Check,
Link2,
} from "lucide-react"; } from "lucide-react";
import { import {
colonyWorkersApi, colonyWorkersApi,
@@ -24,6 +29,8 @@ import {
type ProgressStep, type ProgressStep,
type WorkerSummary, type WorkerSummary,
} from "@/api/colonyWorkers"; } from "@/api/colonyWorkers";
import { coloniesApi, type WorkerProfile } from "@/api/colonies";
import { credentialsApi } from "@/api/credentials";
import { import {
colonyDataApi, colonyDataApi,
type CellValue, type CellValue,
@@ -51,7 +58,7 @@ interface ColonyWorkersPanelProps {
onClose: () => void; onClose: () => void;
} }
type TabKey = "skills" | "tools" | "sessions" | "triggers" | "data"; type TabKey = "skills" | "tools" | "sessions" | "triggers" | "data" | "profiles";
function statusClasses(status: string): string { function statusClasses(status: string): string {
const s = status.toLowerCase(); const s = status.toLowerCase();
@@ -165,6 +172,7 @@ export default function ColonyWorkersPanel({
{/* Tab bar */} {/* Tab bar */}
<div className="flex border-b border-border/60 flex-shrink-0"> <div className="flex border-b border-border/60 flex-shrink-0">
<TabButton active={tab === "sessions"} onClick={() => setTab("sessions")} label="Sessions" /> <TabButton active={tab === "sessions"} onClick={() => setTab("sessions")} label="Sessions" />
<TabButton active={tab === "profiles"} onClick={() => setTab("profiles")} label="Profiles" />
<TabButton active={tab === "triggers"} onClick={() => setTab("triggers")} label="Triggers" /> <TabButton active={tab === "triggers"} onClick={() => setTab("triggers")} label="Triggers" />
<TabButton active={tab === "skills"} onClick={() => setTab("skills")} label="Skills" /> <TabButton active={tab === "skills"} onClick={() => setTab("skills")} label="Skills" />
<TabButton active={tab === "tools"} onClick={() => setTab("tools")} label="Tools" /> <TabButton active={tab === "tools"} onClick={() => setTab("tools")} label="Tools" />
@@ -175,6 +183,7 @@ export default function ColonyWorkersPanel({
{tab === "sessions" && ( {tab === "sessions" && (
<SessionsTab sessionId={sessionId} colonyName={colonyName} /> <SessionsTab sessionId={sessionId} colonyName={colonyName} />
)} )}
{tab === "profiles" && <ProfilesTab colonyName={colonyName} />}
{tab === "triggers" && <TriggersTab sessionId={sessionId} />} {tab === "triggers" && <TriggersTab sessionId={sessionId} />}
{tab === "skills" && <SkillsTab sessionId={sessionId} />} {tab === "skills" && <SkillsTab sessionId={sessionId} />}
{tab === "tools" && <ToolsTab sessionId={sessionId} />} {tab === "tools" && <ToolsTab sessionId={sessionId} />}
@@ -630,11 +639,21 @@ function SessionsTab({
className="w-full text-left px-3 py-2.5" className="w-full text-left px-3 py-2.5"
> >
<div className="flex items-center justify-between mb-1 gap-2"> <div className="flex items-center justify-between mb-1 gap-2">
<div className="flex items-center gap-1.5 min-w-0">
<code <code
className={`text-xs font-mono ${active ? "text-foreground" : "text-foreground/70"}`} className={`text-xs font-mono ${active ? "text-foreground" : "text-foreground/70"}`}
> >
{shortId(w.worker_id)} {shortId(w.worker_id)}
</code> </code>
{w.profile_name && (
<span
className="text-[10px] px-1.5 py-0.5 rounded-full bg-primary/10 text-primary font-medium truncate"
title={`Worker profile: ${w.profile_name}`}
>
{w.profile_name}
</span>
)}
</div>
<div className="flex items-center gap-1"> <div className="flex items-center gap-1">
<span <span
className={`text-[10px] px-1.5 py-0.5 rounded-full font-medium ${statusClasses(w.status)}`} className={`text-[10px] px-1.5 py-0.5 rounded-full font-medium ${statusClasses(w.status)}`}
@@ -1922,3 +1941,467 @@ function TabShell({
</div> </div>
); );
} }
// ── Profiles tab ───────────────────────────────────────────────────────
//
// Lists the colony's worker profiles. Each profile pins a default
// account alias per provider so workers spawned under it call MCP
// tools as the right Slack workspace / Gmail account / etc. The alias
// dropdown only shows aliases the user has already authorized through
// the integrations page — typo-prone free-form entry was rejected as a
// foothold for misdirected tool calls.
//
// Delete is refused while a worker is still bound to the profile; the
// 409 surfaces as an inline notice listing the worker IDs the user
// needs to stop first.
function ProfilesTab({ colonyName }: { colonyName: string | null }) {
const [profiles, setProfiles] = useState<WorkerProfile[]>([]);
const [accounts, setAccounts] = useState<Record<string, string[]>>({});
const [accountIdentities, setAccountIdentities] = useState<
Record<string, Record<string, Record<string, string>>>
>({});
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const [editingName, setEditingName] = useState<string | null>(null);
const [creating, setCreating] = useState(false);
const [busy, setBusy] = useState(false);
const [boundWarning, setBoundWarning] = useState<string | null>(null);
const refresh = useCallback(() => {
if (!colonyName) {
setLoading(false);
return;
}
setLoading(true);
setError(null);
Promise.all([
coloniesApi.listWorkerProfiles(colonyName),
credentialsApi.listSpecs(),
])
.then(([profileResp, specsResp]) => {
setProfiles(profileResp.worker_profiles ?? []);
const aliasMap: Record<string, string[]> = {};
const idMap: Record<string, Record<string, Record<string, string>>> = {};
for (const spec of specsResp.specs ?? []) {
if (!spec.aden_supported) continue;
const provider = spec.credential_id;
const aliases: string[] = [];
const idsForProvider: Record<string, Record<string, string>> = {};
for (const acct of spec.accounts ?? []) {
if (!acct.alias) continue;
aliases.push(acct.alias);
idsForProvider[acct.alias] = acct.identity ?? {};
}
if (aliases.length > 0) {
aliasMap[provider] = aliases;
idMap[provider] = idsForProvider;
}
}
setAccounts(aliasMap);
setAccountIdentities(idMap);
})
.catch((e) =>
setError(
e?.message ?? "Failed to load worker profiles",
),
)
.finally(() => setLoading(false));
}, [colonyName]);
useEffect(() => {
refresh();
}, [refresh]);
const handleSave = useCallback(
async (profile: WorkerProfile) => {
if (!colonyName) return;
setBusy(true);
setError(null);
setBoundWarning(null);
try {
const resp = await coloniesApi.upsertWorkerProfile(colonyName, profile);
setProfiles(resp.worker_profiles ?? []);
setEditingName(null);
setCreating(false);
} catch (e: unknown) {
setError((e as Error)?.message ?? "Save failed");
} finally {
setBusy(false);
}
},
[colonyName],
);
const handleDelete = useCallback(
async (profileName: string) => {
if (!colonyName) return;
setBusy(true);
setError(null);
setBoundWarning(null);
try {
await coloniesApi.deleteWorkerProfile(colonyName, profileName);
refresh();
} catch (e: unknown) {
// Surface 409 bound_workers payloads as an inline notice with the
// worker IDs so the user knows exactly what to stop. The api
// client throws on non-2xx; the response body is on `cause`.
const err = e as Error & { body?: { bound_workers?: string[]; error?: string } };
const bound = err.body?.bound_workers;
if (Array.isArray(bound) && bound.length > 0) {
setBoundWarning(
`Profile "${profileName}" is bound to running worker${
bound.length === 1 ? "" : "s"
} ${bound.map(shortId).join(", ")}. Stop them first.`,
);
} else {
setError(err.message ?? "Delete failed");
}
} finally {
setBusy(false);
}
},
[colonyName, refresh],
);
if (!colonyName) {
return (
<TabShell loading={false} error={null} onRefresh={() => {}} empty="No colony loaded.">
{null}
</TabShell>
);
}
return (
<TabShell
loading={loading}
error={error}
onRefresh={refresh}
empty={null}
headerRight={
!creating && editingName === null ? (
<button
onClick={() => setCreating(true)}
className="text-[10px] px-2 py-0.5 rounded border border-primary/40 text-primary hover:bg-primary/10 transition-colors inline-flex items-center gap-1"
>
<Plus className="w-3 h-3" /> Add profile
</button>
) : null
}
>
<div className="flex flex-col gap-3">
{boundWarning && (
<div className="rounded-md border border-amber-500/40 bg-amber-500/[0.06] px-3 py-2 text-xs text-amber-700 dark:text-amber-400">
{boundWarning}
</div>
)}
<ConnectedAccountsSummary accounts={accounts} identities={accountIdentities} />
{creating && (
<ProfileEditor
mode="create"
existing={null}
availableAliases={accounts}
existingNames={profiles.map((p) => p.name)}
busy={busy}
onCancel={() => setCreating(false)}
onSave={handleSave}
/>
)}
{profiles.length === 0 && !creating ? (
<p className="text-xs text-muted-foreground text-center py-6">
No profiles yet. The colony spawns one default worker.
</p>
) : (
<ul className="flex flex-col gap-2">
{profiles.map((p) =>
editingName === p.name ? (
<li key={p.name}>
<ProfileEditor
mode="edit"
existing={p}
availableAliases={accounts}
existingNames={profiles.map((x) => x.name)}
busy={busy}
onCancel={() => setEditingName(null)}
onSave={handleSave}
/>
</li>
) : (
<li key={p.name}>
<ProfileRow
profile={p}
onEdit={() => {
setEditingName(p.name);
setBoundWarning(null);
}}
onDelete={() => handleDelete(p.name)}
isDefault={p.name === "default"}
busy={busy}
/>
</li>
),
)}
</ul>
)}
</div>
</TabShell>
);
}
function ConnectedAccountsSummary({
accounts,
identities,
}: {
accounts: Record<string, string[]>;
identities: Record<string, Record<string, Record<string, string>>>;
}) {
const providers = Object.keys(accounts).sort();
if (providers.length === 0) {
return (
<div className="rounded-md border border-border/50 bg-muted/20 px-3 py-2 text-xs text-muted-foreground">
No connected accounts yet connect providers in the integrations page
before binding profiles.
</div>
);
}
return (
<div className="rounded-md border border-border/50 bg-muted/10 px-3 py-2">
<div className="flex items-center gap-1.5 text-[10px] uppercase tracking-wide text-muted-foreground mb-1.5">
<Link2 className="w-3 h-3" /> Connected accounts
</div>
<ul className="flex flex-wrap gap-1.5">
{providers.flatMap((provider) =>
accounts[provider].map((alias) => {
const ident = identities[provider]?.[alias];
const detail = ident?.email || ident?.name || "";
return (
<li
key={`${provider}:${alias}`}
className="text-[10px] px-1.5 py-0.5 rounded-full bg-background border border-border/60 text-foreground/80"
title={detail || `${provider}:${alias}`}
>
<span className="font-medium">{provider}</span>:{alias}
</li>
);
}),
)}
</ul>
</div>
);
}
function ProfileRow({
profile,
onEdit,
onDelete,
isDefault,
busy,
}: {
profile: WorkerProfile;
onEdit: () => void;
onDelete: () => void;
isDefault: boolean;
busy: boolean;
}) {
const integrations = profile.integrations ?? {};
const entries = Object.entries(integrations);
return (
<div className="rounded-lg border border-border/50 bg-background/30 px-3 py-2.5">
<div className="flex items-start justify-between gap-2 mb-1.5">
<div className="min-w-0">
<div className="flex items-center gap-1.5">
<span className="text-xs font-medium text-foreground truncate">
{profile.name}
</span>
{isDefault && (
<span className="text-[9px] px-1 py-0.5 rounded bg-muted text-muted-foreground uppercase tracking-wide">
default
</span>
)}
</div>
{profile.task && (
<p className="text-[11px] text-muted-foreground mt-0.5 line-clamp-2">
{profile.task}
</p>
)}
</div>
<div className="flex items-center gap-1 flex-shrink-0">
<button
onClick={onEdit}
disabled={busy}
className="p-1 rounded text-muted-foreground hover:text-foreground hover:bg-muted/60 disabled:opacity-50 transition-colors"
title="Edit profile"
>
<Pencil className="w-3 h-3" />
</button>
{!isDefault && (
<button
onClick={onDelete}
disabled={busy}
className="p-1 rounded text-destructive hover:bg-destructive/10 disabled:opacity-50 transition-colors"
title="Delete profile"
>
<Trash2 className="w-3 h-3" />
</button>
)}
</div>
</div>
{entries.length > 0 ? (
<ul className="flex flex-wrap gap-1">
{entries.map(([provider, alias]) => (
<li
key={provider}
className="text-[10px] px-1.5 py-0.5 rounded-full bg-primary/10 text-primary border border-primary/20"
>
<span className="font-medium">{provider}</span>:{alias}
</li>
))}
</ul>
) : (
<p className="text-[10px] text-muted-foreground italic">
No integrations bound uses each provider's primary account.
</p>
)}
</div>
);
}
function ProfileEditor({
mode,
existing,
availableAliases,
existingNames,
busy,
onCancel,
onSave,
}: {
mode: "create" | "edit";
existing: WorkerProfile | null;
availableAliases: Record<string, string[]>;
existingNames: string[];
busy: boolean;
onCancel: () => void;
onSave: (profile: WorkerProfile) => void;
}) {
const [name, setName] = useState(existing?.name ?? "");
const [task, setTask] = useState(existing?.task ?? "");
const [bindings, setBindings] = useState<Record<string, string>>(
() => ({ ...(existing?.integrations ?? {}) }),
);
const providers = useMemo(() => Object.keys(availableAliases).sort(), [availableAliases]);
const nameError = useMemo(() => {
if (!name) return "Name is required";
if (!/^[a-z0-9][a-z0-9_-]{0,63}$/.test(name)) {
return "lowercase, numbers, _ or - (must start alphanumeric, ≤64)";
}
if (mode === "create" && existingNames.includes(name)) {
return "Name already used";
}
return null;
}, [name, mode, existingNames]);
const setAlias = (provider: string, alias: string) => {
setBindings((prev) => {
const next = { ...prev };
if (alias) next[provider] = alias;
else delete next[provider];
return next;
});
};
return (
<div className="rounded-lg border border-primary/40 bg-primary/[0.04] px-3 py-3 flex flex-col gap-2.5">
<div>
<label className="text-[10px] uppercase tracking-wide text-muted-foreground block mb-1">
Profile name
</label>
<input
value={name}
onChange={(e) => setName(e.target.value)}
disabled={mode === "edit"}
className="w-full text-xs px-2 py-1 rounded border border-border/60 bg-background disabled:opacity-60"
placeholder="slack-work"
/>
{nameError && (
<p className="text-[10px] text-destructive mt-0.5">{nameError}</p>
)}
</div>
<div>
<label className="text-[10px] uppercase tracking-wide text-muted-foreground block mb-1">
Task override (optional)
</label>
<textarea
value={task}
onChange={(e) => setTask(e.target.value)}
rows={2}
className="w-full text-xs px-2 py-1 rounded border border-border/60 bg-background resize-none"
placeholder="What this profile's worker should do (overrides the colony task)"
/>
</div>
<div>
<label className="text-[10px] uppercase tracking-wide text-muted-foreground block mb-1">
Account bindings
</label>
{providers.length === 0 ? (
<p className="text-[10px] text-muted-foreground italic">
No connected accounts yet. Authorize providers in the integrations
page first.
</p>
) : (
<ul className="flex flex-col gap-1.5">
{providers.map((provider) => (
<li key={provider} className="flex items-center gap-2">
<span className="text-[11px] font-medium text-foreground/80 w-20 truncate">
{provider}
</span>
<select
value={bindings[provider] ?? ""}
onChange={(e) => setAlias(provider, e.target.value)}
className="flex-1 text-xs px-1.5 py-0.5 rounded border border-border/60 bg-background"
>
<option value="">— primary —</option>
{availableAliases[provider].map((alias) => (
<option key={alias} value={alias}>
{alias}
</option>
))}
</select>
</li>
))}
</ul>
)}
</div>
<div className="flex justify-end gap-2 pt-1">
<button
onClick={onCancel}
disabled={busy}
className="text-[11px] px-2.5 py-1 rounded border border-border/60 text-muted-foreground hover:text-foreground hover:bg-muted/30 disabled:opacity-50 transition-colors"
>
Cancel
</button>
<button
onClick={() =>
onSave({
name,
task: task || undefined,
integrations: bindings,
})
}
disabled={busy || nameError !== null}
className="text-[11px] px-2.5 py-1 rounded bg-primary text-primary-foreground hover:bg-primary/90 disabled:opacity-50 transition-colors inline-flex items-center gap-1"
>
<Check className="w-3 h-3" /> Save
</button>
</div>
</div>
);
}
+2 -1
View File
@@ -35,7 +35,8 @@ GITHUB_CREDENTIALS = {
help_url="https://github.com/settings/tokens", help_url="https://github.com/settings/tokens",
description="GitHub Personal Access Token (classic)", description="GitHub Personal Access Token (classic)",
# Auth method support # Auth method support
aden_supported=False, aden_supported=True,
aden_provider_name="github",
direct_api_key_supported=True, direct_api_key_supported=True,
api_key_instructions="""To get a GitHub Personal Access Token: api_key_instructions="""To get a GitHub Personal Access Token:
1. Go to GitHub Settings > Developer settings > Personal access tokens 1. Go to GitHub Settings > Developer settings > Personal access tokens
@@ -29,6 +29,8 @@ NOTION_CREDENTIALS = {
startup_required=False, startup_required=False,
help_url="https://www.notion.so/my-integrations", help_url="https://www.notion.so/my-integrations",
description="Notion internal integration token", description="Notion internal integration token",
aden_supported=True,
aden_provider_name="notion",
direct_api_key_supported=True, direct_api_key_supported=True,
api_key_instructions="""To set up Notion API access: api_key_instructions="""To set up Notion API access:
1. Go to https://www.notion.so/my-integrations 1. Go to https://www.notion.so/my-integrations
+1 -1
View File
@@ -67,7 +67,7 @@ SLACK_CREDENTIALS = {
help_url="https://api.slack.com/apps", help_url="https://api.slack.com/apps",
description="Slack Bot Token (starts with xoxb-)", description="Slack Bot Token (starts with xoxb-)",
# Auth method support # Auth method support
aden_supported=False, aden_supported=True,
aden_provider_name="slack", aden_provider_name="slack",
direct_api_key_supported=True, direct_api_key_supported=True,
api_key_instructions="""To get a Slack Bot Token: api_key_instructions="""To get a Slack Bot Token:
@@ -26,6 +26,8 @@ Usage:
from __future__ import annotations from __future__ import annotations
from contextlib import contextmanager
from contextvars import ContextVar
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
from .base import CredentialError, CredentialSpec from .base import CredentialError, CredentialSpec
@@ -34,6 +36,36 @@ if TYPE_CHECKING:
from framework.credentials import CredentialStore from framework.credentials import CredentialStore
# Worker profiles inject their per-provider account aliases into this
# ContextVar before each tool turn. When a tool calls
# ``credentials.get_by_alias(provider, "")`` (or ``credentials.get(name)``),
# the adapter consults the map and, if the provider has a binding, uses
# that alias for the lookup. An explicit non-empty alias on the call wins.
# Keys are credential ids (matching ``CredentialSpec.credential_id``);
# values are the account alias to route to.
_active_account_overrides: ContextVar[dict[str, str] | None] = ContextVar(
"hive_credential_account_overrides", default=None
)
@contextmanager
def account_overrides(overrides: dict[str, str] | None):
"""Bind ``overrides`` for the duration of the ``with`` block.
Used by the colony runtime to pin a worker's MCP tool calls to its
profile's aliases. Empty / ``None`` is a no-op so callers can safely
pass through unbound profiles without conditional logic.
"""
if not overrides:
yield
return
token = _active_account_overrides.set(dict(overrides))
try:
yield
finally:
_active_account_overrides.reset(token)
# Process-wide memoization for CredentialStoreAdapter.default(). # Process-wide memoization for CredentialStoreAdapter.default().
# #
# Without this, every caller (e.g. each MCP server registration in # Without this, every caller (e.g. each MCP server registration in
@@ -127,6 +159,19 @@ class CredentialStoreAdapter:
if name not in self._specs: if name not in self._specs:
raise KeyError(f"Unknown credential '{name}'. Available: {list(self._specs.keys())}") raise KeyError(f"Unknown credential '{name}'. Available: {list(self._specs.keys())}")
if account is None:
# No explicit caller-supplied alias — check whether the active
# worker profile has pinned this credential to a specific
# account. Falls through to the unaliased default lookup when
# no profile binding exists.
overrides = _active_account_overrides.get()
if overrides:
bound_alias = overrides.get(name)
if bound_alias:
aliased = self.get_by_alias(name, bound_alias)
if aliased is not None:
return aliased
if account is not None: if account is not None:
try: try:
from framework.credentials.local.registry import LocalCredentialRegistry from framework.credentials.local.registry import LocalCredentialRegistry
@@ -364,10 +409,23 @@ class CredentialStoreAdapter:
def get_by_alias(self, provider_name: str, alias: str) -> str | None: def get_by_alias(self, provider_name: str, alias: str) -> str | None:
"""Resolve a specific account's token by alias. """Resolve a specific account's token by alias.
When ``alias`` is empty, falls back to the active worker profile's
binding (if any). MCP tools default ``account=""`` so this lets a
worker profile pin a default account without requiring the agent
to supply it on every call.
Raises: Raises:
CredentialExpiredError: If the matched credential is expired and CredentialExpiredError: If the matched credential is expired and
refresh failed. refresh failed.
""" """
if not alias:
overrides = _active_account_overrides.get()
if overrides:
bound_alias = overrides.get(provider_name)
if bound_alias:
alias = bound_alias
if not alias:
return None
cred = self._store.get_credential_by_alias(provider_name, alias) cred = self._store.get_credential_by_alias(provider_name, alias)
if cred is None: if cred is None:
return None return None
@@ -584,7 +642,7 @@ class CredentialStoreAdapter:
# Use 5-second timeout to avoid blocking on slow/failed requests # Use 5-second timeout to avoid blocking on slow/failed requests
client = AdenCredentialClient( client = AdenCredentialClient(
AdenClientConfig( AdenClientConfig(
base_url=os.environ.get("ADEN_API_URL", "https://api.adenhq.com"), base_url=os.environ.get("ADEN_API_URL", "https://app.open-hive.com"),
timeout=5.0, timeout=5.0,
) )
) )