chore: update documentation

This commit is contained in:
Richard Tang
2026-03-23 20:35:26 -07:00
parent d7ce923ca6
commit fca2deb980
11 changed files with 5 additions and 5066 deletions
-981
View File
@@ -1,981 +0,0 @@
# Credential Identity & Multi-Account Foundation (Issue #4755)
## Context
Agents are identity-blind. When `gmail_read_email` runs, neither the LLM nor the tool
knows whose inbox it's operating on. One `ADEN_API_KEY` can back N accounts of the same
provider (e.g., 10 Gmail accounts), but today the system can only surface one — the last
one synced silently overwrites all others.
This plan traces the **5-tuple relationship** (Agent Definition → Agent Instance →
Agent Tool → Auth Provider → Auth User Identity) through every layer of the stack,
identifies exactly where things break, and prescribes targeted fixes.
### Motivating Scenarios
**Scenario A — Executive Assistant Agent**: A company deploys an agent that manages
calendars for 5 executives. Each executive has connected their Google account through
Aden. The agent's job is to check each person's availability and schedule meetings.
Today: the agent can only see ONE person's calendar (whichever synced last). The other
4 accounts are silently lost in the index collision. The agent schedules meetings on
the wrong person's calendar with no indication anything is wrong.
**Scenario B — Multi-Channel Support Agent**: A support team agent is connected to
3 Slack workspaces (Engineering, Sales, Support), a shared Gmail inbox, and a personal
Gmail for the team lead. Today: the agent sees one Slack workspace, one Gmail. It
cannot tell which Slack workspace it's posting to or whose Gmail it's reading. It
might reply to a customer email from the team lead's personal inbox.
**Scenario C — Compliance & Audit**: An enterprise client requires audit logs showing
which account was accessed, when, and by which agent. Today: the system logs
`credentials.get("google")` — no record of which of the 10 Google accounts was used.
Impossible to audit.
**Scenario D — Single-Account Agent (backward compat)**: A simple agent uses one
Gmail account and one Slack bot. Nothing should change. `credentials.get("google")`
returns the same token it always did. Zero migration, zero configuration changes.
---
## The 5-Tuple Model
Every credential interaction involves five entities. Understanding how they relate
(and where the relationships break) is the key to the fix.
```
Agent Definition ──→ Agent Instance ──→ Agent Tool ──→ Auth Provider ──→ Auth User Identity
"I need Gmail" "Here's your "Give me a "Here's one "Whose token
Gmail tool" token" token" is this?"
← MISSING
```
### 1. Agent Definition (what tools are needed)
**Files**: `exports/{name}/agent.py`, `nodes/__init__.py`, `mcp_servers.json`
An exported agent declares `NodeSpec.tools = ["gmail_read_email", "gmail_send_email"]`.
The `mcp_servers.json` points to the tools MCP server. The agent definition has NO
credential awareness — it names tools, not credentials. This is intentional: the same
agent definition can run against different credential sets in different environments
(dev vs. prod, tenant A vs. tenant B).
**Business logic**: Agent definitions are portable templates. A "Gmail Triage" agent
built by one team can be deployed to 50 different customers, each with their own
Google accounts. The agent definition never hard-codes credential IDs.
**Status**: Fine. No changes needed.
### 2. Agent Instance (runtime wiring)
**Files**: `runner.py`, `tool_registry.py`, `mcp_client.py`
`AgentRunner.__init__()` does three things in sequence:
1. `validate_agent_credentials(graph.nodes)` — checks presence + health
2. `ToolRegistry.load_mcp_config()``MCPClient` spawns subprocess
3. `_setup()``create_agent_runtime()` with discovered tools
The `ToolRegistry` bridges parent ↔ MCP subprocess:
- `CONTEXT_PARAMS = {"workspace_id", "agent_id", "session_id", "data_dir"}` — stripped
from LLM schema, injected at call time via `make_mcp_executor` closure
- `set_session_context()` — set once at startup
- `set_execution_context()` — per-execution via `contextvars`
The MCP subprocess inherits `os.environ` at spawn time via
`merged_env = {**os.environ, **(config.env or {})}` in `mcp_client.py:157`.
**Business logic**: The agent instance is where "portable template" meets "specific
deployment." An instance knows which Aden API key to use, which workspace it belongs
to, which tools are available. The `CONTEXT_PARAMS` mechanism is how the framework
passes deployment-specific context into tools without the LLM knowing or caring.
This is the natural extension point for `account` routing in the future.
**Scenario**: Two customers both deploy the same "Email Triage" agent. Customer A
has 2 Google accounts; Customer B has 5. Each customer's `AgentRunner` validates
against their own Aden key, discovers different sets of credentials, and wires them
into the same agent graph. The agent definition is identical.
**Status**: Works for single-account. The `CONTEXT_PARAMS` pattern is the right
mechanism for future multi-account routing (adding `account` param).
### 3. Agent Tool (credential consumption)
**Files**: `tools/src/aden_tools/tools/*/`, `tools/mcp_server.py`
Every tool follows the same pattern:
```python
def register_gmail_tools(mcp, credentials=None):
def _get_token():
if credentials is not None:
return credentials.get("google") # ← single token, identity unknown
return os.getenv("GOOGLE_ACCESS_TOKEN")
@mcp.tool()
def gmail_read_email(message_id: str):
token = _get_token()
...
```
The `credentials` object is `CredentialStoreAdapter`, created once at MCP server startup
via `CredentialStoreAdapter.default()`. All tool closures capture this single shared
instance.
**Business logic**: Tools are the consumer endpoint — they need a valid access token
to call external APIs. They don't care about Aden, sync, or storage. They just need
`_get_token()` to return the right token. Today, "right" is undefined because there's
no way to say "the token for alice@company.com, not bob@company.com."
**Where it breaks — Scenario A revisited**: The executive assistant agent calls
`gmail_read_email()` intending to read Alice's inbox. `_get_token()` returns
`credentials.get("google")` which resolves to... Bob's token (he synced last).
The agent reads Bob's emails, thinks they're Alice's, and schedules meetings
accordingly. No error is raised. No indication anything is wrong. The agent is
confidently operating on the wrong person's data.
**Where it breaks — Scenario B revisited**: The support agent calls
`slack_post_message(channel="support-tickets")`. It uses a Slack token from
the Engineering workspace (last synced). The message goes to a channel that
doesn't exist in Engineering, returns an error, and the agent retries in a loop
with no understanding of why it's failing.
### 4. Auth Provider (credential storage & resolution)
**Files**: `store.py`, `aden/storage.py`, `aden/provider.py`, `aden/client.py`
Resolution chain:
```
credentials.get("google")
→ CredentialStoreAdapter.get("google")
→ CredentialStore.get("google")
→ AdenCachedStorage.load("google")
→ _provider_index.get("google") → "google_def456" (last write wins)
→ _load_by_id("google_def456")
→ Returns ONE CredentialObject
```
**The index collision bug** (`storage.py:303`):
```python
def _index_provider(self, credential):
provider_name = integration_type_key.value.get_secret_value()
self._provider_index[provider_name] = credential.id # ← OVERWRITES
```
**Business logic**: The storage layer is responsible for mapping human-readable
provider names ("google") to internal hash-based credential IDs ("google_abc123").
This mapping is essential because Aden generates unique hash IDs per connected account,
but tools reference providers by name. The `_provider_index` is this mapping.
**Why it's a `dict[str, str]` today**: The original design assumed 1:1 between
provider name and credential. "One Google account per API key." This was valid
for simple deployments but breaks fundamentally when an Aden API key backs multiple
accounts of the same provider.
**The collision mechanics**: When `sync_all()` runs, it iterates over all active
integrations from Aden. For a user with 3 Gmail accounts:
1. Sync `google_abc123` (alice@co.com) → `_provider_index["google"] = "google_abc123"`
2. Sync `google_def456` (bob@co.com) → `_provider_index["google"] = "google_def456"` ← Alice lost
3. Sync `google_ghi789` (carol@co.com) → `_provider_index["google"] = "google_ghi789"` ← Bob lost
All three `.enc` files exist on disk. Only Carol's is reachable by name. Alice's and
Bob's tokens are orphaned — encrypted, on disk, but invisible to the resolution chain.
**Why the disk layer is fine**: `EncryptedFileStorage` uses the hash ID as filename:
`google_abc123.enc`, `google_def456.enc`. No collision. The problem is purely in the
in-memory index that maps names to IDs.
### 5. Auth User Identity (THE MISSING PIECE)
**Files**: `models.py` (no identity model), `aden/provider.py` (metadata discarded),
`health_check.py` (identity parsed then discarded), `validation.py` (details ignored)
**Business logic**: Identity answers "whose account is this?" Every external service
provides identity data in its API responses — Gmail returns `emailAddress`, GitHub
returns `login`, Slack returns `team` + `user`. This data already flows through the
system during health checks and Aden syncs. It's parsed, briefly held in local
variables, and then discarded. No model captures it. No property exposes it. No
downstream consumer reads it.
Identity data exists at two sources but is discarded:
| Source | Data Available | What Happens |
|--------|---------------|--------------|
| Aden `metadata.email` | Email of connected account | `_aden_response_to_credential()` ignores `metadata` dict |
| Gmail health check | `emailAddress` field | `OAuthBearerHealthChecker.check()` returns `valid=True`, discards response body |
| GitHub health check | `login` username | Parsed to `details["username"]`, validation ignores `details` |
| Slack health check | `team`, `user` | Parsed to `details`, validation ignores `details` |
| Discord health check | `username`, `id` | Parsed to `details`, validation ignores `details` |
| Calendar health check | Primary calendar `id` = email | `OAuthBearerHealthChecker.check()` discards response body |
**The waste**: Every agent startup already makes these health check API calls. The
identity data is RIGHT THERE in the response body. We parse it for validation logic,
then throw it away. Zero additional API calls needed — we just need to keep what we
already have.
**What identity enables downstream**:
- LLM knows whose inbox it's reading (system prompt awareness)
- Tools can route to specific accounts (future `account` parameter)
- Audit logs can record which identity was accessed
- Users can see which accounts are connected in TUI/dashboard
- Agents can reason about cross-account operations ("forward from alice to bob")
---
## What Changes — Layer by Layer
### Step 1: `CredentialIdentity` model on `CredentialObject`
**File**: `core/framework/credentials/models.py`
**Business logic**: Every credential needs a structured way to answer "who does this
belong to?" Different providers express identity differently:
| Provider | Primary Identity | Secondary Identity |
|----------|-----------------|-------------------|
| Google (Gmail, Calendar, Drive) | Email address | — |
| Slack | Workspace name | Bot username |
| GitHub | Username (login) | — |
| Discord | Username | Account ID |
| HubSpot | Portal ID | — |
| Microsoft 365 | Email address | Tenant ID |
The `CredentialIdentity` model normalizes these into four universal fields:
`email`, `username`, `workspace`, `account_id`. The `label` property picks the
best human-readable identifier for display (email preferred, then username, etc.).
**Why a computed property, not a stored field**: Identity is derived from
`_identity_*` keys that already exist in the credential's key vault. Storing it
as a separate field would create a sync problem (what if keys update but the field
doesn't?). A computed property always reflects current state.
**Scenarios this enables**:
- **Display**: `cred.identity.label``"alice@company.com"` (for system prompts, TUI, logs)
- **Comparison**: `cred.identity.email == "alice@company.com"` (for account routing)
- **Serialization**: `cred.identity.to_dict()``{"email": "alice@company.com"}` (for MCP tool responses)
- **Existence check**: `cred.identity.is_known``True` (skip accounts with no identity)
- **Provider type**: `cred.provider_type``"google"` (from `_integration_type` key)
**Key design decision**: `set_identity(**fields)` persists as `_identity_*` keys using
the existing `set_key()` method. This means identity survives serialization/deserialization
through `EncryptedFileStorage` without any schema migration. Old credentials without
identity keys simply return `CredentialIdentity()` with all `None` fields and
`label == "unknown"`.
```python
class CredentialIdentity(BaseModel):
email: str | None = None
username: str | None = None
workspace: str | None = None
account_id: str | None = None
@property
def label(self) -> str:
return self.email or self.username or self.workspace or self.account_id or "unknown"
@property
def is_known(self) -> bool:
return bool(self.email or self.username or self.workspace or self.account_id)
def to_dict(self) -> dict[str, str]:
return {k: v for k, v in self.model_dump().items() if v is not None}
```
On `CredentialObject`:
```python
@property
def identity(self) -> CredentialIdentity:
fields = {}
for key_name, key_obj in self.keys.items():
if key_name.startswith("_identity_"):
field = key_name[len("_identity_"):]
fields[field] = key_obj.value.get_secret_value()
return CredentialIdentity(**{k: v for k, v in fields.items()
if k in CredentialIdentity.model_fields})
@property
def provider_type(self) -> str | None:
key = self.keys.get("_integration_type")
return key.value.get_secret_value() if key else None
def set_identity(self, **fields: str) -> None:
for field_name, value in fields.items():
if value:
self.set_key(f"_identity_{field_name}", value)
```
---
### Step 2: Fix storage multi-account index
**File**: `core/framework/credentials/aden/storage.py`
**Business logic**: The core bug. When a user connects multiple accounts of the same
provider type through Aden, all but the last one becomes unreachable. This affects
every multi-account deployment silently — no error, no warning, just missing accounts.
**`_provider_index`**: `dict[str, str]``dict[str, list[str]]`
**Before (broken)**:
```
sync google_abc123 (alice) → index["google"] = "google_abc123"
sync google_def456 (bob) → index["google"] = "google_def456" ← alice lost
load("google") → returns bob's token
```
**After (fixed)**:
```
sync google_abc123 (alice) → index["google"] = ["google_abc123"]
sync google_def456 (bob) → index["google"] = ["google_abc123", "google_def456"]
load("google") → returns alice's token (first = backward compat)
load_all_for_provider("google") → returns [alice, bob]
```
**Backward compatibility contract**: Every existing tool calls `credentials.get("google")`
and expects a single token string back. This MUST continue to work. `load("google")`
returns the first credential in the list — same behavior as before for single-account
deployments, deterministic (first-synced-first-served) for multi-account.
**Scenarios**:
- **Single account** (most common today): `index["google"] = ["google_abc123"]`.
`load("google")` returns the only entry. Identical behavior to before.
- **Two accounts, same provider**: `index["google"] = ["google_abc123", "google_def456"]`.
`load("google")` returns first. `load_all_for_provider("google")` returns both.
Existing tools see no change; new APIs can enumerate.
- **Mixed providers**: `index["google"] = ["google_abc123"], index["slack"] = ["slack_xyz"]`.
Each provider resolves independently.
- **Credential removed from Aden**: On next `sync_all()`, `rebuild_provider_index()`
rebuilds from disk. The removed credential's `.enc` file is gone, so it drops from
the index naturally.
- **`exists()` check**: Validation calls `exists("google")` to check if credentials
are available before running health checks. Must return `True` if ANY Google account
exists, not just the last-synced one.
```python
# _index_provider — append, don't overwrite
def _index_provider(self, credential):
...
if provider_name not in self._provider_index:
self._provider_index[provider_name] = []
if credential.id not in self._provider_index[provider_name]:
self._provider_index[provider_name].append(credential.id)
# load — first match (backward compat)
def load(self, credential_id):
resolved_ids = self._provider_index.get(credential_id)
if resolved_ids:
for rid in resolved_ids:
if rid != credential_id:
result = self._load_by_id(rid)
if result is not None:
return result
return self._load_by_id(credential_id)
# NEW: enumerate all accounts
def load_all_for_provider(self, provider_name: str) -> list[CredentialObject]:
results = []
for cid in self._provider_index.get(provider_name, []):
cred = self._load_by_id(cid)
if cred:
results.append(cred)
return results
```
---
### Step 3: Preserve Aden metadata as identity
**File**: `core/framework/credentials/aden/provider.py`
**Business logic**: When a user connects a Google account through Aden's OAuth flow,
the Aden server stores metadata about the connected account — most importantly, the
email address. This metadata comes back in every API response as
`metadata: {"email": "alice@company.com"}`. Today, this metadata is present in
`AdenCredentialResponse.metadata` (the `from_dict()` parser already handles it) but
is never written into the `CredentialObject`'s key vault. It's silently dropped.
**Why Aden metadata is the primary identity source**: Aden captures identity at the
moment of OAuth authorization — the user explicitly grants access, and the Aden server
records who they are. This is more authoritative than health checks because:
1. It's captured at consent time, not at validation time
2. It works even if the health check endpoint is down
3. It's available immediately on first sync, before any health check runs
**When metadata arrives**: Two code paths create/update credentials from Aden responses:
1. **`_aden_response_to_credential()`** — first-time sync. The credential doesn't
exist locally yet. We're building it from scratch. Metadata should be written as
`_identity_*` keys in the initial key dict.
2. **`_update_credential_from_aden()`** — token refresh. The credential already exists.
The access token is updated. Metadata should be written/overwritten as `_identity_*`
keys on the existing credential object.
**Scenario — first sync**: User connects `alice@company.com` through Aden. Aden
returns `{access_token: "...", metadata: {email: "alice@company.com"}}`. The
credential is created with `_identity_email = "alice@company.com"`. Later,
`cred.identity.email` returns `"alice@company.com"`.
**Scenario — token refresh**: Alice's token expires. Aden refreshes it and returns
updated metadata. `_update_credential_from_aden()` updates the access token AND
refreshes `_identity_email`. If Alice changed her email (e.g., name change), the
identity stays current.
**Scenario — no metadata**: Some Aden integrations may not return metadata (e.g.,
a simple API key integration). The loop `for meta_key, meta_value in (metadata or {}).items()`
safely does nothing. The credential has no `_identity_*` keys, and `cred.identity`
returns `CredentialIdentity()` with `label == "unknown"`.
```python
# In _aden_response_to_credential, after building keys dict:
for meta_key, meta_value in (aden_response.metadata or {}).items():
if meta_value and isinstance(meta_value, str):
keys[f"_identity_{meta_key}"] = CredentialKey(
name=f"_identity_{meta_key}",
value=SecretStr(meta_value),
)
# In _update_credential_from_aden, after updating access_token:
for meta_key, meta_value in (aden_response.metadata or {}).items():
if meta_value and isinstance(meta_value, str):
credential.keys[f"_identity_{meta_key}"] = CredentialKey(
name=f"_identity_{meta_key}",
value=SecretStr(meta_value),
)
```
---
### Step 4: Extract identity from health checks
**File**: `tools/src/aden_tools/credentials/health_check.py`
**Business logic**: Health checks are the second identity source. Every agent startup
runs `validate_agent_credentials()` which calls provider-specific health check
endpoints. These endpoints return identity data as a side effect of validation:
| Health Check Endpoint | What It Returns | Identity We Extract |
|----------------------|----------------|-------------------|
| Gmail: `GET /users/me/profile` | `{emailAddress, messagesTotal, ...}` | `email = emailAddress` |
| Calendar: `GET /users/me/calendarList` | `{items: [{id, primary, ...}]}` | `email = primary calendar id` |
| Slack: `POST auth.test` | `{ok, team, user, bot_id, ...}` | `workspace = team, username = user` |
| GitHub: `GET /user` | `{login, id, name, ...}` | `username = login` |
| Discord: `GET /users/@me` | `{username, id, ...}` | `username = username` |
**Why health checks matter as an identity source**:
1. **Fallback when Aden metadata is missing**: Not all Aden integrations return
metadata. The health check always hits the actual service, so identity is always
available on success.
2. **Ground truth verification**: Aden metadata is captured at OAuth time. If the
user's email changed since then, the health check returns the CURRENT identity.
3. **Non-Aden credentials**: When credentials are configured via environment
variables (no Aden), health checks are the ONLY identity source. A dev sets
`GOOGLE_ACCESS_TOKEN` manually — the health check reveals whose token it is.
4. **Zero additional cost**: The health check API call is already happening. We
just need to parse the response body that's currently discarded after the
status code check.
**Design — `_extract_identity()` hook**: The base `OAuthBearerHealthChecker` gets
a new virtual method `_extract_identity(data: dict) -> dict[str, str]` that subclasses
override. The `check()` method calls it when the response is 200 OK:
```python
class OAuthBearerHealthChecker:
def _extract_identity(self, data: dict) -> dict[str, str]:
"""Override to extract identity fields from successful response."""
return {}
def check(self, access_token: str) -> HealthCheckResult:
...
if response.status_code == 200:
identity = {}
try:
data = response.json()
identity = self._extract_identity(data)
except Exception:
pass # Identity extraction is best-effort
return HealthCheckResult(
valid=True,
message=f"{self.service_name} credentials valid",
details={"identity": identity} if identity else {},
)
```
**Why `details["identity"]`**: The existing `HealthCheckResult` has a `details: dict`
field that's used ad-hoc by different checkers. By putting identity under a standardized
`"identity"` key, Step 5 can generically extract it without knowing which checker
ran. Existing `details` fields (`username`, `team`, `bot_id`) continue to exist
alongside — no breaking changes.
**Standalone checkers** (Slack, GitHub, Discord) don't extend `OAuthBearerHealthChecker`.
They already parse identity data into their `details` dict. For these, we simply add
an `"identity"` key with the structured fields alongside existing keys.
**Scenario — Gmail health check enriches a credential without Aden metadata**: A dev
sets `GOOGLE_ACCESS_TOKEN` as an env var. The credential has no `_identity_*` keys.
On startup, the Gmail health check calls `/users/me/profile`, gets
`{emailAddress: "dev@gmail.com"}`, returns `details={"identity": {"email": "dev@gmail.com"}}`.
Step 5 persists this. Now `cred.identity.email` works even without Aden.
**Scenario — health check fails**: Token is expired or revoked. Response is 401.
No identity extracted (identity extraction only runs on 200). The health check
returns `valid=False`. Step 5 skips persistence. The credential's existing identity
(if any, from Aden metadata) remains unchanged.
**Scenario — identity extraction throws**: The response body is malformed or missing
expected fields. The `try/except` in `check()` catches it. Health check still returns
`valid=True` (the token worked). Identity is just not extracted. Best-effort, never
blocks validation.
---
### Step 5: Persist identity during validation
**File**: `core/framework/credentials/validation.py`
**Business logic**: Steps 3 and 4 produce identity data. Step 5 is the bridge that
takes identity from health check results and persists it to the credential store.
This runs during `validate_agent_credentials()`, which is called at every agent startup.
**Why persist during validation**: Validation is the natural lifecycle hook because:
1. It runs on every agent startup (guaranteed execution)
2. It already has access to the credential store
3. It already runs health checks (identity is available in the result)
4. It runs BEFORE the agent executes (identity is available for system prompt injection)
**Flow**:
```
Agent startup
→ validate_agent_credentials()
→ for each credential:
→ check_credential_health(token) → HealthCheckResult
→ if result.valid AND result.details["identity"] exists:
→ cred_obj = store.get_credential(cred_id)
→ cred_obj.set_identity(**identity_data)
→ store.save_credential(cred_obj) ← persisted to disk
```
**Scenario — identity from health check augments Aden metadata**: Aden provides
`metadata.email = "alice@company.com"` (stored as `_identity_email` in Step 3).
The Slack health check returns `identity: {workspace: "Acme Corp", username: "hive-bot"}`.
Step 5 adds `_identity_workspace` and `_identity_username` to the Slack credential.
Now both credentials have rich identity data from their respective sources.
**Scenario — identity update on restart**: Between agent runs, the GitHub user
renamed from `old-username` to `new-username`. On next startup, the health check
returns `identity: {username: "new-username"}`. Step 5 calls `set_identity(username="new-username")`,
which overwrites `_identity_username`. The credential now reflects the current identity.
**Scenario — multiple accounts of same provider**: With the index fix (Step 2),
`validate_agent_credentials()` iterates over all credentials. Each Google account
gets its own health check. Each health check returns a different `emailAddress`.
Each identity is persisted to the correct `CredentialObject`. Account A gets
`_identity_email = "alice@co.com"`, Account B gets `_identity_email = "bob@co.com"`.
**Error handling**: Identity persistence is best-effort. If `get_credential()` fails
or `save_credential()` fails, the exception is caught and swallowed. The agent still
starts. The credential still works. It just won't have identity data for that account.
This is acceptable because identity is informational, not functional.
```python
if result.valid:
identity_data = result.details.get("identity")
if identity_data and isinstance(identity_data, dict):
try:
cred_obj = store.get_credential(cred_id, refresh_if_needed=False)
if cred_obj:
cred_obj.set_identity(**identity_data)
store.save_credential(cred_obj)
except Exception:
pass # Identity persistence is best-effort
```
---
### Step 6: Account listing & identity APIs
**Files**: `core/framework/credentials/store.py`, `tools/src/aden_tools/credentials/store_adapter.py`
**Business logic**: Steps 1-5 populate identity data. Step 6 exposes it through
clean APIs. Two layers need new methods:
1. **`CredentialStore`** (framework layer) — knows about `CredentialObject` and storage
2. **`CredentialStoreAdapter`** (tool boundary) — wraps the store with `CredentialSpec`-aware
APIs, sits in the MCP subprocess, consumed by tools
**Why two layers**: The store is a framework concept (core/). The adapter is a tools
concept (tools/). Tools never import from core directly. The adapter bridges the gap,
translating between credential IDs and spec names, handling the "is this credential
configured and available?" logic.
**APIs added to `CredentialStore`**:
- `list_accounts(provider_name)` — returns all accounts for a provider type with
their identities. Delegates to `storage.load_all_for_provider()` (Step 2). Returns
a list of dicts, not raw `CredentialObject`s, to avoid leaking secrets upstream.
- `get_credential_by_identity(provider_name, label)` — finds a specific account by
matching `cred.identity.label` against the provided label. This is the resolution
mechanism for future multi-account routing: "give me the token for alice@co.com."
**APIs added to `CredentialStoreAdapter`**:
- `get_identity(name)` — returns the identity dict for a named credential spec.
Used by tools that want to know whose token they're using for logging/display.
- `list_accounts(provider_name)` — delegates to store. Used by the `get_account_info`
MCP tool (Step 8).
- `get_all_account_info()` — iterates over all configured credential specs, collects
all accounts across all providers. Used to build the system prompt (Step 7).
Deduplicates by provider name to avoid listing the same provider's accounts twice
when multiple specs map to the same provider.
- `get_by_identity(provider_name, label)` — resolves a specific account's token by
identity label. Used by future multi-account routing (Step 9). Returns a raw token
string, not a `CredentialObject`.
**Scenario — system prompt building**: At agent startup, the runner calls
`adapter.get_all_account_info()`. The adapter iterates over specs:
`{"gmail": CredentialSpec(credential_id="google"), "gcal": CredentialSpec(credential_id="google"), "slack": CredentialSpec(...)}`.
It deduplicates by provider: `google` and `slack`. For `google`, `list_accounts("google")`
returns 2 accounts. For `slack`, 1 account. Result: 3 account entries for the system prompt.
**Scenario — identity-based routing (future)**: The LLM calls
`gmail_read_email(account="alice@co.com")`. The tool calls
`credentials.get_by_identity("google", "alice@co.com")`. The adapter delegates to
`store.get_credential_by_identity("google", "alice@co.com")` which scans all Google
credentials, finds the one where `identity.label == "alice@co.com"`, and returns
its access token. The right inbox is read.
```python
# CredentialStore
def list_accounts(self, provider_name: str) -> list[dict[str, Any]]:
if hasattr(self._storage, 'load_all_for_provider'):
creds = self._storage.load_all_for_provider(provider_name)
else:
cred = self.get_credential(provider_name)
creds = [cred] if cred else []
return [
{"credential_id": c.id, "provider": provider_name,
"identity": c.identity.to_dict(), "label": c.identity.label}
for c in creds
]
def get_credential_by_identity(self, provider_name: str, label: str) -> CredentialObject | None:
if hasattr(self._storage, 'load_all_for_provider'):
for cred in self._storage.load_all_for_provider(provider_name):
if cred.identity.label == label:
return cred
return None
```
```python
# CredentialStoreAdapter
def get_all_account_info(self) -> list[dict[str, Any]]:
accounts = []
seen: set[str] = set()
for name, spec in self._specs.items():
provider = spec.credential_id or name
if provider in seen or not self.is_available(name):
continue
seen.add(provider)
accounts.extend(self._store.list_accounts(provider))
return accounts
def get_by_identity(self, provider_name: str, label: str) -> str | None:
cred = self._store.get_credential_by_identity(provider_name, label)
return cred.get_default_key() if cred else None
```
---
### Step 7: Surface identity to LLM via system prompt
**Files**: `prompt_composer.py`, `executor.py`, `event_loop_node.py`, `node.py`, `runner.py`
**Business logic**: The LLM needs to know what accounts are connected so it can:
1. **Communicate clearly to the user**: "I checked alice@company.com's inbox and
found 3 unread messages" vs. "I checked the inbox and found 3 unread messages"
2. **Disambiguate operations**: When asked "check my emails," the LLM can respond
"You have 2 Google accounts connected: alice@company.com and bob@company.com.
Which would you like me to check?" (requires Step 9 routing, but awareness comes first)
3. **Prevent hallucination**: Without account info, the LLM might invent account
names or assume capabilities it doesn't have. With the accounts prompt, it knows
exactly what's available.
4. **Cross-account reasoning**: "Forward the email from alice's inbox to bob's inbox"
requires knowing both accounts exist and which is which.
**Where it sits in the three-layer prompt**:
```
Layer 1 — Identity: "You are a thorough email management agent."
Accounts: "Connected accounts:
- google: alice@company.com (email: alice@company.com)
- google: bob@company.com (email: bob@company.com)
- slack: Acme Corp (workspace: Acme Corp, username: hive-bot)"
Layer 2 — Narrative: "We've triaged 15 emails so far..."
Layer 3 — Focus: "Your current task: categorize remaining unread emails"
```
Accounts sit between identity (static personality) and narrative (dynamic state)
because connected accounts are semi-static — they don't change during a session but
are deployment-specific (different from the agent definition).
**Injection path through the framework**:
```
AgentRunner._setup()
→ CredentialStoreAdapter.get_all_account_info()
→ build_accounts_prompt(accounts) ← new function in prompt_composer.py
→ GraphExecutor(accounts_prompt=...) ← new init param
→ NodeContext(accounts_prompt=...) ← new field
→ compose_system_prompt(..., accounts_prompt=...) ← new param
```
**Why it flows through `NodeContext`**: For the first node in a graph (or an isolated
`EventLoopNode`), the system prompt is built in `EventLoopNode.execute()`, not through
the continuous transition path. `NodeContext.accounts_prompt` carries the data to
both paths:
- **Continuous transition**: `compose_system_prompt()` in the executor uses
`self.accounts_prompt` directly
- **First node / isolated node**: `EventLoopNode.execute()` reads `ctx.accounts_prompt`
and appends it to the system prompt
**Scenario — no credentials**: An agent with no external integrations (pure LLM
reasoning, no tools). `get_all_account_info()` returns `[]`. `build_accounts_prompt([])`
returns `""`. The accounts block is omitted from the system prompt. Zero impact.
**Scenario — single account**: One Google account. System prompt shows
`"Connected accounts:\n- google: alice@company.com (email: alice@company.com)"`.
The LLM knows who it's operating as.
**Scenario — unknown identity**: A credential exists but has no `_identity_*` keys
(maybe Aden didn't provide metadata and health checks haven't run yet). `identity.label`
returns `"unknown"`. The prompt shows `"- google: unknown"`. Better than nothing —
the LLM knows Google is connected, just not whose account.
```python
def build_accounts_prompt(accounts: list[dict[str, Any]]) -> str:
if not accounts:
return ""
lines = ["Connected accounts:"]
for acct in accounts:
provider = acct.get("provider", "unknown")
label = acct.get("label", "unknown")
identity = acct.get("identity", {})
detail_parts = [f"{k}: {v}" for k, v in identity.items() if v]
detail = f" ({', '.join(detail_parts)})" if detail_parts else ""
lines.append(f"- {provider}: {label}{detail}")
return "\n".join(lines)
```
---
### Step 8: `get_account_info` MCP tool
**New directory**: `tools/src/aden_tools/tools/account_info_tool/`
**Business logic**: Step 7 gives the LLM passive awareness (system prompt). Step 8
gives the LLM active introspection — it can call `get_account_info()` to query
connected accounts at runtime, even mid-conversation.
**Why both passive and active**: The system prompt provides context at conversation
start. But in long-running agents with many tools, the system prompt may get
compacted (truncated during context management). The MCP tool ensures the LLM can
always re-discover account info even after compaction.
**Use cases**:
- **User asks "what accounts are connected?"**: LLM calls `get_account_info()`,
formats the response for the user.
- **LLM needs to decide which account to use**: Before sending an email, the LLM
calls `get_account_info(provider="google")` to see which Gmail accounts are
available, then asks the user which one to send from.
- **Dynamic account discovery**: In a long-running session, accounts might be
added/revoked (Aden dashboard). The tool provides current state vs. the stale
system prompt.
- **Debugging/transparency**: The user can ask "which Slack workspace are you
connected to?" and get a precise answer.
**API design**:
```python
@mcp.tool()
def get_account_info(provider: str = "") -> dict:
"""List connected accounts and their identities.
Call with no arguments to see all connected accounts.
Call with provider="google" to filter by provider type.
Returns account IDs, provider types, and identity labels
(email, username, workspace) for each connected account.
"""
if credentials is None:
return {"accounts": [], "message": "No credential store configured"}
if provider:
accounts = credentials.list_accounts(provider)
else:
accounts = credentials.get_all_account_info()
return {"accounts": accounts, "count": len(accounts)}
```
**Response example**:
```json
{
"accounts": [
{"credential_id": "google_abc123", "provider": "google",
"identity": {"email": "alice@company.com"}, "label": "alice@company.com"},
{"credential_id": "google_def456", "provider": "google",
"identity": {"email": "bob@company.com"}, "label": "bob@company.com"},
{"credential_id": "slack_xyz", "provider": "slack",
"identity": {"workspace": "Acme Corp", "username": "hive-bot"},
"label": "Acme Corp"}
],
"count": 3
}
```
Register in `tools/src/aden_tools/tools/__init__.py` alongside existing tools.
---
### Step 9: Multi-account routing extension point (design only, no code)
**Business logic**: Steps 1-8 build the foundation. Step 9 designs (but does not
implement) the per-tool-call account selection mechanism. This is the endgame:
when the LLM calls `gmail_read_email(account="alice@co.com")`, the right token
is used.
**Why design-only in this PR**: Multi-account routing requires changes to every
tool's `_get_token()` function and introduces the `account` parameter across all
tool signatures. This is a significant surface area change that should be a
separate PR with its own testing. The foundation from Steps 1-8 makes it a
straightforward addition.
**How it will work — the full flow**:
1. **LLM discovers accounts**: Via system prompt (Step 7) or `get_account_info` tool
(Step 8), the LLM knows `alice@company.com` and `bob@company.com` are connected.
2. **User says "check alice's inbox"**: The LLM calls
`gmail_read_email(account="alice@company.com")`.
3. **Tool resolves account**: `_get_token("alice@company.com")` calls
`credentials.get_by_identity("google", "alice@company.com")`.
4. **Store resolves credential**: `get_credential_by_identity("google", "alice@company.com")`
scans all Google credentials, finds the one where `identity.label == "alice@company.com"`,
returns its access token.
5. **API call with correct token**: The tool uses Alice's token to call the Gmail API.
The right inbox is read.
**Pinned single-account agents**: For agents that should ALWAYS use a specific account
(e.g., a shared support inbox), the `account` parameter becomes a `CONTEXT_PARAM` in
`ToolRegistry`. It's stripped from the LLM schema (the LLM can't override it) and
auto-injected at call time from `NodeSpec` or `GraphSpec` configuration. This follows
the exact same pattern as `data_dir` — proven, concurrency-safe, framework-native.
**Why `CredentialIdentity.label` is the stable routing key**:
- It's human-readable (email addresses, usernames)
- It's deterministic (computed from `_identity_*` keys)
- It matches what the LLM sees in the system prompt
- It survives credential refresh (identity doesn't change when tokens rotate)
- It's unique within a provider (two Google accounts always have different emails)
---
## How This Works with Exported/Template Agents
### Agent definition (no changes)
Exported agents in `exports/` declare tools via `NodeSpec.tools` and MCP servers via
`mcp_servers.json`. They don't know about credentials — this is by design. Credential
specs (`CredentialSpec.tools`) provide the external mapping from tool name to credential.
**Scenario — same agent, different deployments**: The "Email Triage" agent template
is used by 3 customers. Customer A has 1 Gmail account. Customer B has 5. Customer C
has 3 Gmail and 2 Outlook. The agent definition is identical for all three. Only
the Aden API key (and thus the available credentials) differs.
### Agent instance (accounts_prompt injection)
When `AgentRunner.load()` instantiates an agent:
1. `validate_agent_credentials()` runs — syncs Aden, checks presence/health
2. Identity is persisted during validation (Step 5)
3. `_setup()` collects `accounts_prompt` via `CredentialStoreAdapter.get_all_account_info()`
4. Passes to `GraphExecutor(accounts_prompt=...)``compose_system_prompt()`
The agent definition doesn't need to change. Identity flows through the existing
runtime wiring.
### MCP subprocess (independent adapter)
The MCP subprocess creates its own `CredentialStoreAdapter.default()` at startup.
This triggers an independent `sync_all()` from Aden. With the index fix (Step 2),
all accounts are preserved. The adapter's new methods (`list_accounts()`,
`get_all_account_info()`, `get_by_identity()`) are available to tools in the subprocess.
**Why independent sync is correct**: The MCP subprocess runs in a separate process
with its own memory space. It cannot share the parent's `CredentialStore`. Both
processes sync from the same Aden server (same API key), so they see the same
credentials. The disk-level `EncryptedFileStorage` handles concurrent access safely
(each read is atomic file read, writes use temp+rename).
### ToolRegistry bridge (future routing)
When multi-account routing is implemented (Step 9), the `account` parameter will be
added to `CONTEXT_PARAMS`. `ToolRegistry._convert_mcp_tool_to_framework_tool()` will
strip it from LLM schema (line 467). `make_mcp_executor()` will inject it at call time
(line 421). This follows the exact same pattern as `data_dir`.
---
## Files Modified (Summary)
| # | File | Changes |
|---|------|---------|
| 1 | `core/framework/credentials/models.py` | `CredentialIdentity`, `identity` property, `set_identity()`, `provider_type` |
| 2 | `core/framework/credentials/aden/storage.py` | `_provider_index: dict[str, list[str]]`, `load_all_for_provider()`, fix `exists()`, `rebuild_provider_index()` |
| 3 | `core/framework/credentials/aden/provider.py` | Persist `metadata` as `_identity_*` keys in both `_aden_response_to_credential` and `_update_credential_from_aden` |
| 4 | `tools/src/aden_tools/credentials/health_check.py` | `_extract_identity()` hook on `OAuthBearerHealthChecker`, overrides per checker, `identity` key in standalone checker `details` |
| 5 | `core/framework/credentials/validation.py` | Persist identity from health check `details["identity"]` via `set_identity()` |
| 6 | `core/framework/credentials/store.py` | `list_accounts()`, `get_credential_by_identity()` |
| 7 | `tools/src/aden_tools/credentials/store_adapter.py` | `get_identity()`, `list_accounts()`, `get_all_account_info()`, `get_by_identity()` |
| 8 | `core/framework/graph/prompt_composer.py` | `build_accounts_prompt()`, `accounts_prompt` param on `compose_system_prompt()` |
| 9 | `core/framework/graph/node.py` | `accounts_prompt: str = ""` on `NodeContext` |
| 10 | `core/framework/graph/executor.py` | `accounts_prompt` init param, pass to `compose_system_prompt()` and `_build_context()` |
| 11 | `core/framework/graph/event_loop_node.py` | Append `accounts_prompt` for first node system prompt |
| 12 | `core/framework/runner/runner.py` | Collect accounts info in `_setup()`, pass to executor |
| 13 | `tools/src/aden_tools/tools/account_info_tool/` | New `get_account_info` MCP tool |
| 14 | `tools/src/aden_tools/tools/__init__.py` | Register account info tool |
---
## Verification
1. **Multi-index**: Sync 2 Google accounts → both in `_provider_index["google"]` (not overwritten)
2. **Identity model**: `cred.identity.email` returns email, `cred.identity.label` returns best label
3. **Health check identity**: `GoogleGmailHealthChecker.check(token)``result.details["identity"]["email"]`
4. **Persistence**: After validation, credential on disk has `_identity_email` key
5. **Account listing**: `adapter.list_accounts("google")` → 2 accounts with distinct identities
6. **System prompt**: `compose_system_prompt(accounts_prompt=...)` includes "Connected accounts"
7. **MCP tool**: `get_account_info(provider="google")` returns both accounts with labels
8. **Backward compat**: `credentials.get("google")` still returns single token string
9. **Existing tests**: `PYTHONPATH=core:tools/src python -m pytest tools/tests/ -x -q -k "credential"`
File diff suppressed because it is too large Load Diff
-992
View File
@@ -1,992 +0,0 @@
# Credential Store Usage Guide
This guide covers how to use the Hive credential store for managing API keys, OAuth2 tokens, and custom credentials in your agents and tools.
## Table of Contents
- [Quick Start](#quick-start)
- [Core Concepts](#core-concepts)
- [Basic Usage](#basic-usage)
- [Template Resolution](#template-resolution)
- [Storage Backends](#storage-backends)
- [Using OAuth2 Provider](#using-oauth2-provider)
- [Implementing Custom Providers](#implementing-custom-providers)
- [Testing with Credentials](#testing-with-credentials)
- [Migration from CredentialManager](#migration-from-credentialmanager)
- [Security Best Practices](#security-best-practices)
---
## Quick Start
```python
from core.framework.credentials import CredentialStore, InMemoryStorage
# Create a store with in-memory storage (for development)
store = CredentialStore(storage=InMemoryStorage())
# Save a simple API key
store.save_api_key("brave_search", "your-api-key-here")
# Retrieve the credential
api_key = store.get("brave_search")
# Use template resolution for HTTP headers
headers = store.resolve_headers({
"X-Subscription-Token": "{{brave_search.api_key}}"
})
# Result: {"X-Subscription-Token": "your-api-key-here"}
```
---
## Core Concepts
### Key-Vault Structure
Credentials are stored as **objects** containing one or more **keys**:
```
brave_search (CredentialObject)
├── api_key: "BSAKxxxxx"
github_oauth (CredentialObject)
├── access_token: "ghp_xxxxx"
├── refresh_token: "ghr_xxxxx"
└── expires_at: 2024-01-15T10:00:00Z
```
### Bipartisan Model
The credential store follows a **bipartisan model**:
- **Store**: Only stores credential values
- **Tools**: Define how credentials are used (headers, query params, etc.)
This separation keeps the store simple and lets each tool specify its exact requirements.
### Components
| Component | Purpose |
|-----------|---------|
| `CredentialStore` | Main orchestrator for all credential operations |
| `CredentialObject` | A credential with one or more keys |
| `CredentialKey` | A single key-value pair with optional expiration |
| `CredentialStorage` | Backend for persisting credentials |
| `CredentialProvider` | Handles credential lifecycle (refresh, validate) |
| `TemplateResolver` | Resolves `{{cred.key}}` patterns |
---
## Basic Usage
### Creating a Credential Store
```python
from core.framework.credentials import (
CredentialStore,
EncryptedFileStorage,
EnvVarStorage,
InMemoryStorage,
)
# Option 1: Encrypted file storage (recommended for production)
store = CredentialStore.with_encrypted_storage("~/.hive/credentials")
# Option 2: Environment variable storage (backward compatible)
store = CredentialStore.with_env_storage({
"brave_search": "BRAVE_SEARCH_API_KEY",
"openai": "OPENAI_API_KEY",
})
# Option 3: In-memory storage (for testing/development)
store = CredentialStore(storage=InMemoryStorage())
# Option 4: Custom storage configuration
storage = EncryptedFileStorage(
base_path="~/.hive/credentials",
key_env_var="HIVE_CREDENTIAL_KEY" # Encryption key from env
)
store = CredentialStore(storage=storage)
```
### Saving Credentials
```python
# Simple API key
store.save_api_key("brave_search", "your-api-key")
# Multi-key credential (e.g., OAuth2)
from core.framework.credentials import CredentialObject, CredentialKey, CredentialType
from pydantic import SecretStr
from datetime import datetime, timedelta, timezone
credential = CredentialObject(
id="github_oauth",
credential_type=CredentialType.OAUTH2,
keys={
"access_token": CredentialKey(
name="access_token",
value=SecretStr("ghp_xxxxxxxxxxxx"),
expires_at=datetime.now(timezone.utc) + timedelta(hours=1)
),
"refresh_token": CredentialKey(
name="refresh_token",
value=SecretStr("ghr_xxxxxxxxxxxx")
),
},
provider_id="oauth2",
auto_refresh=True,
)
store.save_credential(credential)
```
### Retrieving Credentials
```python
# Get the default key value (api_key, access_token, or first key)
api_key = store.get("brave_search")
# Get a specific key
access_token = store.get_key("github_oauth", "access_token")
refresh_token = store.get_key("github_oauth", "refresh_token")
# Get the full credential object
credential = store.get_credential("github_oauth")
if credential:
print(f"Type: {credential.credential_type}")
print(f"Keys: {list(credential.keys.keys())}")
print(f"Auto-refresh: {credential.auto_refresh}")
# Check if credential exists and is available
if store.is_available("brave_search"):
# Use the credential
pass
```
### Deleting Credentials
```python
# Delete a credential
deleted = store.delete_credential("old_api_key")
if deleted:
print("Credential deleted")
```
---
## Template Resolution
The credential store supports template patterns for injecting credentials into HTTP requests.
### Syntax
```
{{credential_id}} -> Returns default key
{{credential_id.key_name}} -> Returns specific key
```
### Resolving Headers
```python
# Define headers with credential templates
header_templates = {
"Authorization": "Bearer {{github_oauth.access_token}}",
"X-API-Key": "{{brave_search.api_key}}",
"X-Custom": "{{custom_cred.token}}"
}
# Resolve to actual values
headers = store.resolve_headers(header_templates)
# Result: {
# "Authorization": "Bearer ghp_xxxxxxxxxxxx",
# "X-API-Key": "BSAKxxxxxxxxxxxx",
# "X-Custom": "actual-token-value"
# }
# Use with httpx/requests
import httpx
response = httpx.get("https://api.example.com/data", headers=headers)
```
### Resolving Query Parameters
```python
params = store.resolve_params({
"api_key": "{{brave_search.api_key}}",
"client_id": "{{oauth_app.client_id}}"
})
```
### Resolving Arbitrary Strings
```python
# Resolve any string containing templates
url = store.resolve("https://api.example.com?key={{api_cred.key}}")
```
### Handling Missing Credentials
```python
# By default, missing credentials raise an error
try:
headers = store.resolve_headers({"Auth": "{{missing.key}}"})
except CredentialNotFoundError as e:
print(f"Missing credential: {e}")
# Use fail_on_missing=False to leave templates unresolved
headers = store.resolve_headers(
{"Auth": "{{missing.key}}"},
fail_on_missing=False
)
# Result: {"Auth": "{{missing.key}}"}
```
---
## Storage Backends
### EncryptedFileStorage (Recommended)
Encrypts credentials at rest using Fernet (AES-128-CBC + HMAC).
```python
from core.framework.credentials import EncryptedFileStorage
# The encryption key is read from HIVE_CREDENTIAL_KEY env var
storage = EncryptedFileStorage("~/.hive/credentials")
# Or provide the key directly (32-byte Fernet key)
storage = EncryptedFileStorage(
base_path="~/.hive/credentials",
encryption_key=b"your-32-byte-fernet-key-here..."
)
```
**Directory structure:**
```
~/.hive/credentials/
├── credentials/
│ ├── brave_search.enc # Encrypted credential JSON
│ └── github_oauth.enc
└── metadata/
└── index.json # Unencrypted index
```
**Generate an encryption key:**
```python
from cryptography.fernet import Fernet
key = Fernet.generate_key()
print(f"HIVE_CREDENTIAL_KEY={key.decode()}")
```
### EnvVarStorage (Backward Compatible)
Reads credentials from environment variables. **Read-only** - cannot save credentials.
```python
from core.framework.credentials import EnvVarStorage
storage = EnvVarStorage(
env_mapping={
"brave_search": "BRAVE_SEARCH_API_KEY",
"openai": "OPENAI_API_KEY",
}
)
# Credentials are read from environment
# export BRAVE_SEARCH_API_KEY=your-key
```
### CompositeStorage (Layered)
Combines multiple storage backends with fallback.
```python
from core.framework.credentials import CompositeStorage, EncryptedFileStorage, EnvVarStorage
storage = CompositeStorage(
primary=EncryptedFileStorage("~/.hive/credentials"),
fallbacks=[
EnvVarStorage({"brave_search": "BRAVE_SEARCH_API_KEY"})
]
)
# Writes go to primary (encrypted files)
# Reads check primary first, then fallbacks (env vars)
```
### HashiCorp Vault Storage
For enterprise deployments with HashiCorp Vault.
```python
from core.framework.credentials.vault import HashiCorpVaultStorage
storage = HashiCorpVaultStorage(
vault_url="https://vault.example.com",
token="hvs.xxxxx", # Or use VAULT_TOKEN env var
mount_point="secret",
path_prefix="hive/credentials"
)
```
---
## Using OAuth2 Provider
The OAuth2 provider handles token lifecycle including automatic refresh.
### Setup
```python
from core.framework.credentials import CredentialStore, InMemoryStorage
from core.framework.credentials.oauth2 import BaseOAuth2Provider, OAuth2Config
# Configure OAuth2
config = OAuth2Config(
token_url="https://oauth.example.com/token",
authorization_url="https://oauth.example.com/authorize", # Optional
client_id="your-client-id",
client_secret="your-client-secret",
default_scopes=["read", "write"],
)
# Create provider
provider = BaseOAuth2Provider(config)
# Create store with provider
store = CredentialStore(
storage=InMemoryStorage(),
providers=[provider],
)
```
### Client Credentials Flow (Server-to-Server)
```python
# Get a token using client credentials
token = provider.client_credentials_grant(scopes=["api.read"])
# Save to store
from core.framework.credentials import CredentialObject, CredentialKey, CredentialType
from pydantic import SecretStr
credential = CredentialObject(
id="service_account",
credential_type=CredentialType.OAUTH2,
keys={
"access_token": CredentialKey(
name="access_token",
value=SecretStr(token.access_token),
expires_at=token.expires_at
),
},
provider_id="oauth2",
auto_refresh=True,
)
store.save_credential(credential)
```
### Refresh Token Flow
```python
# Save credential with refresh token
credential = CredentialObject(
id="user_oauth",
credential_type=CredentialType.OAUTH2,
keys={
"access_token": CredentialKey(
name="access_token",
value=SecretStr("ghp_xxxx"),
expires_at=datetime.now(timezone.utc) + timedelta(hours=1)
),
"refresh_token": CredentialKey(
name="refresh_token",
value=SecretStr("ghr_xxxx")
),
},
provider_id="oauth2",
auto_refresh=True,
)
store.save_credential(credential)
# When you retrieve the credential, it auto-refreshes if expired
token = store.get("user_oauth") # Automatically refreshed if needed
# Or manually refresh
store.refresh_credential("user_oauth")
```
### Token Lifecycle Manager
For more control over token lifecycle:
```python
from core.framework.credentials.oauth2 import TokenLifecycleManager
from datetime import timedelta
manager = TokenLifecycleManager(
credential_id="my_oauth",
provider=provider,
store=store,
refresh_buffer=timedelta(minutes=5), # Refresh 5 min before expiry
)
# Acquire token (refreshes if needed)
token = await manager.acquire_token()
# Use the token
headers = {"Authorization": f"Bearer {token.access_token}"}
```
---
## Implementing Custom Providers
Custom providers let you integrate with proprietary authentication systems.
### Provider Interface
```python
from abc import ABC, abstractmethod
from typing import List
from core.framework.credentials import CredentialObject, CredentialType
class CredentialProvider(ABC):
"""Abstract base for credential providers."""
@property
@abstractmethod
def provider_id(self) -> str:
"""Unique identifier for this provider."""
pass
@property
@abstractmethod
def supported_types(self) -> List[CredentialType]:
"""Credential types this provider handles."""
pass
@abstractmethod
def refresh(self, credential: CredentialObject) -> CredentialObject:
"""Refresh the credential and return updated version."""
pass
@abstractmethod
def validate(self, credential: CredentialObject) -> bool:
"""Check if credential is still valid."""
pass
def should_refresh(self, credential: CredentialObject) -> bool:
"""Determine if credential needs refresh (optional override)."""
# Default: check expiration with 5-minute buffer
...
def revoke(self, credential: CredentialObject) -> bool:
"""Revoke credential (optional, default returns False)."""
return False
```
### Example: Custom API Provider
```python
from datetime import datetime, timedelta, timezone
from typing import List
from pydantic import SecretStr
from core.framework.credentials import (
CredentialKey,
CredentialObject,
CredentialProvider,
CredentialRefreshError,
CredentialType,
)
class MyCustomProvider(CredentialProvider):
"""
Custom provider for MyService API tokens.
MyService issues tokens that expire after 24 hours and can be
refreshed using the original API key.
"""
def __init__(self, base_url: str = "https://api.myservice.com"):
self.base_url = base_url
@property
def provider_id(self) -> str:
return "myservice"
@property
def supported_types(self) -> List[CredentialType]:
return [CredentialType.CUSTOM]
def refresh(self, credential: CredentialObject) -> CredentialObject:
"""Refresh the access token using the API key."""
import httpx
api_key = credential.get_key("api_key")
if not api_key:
raise CredentialRefreshError(
f"Credential '{credential.id}' missing api_key for refresh"
)
# Call MyService API to get new token
try:
response = httpx.post(
f"{self.base_url}/auth/token",
headers={"X-API-Key": api_key},
timeout=30,
)
response.raise_for_status()
data = response.json()
except httpx.HTTPError as e:
raise CredentialRefreshError(f"Token refresh failed: {e}") from e
# Update credential with new token
credential.set_key(
"access_token",
data["access_token"],
expires_at=datetime.now(timezone.utc) + timedelta(hours=24),
)
credential.last_refreshed = datetime.now(timezone.utc)
return credential
def validate(self, credential: CredentialObject) -> bool:
"""Check if access_token exists and is not expired."""
access_key = credential.keys.get("access_token")
if access_key is None:
return False
return not access_key.is_expired
def should_refresh(self, credential: CredentialObject) -> bool:
"""Refresh if token expires within 1 hour."""
access_key = credential.keys.get("access_token")
if access_key is None or access_key.expires_at is None:
return False
buffer = timedelta(hours=1)
return datetime.now(timezone.utc) >= (access_key.expires_at - buffer)
def revoke(self, credential: CredentialObject) -> bool:
"""Revoke the access token."""
import httpx
access_token = credential.get_key("access_token")
if not access_token:
return False
try:
response = httpx.post(
f"{self.base_url}/auth/revoke",
headers={"Authorization": f"Bearer {access_token}"},
timeout=30,
)
return response.status_code == 200
except httpx.HTTPError:
return False
```
### Registering Custom Providers
```python
from core.framework.credentials import CredentialStore, InMemoryStorage
# Create store with custom provider
provider = MyCustomProvider(base_url="https://api.myservice.com")
store = CredentialStore(
storage=InMemoryStorage(),
providers=[provider],
)
# Or register after creation
store.register_provider(provider)
# Save a credential that uses this provider
credential = CredentialObject(
id="myservice_prod",
credential_type=CredentialType.CUSTOM,
keys={
"api_key": CredentialKey(
name="api_key",
value=SecretStr("my-permanent-api-key")
),
},
provider_id="myservice", # Links to our custom provider
auto_refresh=True,
)
store.save_credential(credential)
# The store will use MyCustomProvider for refresh/validate
token = store.get("myservice_prod") # Auto-refreshes if needed
```
### Example: Extending OAuth2 for a Specific Service
```python
from core.framework.credentials.oauth2 import BaseOAuth2Provider, OAuth2Config, OAuth2Token
class GitHubOAuth2Provider(BaseOAuth2Provider):
"""GitHub-specific OAuth2 provider with custom scopes handling."""
def __init__(self, client_id: str, client_secret: str):
config = OAuth2Config(
token_url="https://github.com/login/oauth/access_token",
authorization_url="https://github.com/login/oauth/authorize",
client_id=client_id,
client_secret=client_secret,
default_scopes=["repo", "read:user"],
)
super().__init__(config)
@property
def provider_id(self) -> str:
return "github_oauth2"
def _parse_token_response(self, response_data: dict) -> OAuth2Token:
"""GitHub returns scope as space-separated string."""
token = super()._parse_token_response(response_data)
# GitHub-specific: tokens don't expire unless revoked
# But we set a reasonable refresh interval
if token.expires_at is None:
token.expires_at = datetime.now(timezone.utc) + timedelta(days=30)
return token
def validate(self, credential: CredentialObject) -> bool:
"""Validate by making a test API call to GitHub."""
import httpx
access_token = credential.get_key("access_token")
if not access_token:
return False
try:
response = httpx.get(
"https://api.github.com/user",
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/vnd.github+json",
},
timeout=10,
)
return response.status_code == 200
except httpx.HTTPError:
return False
```
---
## Testing with Credentials
### Using the Testing Factory
```python
from core.framework.credentials import CredentialStore
# Create a test store with mock credentials
store = CredentialStore.for_testing({
"brave_search": {"api_key": "test-brave-key"},
"github_oauth": {
"access_token": "test-github-token",
"refresh_token": "test-refresh-token",
},
})
# Use in tests
def test_my_tool():
api_key = store.get("brave_search")
assert api_key == "test-brave-key"
headers = store.resolve_headers({
"Authorization": "Bearer {{github_oauth.access_token}}"
})
assert headers["Authorization"] == "Bearer test-github-token"
```
### Using with CredentialStoreAdapter (Backward Compatible)
```python
from aden_tools.credentials import CredentialStoreAdapter
# For testing existing tools
credentials = CredentialStoreAdapter.for_testing({
"brave_search": "test-key",
"openai": "test-openai-key",
})
# Existing API works
assert credentials.get("brave_search") == "test-key"
credentials.validate_for_tools(["web_search"]) # No error
```
### Mocking in Unit Tests
```python
import pytest
from unittest.mock import MagicMock, patch
def test_tool_with_mocked_store():
# Create a mock store
mock_store = MagicMock()
mock_store.get.return_value = "mocked-api-key"
mock_store.resolve_headers.return_value = {
"Authorization": "Bearer mocked-token"
}
# Inject into your tool
with patch("my_tool.credential_store", mock_store):
result = my_tool.make_api_call()
mock_store.get.assert_called_once_with("api_credential")
```
---
## Migration from CredentialManager
If you're using the existing `CredentialManager`, migration is straightforward.
### Option 1: Use the Adapter (No Code Changes)
```python
# Before
from aden_tools.credentials import CredentialManager
credentials = CredentialManager()
# After - using adapter with new store backend
from aden_tools.credentials import CredentialStoreAdapter
from core.framework.credentials import CredentialStore
store = CredentialStore.with_encrypted_storage("~/.hive/credentials")
credentials = CredentialStoreAdapter(store)
# All existing code works unchanged
api_key = credentials.get("brave_search")
credentials.validate_for_tools(["web_search"])
```
### Option 2: Use Environment Storage (Identical Behavior)
```python
from aden_tools.credentials import CredentialStoreAdapter
# Creates adapter backed by environment variables
credentials = CredentialStoreAdapter.with_env_storage()
# Behaves exactly like original CredentialManager
api_key = credentials.get("brave_search")
```
### Option 3: Gradual Migration
```python
from aden_tools.credentials import CredentialStoreAdapter
from core.framework.credentials import CredentialStore, CompositeStorage, EncryptedFileStorage, EnvVarStorage
# Use encrypted storage as primary, env vars as fallback
storage = CompositeStorage(
primary=EncryptedFileStorage("~/.hive/credentials"),
fallbacks=[EnvVarStorage({"brave_search": "BRAVE_SEARCH_API_KEY"})]
)
store = CredentialStore(storage=storage)
credentials = CredentialStoreAdapter(store)
# New credentials go to encrypted storage
# Old env var credentials still work as fallback
```
---
## Security Best Practices
### 1. Use Encrypted Storage in Production
```python
# Always use EncryptedFileStorage for production
store = CredentialStore.with_encrypted_storage("~/.hive/credentials")
```
### 2. Protect the Encryption Key
```bash
# Set encryption key as environment variable
export HIVE_CREDENTIAL_KEY="your-fernet-key"
# Or use a secrets manager
export HIVE_CREDENTIAL_KEY=$(vault kv get -field=key secret/hive/credential-key)
```
### 3. Use SecretStr for Values
```python
from pydantic import SecretStr
# SecretStr prevents accidental logging
key = CredentialKey(
name="api_key",
value=SecretStr("sensitive-value") # Won't appear in logs
)
# Explicitly extract when needed
actual_value = key.get_secret_value()
```
### 4. Set Appropriate Expiration
```python
# Always set expiration for tokens
credential.set_key(
"access_token",
token_value,
expires_at=datetime.now(timezone.utc) + timedelta(hours=1)
)
```
### 5. Enable Auto-Refresh
```python
credential = CredentialObject(
id="my_oauth",
auto_refresh=True, # Automatically refresh before expiry
provider_id="oauth2",
...
)
```
### 6. Validate Before Use
```python
# Check credential validity before making API calls
if not store.is_available("api_credential"):
raise RuntimeError("Required credential not available")
# Or use validation
errors = store.validate_for_usage("api_credential")
if errors:
raise RuntimeError(f"Credential validation failed: {errors}")
```
### 7. Use Template Resolution
```python
# Don't interpolate secrets manually
# Bad:
headers = {"Authorization": f"Bearer {store.get('token')}"}
# Good - uses template resolution which handles errors gracefully:
headers = store.resolve_headers({
"Authorization": "Bearer {{my_oauth.access_token}}"
})
```
---
## API Reference
### CredentialStore
| Method | Description |
|--------|-------------|
| `get(credential_id)` | Get default key value |
| `get_key(credential_id, key_name)` | Get specific key value |
| `get_credential(credential_id)` | Get full credential object |
| `save_credential(credential)` | Save credential to storage |
| `save_api_key(id, value)` | Convenience for simple API keys |
| `delete_credential(credential_id)` | Delete a credential |
| `is_available(credential_id)` | Check if credential exists and has value |
| `resolve(template)` | Resolve template string |
| `resolve_headers(headers)` | Resolve templates in headers dict |
| `resolve_params(params)` | Resolve templates in params dict |
| `refresh_credential(credential_id)` | Manually refresh a credential |
| `register_provider(provider)` | Register a custom provider |
| `for_testing(credentials)` | Create test store with mock data |
| `with_encrypted_storage(path)` | Create store with encrypted files |
| `with_env_storage(mapping)` | Create store with env var backend |
### CredentialObject
| Property/Method | Description |
|-----------------|-------------|
| `id` | Unique identifier |
| `credential_type` | Type (API_KEY, OAUTH2, etc.) |
| `keys` | Dict of CredentialKey objects |
| `get_key(name)` | Get key value by name |
| `set_key(name, value, ...)` | Set or update a key |
| `has_key(name)` | Check if key exists |
| `get_default_key()` | Get default key value |
| `needs_refresh` | True if any key is expired |
| `is_valid` | True if has valid, non-expired key |
| `auto_refresh` | Whether to auto-refresh |
| `provider_id` | ID of provider for lifecycle |
### CredentialProvider
| Method | Description |
|--------|-------------|
| `provider_id` | Unique identifier (property) |
| `supported_types` | List of supported CredentialTypes (property) |
| `refresh(credential)` | Refresh and return updated credential |
| `validate(credential)` | Check if credential is valid |
| `should_refresh(credential)` | Check if refresh is needed |
| `revoke(credential)` | Revoke credential (optional) |
---
## Troubleshooting
### "Unknown credential" Error
```python
# Error: KeyError: "Unknown credential 'my_cred'"
# Solution: Check if credential exists
if store.get_credential("my_cred") is None:
print("Credential not found - need to save it first")
```
### "Credential not found" in Templates
```python
# Error: CredentialNotFoundError when resolving templates
# Solution 1: Ensure credential is saved
store.save_api_key("my_cred", "value")
# Solution 2: Use fail_on_missing=False
headers = store.resolve_headers(templates, fail_on_missing=False)
```
### Encryption Key Issues
```python
# Error: "Failed to decrypt credential"
# Solution: Ensure HIVE_CREDENTIAL_KEY matches what was used to encrypt
# If key is lost, credentials must be re-created
```
### Provider Not Found
```python
# Warning: "No provider found for credential 'x'"
# Solution: Register the provider or set provider_id=None for static credentials
store.register_provider(MyProvider())
# Or use static provider (default)
credential.provider_id = "static" # or None
```
---
## Further Reading
- [Credential Store Design Document](credential-store-design.md)
- [OAuth2 RFC 6749](https://datatracker.ietf.org/doc/html/rfc6749)
- [Fernet Encryption](https://cryptography.io/en/latest/fernet/)
-552
View File
@@ -1,552 +0,0 @@
# Credential System: Complete Code Path Analysis
## Architecture Overview
```
┌──────────────┐
│ AgentRunner │ runner.py:_validate_credentials()
└──────┬───────┘
┌──────▼───────┐
│ validation │ validate_agent_credentials()
│ (2-phase) │ Phase 1: presence Phase 2: health check
└──────┬───────┘
┌─────────────▼─────────────┐
│ CredentialStore │ store.py
│ (cache + provider mgmt) │
└─────────────┬─────────────┘
┌───────────────────┼───────────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌───────▼───────┐
│ EnvVarStorage│ │ Encrypted │ │ AdenCached │
│ (primary) │ │ FileStorage │ │ Storage │
└─────────────┘ │ (fallback) │ │ (Aden sync) │
└─────────────┘ └───────┬───────┘
┌───────▼───────┐
│AdenSyncProvider│
│+ AdenClient │
└───────────────┘
```
### Key Files
| Layer | File | Purpose |
|-------|------|---------|
| Models | `core/framework/credentials/models.py` | `CredentialObject`, `CredentialKey`, exception hierarchy |
| Storage | `core/framework/credentials/storage.py` | `EncryptedFileStorage`, `EnvVarStorage`, `CompositeStorage` |
| Store | `core/framework/credentials/store.py` | `CredentialStore` — cache, providers, refresh |
| Validation | `core/framework/credentials/validation.py` | `validate_agent_credentials()` — two-phase pre-flight check |
| Setup | `core/framework/credentials/setup.py` | `CredentialSetupSession` — interactive credential collection |
| Aden client | `core/framework/credentials/aden/client.py` | `AdenCredentialClient` — HTTP calls to api.adenhq.com |
| Aden provider | `core/framework/credentials/aden/provider.py` | `AdenSyncProvider` — refresh, sync, fetch |
| Aden storage | `core/framework/credentials/aden/storage.py` | `AdenCachedStorage` — local cache + Aden fallback |
| Specs | `tools/src/aden_tools/credentials/` | `CredentialSpec` per integration (env_var, health check, etc.) |
| Runner | `core/framework/runner/runner.py` | `_validate_credentials()` — agent startup gate |
| TUI | `core/framework/tui/screens/credential_setup.py` | `CredentialSetupScreen` — modal credential form |
| TUI app | `core/framework/tui/app.py` | `_show_credential_setup()`, `_load_and_switch_agent()` |
### Exception Hierarchy
```
CredentialError ← base (caught by runner + TUI)
├── CredentialDecryptionError ← corrupted/wrong-key .enc files
├── CredentialKeyNotFoundError ← key name not in credential
├── CredentialNotFoundError ← credential ID not found
├── CredentialRefreshError ← refresh failed (e.g., revoked OAuth)
└── CredentialValidationError ← schema/format invalid
```
---
## Scenario 1: User Supplies Correct Credential
### Flow
```
AgentRunner._setup()
→ _ensure_credential_key_env() # validation.py:16
│ Loads HIVE_CREDENTIAL_KEY, ADEN_API_KEY from shell config into os.environ
→ _validate_credentials() # runner.py:418
→ validate_agent_credentials(nodes) # validation.py:94
│ Phase 0: Aden pre-sync (if ADEN_API_KEY set)
│ → _presync_aden_tokens() # validation.py:50
│ → CredentialStore.with_aden_sync(auto_sync=True)
│ → For each aden_supported spec: get_key() → set os.environ
│ Build store:
│ EnvVarStorage (primary) + EncryptedFileStorage (fallback if HIVE_CREDENTIAL_KEY set)
│ Phase 1: Presence check
│ → store.is_available(cred_id)
│ → EnvVarStorage.load() → os.environ[env_var] → CredentialObject ✓
│ Result: NOT in missing list
│ Phase 2: Health check (if spec.health_check_endpoint set)
│ → check_credential_health(cred_name, value)
│ e.g., Anthropic: POST /v1/messages → 400 (key valid, request malformed) → valid=True
│ e.g., Brave: GET /search?q=test → 200 → valid=True
│ Result: NOT in invalid list
│ errors = [] → returns normally ✓
```
### What Happens
- Validation passes silently
- Agent loads and runs
- No files written, no user-visible output
- `CredentialStore._cache` populated (5-min TTL)
---
## Scenario 2: User Supplies Wrong Credential
### Flow
```
validate_agent_credentials(nodes)
│ Phase 1: Presence check
│ → store.is_available("anthropic")
│ → EnvVarStorage.load() → os.environ["ANTHROPIC_API_KEY"] = "wrong-key"
│ → Returns CredentialObject ✓ (value exists, content not validated)
│ Result: passes presence check, added to to_verify list
│ Phase 2: Health check
│ → check_credential_health("anthropic", credential_object)
│ → AnthropicHealthChecker: POST /v1/messages with x-api-key: "wrong-key"
│ → Response: 401 Unauthorized
│ → HealthCheckResult(valid=False, message="API key is invalid")
│ → Added to invalid list, cred_name added to failed_cred_names
│ CredentialError raised:
│ "Invalid or expired credentials:
│ ANTHROPIC_API_KEY for event_loop nodes — Anthropic API key is invalid
│ Get a new key at: https://console.anthropic.com/settings/keys"
│ exc.failed_cred_names = ["anthropic"]
```
### TUI Path (non-interactive)
```
_load_and_switch_agent() # app.py:356
except CredentialError as e: # app.py:382
→ _show_credential_setup(agent_path, e) # app.py:404
→ build_setup_session_from_error(e) # validation.py:253
→ failed_cred_names = ["anthropic"]
→ Creates MissingCredential for anthropic
→ push_screen(CredentialSetupScreen)
```
### CLI Path (interactive with TTY)
```
_validate_credentials() # runner.py:418
except CredentialError as e: # runner.py:440
→ print(str(e), file=sys.stderr)
→ session = build_setup_session_from_error(e)
→ session.run_interactive() # Terminal prompts
→ validate_agent_credentials(nodes) # Re-validate
```
### What User Sees
- TUI: Credential setup modal with the invalid credential's input field
- CLI: Error message printed, interactive prompts
### Silent Failure Risk
If `check_credential_health()` itself throws (network timeout, DNS failure, import error),
it's caught at `validation.py:231`:
```python
except Exception as exc:
logger.debug("Health check for %s failed: %s", cred_name, exc)
```
The credential is NOT added to `invalid`. **Agent starts with a bad key.** Only `logger.debug`
records the issue.
---
## Scenario 3: Credential Expired But Can Be Refreshed
Applies to OAuth2 credentials (Google, HubSpot, etc.) managed via AdenSyncProvider.
### Flow: Token Refresh During Runtime
```
CredentialStore.get_credential(cred_id, refresh_if_needed=True) # store.py:176
│ Check cache → cached credential found
│ → _should_refresh(cached) # store.py:442
│ → AdenSyncProvider.should_refresh(credential) # provider.py:238
│ → access_key = credential.keys["access_token"]
│ → datetime.now(UTC) >= (expires_at - 5min buffer)
│ → Returns True (within refresh window)
│ → _refresh_credential(cached) # store.py:456
│ → AdenSyncProvider.refresh(credential) # provider.py:151
│ → client.request_refresh(credential.id) # client.py:356
│ → POST /v1/credentials/{id}/refresh
│ → Server refreshes OAuth token, returns new access_token
│ → _update_credential_from_aden(credential, response)
│ → Updates access_token value + expires_at
│ → storage.save(refreshed) # Writes new .enc file
│ → _add_to_cache(refreshed) # Updates in-memory cache
│ → Returns refreshed credential ✓
```
### Flow: Expired Token Caught During Validation
```
validate_agent_credentials(nodes)
│ Phase 0: _presync_aden_tokens()
│ → CredentialStore.with_aden_sync(auto_sync=True)
│ → provider.sync_all() fetches fresh tokens from Aden
│ → Fresh token set in os.environ ✓
│ Phase 2: Health check with fresh token → valid=True ✓
```
### What Happens
- Refresh is transparent to the user
- New token written to `~/.hive/credentials/credentials/{id}.enc`
- In-memory cache updated
- Logged: `INFO: Refreshed credential '{id}' via Aden server`
---
## Scenario 4: Credential Expired and Cannot Be Refreshed
OAuth refresh token is revoked (user disconnected integration on hive.adenhq.com, or
the refresh token itself expired).
### Flow: Refresh Attempt
```
AdenSyncProvider.refresh(credential) # provider.py:151
→ client.request_refresh(credential.id) # client.py:356
→ POST /v1/credentials/{id}/refresh
→ Response: 400 {"error": "refresh_failed",
│ "requires_reauthorization": true,
│ "reauthorization_url": "https://..."}
→ AdenRefreshError raised # client.py:297
except AdenRefreshError as e: # provider.py:186
→ logger.error("Aden refresh failed for '{id}': ...")
→ raise CredentialRefreshError(
"Integration '{id}' requires re-authorization. Visit: ..."
)
```
### What CredentialStore Does
```
CredentialStore._refresh_credential(credential) # store.py:456
except CredentialRefreshError as e: # store.py:474
→ logger.error("Failed to refresh credential '{id}': ...")
→ return credential ← RETURNS STALE/EXPIRED CREDENTIAL!
```
**BUG: Silent failure.** The store returns the expired credential without raising.
The caller gets an expired token. Downstream API calls fail with 401.
### During Validation
If validation runs health check on the expired token:
```
check_credential_health() → 401 → valid=False
→ Added to invalid list → CredentialError raised
→ TUI shows credential setup screen
```
### Gap: Token Expires After Validation
If the token expires **during agent execution** (after validation passed):
- Refresh fails silently (returns stale credential)
- Tool call gets 401 from downstream API
- LLM sees tool error, no framework-level recovery
---
## Scenario 5: Credential Store File Sabotaged (Wrong Content)
File `~/.hive/credentials/credentials/{id}.enc` replaced with valid Fernet-encrypted
content encoding wrong JSON (e.g., `{"bad": "data"}`).
### Flow
```
EncryptedFileStorage.load(credential_id) # storage.py:193
→ fernet.decrypt(encrypted) # Succeeds (valid Fernet)
→ json.loads(decrypted) # Succeeds (valid JSON)
→ _deserialize_credential(data) # storage.py:252
→ CredentialObject.model_validate({"bad": "data"})
```
### Sub-case A: Missing `id` field
```
CredentialObject.model_validate({"bad": "data"})
→ Pydantic ValidationError: "id - Field required"
→ NOT caught by EncryptedFileStorage's try/except (only covers decrypt + json.loads)
→ Propagates up uncaught
```
**TUI**: Caught by generic `except Exception` in `_load_and_switch_agent()` (app.py:389):
```
self.notify("Failed to load agent: 1 validation error for CredentialObject...", severity="error")
```
User sees generic error notification. NOT a credential setup screen. **Not actionable.**
**CLI**: Unhandled traceback.
### Sub-case B: Valid `id` but wrong/empty keys
```
CredentialObject.model_validate({"id": "my_cred", "keys": {}})
→ Valid CredentialObject with keys={} (Pydantic extra="allow", keys defaults to {})
→ store.is_available() → get_credential() returns CredentialObject
→ But get() / get_key() returns None → is_available returns False
→ Treated as "missing" credential
```
User sees credential setup screen as if the credential was never configured.
**The actual cause (sabotaged file) is hidden.**
---
## Scenario 6: Credential Store File Corrupted (Binary Garbage)
File `~/.hive/credentials/credentials/{id}.enc` contains random binary data.
### Flow
```
EncryptedFileStorage.load(credential_id) # storage.py:193
→ fernet.decrypt(binary_garbage)
→ Raises cryptography.fernet.InvalidToken
→ Caught by except Exception: # storage.py:210
→ raise CredentialDecryptionError(
"Failed to decrypt credential '{id}': InvalidToken"
)
```
### Propagation
```
CredentialDecryptionError (subclass of CredentialError)
→ CompositeStorage.load(): NOT caught → propagates
→ CredentialStore.get_credential(): NOT caught → propagates
→ validate_agent_credentials() → propagates out entirely
```
**TUI** (app.py:382):
```python
except CredentialError as e: # CATCHES CredentialDecryptionError
self._show_credential_setup(str(agent_path), credential_error=e)
```
Shows credential setup screen! But `CredentialDecryptionError` does NOT have
`failed_cred_names` attribute → `getattr(e, "failed_cred_names", [])` returns `[]`
→ session falls back to `from_agent_path()` detection.
User sees credential setup screen as if credential is missing.
**Corruption is hidden.** Re-entering the credential overwrites the corrupted file.
### CompositeStorage Bug
If `CompositeStorage(primary=EnvVarStorage, fallbacks=[EncryptedFileStorage])` is used,
the storage tries primary first. But if `EncryptedFileStorage` is a fallback and
the .enc file is corrupted:
```
CompositeStorage.load()
→ primary (EnvVarStorage) → env var IS set → returns CredentialObject ✓
```
The corrupted fallback is never touched. **This case works fine.**
But if the storage order is reversed (encrypted primary, env fallback):
```
CompositeStorage.load()
→ primary (EncryptedFileStorage) → CredentialDecryptionError
→ NOT caught → propagates ← BUG: fallback never tried
```
The exception from primary propagates BEFORE checking the fallback.
**A corrupted .enc file blocks access even when the env var has a valid value.**
---
## Scenario 7: ADEN_API_KEY Set But Vendor OAuth Not Authorized
User has valid `ADEN_API_KEY`. Agent needs HubSpot/Google. User has NOT connected
that integration on hive.adenhq.com.
### Flow
```
validate_agent_credentials(nodes)
│ Phase 0: _presync_aden_tokens()
│ → CredentialStore.with_aden_sync(auto_sync=True)
│ → provider.sync_all(store)
│ → client.list_integrations() # GET /v1/credentials
│ → HubSpot NOT in response (never connected)
│ → Only connected integrations synced
│ → For hubspot spec: get_key("hubspot", "access_token")
│ → AdenCachedStorage.load("hubspot")
│ → _provider_index.get("hubspot") → None (not synced)
│ → _load_by_id("hubspot")
│ → local: None (not cached)
│ → aden: fetch_from_aden("hubspot")
│ → GET /v1/credentials/hubspot → 404
│ → AdenNotFoundError caught → returns None
│ → Returns None
│ → get_key returns None
│ → os.environ["HUBSPOT_ACCESS_TOKEN"] NOT set
│ Phase 1: Presence check
│ → _check_credential(hubspot_spec, "hubspot", "hubspot tools")
│ → store.is_available("hubspot") → False
│ → has_aden_key=True, aden_supported=True, direct_api_key_supported=False
│ → Goes into aden_not_connected list (NOT failed_cred_names)
│ CredentialError raised:
│ "Aden integrations not connected (ADEN_API_KEY is set but OAuth tokens unavailable):
│ HUBSPOT_ACCESS_TOKEN for hubspot tools
│ Connect this integration at hive.adenhq.com first."
│ exc.failed_cred_names = [] ← empty!
```
### TUI Behavior
```
_show_credential_setup(agent_path, credential_error=e)
→ build_setup_session_from_error(e)
→ failed_cred_names = [] → falls back to from_agent_path()
→ detect_missing_credentials_from_nodes() finds hubspot missing
→ session.missing = [MissingCredential(hubspot, aden_supported=True, ...)]
→ NOT empty → CredentialSetupScreen pushed
```
Setup screen shows ADEN_API_KEY input (already set). User clicks "Save & Continue":
```
_save_credentials()
→ ADEN_API_KEY already in env → configured += 1
→ _sync_aden_credentials()
→ provider.sync_all() → hubspot still not connected → synced=0
→ Notification: "No active integrations found in Aden."
→ For hubspot: store.is_available("hubspot") → False
→ Notification: "hubspot (id='hubspot') not found in Aden."
→ configured > 0 → dismiss(True)
```
TUI retries `_do_load_agent()` → validation fails again → **LOOP.**
### What User Sees
1. Setup screen appears, ADEN_API_KEY field shown
2. User clicks Save
3. Warning: "hubspot not found in Aden. Connect this integration at hive.adenhq.com first."
4. Screen dismisses (configured=1 from ADEN_API_KEY)
5. Agent reload fails → setup screen appears again
6. Repeat forever
### Root Cause
`configured += 1` fires when ADEN_API_KEY is saved, even though the actual needed
credential (hubspot OAuth token) was NOT obtained. The screen dismisses with "success"
but the agent still can't load.
---
## Known Silent Failure Points
| # | Location | What Happens | Risk |
|---|----------|-------------|------|
| 1 | `validation.py:231` | `check_credential_health()` throws → `logger.debug()` → credential treated as valid | Agent starts with bad key |
| 2 | `store.py:474-476` | `CredentialRefreshError` caught → returns stale credential | Tool calls fail with 401 at runtime |
| 3 | `store.py:706-708` | `with_aden_sync()` catches all Exception → falls back to local-only store silently | Aden sync failure invisible |
| 4 | `provider.py:312-313` | Individual integration sync fails → `logger.warning()` → skipped | Integration silently missing |
| 5 | `credential_setup.py:262-263` | `_persist_to_local_store()``except Exception: pass` | Credential lost on restart |
| 6 | `storage.py:489-501` | `CompositeStorage.load()` doesn't catch primary storage exceptions | Corrupted .enc blocks env var fallback |
| 7 | `validation.py:63-65` | `_presync_aden_tokens()` catches all Exception → `logger.warning()` | Aden tokens not refreshed, stale values used |
---
## Storage Priority Order
### During Validation (`validate_agent_credentials`)
```
1. os.environ (via EnvVarStorage) ← WINS if set
2. ~/.hive/credentials/credentials/*.enc ← fallback (only if HIVE_CREDENTIAL_KEY set)
```
### During Runtime (`CredentialStoreAdapter.default()`)
```
1. EncryptedFileStorage ← primary (if HIVE_CREDENTIAL_KEY set)
2. EnvVarStorage ← fallback
3. AdenSyncProvider ← if ADEN_API_KEY set, auto-refresh on access
```
**Note: validation and runtime use DIFFERENT storage priority orders.** Validation
prefers env vars; runtime prefers encrypted store. This means a credential can pass
validation (from env) but fail at runtime (encrypted store has stale value and env
var was only set in the validation process, not persisted).
### During TUI Credential Setup (`_sync_aden_credentials`)
```
1. AdenSyncProvider.sync_all() ← fetches from Aden API
2. AdenCachedStorage ← local encrypted cache
(no EnvVarStorage in this path)
```
---
## File Locations on Disk
```
~/.hive/
credentials/
credentials/ # EncryptedFileStorage base
{credential_id}.enc # Fernet-encrypted JSON
key.txt # HIVE_CREDENTIAL_KEY (generated if missing)
configuration.json # Global config
```
### .enc File Format (decrypted)
```json
{
"id": "hubspot",
"credential_type": "oauth2",
"keys": {
"access_token": {
"name": "access_token",
"value": "ya29.a0ARrdaM...",
"expires_at": "2025-01-15T12:00:00+00:00"
},
"_aden_managed": {
"name": "_aden_managed",
"value": "true"
},
"_integration_type": {
"name": "_integration_type",
"value": "hubspot"
}
},
"provider_id": "aden_sync",
"auto_refresh": true
}
```
The `_integration_type` key is used by `AdenCachedStorage._index_provider()` to map
provider names (e.g., "hubspot") to hash-based credential IDs from Aden.
-30
View File
@@ -217,36 +217,6 @@ Follow the prompts to:
This step establishes the core concepts and rules needed before building an agent.
### 4. Apply Agent Patterns
```
claude> pattern guidance
```
Follow the prompts to:
1. Apply best-practice agent design patterns
2. Add pause/resume flows for multi-turn interactions
3. Improve robustness with routing, fallbacks, and retries
4. Avoid common anti-patterns during agent construction
This step helps optimize agent design before final testing.
### 5. Test Your Agent
```
claude> test workflow
```
Follow the prompts to:
1. Generate test guidelines for constraints and success criteria
2. Write agent tests directly under `exports/{agent}/tests/`
3. Run goal-based evaluation tests
4. Debug failing tests and iterate on agent improvements
This step verifies that the agent meets its goals before production use.
## Troubleshooting
### "externally-managed-environment" error (PEP 668)
+3 -3
View File
@@ -17,7 +17,7 @@ The fastest way to get started:
```bash
# 1. Clone the repository
git clone https://github.com/adenhq/hive.git
git clone https://github.com/aden-hive/hive.git
cd hive
# 2. Run automated setup
@@ -31,7 +31,7 @@ uv run python -c "import framework; import aden_tools; print('✓ Setup complete
```powershell
# 1. Clone the repository
git clone https://github.com/adenhq/hive.git
git clone https://github.com/aden-hive/hive.git
cd hive
# 2. Run automated setup
@@ -240,6 +240,6 @@ pip uninstall -y framework tools
## Getting Help
- **Documentation**: Check the `/docs` folder
- **Issues**: [github.com/adenhq/hive/issues](https://github.com/adenhq/hive/issues)
- **Issues**: [github.com/adenhq/hive/issues](https://github.com/aden-hive/hive/issues)
- **Discord**: [discord.com/invite/MXE49hrKDk](https://discord.com/invite/MXE49hrKDk)
- **Build Agents**: Use the coder-tools workflow to create agents
-110
View File
@@ -1,110 +0,0 @@
# Hive Coder: Meta-Agent Integration Plan
## Problem
The hive_coder agent currently has 7 file I/O tools (`read_file`, `write_file`, `edit_file`, `list_directory`, `search_files`, `run_command`, `undo_changes`) in `tools/coder_tools_server.py`. It can write agent packages but is **not integrated into the Hive ecosystem**:
1. **No dynamic tool discovery** — It references a static list of hive-tools in `reference/framework_guide.md`. It can't discover what MCP tools are actually available or what parameters they accept.
2. **No runtime observability** — It can't inspect sessions, checkpoints, or logs from agents it builds. When something goes wrong, the user has to manually dig through files.
3. **No test execution** — It can't run an agent's test suite structurally (it could use `run_command` with raw pytest, but has no structured test parsing).
## Solution
Add 8 new tools to `coder_tools_server.py` that give hive_coder deep integration with the Hive framework. Update the system prompt to teach the LLM when and how to use these meta-agent capabilities.
---
## New Tools
### 1. Tool Discovery
**`discover_mcp_tools(server_config_path?)`**
Connect to any MCP server and list all available tools with full schemas. Uses `framework.runner.mcp_client.MCPClient` — the same client the runtime uses. Reads a `mcp_servers.json` file (defaults to hive-tools), connects to each server, calls `list_tools()`, returns tool names + descriptions + input schemas, then disconnects.
This replaces the static tools reference. The LLM now discovers tools dynamically before designing an agent.
### 2. Agent Inventory
**`list_agents()`**
Scan `exports/` for agent packages and `~/.hive/agents/` for runtime data. Returns agent names, descriptions (from `__init__.py`), and session counts. Gives the LLM awareness of what already exists.
### 3-7. Session & Checkpoint Inspection
Ported from the former `agent_builder_server.py`. Pure filesystem reads — JSON + pathlib, zero framework imports.
| Tool | Purpose |
|------|---------|
| `list_agent_sessions(agent_name, status?, limit?)` | List sessions, filterable by status |
| `list_agent_checkpoints(agent_name, session_id)` | List checkpoints for debugging |
| `get_agent_checkpoint(agent_name, session_id, checkpoint_id?)` | Load a checkpoint's full state |
**Key difference from the old agent-builder server:** These tools accept `agent_name` (e.g. `"deep_research_agent"`) instead of raw `agent_work_dir` paths. They resolve to `~/.hive/agents/{agent_name}/` internally. Friendlier for the LLM.
### 8. Test Execution
**`run_agent_tests(agent_name, test_types?, fail_fast?)`**
Ported from the former `agent_builder_server.py`. Runs pytest on an agent's test suite, sets PYTHONPATH automatically, parses output into structured results (passed/failed/skipped counts, per-test status, failure details).
---
## Files to Modify
### `tools/coder_tools_server.py` (~400 new lines)
Add all 8 tools after the existing `undo_changes` tool:
```
# ── Meta-agent: Tool discovery ────────────────────────────────
# discover_mcp_tools()
# ── Meta-agent: Agent inventory ───────────────────────────────
# list_agents()
# ── Meta-agent: Session & checkpoint inspection ───────────────
# _resolve_hive_agent_path(), _read_session_json(), _scan_agent_sessions(), _truncate_value()
# list_agent_sessions(), list_agent_checkpoints(), get_agent_checkpoint()
# list_agent_checkpoints(), get_agent_checkpoint()
# ── Meta-agent: Test execution ────────────────────────────────
# run_agent_tests()
```
### `exports/hive_coder/nodes/__init__.py`
- Add 8 new tool names to the `tools` list
- Rewrite system prompt "Tools Available" section with meta-agent tools
- Add "Meta-Agent Capabilities" section teaching:
- Tool discovery before designing agents
- Post-build test execution
- Debugging via session/checkpoint inspection
- Agent awareness via `list_agents()`
### `exports/hive_coder/agent.py`
- Update `identity_prompt` to mention dynamic tool discovery and runtime observability
- Add `dynamic-tool-discovery` constraint to the goal
### `exports/hive_coder/reference/framework_guide.md`
Replace static tools list with a note to use `discover_mcp_tools()` instead.
---
## What's NOT in Scope (deferred to v2)
- **Agent notifications / webhook listener** — Requires always-on listener architecture
- **`compare_agent_checkpoints`** — LLM can compare by reading two checkpoints sequentially
- **Runtime log query tools** — Available in hive-tools MCP; `run_command` can access them now
---
## Verification
1. MCP server starts with all 15 tools (7 existing + 8 new)
2. `discover_mcp_tools()` connects to hive-tools and returns real tool schemas
3. Agent validation passes (`default_agent.validate()`)
4. Session tools work against existing data in `~/.hive/agents/`
5. Smoke test: launch in TUI, ask it to discover tools
-37
View File
@@ -1,37 +0,0 @@
# Local API key credentials lack feature parity with Aden OAuth credentials
## Summary
The credential tester only surfaces accounts synced via Aden OAuth (requires `ADEN_API_KEY`). Users who authenticate services with a direct API key — Brave Search, GitHub, Exa, Google Maps, Stripe, Telegram, and many others — have no way to list, manage, or test those credentials through the same interface.
## Problem
Local API key credentials are completely flat today:
- **No namespace** — one env var per service (`BRAVE_SEARCH_API_KEY`), no aliases, no multi-account support
- **No identity metadata** — no way to record who owns a key (email, username, workspace)
- **No status tracking** — no "active / failed / unknown" state
- **Not visible in credential tester** — the account picker only calls the Aden API; it silently shows nothing if `ADEN_API_KEY` is absent
- **No management surface** — no list/add/delete/validate flow for API keys
Aden credentials have all of this: `integration_id`, alias, identity, status, health-check-on-sync, and a full listing API.
## Affected credentials (local-only by default)
Brave Search, Exa Search, Google Search (CSE), SerpAPI, GitHub, Google Maps, Telegram, Apollo, Stripe, Razorpay, Cal.com, BigQuery, GCP Vision, Resend, and more.
## Expected behavior
- Running the credential tester should surface **all** configured credentials — Aden-synced and local API keys together, in the same account picker
- Local API key accounts should support aliases (`work`, `personal`) so users can store multiple keys per service
- Identity metadata (username, email, workspace) should be extracted automatically via health check when a key is saved
- A status badge (`active` / `failed` / `unknown`) should indicate whether the key was last verified successfully
- The TUI should provide an "Add Local Credential" screen with a live health check
- The MCP `store_credential` / `list_stored_credentials` / `delete_stored_credential` tools should support aliases; a new `validate_credential` tool should allow re-checking a stored key at any time
## Root cause (bonus bug)
Even credentials configured with the existing `store_credential` MCP tool are invisible in the credential tester because:
1. `_list_env_fallback_accounts()` only checked env vars — it missed credentials stored in `EncryptedFileStorage` using the old flat format (`brave_search`, no alias)
2. `_activate_local_account()` early-returned for `alias == "default"`, assuming the env var was already set — but old flat encrypted credentials are not in `os.environ`
-594
View File
@@ -1,594 +0,0 @@
# Server & CLI Architecture: Shared Runtime Primitives
## Executive Summary
The `hive serve` HTTP server and the CLI commands (`hive run`, `hive shell`, `hive tui`) are two access layers built on top of the **same runtime primitives**. There is no separate "server runtime" — the HTTP server is a thin REST/SSE translation layer that delegates every operation to the same `AgentRunner`, `AgentRuntime`, `GraphExecutor`, and storage subsystems that the CLI uses directly.
---
## Architecture Overview
```mermaid
flowchart TB
subgraph Access["Access Layer"]
direction LR
subgraph CLI["CLI Access"]
Run["hive run"]
Shell["hive shell"]
TUI["hive tui"]
end
subgraph HTTP["HTTP Access (hive serve)"]
REST["REST Endpoints<br/>(aiohttp routes)"]
SSE["SSE Event Stream"]
SPA["Frontend SPA"]
end
end
subgraph Bridge["Server Bridge Layer"]
AM["AgentManager<br/>Multi-agent slot lifecycle"]
end
subgraph Core["Shared Runtime Core"]
AR["AgentRunner<br/>Load, validate, run agents"]
ART["AgentRuntime<br/>Multi-entry-point orchestration"]
GE["GraphExecutor<br/>Node execution, edge traversal"]
end
subgraph Storage["Shared Storage"]
SS["SessionStore"]
CS["CheckpointStore"]
RL["RuntimeLogger<br/>L1/L2/L3 logs"]
SM["SharedMemory"]
end
Run --> AR
Shell --> AR
TUI --> AR
REST --> AM
SSE --> AM
AM --> AR
AR --> ART
ART --> GE
GE --> SS
GE --> CS
GE --> RL
GE --> SM
```
### Key Insight
The only component unique to the HTTP server is `AgentManager` — a thin lifecycle wrapper that holds multiple `AgentSlot` instances concurrently. Each slot contains the **exact same objects** the CLI creates:
```python
@dataclass
class AgentSlot:
id: str
agent_path: Path
runner: AgentRunner # Same as CLI
runtime: AgentRuntime # Same as CLI
info: AgentInfo # Same as CLI
loaded_at: float
```
---
## The Shared Runtime Stack
### Layer 1: AgentRunner
The entry point for loading and running any agent, regardless of access mode.
```python
# CLI usage (hive run)
runner = AgentRunner.load("exports/my-agent", model="claude-sonnet-4-6")
result = await runner.run(input_data={"query": "hello"})
# Server usage (identical call inside AgentManager.load_agent)
runner = AgentRunner.load(agent_path, model=model, interactive=False)
```
**Responsibilities:**
- Load agents from `agent.json` or `agent.py`
- Discover tools from `tools.py` and `mcp_servers.json`
- Validate credentials before execution
- Provide `AgentInfo` and `ValidationResult` inspection
### Layer 2: AgentRuntime
The orchestrator for concurrent, multi-entry-point execution.
```python
# Both CLI (TUI/shell) and server use the same runtime
runtime = runner._agent_runtime
await runtime.start()
# Triggering execution — identical call in both modes
exec_id = await runtime.trigger("default", {"query": "hello"})
# Injecting user input — identical call in both modes
await runtime.inject_input(node_id="chat", content="user message")
# Subscribing to events — CLI uses for TUI, server uses for SSE
sub_id = runtime.subscribe_to_events([EventType.CLIENT_OUTPUT_DELTA], handler)
```
### Layer 3: GraphExecutor
Executes the agent graph node-by-node. Completely unaware of whether it was invoked from CLI or HTTP.
**Responsibilities:**
- Node execution following `GraphSpec` edges
- Edge condition evaluation and routing
- `SharedMemory` management across nodes
- Checkpoint creation for resumability
- HITL pause points at `client_facing` nodes
### Layer 4: Storage
All storage subsystems are shared — sessions, checkpoints, and logs written via CLI are readable via the HTTP server and vice versa.
```
~/.hive/agents/{agent_name}/
├── sessions/ # SessionStore
│ └── session_YYYYMMDD_HHMMSS_{uuid}/
│ ├── state.json # Session state
│ ├── conversations/ # Per-node EventLoop state
│ ├── artifacts/ # Large outputs
│ └── logs/ # L1/L2/L3 observability
│ ├── summary.json
│ ├── details.jsonl
│ └── tool_logs.jsonl
├── runtime_logs/ # RuntimeLogger
└── artifacts/ # Fallback storage
```
---
## HTTP Endpoint to Runtime Primitive Mapping
Every HTTP endpoint is a direct, thin delegation to a shared runtime method. No execution logic lives in the route handlers.
### Agent Lifecycle
| HTTP Endpoint | Method | Runtime Primitive |
|---|---|---|
| `POST /api/agents` | Load agent | `AgentRunner.load()``runtime.start()` |
| `DELETE /api/agents/{id}` | Unload agent | `runner.cleanup_async()` |
| `GET /api/agents/{id}` | Agent info | `runner.info()``AgentInfo` |
| `GET /api/agents/{id}/stats` | Statistics | Runtime metrics collection |
| `GET /api/agents/{id}/entry-points` | Entry points | `runtime.get_entry_points()` |
| `GET /api/agents/{id}/graphs` | List graphs | `runtime.list_graphs()` |
| `GET /api/discover` | Discover agents | Filesystem scan (same as `hive list`) |
### Execution Control
| HTTP Endpoint | Method | Runtime Primitive |
|---|---|---|
| `POST /api/agents/{id}/trigger` | Start execution | `runtime.trigger(entry_point_id, input_data)` |
| `POST /api/agents/{id}/chat` | Auto-route | `runtime.inject_input()` or `runtime.trigger()` |
| `POST /api/agents/{id}/inject` | Send user input | `runtime.inject_input(node_id, content)` |
| `POST /api/agents/{id}/resume` | Resume session | `runtime.trigger()` with `session_state` |
| `POST /api/agents/{id}/stop` | Pause execution | Cancels the execution task |
| `POST /api/agents/{id}/replay` | Replay checkpoint | Checkpoint restore → `runtime.trigger()` |
| `GET /api/agents/{id}/goal-progress` | Goal progress | `runtime.get_goal_progress()` |
### Event Streaming
| HTTP Endpoint | Method | Runtime Primitive |
|---|---|---|
| `GET /api/agents/{id}/events` | SSE stream | `runtime.subscribe_to_events()` |
Default event types streamed: `CLIENT_OUTPUT_DELTA`, `CLIENT_INPUT_REQUESTED`, `LLM_TEXT_DELTA`, `TOOL_CALL_STARTED`, `TOOL_CALL_COMPLETED`, `EXECUTION_STARTED`, `EXECUTION_COMPLETED`, `EXECUTION_FAILED`, `EXECUTION_PAUSED`, `NODE_LOOP_STARTED`, `NODE_LOOP_COMPLETED`, `EDGE_TRAVERSED`, `GOAL_PROGRESS`.
### Session Management
| HTTP Endpoint | Method | Runtime Primitive |
|---|---|---|
| `GET /api/agents/{id}/sessions` | List sessions | `SessionStore.list_sessions()` |
| `GET /api/agents/{id}/sessions/{sid}` | Session details | `SessionStore.read_state()` |
| `DELETE /api/agents/{id}/sessions/{sid}` | Delete session | `SessionStore.delete_session()` |
| `GET /api/agents/{id}/sessions/{sid}/checkpoints` | List checkpoints | `CheckpointStore.list_checkpoints()` |
| `POST /api/agents/{id}/sessions/{sid}/checkpoints/{cid}/restore` | Restore checkpoint | Checkpoint load → `runtime.trigger()` |
| `GET /api/agents/{id}/sessions/{sid}/messages` | Chat history | `ConversationStore` reads |
### Graph Inspection
| HTTP Endpoint | Method | Runtime Primitive |
|---|---|---|
| `GET /api/agents/{id}/graphs/{gid}/nodes` | List nodes | `GraphSpec` inspection |
| `GET /api/agents/{id}/graphs/{gid}/nodes/{nid}` | Node details | `GraphSpec` node lookup |
| `GET /api/agents/{id}/graphs/{gid}/nodes/{nid}/criteria` | Success criteria | Node criteria + judge verdicts |
### Logging
| HTTP Endpoint | Method | Runtime Primitive |
|---|---|---|
| `GET /api/agents/{id}/logs` | Agent logs | `RuntimeLogger` queries |
| `GET /api/agents/{id}/graphs/{gid}/nodes/{nid}/logs` | Node logs | `RuntimeLogger` node-scoped queries |
---
## What Differs Between CLI and HTTP
The differences are in the **access pattern**, not the runtime behavior.
| Concern | CLI | HTTP Server |
|---|---|---|
| **Multi-agent** | One runner per process | `AgentManager` holds N slots concurrently |
| **User input** | stdin (shell) / TUI widget | `POST /inject` or `POST /chat` |
| **Event streaming** | `subscribe_to_events()` → TUI update | Same subscription → SSE stream |
| **HITL approval** | `set_approval_callback()` + stdin | `CLIENT_INPUT_REQUESTED` event → `/inject` |
| **Agent lifecycle** | Process start → run → exit | Dynamic load/unload via REST calls |
| **Concurrency** | Sequential (one run at a time) | Async — multiple triggers, multiple agents |
| **Agent discovery** | `hive list` scans dirs | `GET /api/discover` scans dirs (same logic) |
| **Frontend** | Terminal / Textual TUI | React SPA served from `frontend/dist/` |
---
## The AgentManager Bridge
The only component unique to the HTTP server. It manages the lifecycle of multiple loaded agents within a single process.
```mermaid
flowchart LR
subgraph AgentManager
S1["Slot: support-agent<br/>runner + runtime + info"]
S2["Slot: research-agent<br/>runner + runtime + info"]
S3["Slot: code-agent<br/>runner + runtime + info"]
end
Load["POST /api/agents"] -->|"load_agent()"| AgentManager
Unload["DELETE /api/agents/{id}"] -->|"unload_agent()"| AgentManager
List["GET /api/agents"] -->|"list_agents()"| AgentManager
Get["GET /api/agents/{id}"] -->|"get_agent()"| AgentManager
Shutdown["Server shutdown"] -->|"shutdown_all()"| AgentManager
```
**Key design choices:**
- **Thread-safe** via `asyncio.Lock` — no race conditions during load/unload
- **Blocking I/O offloaded** — `AgentRunner.load()` runs in `run_in_executor` to avoid blocking the event loop
- **Same pattern as TUI** — the comment in source explicitly notes this: `# Blocking I/O — load in executor (same as tui/app.py:362-368)`
---
## How the `/chat` Endpoint Auto-Routes
The `/chat` endpoint demonstrates the thin-wrapper pattern. It checks runtime state and delegates:
```
POST /api/agents/{id}/chat { "message": "hello" }
Is any node waiting for input?
│ │
YES NO
│ │
▼ ▼
runtime.inject_input() runtime.trigger()
│ │
▼ ▼
{ "status": "injected", { "status": "started",
"node_id": "..." } "execution_id": "..." }
```
This is the same decision a human makes in the shell — if the agent is waiting for input, provide it; otherwise start a new execution.
---
## Concurrent Judge & Queen: Multi-Graph Monitoring Primitives
The Worker Health Judge and Queen triage system introduce **secondary graphs** that run alongside a primary worker graph within the same `AgentRuntime`. They share the runtime's `EventBus` but have fully isolated storage. This section documents the new runtime primitives, EventBus events, data models, and storage layout they introduce.
### Architecture
```
One AgentRuntime (shared EventBus)
|
+-- Worker Graph (primary) trigger_type: manual
| Entry point: "start" -> worker node (event_loop, client_facing)
|
+-- Health Judge Graph (secondary) trigger_type: timer (2 min)
| Entry point: "health_check" -> judge node (event_loop, autonomous)
| isolation_level: isolated
| conversation_mode: continuous
|
+-- Queen Graph (secondary) trigger_type: event (worker_escalation_ticket)
Entry point: "ticket_receiver" -> ticket_triage node (event_loop)
isolation_level: isolated
```
### GraphScopedEventBus and Event Identity Fields
Every event carries four identity fields: `(graph_id, stream_id, node_id, execution_id)`.
- **`graph_id`** — Set automatically by `GraphScopedEventBus`, a public subclass of `EventBus` that stamps `graph_id` on every `publish()` call. All three components (worker, judge, queen) use a scoped bus so their events are distinguishable.
- **`stream_id`** — The entry point pipeline. Flows from `EntryPointSpec.id` through `ExecutionStream``GraphExecutor``NodeContext``EventLoopNode`.
- **`node_id`** — The graph node emitting the event.
- **`execution_id`** — UUID for a specific execution run, set by `ExecutionStream` and wired through `GraphExecutor``EventLoopNode` → all `emit_*` calls.
See [EVENT_TYPES.md](../core/framework/runtime/EVENT_TYPES.md) for the complete event type and schema reference.
### New EventBus Event Types
Two new events added to `EventType` enum:
#### `WORKER_ESCALATION_TICKET`
Emitted by the health judge's `emit_escalation_ticket` tool when the judge detects a sustained degradation pattern in the worker.
```python
EventBus.emit_worker_escalation_ticket(
stream_id: str, # Judge's stream ID (e.g. "judge")
node_id: str, # "judge"
ticket: dict, # Full EscalationTicket (see data model below)
execution_id: str | None = None,
)
```
**Event data payload:**
```json
{
"ticket": {
"ticket_id": "uuid",
"created_at": "ISO8601",
"worker_agent_id": "deep_research_agent",
"worker_session_id": "session_20260223_130234_a30df858",
"worker_node_id": "researcher",
"worker_graph_id": "deep-research-graph",
"severity": "high",
"cause": "Node has produced 18 consecutive RETRY verdicts with identical error...",
"judge_reasoning": "Steps unchanged across last 3 checks. Evidence shows repeated...",
"suggested_action": "Restart node with modified prompt or escalate to human.",
"recent_verdicts": ["RETRY", "RETRY", "RETRY", "CONTINUE", "RETRY"],
"total_steps_checked": 47,
"steps_since_last_accept": 18,
"stall_minutes": null,
"evidence_snippet": "Error: API returned 429 rate limit exceeded..."
}
}
```
**Subscribers:** Queen's `ticket_receiver` entry point (event-driven). HTTP server should forward via SSE for dashboard UIs.
#### `QUEEN_INTERVENTION_REQUESTED`
Emitted by the queen's `notify_operator` tool after triaging an escalation ticket and deciding the human operator should be involved.
```python
EventBus.emit_queen_intervention_requested(
stream_id: str, # Queen's stream ID
node_id: str, # "ticket_triage"
ticket_id: str, # References the original EscalationTicket
analysis: str, # Queen's 2-3 sentence analysis
severity: str, # "low" | "medium" | "high" | "critical"
queen_graph_id: str, # "queen"
queen_stream_id: str, # "queen"
execution_id: str | None = None,
)
```
**Event data payload:**
```json
{
"ticket_id": "uuid",
"analysis": "Worker is stuck in a rate-limit retry loop for 6+ minutes. Suggest pausing and retrying with backoff.",
"severity": "high",
"queen_graph_id": "queen",
"queen_stream_id": "queen"
}
```
**Subscribers:** TUI (shows non-disruptive overlay). HTTP server should forward via SSE.
### New Data Model: EscalationTicket
```python
# core/framework/runtime/escalation_ticket.py
class EscalationTicket(BaseModel):
ticket_id: str # Auto-generated UUID
created_at: str # Auto-generated ISO8601
# Worker identification
worker_agent_id: str # Agent name (e.g. "deep_research_agent")
worker_session_id: str # Session being monitored
worker_node_id: str # Primary graph's entry node
worker_graph_id: str # Primary graph ID
# Problem characterization (LLM-generated by judge)
severity: Literal["low", "medium", "high", "critical"]
cause: str # What the judge observed
judge_reasoning: str # Why the judge decided to escalate
suggested_action: str # Recommended intervention
# Evidence
recent_verdicts: list[str] # Last N verdicts (ACCEPT/RETRY/CONTINUE/ESCALATE)
total_steps_checked: int # Total log steps seen
steps_since_last_accept: int
stall_minutes: float | None # Wall-clock since last step (None if active)
evidence_snippet: str # Truncated recent LLM output
```
### Modified AgentRuntime APIs
The following existing methods gained a `graph_id` parameter to support multi-graph routing. When `graph_id=None` (default), the method targets the **active graph** (`active_graph_id`), falling back to the primary graph. Existing callers that pass no `graph_id` are unaffected.
| Method | New parameter | Notes |
|---|---|---|
| `trigger()` | `graph_id: str \| None = None` | Routes to the named graph's stream |
| `get_entry_points()` | `graph_id: str \| None = None` | Returns entry points for the specified graph |
| `get_stream()` | `graph_id: str \| None = None` | Resolves stream via active graph first |
| `get_execution_result()` | `graph_id: str \| None = None` | Looks up result in the graph's stream |
| `cancel_execution()` | `graph_id: str \| None = None` | Cancels execution in the graph's stream |
### New AgentRuntime APIs
| Method | Signature | Description |
|---|---|---|
| `get_active_graph()` | `-> GraphSpec` | Returns the `GraphSpec` for the currently active graph (used by TUI/chat routing) |
| `active_graph_id` (property) | `str` (get/set) | The graph that receives user input. Set by TUI when switching between worker and queen views |
| `get_active_streams()` | `-> list[dict]` | Returns metadata for every stream with active executions across all graphs. Each dict contains `graph_id`, `stream_id`, `entry_point_id`, `active_execution_ids`, `is_awaiting_input`, `waiting_nodes`. |
| `get_waiting_nodes()` | `-> list[dict]` | Flat list of all nodes currently blocked waiting for client input across all graphs/streams. Each dict contains `graph_id`, `stream_id`, `node_id`, `execution_id`. |
### New ExecutionStream APIs
| Method | Signature | Description |
|---|---|---|
| `get_waiting_nodes()` | `-> list[dict]` | Returns `[{"node_id": str, "execution_id": str}]` for every `EventLoopNode` with `_awaiting_input == True`. |
| `get_injectable_nodes()` | `-> list[dict]` | Returns `[{"node_id": str, "execution_id": str}]` for every node that supports message injection (has `inject_event` method). |
### Proposed HTTP Endpoints
These endpoints are not yet implemented. They expose the new multi-graph and monitoring primitives to the HTTP access layer, following the same thin-delegation pattern as existing endpoints.
#### Multi-Graph Control
| HTTP Endpoint | Method | Runtime Primitive |
|---|---|---|
| `POST /api/agents/{id}/graphs` | Load secondary graph | `runtime.add_graph(graph_id, graph, goal, entry_points)` |
| `DELETE /api/agents/{id}/graphs/{gid}` | Unload secondary graph | `runtime.remove_graph(graph_id)` (not yet implemented) |
| `GET /api/agents/{id}/graphs/{gid}/sessions` | List graph sessions | Graph-specific `SessionStore.list_sessions()` |
| `GET /api/agents/{id}/graphs/{gid}/sessions/{sid}` | Graph session details | Graph-specific `SessionStore.read_state()` |
| `PUT /api/agents/{id}/active-graph` | Switch active graph | `runtime.active_graph_id = graph_id` |
| `GET /api/agents/{id}/active-graph` | Get active graph | `runtime.active_graph_id` |
#### Stream Introspection
| HTTP Endpoint | Method | Runtime Primitive |
|---|---|---|
| `GET /api/agents/{id}/streams` | Active streams | `runtime.get_active_streams()` — all streams with active executions |
| `GET /api/agents/{id}/waiting-nodes` | Waiting nodes | `runtime.get_waiting_nodes()` — all nodes blocked on client input |
#### Worker Health Monitoring
| HTTP Endpoint | Method | Runtime Primitive |
|---|---|---|
| `GET /api/agents/{id}/health` | Health summary | Calls `get_worker_health_summary()` tool (reads worker session logs) |
| `GET /api/agents/{id}/escalations` | List escalation tickets | Query `WORKER_ESCALATION_TICKET` events from EventBus history |
| `GET /api/agents/{id}/escalations/{tid}` | Ticket details | Lookup specific ticket by `ticket_id` |
#### Event Streaming Additions
The SSE stream (`GET /api/agents/{id}/events`) should include the two new event types in its default set:
```
Default event types: ..., WORKER_ESCALATION_TICKET, QUEEN_INTERVENTION_REQUESTED
```
Clients can subscribe selectively:
```
GET /api/agents/{id}/events?types=worker_escalation_ticket,queen_intervention_requested
```
### Isolated Session Lifecycle for Secondary Graphs
Isolated entry points (`isolation_level="isolated"`) use **persistent sessions** — a single session is created on first trigger and reused for all subsequent triggers of the same entry point. This is critical for:
- **Timer-driven** entry points (health judge): one session across all timer ticks, so `conversation_mode="continuous"` works and the judge accumulates observations in its conversation history.
- **Event-driven** entry points (queen ticket receiver): one session across all received events, so the queen can reference prior triage decisions.
The session reuse is managed by the timer/event handler closures in `AgentRuntime`, which remember the first `execution_id` returned by `stream.execute()` and pass it as `resume_session_id` on all subsequent fires. The `GraphExecutor` detects the existing conversation store, resets the cursor (clearing stale outputs), and appends a transition marker so the LLM knows a new trigger arrived while the conversation thread carries forward.
### Secondary Graph Storage Layout
Secondary graphs have fully isolated storage under `graphs/{graph_id}/` to prevent any interference with the primary worker's sessions, logs, and conversations.
```
~/.hive/agents/{agent_name}/
+-- sessions/ # Primary graph only
| +-- session_YYYYMMDD_HHMMSS_{uuid}/
| +-- state.json
| +-- conversations/
| +-- logs/
+-- graphs/
| +-- judge/ # Health judge (secondary)
| | +-- sessions/
| | | +-- session_YYYYMMDD_HHMMSS_{uuid}/ # ONE persistent session
| | | +-- state.json
| | | +-- conversations/judge/ # Continuous conversation
| | | +-- logs/
| | | +-- tool_logs.jsonl
| | | +-- details.jsonl
| | +-- runtime_logs/
| +-- queen/ # Queen triage (secondary)
| +-- sessions/
| | +-- session_YYYYMMDD_HHMMSS_{uuid}/ # ONE persistent session
| | +-- state.json
| | +-- conversations/ticket_triage/
| | +-- logs/
| +-- runtime_logs/
+-- runtime_logs/ # Primary graph runtime logs
```
Each secondary graph gets its own `SessionStore` and `RuntimeLogStore` scoped to `graphs/{graph_id}/`. This is set up in `AgentRuntime.add_graph()`:
```python
graph_base = self._session_store.base_path / subpath # e.g. .../graphs/judge
graph_session_store = SessionStore(graph_base)
graph_log_store = RuntimeLogStore(graph_base / "runtime_logs")
```
### Worker Monitoring Tools
Three tools registered via `register_worker_monitoring_tools(registry, event_bus, storage_path)`. These are bound to the worker's EventBus and storage path at registration time.
| Tool | Used by | Description |
|---|---|---|
| `get_worker_health_summary(session_id?, last_n_steps?)` | Health Judge | Reads worker's `sessions/{id}/logs/tool_logs.jsonl`. Auto-discovers active session if `session_id` omitted. Returns JSON with `worker_agent_id`, `worker_graph_id`, `session_id`, `total_steps`, `recent_verdicts`, `steps_since_last_accept`, `stall_minutes`, `evidence_snippet`. |
| `emit_escalation_ticket(ticket_json)` | Health Judge | Validates JSON against `EscalationTicket` schema (Pydantic rejects partial tickets), then calls `EventBus.emit_worker_escalation_ticket()`. |
| `notify_operator(ticket_id, analysis, urgency)` | Queen | Calls `EventBus.emit_queen_intervention_requested()` so the TUI/frontend surfaces a notification. |
### Queen Lifecycle Tools
Four tools registered via `register_queen_lifecycle_tools(registry, worker_runtime, event_bus)`. These close over the worker's `AgentRuntime` to give the Queen control over the worker agent's lifecycle.
| Tool | Description |
|---|---|
| `start_worker(task)` | Trigger the worker's default entry point with a task description. Returns an `execution_id`. |
| `stop_worker()` | Cancel all active worker executions. Returns IDs of cancelled executions. |
| `get_worker_status()` | Check if the worker is idle, running, or waiting for input. Returns execution details and waiting node ID if applicable. Uses `stream.get_waiting_nodes()` for accurate detection. |
| `inject_worker_message(content)` | Send a message to the running worker agent by finding an injectable node via `stream.get_injectable_nodes()` and calling `stream.inject_input()`. |
### New File Reference
| Component | Path |
|---|---|
| EscalationTicket model | `core/framework/runtime/escalation_ticket.py` |
| Worker Health Judge graph | `core/framework/monitoring/judge.py` |
| Worker monitoring tools | `core/framework/tools/worker_monitoring_tools.py` |
| Queen lifecycle tools | `core/framework/tools/queen_lifecycle_tools.py` |
| Monitoring package init | `core/framework/monitoring/__init__.py` |
| Event types reference | `core/framework/runtime/EVENT_TYPES.md` |
---
## File Reference
| Component | Path |
|---|---|
| CLI entry point | `core/framework/runner/cli.py` |
| HTTP app factory | `core/framework/server/app.py` |
| Agent manager | `core/framework/server/agent_manager.py` |
| Agent routes | `core/framework/server/routes_agents.py` |
| Execution routes | `core/framework/server/routes_execution.py` |
| Event routes | `core/framework/server/routes_events.py` |
| Session routes | `core/framework/server/routes_sessions.py` |
| Graph routes | `core/framework/server/routes_graphs.py` |
| Log routes | `core/framework/server/routes_logs.py` |
| SSE helper | `core/framework/server/sse.py` |
| AgentRunner | `core/framework/runner/runner.py` |
| AgentRuntime | `core/framework/runtime/agent_runtime.py` |
| GraphExecutor | `core/framework/graph/executor.py` |
| SessionStore | `core/framework/storage/session_store.py` |
| CheckpointStore | `core/framework/storage/checkpoint_store.py` |
| Runtime logger | `core/framework/runtime/core.py` |
| EventBus | `core/framework/runtime/event_bus.py` |
| ExecutionStream | `core/framework/runtime/execution_stream.py` |
| GraphScopedEventBus | `core/framework/runtime/execution_stream.py` |
| EscalationTicket | `core/framework/runtime/escalation_ticket.py` |
| Queen lifecycle tools | `core/framework/tools/queen_lifecycle_tools.py` |
| Worker monitoring tools | `core/framework/tools/worker_monitoring_tools.py` |
| Health Judge graph | `core/framework/monitoring/judge.py` |
| Event types reference | `core/framework/runtime/EVENT_TYPES.md` |