refactor: remove deprecated codes

This commit is contained in:
Timothy
2026-03-10 14:57:54 -07:00
parent 4caaa79900
commit 2aefdf5b5f
31 changed files with 613 additions and 7223 deletions
-4
View File
@@ -2,10 +2,6 @@
Shared agent instructions for this workspace. Shared agent instructions for this workspace.
## Deprecations
- **TUI is deprecated.** The terminal UI (`hive tui`) is no longer maintained. Use the browser-based interface (`hive open`) instead.
## Coding Agent Notes ## Coding Agent Notes
- -
+404
View File
@@ -0,0 +1,404 @@
# Code Cleaning Analysis — Hive Framework
**Date:** 2026-03-09
**Scope:** Full codebase audit relative to active entrypoints (`hive open`, `quickstart.sh`)
**Methodology:** Static import tracing, call-site analysis, cross-referencing across all Python modules
---
## Executive Summary
This report identifies code in the Hive framework that is dead, unused, placeholder-only, or structurally isolated from the active execution paths. The active entrypoints are `hive open` (which launches the browser-based server at `core/framework/server/`) and `quickstart.sh`/`quickstart.ps1` (the onboarding wizard). All analysis is verified against actual import graphs — no guesswork.
Findings are grouped into three tiers:
- **Tier 1 — Dead Code:** Not imported or called by anything in the codebase.
- **Tier 2 — Placeholder Stubs:** Registered in the CLI but return `1` with "not yet implemented."
- **Tier 3 — Structural Concerns:** Code that works but represents problematic coupling, migration debt, or deprecated patterns still embedded in active paths.
---
## Tier 1 — Dead Code
### 1.1 `framework/graph/hitl.py` (203 lines)
**Status:** Zero external imports. Completely unused.
**What it is:** A formal data-model module defining a HITL (Human-In-The-Loop) protocol:
```python
class HITLInputType(StrEnum): ...
class HITLQuestion: ...
class HITLRequest: ...
class HITLResponse: ...
class HITLSession: ...
```
**Why it's dead:** The HITL concept is alive in the codebase — agents pause at `pause_nodes` defined in `EdgeSpec`, the `GraphExecutor` checks `node_spec.id in graph.pause_nodes`, and the shell command implements an approval callback (`_hitl_approval`). However, none of this machinery uses the types from `hitl.py`. The actual pause/resume flow uses ad-hoc dicts and the executor's internal state, not these dataclasses.
**Verification:**
```
$ grep -rn "from framework.graph.hitl\|import hitl\|HITLRequest\|HITLQuestion\|HITLSession" core/ --include="*.py"
(no results outside hitl.py itself)
```
**Recommendation:** Delete `core/framework/graph/hitl.py`. The concept it formalized was never adopted by the implementation.
---
### 1.2 `framework/storage/state_writer.py` (179 lines)
**Status:** Zero external imports. Self-described migration artifact.
**What it is:** A "dual-write adapter for migration period" that conditionally writes to both the old `Run`-based storage format and the new `SessionState`-based format via an env var gate:
```python
class StateWriter:
"""
Writes execution state to both old and new formats during migration.
During the dual-write phase:
- New format (state.json) is written when USE_UNIFIED_SESSIONS=true
- Old format (Run/RunSummary) is always written for backward compatibility
"""
self.dual_write_enabled = os.getenv("USE_UNIFIED_SESSIONS", "false").lower() == "true"
```
**Why it's dead:** The migration is effectively complete. `AgentRuntime` initializes a `SessionStore` directly (always enabled, no env var check), and `ExecutionStream` now writes to `SessionStore` by default. `StateWriter` is never imported — not by `AgentRuntime`, not by `ExecutionStream`, not by `runner.py`. The class exists in isolation.
**Verification:**
```
$ grep -rn "StateWriter\|state_writer\|USE_UNIFIED_SESSIONS" core/ --include="*.py"
framework/storage/state_writer.py:20:class StateWriter: # ← definition
framework/storage/state_writer.py:39: ...USE_UNIFIED_SESSIONS... # ← internal reference only
```
**Recommendation:** Delete `core/framework/storage/state_writer.py`. The migration is over; the old format path in `ConcurrentStorage` (`save_run`/`load_run`) is the remaining debt, not this class.
---
### 1.3 `framework/server/agent_manager.py` (36 lines)
**Status:** Zero external imports. Explicit backward-compat shim with no callers.
**What it is:** A two-line shim that re-exports `SessionManager` as `AgentManager` and provides a legacy `AgentSlot` dataclass described as "kept for test compatibility only":
```python
"""Backward-compatibility shim.
The primary implementation is now in ``session_manager.py``.
This module re-exports SessionManager as AgentManager and
keeps AgentSlot for test compatibility.
"""
AgentManager = SessionManager
```
**Why it's dead:** No file in the codebase imports from `framework.server.agent_manager`. The tests that supposedly needed `AgentSlot` don't import it either.
**Verification:**
```
$ grep -rn "from framework.server.agent_manager\|AgentManager\|AgentSlot" core/ --include="*.py"
framework/server/agent_manager.py:4:...re-exports SessionManager as AgentManager... # definition only
framework/server/agent_manager.py:36:AgentManager = SessionManager # definition only
```
**Recommendation:** Delete `core/framework/server/agent_manager.py`. `SessionManager` is already imported directly everywhere it's needed.
---
### 1.4 `framework/agents/hive_coder/` (directory with only `__pycache__`)
**Status:** Source files deleted. Only stale bytecode (`.pyc`) remains.
**What it is:** A directory at `core/framework/agents/hive_coder/` that once contained an agent. The source Python files (`.py`) no longer exist — only the compiled bytecode artifacts remain:
```
core/framework/agents/hive_coder/
├── __pycache__/
│ ├── agent.cpython-311.pyc
│ ├── agent.cpython-314.pyc
│ ├── config.cpython-311.pyc
│ ├── ticket_receiver.cpython-311.pyc
│ └── ...
├── nodes/__pycache__/
│ ├── __init__.cpython-311.pyc
│ └── ...
└── tests/__pycache__/
```
No `.py` files exist in any of these subdirectories.
**Why it's a problem:** The directory creates a misleading impression that an agent exists here. Python's import resolution can find `__pycache__` entries without source, causing intermittent import surprises. Nothing in the live codebase references this module.
**Verification:**
```
$ find core/framework/agents/hive_coder -name "*.py"
(no results)
$ grep -rn "hive_coder\|HiveCoder" core/ --include="*.py"
(no results)
```
**Recommendation:** Delete the entire `core/framework/agents/hive_coder/` directory tree.
---
### 1.5 `framework/builder/` — `BuilderQuery` class (501 lines)
**Status:** Exported in `framework/__init__.py` but never imported or called by any external code.
**What it is:** An introspection API for querying execution runs, failures, and decision patterns:
```python
class BuilderQuery:
def get_run_summary(self, run_id: str) -> RunSummary | None: ...
def list_runs_for_goal(self, goal_id: str) -> list[RunSummary]: ...
def get_recent_failures(self, limit: int = 10) -> list[RunSummary]: ...
def analyze_failure(self, run_id: str) -> FailureAnalysis | None: ...
def get_success_rate(self, goal_id: str) -> float: ...
```
**Why it's dead:** `BuilderQuery` is imported and listed in `__all__` in `framework/__init__.py` as a public API, but no code outside the `builder/` directory ever imports it — not the server, not the CLI, not any agent, not any test.
**Verification:**
```
$ grep -rn "BuilderQuery" core/ --include="*.py" | grep -v "builder/"
framework/__init__.py:25:from framework.builder.query import BuilderQuery # export only
framework/__init__.py:55: "BuilderQuery", # __all__ only
```
**Recommendation:** Either (a) delete `core/framework/builder/` and remove its export from `framework/__init__.py`, or (b) if it is intended as a future public API, leave it but remove it from `__all__` until it has actual callers.
---
### 1.6 `framework/mcp/__init__.py` (4 lines)
**Status:** Empty module, never imported for substance.
**What it is:**
```python
"""MCP servers for worker-bee."""
# Don't auto-import servers to avoid double-import issues when running with -m
__all__ = []
```
**Why it's a concern:** The `framework/mcp/` directory exists as a Python package but contains only this empty `__init__.py`. The actual MCP client implementation lives in `framework/runner/mcp_client.py`. No code imports `framework.mcp` for functionality — only `framework.runner.mcp_client`, `framework.runner.tool_registry`, and `framework.tools.*` handle MCP.
**Recommendation:** Delete `core/framework/mcp/` entirely, or if a future MCP namespace is intended here, leave a `# TODO` comment.
---
### 1.7 `framework/credentials/vault/` — HashiCorp Vault backend (394 lines)
**Status:** Implemented but never loaded by any active code path.
**What it is:** A `HashiCorpVaultStorage` class that wraps the `hvac` library for enterprise secret management:
```python
class HashiCorpVaultStorage(CredentialStorage):
"""HashiCorp Vault storage adapter.
Provides integration with HashiCorp Vault for enterprise secret management."""
```
**Why it's dead:** The `credentials/__init__.py` docstring mentions it, but no code in `store.py`, `setup.py`, `runner.py`, or the server routes imports `HashiCorpVaultStorage`. The `CredentialStore` in active use is initialized via `with_encrypted_storage()`, which uses the local file-based backend exclusively.
**Verification:**
```
$ grep -rn "from framework.credentials.vault\|HashiCorpVaultStorage" core/ --include="*.py"
credentials/__init__.py:42: from core.framework.credentials.vault import HashiCorpVaultStorage # docstring example only
credentials/vault/hashicorp.py:23:class HashiCorpVaultStorage(CredentialStorage): # definition only
```
**Recommendation:** If Vault integration is not planned in the near term, delete `core/framework/credentials/vault/`. If it is planned, add an integration test and a `TODO` issue reference.
---
## Tier 2 — Placeholder Stubs (CLI Commands That Don't Work)
The following CLI subcommands are registered in `register_commands()` (`runner/cli.py`), appear in `--help` output, but their implementations return exit code `1` with "not yet implemented" messages. Users invoking these commands get errors.
### 2.1 `hive sessions list` / `hive sessions show` / `hive sessions checkpoints`
```python
def cmd_sessions_list(args: argparse.Namespace) -> int:
"""List agent sessions."""
print("⚠ Sessions list command not yet implemented")
print("This will be available once checkpoint infrastructure is complete.")
return 1
def cmd_sessions_show(args: argparse.Namespace) -> int:
"""Show detailed session information."""
print("⚠ Session show command not yet implemented")
print("This will be available once checkpoint infrastructure is complete.")
return 1
def cmd_sessions_checkpoints(args: argparse.Namespace) -> int:
"""List checkpoints for a session."""
print("⚠ Session checkpoints command not yet implemented")
print("This will be available once checkpoint infrastructure is complete.")
return 1
```
**Why this matters:** The session/checkpoint infrastructure IS complete — `SessionStore`, `CheckpointStore`, and the server's `/api/sessions/*` routes all work. These CLI commands were written as stubs expecting infrastructure that has since been built, but the commands were never wired up to that infrastructure. They are dead stubs that mislead users.
**Recommendation:** Either implement these commands by calling `SessionStore` directly, or remove the subcommand registrations and their functions until implementation is ready.
---
### 2.2 `hive pause` / `hive resume` (CLI)
```python
def cmd_pause(args: argparse.Namespace) -> int:
"""Pause a running session."""
print("⚠ Pause command not yet implemented")
print("This will be available once executor pause integration is complete.")
return 1
def cmd_resume(args: argparse.Namespace) -> int:
"""Resume a session from checkpoint."""
print("⚠ Resume command not yet implemented")
print("This will be available once checkpoint resume integration is complete.")
return 1
```
**Why this matters:** Resume functionality is actually implemented — `cmd_run` supports `--resume-session` and `--checkpoint` flags that call `_load_resume_state()` and restore from checkpoints. The pause mechanism (via `SIGTERM` handling in `AgentRuntime`) also exists. These `hive pause` / `hive resume` stubs duplicate planned functionality that `hive run --resume-session` already provides.
**Recommendation:** Remove the stub registrations and their parser definitions from `runner/cli.py`, or implement them using the existing `_load_resume_state()` infrastructure.
---
## Tier 3 — Structural Concerns
### 3.1 TUI Module Coupled to Active Server (`routes_sessions.py`)
**Context:** CLAUDE.md declares the TUI deprecated: *"TUI is deprecated. The terminal UI (`hive tui`) is no longer maintained. Use the browser-based interface (`hive open`) instead."*
**The concern:** Despite this, the active HTTP server imports directly from the TUI module:
```python
# core/framework/server/routes_sessions.py, line 734
async def handle_discover(request: web.Request) -> web.Response:
"""GET /api/discover — discover agents from filesystem."""
from framework.tui.screens.agent_picker import discover_agents
groups = discover_agents()
```
The `discover_agents()` function (from `tui/screens/agent_picker.py`) is a general-purpose filesystem scanner that returns categorized agent entries. It has no terminal UI logic in it — the name and module path are misleading. As a result, the "deprecated" TUI module is a runtime dependency of the active `hive open` server path. Removing the TUI would break the `/api/discover` endpoint that the React frontend uses to list available agents.
**Recommendation:** Extract `discover_agents()` into a location appropriate for its actual role — either `framework/runner/runner.py` or a new `framework/agents/discovery.py`. This decouples the server from the TUI and allows the TUI to be cleanly removed or maintained independently.
---
### 3.2 Legacy `Runtime` Class Still Embedded in Core Execution Path
**Context:** `framework/runtime/core.py` contains the original `Runtime` class, which writes decisions and run metadata to the old `FileStorage`-based format (`save_run`, `load_run`, `summaries/`). A newer `AgentRuntime` + `SessionStore` + `ExecutionStream` architecture has replaced this for the server-based flow.
**The concern:** Despite the newer architecture, the old `Runtime` is still instantiated and called in the primary execution path:
```python
# framework/graph/executor.py — called for every agent execution
_run_id = self.runtime.start_run(goal_id, ...) # line 692
self.runtime.end_run(success=True, ...) # lines 1089, 1163, 1515, 1632
```
This means every agent execution writes to two places: the new `SessionStore` (via `ExecutionStream`) and the old file format (via `Runtime.start_run`/`end_run`). The `BuilderQuery` class was designed to read from this old format, but since `BuilderQuery` has no callers (Tier 1, item 1.5), the old format data is written but never read by anything other than tests.
**Affected files:**
- `framework/runtime/core.py` — the `Runtime` class itself
- `framework/graph/executor.py` — calls `runtime.start_run`/`end_run`
- `framework/storage/backend.py``FileStorage.save_run`/`load_run`
- `framework/storage/concurrent.py``ConcurrentStorage.save_run`/`load_run`
- `framework/schemas/run.py``Run`, `RunSummary` schemas
- `framework/__init__.py` — exports `Run`, `RunSummary` (public API)
**Recommendation:** Before removing, confirm whether `Run`/`RunSummary`/`Runtime` are part of the external public API (i.e., exported agents import from `framework`). If not, create a migration plan to remove the dual-write path. The `BuilderQuery` removal (Tier 1, item 1.5) should happen first.
---
### 3.3 `hive dispatch` Command and `AgentOrchestrator` — Orphaned CLI Orchestration
**Context:** `hive dispatch` uses `AgentOrchestrator` (`framework/runner/orchestrator.py`) to route requests across multiple agents using an LLM router. This is a CLI-only multi-agent orchestration pattern that predates the server's Queen-based orchestration.
**The concern:** The server's primary multi-agent flow goes through `queen_orchestrator.py``SessionManager` → Queen agent. The `AgentOrchestrator` class provides different behavior (capability-based routing, capability negotiation via `CapabilityResponse`, message relay protocol in `runner/protocol.py`). The two patterns are architecturally distinct and neither references the other.
`AgentOrchestrator` is also used in `hive shell --multi` (line 1674 in `cli.py`). This is functional but represents an older orchestration pattern now superseded by the server's approach.
**Files involved:**
- `framework/runner/orchestrator.py``AgentOrchestrator` class
- `framework/runner/protocol.py``AgentMessage`, `CapabilityLevel`, `CapabilityResponse`, `MessageType`, `OrchestratorResult`
- `framework/runner/cli.py``cmd_dispatch`, `hive shell --multi`
**Recommendation:** Clarify whether `hive dispatch` / `AgentOrchestrator` is intended to remain as a CLI alternative to the server's Queen orchestration, or if it should be removed in favor of directing users to `hive open`. If the latter, remove `cmd_dispatch`, `AgentOrchestrator`, and `runner/protocol.py`.
---
### 3.4 Codex Subscription Code Path — Active but Narrowly Used
**Context:** The quickstart wizard offers "OpenAI Codex Subscription" as option 3 of 4 LLM providers. When selected, it invokes `core/codex_oauth.py` for OAuth, sets `use_codex_subscription: true` in the agent config, and routes through `get_codex_token()` / `get_codex_account_id()` in `runner.py`.
**The concern:** This is a functional code path (not dead), but it is:
1. Tightly coupled to specific OpenAI Codex OAuth endpoints and client IDs that are hardcoded in `codex_oauth.py`
2. Uses the Codex-specific `gpt-5.3-codex` model name hardcoded in `quickstart.sh`
3. No exported agents in `exports/` use `use_codex_subscription`, suggesting this is mostly a setup-time concern
This is not dead code — it is a supported configuration path — but it represents a narrow, externally-coupled feature that may require maintenance if OpenAI changes their OAuth endpoints.
**Files involved:**
- `core/codex_oauth.py` (standalone script)
- `core/framework/runner/runner.py``get_codex_token()`, `_is_codex_token_expired()`, `_refresh_codex_token()`, `get_codex_account_id()`
- `core/framework/config.py``use_codex_subscription` branch in `get_api_key()`, `get_api_base()`, `get_extra_headers()`
**Recommendation:** No immediate action, but document that this path requires maintenance if Codex OAuth endpoints change. Consider adding an integration test.
---
### 3.5 `framework/__init__.py` Public API Surface — Exports That Have No External Consumers
The framework's top-level `__init__.py` exports several items that are either dead (no external callers) or internal implementation details:
| Export | External Callers | Recommendation |
|--------|-----------------|----------------|
| `BuilderQuery` | None | Remove (see Tier 1, §1.5) |
| `Run`, `RunSummary` | `runtime/core.py` (internal) | Keep — part of legacy format transition |
| `Problem` | Tests only | Keep — schema type |
| `AgentOrchestrator` | CLI only | Reassess (see §3.3) |
| `ApprovalStatus`, `ErrorCategory`, `DebugTool`, `TestSuiteResult` | Tests only | Keep — testing framework |
| `Runtime` | executor, server (internal) | Keep — in active use |
**Recommendation:** Remove `BuilderQuery` from `__all__`. Review whether `AgentOrchestrator` should remain in the public API given its CLI-only usage.
---
## Summary Table
| Item | Location | Type | Lines | Action |
|------|----------|------|-------|--------|
| `hitl.py` | `framework/graph/hitl.py` | Dead code | 203 | Delete |
| `state_writer.py` | `framework/storage/state_writer.py` | Migration artifact | 179 | Delete |
| `agent_manager.py` | `framework/server/agent_manager.py` | Dead shim | 36 | Delete |
| `hive_coder/` directory | `framework/agents/hive_coder/` | Stale bytecode | — | Delete directory |
| `BuilderQuery` | `framework/builder/query.py` | Unused export | 501 | Delete or de-export |
| `framework/mcp/__init__.py` | `framework/mcp/` | Empty package | 4 | Delete directory |
| `credentials/vault/` | `framework/credentials/vault/` | Unwired feature | 394 | Delete or add tests |
| `cmd_sessions_*` stubs | `runner/cli.py` lines 17731799 | Placeholder | ~30 | Implement or remove |
| `cmd_pause` / `cmd_resume` stubs | `runner/cli.py` lines 18011821 | Placeholder | ~20 | Implement or remove |
| `discover_agents` coupling | `server/routes_sessions.py` line 734 | Bad dependency | — | Extract to neutral module |
| Legacy `Runtime` dual-write | `graph/executor.py`, `runtime/core.py` | Migration debt | — | Plan removal after BuilderQuery gone |
| `AgentOrchestrator` / `hive dispatch` | `runner/orchestrator.py`, `cli.py` | Architectural concern | — | Clarify intent |
---
## What Is Confirmed Active (Not Flagged)
For completeness, the following are verified active and should not be touched:
- **`hive open` / `hive serve`** — primary entrypoints, fully functional
- **`hive run`** — headless agent execution (without `--tui`)
- **`hive shell`** — interactive REPL (without `--multi`)
- **`hive info` / `hive validate` / `hive list`** — agent inspection
- **`hive setup-credentials`** — credential wizard
- **`hive tui`** and **`hive run --tui`** — functional despite deprecation label; contains the `discover_agents` function needed by server
- **`framework/graph/`** — all files except `hitl.py` are in active use
- **`framework/runtime/`** — all files except `state_writer.py` are in active use
- **`framework/server/`** — all files except `agent_manager.py` are in active use
- **`framework/credentials/`** — all subdirectories except `vault/` are in active use
- **`framework/agents/queen/`** — primary orchestrator, fully active
- **`framework/agents/credential_tester/`** — active (used by TUI account selection)
- **`framework/monitoring/`** — active (used by `session_manager.py` for judge evaluation)
- **`framework/testing/`** — active CLI testing framework
- **`framework/tools/`** — all four modules active (Queen lifecycle, memory, graph, worker monitoring)
- **`framework/storage/`** — all files except `state_writer.py` active
- **`framework/observability/`** — active logging infrastructure
- **`framework/utils/`** — `atomic_write` used by storage layer
- **`quickstart.sh` / `quickstart.ps1`** — active onboarding flow
- **`core/codex_oauth.py`** — active (used by quickstart option 3)
---
*Analysis performed by tracing import graphs from `hive open` → `cmd_serve()` → `create_app()` → six route modules → `SessionManager`/`QueenOrchestrator`, and from `quickstart.sh` → `hive run`/`hive shell`/`hive open`. All findings verified with `grep` — no guesses.*
-3
View File
@@ -22,7 +22,6 @@ The framework includes a Goal-Based Testing system (Goal → Agent → Eval):
See `framework.testing` for details. See `framework.testing` for details.
""" """
from framework.builder.query import BuilderQuery
from framework.llm import AnthropicProvider, LLMProvider from framework.llm import AnthropicProvider, LLMProvider
from framework.runner import AgentOrchestrator, AgentRunner from framework.runner import AgentOrchestrator, AgentRunner
from framework.runtime.core import Runtime from framework.runtime.core import Runtime
@@ -51,8 +50,6 @@ __all__ = [
"Problem", "Problem",
# Runtime # Runtime
"Runtime", "Runtime",
# Builder
"BuilderQuery",
# LLM # LLM
"LLMProvider", "LLMProvider",
"AnthropicProvider", "AnthropicProvider",
@@ -51,42 +51,6 @@ def cli():
pass pass
@cli.command()
@click.option("--verbose", "-v", is_flag=True)
@click.option("--debug", is_flag=True)
def tui(verbose, debug):
"""Launch TUI to test a credential interactively."""
setup_logging(verbose=verbose, debug=debug)
try:
from framework.tui.app import AdenTUI
except ImportError:
click.echo("TUI requires 'textual'. Install with: pip install textual")
sys.exit(1)
agent = CredentialTesterAgent()
account = pick_account(agent)
if account is None:
sys.exit(1)
agent.select_account(account)
provider = account.get("provider", "?")
alias = account.get("alias", "?")
click.echo(f"\nTesting {provider}/{alias}...\n")
async def run_tui():
agent._setup()
runtime = agent._agent_runtime
await runtime.start()
try:
app = AdenTUI(runtime)
await app.run_async()
finally:
await runtime.stop()
asyncio.run(run_tui())
@cli.command() @cli.command()
@click.option("--verbose", "-v", is_flag=True) @click.option("--verbose", "-v", is_flag=True)
@click.option("--debug", is_flag=True) @click.option("--debug", is_flag=True)
+151
View File
@@ -0,0 +1,151 @@
"""Agent discovery — scan known directories and return categorised AgentEntry lists."""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class AgentEntry:
"""Lightweight agent metadata for the picker / API discover endpoint."""
path: Path
name: str
description: str
category: str
session_count: int = 0
node_count: int = 0
tool_count: int = 0
tags: list[str] = field(default_factory=list)
last_active: str | None = None
def _get_last_active(agent_name: str) -> str | None:
"""Return the most recent updated_at timestamp across all sessions."""
sessions_dir = Path.home() / ".hive" / "agents" / agent_name / "sessions"
if not sessions_dir.exists():
return None
latest: str | None = None
for session_dir in sessions_dir.iterdir():
if not session_dir.is_dir() or not session_dir.name.startswith("session_"):
continue
state_file = session_dir / "state.json"
if not state_file.exists():
continue
try:
data = json.loads(state_file.read_text(encoding="utf-8"))
ts = data.get("timestamps", {}).get("updated_at")
if ts and (latest is None or ts > latest):
latest = ts
except Exception:
continue
return latest
def _count_sessions(agent_name: str) -> int:
"""Count session directories under ~/.hive/agents/{agent_name}/sessions/."""
sessions_dir = Path.home() / ".hive" / "agents" / agent_name / "sessions"
if not sessions_dir.exists():
return 0
return sum(1 for d in sessions_dir.iterdir() if d.is_dir() and d.name.startswith("session_"))
def _extract_agent_stats(agent_path: Path) -> tuple[int, int, list[str]]:
"""Extract node count, tool count, and tags from an agent directory.
Prefers agent.py (AST-parsed) over agent.json for node/tool counts
since agent.json may be stale. Tags are only available from agent.json.
"""
import ast
node_count, tool_count, tags = 0, 0, []
agent_py = agent_path / "agent.py"
if agent_py.exists():
try:
tree = ast.parse(agent_py.read_text(encoding="utf-8"))
for node in ast.walk(tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == "nodes":
if isinstance(node.value, ast.List):
node_count = len(node.value.elts)
except Exception:
pass
agent_json = agent_path / "agent.json"
if agent_json.exists():
try:
data = json.loads(agent_json.read_text(encoding="utf-8"))
json_nodes = data.get("nodes", [])
if node_count == 0:
node_count = len(json_nodes)
tools: set[str] = set()
for n in json_nodes:
tools.update(n.get("tools", []))
tool_count = len(tools)
tags = data.get("agent", {}).get("tags", [])
except Exception:
pass
return node_count, tool_count, tags
def discover_agents() -> dict[str, list[AgentEntry]]:
"""Discover agents from all known sources grouped by category."""
from framework.runner.cli import (
_extract_python_agent_metadata,
_get_framework_agents_dir,
_is_valid_agent_dir,
)
groups: dict[str, list[AgentEntry]] = {}
sources = [
("Your Agents", Path("exports")),
("Framework", _get_framework_agents_dir()),
("Examples", Path("examples/templates")),
]
for category, base_dir in sources:
if not base_dir.exists():
continue
entries: list[AgentEntry] = []
for path in sorted(base_dir.iterdir(), key=lambda p: p.name):
if not _is_valid_agent_dir(path):
continue
name, desc = _extract_python_agent_metadata(path)
config_fallback_name = path.name.replace("_", " ").title()
used_config = name != config_fallback_name
node_count, tool_count, tags = _extract_agent_stats(path)
if not used_config:
agent_json = path / "agent.json"
if agent_json.exists():
try:
data = json.loads(agent_json.read_text(encoding="utf-8"))
meta = data.get("agent", {})
name = meta.get("name", name)
desc = meta.get("description", desc)
except Exception:
pass
entries.append(
AgentEntry(
path=path,
name=name,
description=desc,
category=category,
session_count=_count_sessions(path.name),
node_count=node_count,
tool_count=tool_count,
tags=tags,
last_active=_get_last_active(path.name),
)
)
if entries:
groups[category] = entries
return groups
-7
View File
@@ -1,7 +0,0 @@
"""Builder interface for analyzing and building agents."""
from framework.builder.query import BuilderQuery
__all__ = [
"BuilderQuery",
]
-501
View File
@@ -1,501 +0,0 @@
"""
Builder Query Interface - How I (Builder) analyze agent runs.
This is designed around the questions I need to answer:
1. What happened? (summaries, narratives)
2. Why did it fail? (failure analysis, decision traces)
3. What patterns emerge? (across runs, across nodes)
4. What should we change? (suggestions)
"""
from collections import defaultdict
from pathlib import Path
from typing import Any
from framework.schemas.decision import Decision
from framework.schemas.run import Run, RunStatus, RunSummary
from framework.storage.backend import FileStorage
class FailureAnalysis:
"""Structured analysis of why a run failed."""
def __init__(
self,
run_id: str,
failure_point: str,
root_cause: str,
decision_chain: list[str],
problems: list[str],
suggestions: list[str],
):
self.run_id = run_id
self.failure_point = failure_point
self.root_cause = root_cause
self.decision_chain = decision_chain
self.problems = problems
self.suggestions = suggestions
def to_dict(self) -> dict[str, Any]:
return {
"run_id": self.run_id,
"failure_point": self.failure_point,
"root_cause": self.root_cause,
"decision_chain": self.decision_chain,
"problems": self.problems,
"suggestions": self.suggestions,
}
def __str__(self) -> str:
lines = [
f"=== Failure Analysis for {self.run_id} ===",
"",
f"Failure Point: {self.failure_point}",
f"Root Cause: {self.root_cause}",
"",
"Decision Chain Leading to Failure:",
]
for i, dec in enumerate(self.decision_chain, 1):
lines.append(f" {i}. {dec}")
if self.problems:
lines.append("")
lines.append("Reported Problems:")
for prob in self.problems:
lines.append(f" - {prob}")
if self.suggestions:
lines.append("")
lines.append("Suggestions:")
for sug in self.suggestions:
lines.append(f"{sug}")
return "\n".join(lines)
class PatternAnalysis:
"""Patterns detected across multiple runs."""
def __init__(
self,
goal_id: str,
run_count: int,
success_rate: float,
common_failures: list[tuple[str, int]],
problematic_nodes: list[tuple[str, float]],
decision_patterns: dict[str, Any],
):
self.goal_id = goal_id
self.run_count = run_count
self.success_rate = success_rate
self.common_failures = common_failures
self.problematic_nodes = problematic_nodes
self.decision_patterns = decision_patterns
def to_dict(self) -> dict[str, Any]:
return {
"goal_id": self.goal_id,
"run_count": self.run_count,
"success_rate": self.success_rate,
"common_failures": self.common_failures,
"problematic_nodes": self.problematic_nodes,
"decision_patterns": self.decision_patterns,
}
def __str__(self) -> str:
lines = [
f"=== Pattern Analysis for Goal {self.goal_id} ===",
"",
f"Runs Analyzed: {self.run_count}",
f"Success Rate: {self.success_rate:.1%}",
]
if self.common_failures:
lines.append("")
lines.append("Common Failures:")
for failure, count in self.common_failures:
lines.append(f" - {failure} ({count} occurrences)")
if self.problematic_nodes:
lines.append("")
lines.append("Problematic Nodes (failure rate):")
for node, rate in self.problematic_nodes:
lines.append(f" - {node}: {rate:.1%} failure rate")
return "\n".join(lines)
class BuilderQuery:
"""
The interface I (Builder) use to understand what agents are doing.
This is optimized for the questions I need to answer when analyzing
agent behavior and deciding what to improve.
"""
def __init__(self, storage_path: str | Path):
self.storage = FileStorage(storage_path)
# === WHAT HAPPENED? ===
def get_run_summary(self, run_id: str) -> RunSummary | None:
"""Get a quick summary of a run."""
return self.storage.load_summary(run_id)
def get_full_run(self, run_id: str) -> Run | None:
"""Get the complete run with all decisions."""
return self.storage.load_run(run_id)
def list_runs_for_goal(self, goal_id: str) -> list[RunSummary]:
"""Get summaries of all runs for a goal."""
run_ids = self.storage.get_runs_by_goal(goal_id)
summaries = []
for run_id in run_ids:
summary = self.storage.load_summary(run_id)
if summary:
summaries.append(summary)
return summaries
def get_recent_failures(self, limit: int = 10) -> list[RunSummary]:
"""Get recent failed runs."""
run_ids = self.storage.get_runs_by_status(RunStatus.FAILED)
summaries = []
for run_id in run_ids[:limit]:
summary = self.storage.load_summary(run_id)
if summary:
summaries.append(summary)
return summaries
# === WHY DID IT FAIL? ===
def analyze_failure(self, run_id: str) -> FailureAnalysis | None:
"""
Deep analysis of why a run failed.
This is my primary tool for understanding what went wrong.
"""
run = self.storage.load_run(run_id)
if run is None or run.status != RunStatus.FAILED:
return None
# Find the first failed decision
failed_decisions = [d for d in run.decisions if not d.was_successful]
if not failed_decisions:
failure_point = "Unknown - no decision marked as failed"
root_cause = "Run failed but all decisions succeeded (external cause?)"
else:
first_failure = failed_decisions[0]
failure_point = first_failure.summary_for_builder()
root_cause = first_failure.outcome.error if first_failure.outcome else "Unknown"
# Build the decision chain leading to failure
decision_chain = []
for d in run.decisions:
decision_chain.append(d.summary_for_builder())
if not d.was_successful:
break
# Extract problems
problems = [f"[{p.severity}] {p.description}" for p in run.problems]
# Generate suggestions based on the failure
suggestions = self._generate_suggestions(run, failed_decisions)
return FailureAnalysis(
run_id=run_id,
failure_point=failure_point,
root_cause=root_cause,
decision_chain=decision_chain,
problems=problems,
suggestions=suggestions,
)
def get_decision_trace(self, run_id: str) -> list[str]:
"""Get a readable trace of all decisions in a run."""
run = self.storage.load_run(run_id)
if run is None:
return []
return [d.summary_for_builder() for d in run.decisions]
# === WHAT PATTERNS EMERGE? ===
def find_patterns(self, goal_id: str) -> PatternAnalysis | None:
"""
Find patterns across runs for a goal.
This helps me understand systemic issues vs one-off failures.
"""
run_ids = self.storage.get_runs_by_goal(goal_id)
if not run_ids:
return None
runs = []
for run_id in run_ids:
run = self.storage.load_run(run_id)
if run:
runs.append(run)
if not runs:
return None
# Calculate success rate
completed = [r for r in runs if r.status == RunStatus.COMPLETED]
success_rate = len(completed) / len(runs) if runs else 0.0
# Find common failures
failure_counts: dict[str, int] = defaultdict(int)
for run in runs:
for decision in run.decisions:
if not decision.was_successful and decision.outcome:
error = decision.outcome.error or "Unknown error"
failure_counts[error] += 1
common_failures = sorted(failure_counts.items(), key=lambda x: x[1], reverse=True)[:5]
# Find problematic nodes
node_stats: dict[str, dict[str, int]] = defaultdict(lambda: {"total": 0, "failed": 0})
for run in runs:
for decision in run.decisions:
node_stats[decision.node_id]["total"] += 1
if not decision.was_successful:
node_stats[decision.node_id]["failed"] += 1
problematic_nodes = []
for node_id, stats in node_stats.items():
if stats["total"] > 0:
failure_rate = stats["failed"] / stats["total"]
if failure_rate > 0.1: # More than 10% failure rate
problematic_nodes.append((node_id, failure_rate))
problematic_nodes.sort(key=lambda x: x[1], reverse=True)
# Decision patterns
decision_patterns = self._analyze_decision_patterns(runs)
return PatternAnalysis(
goal_id=goal_id,
run_count=len(runs),
success_rate=success_rate,
common_failures=common_failures,
problematic_nodes=problematic_nodes,
decision_patterns=decision_patterns,
)
def compare_runs(self, run_id_1: str, run_id_2: str) -> dict[str, Any]:
"""Compare two runs to understand what differed."""
run1 = self.storage.load_run(run_id_1)
run2 = self.storage.load_run(run_id_2)
if run1 is None or run2 is None:
return {"error": "One or both runs not found"}
return {
"run_1": {
"id": run1.id,
"status": run1.status.value,
"decisions": len(run1.decisions),
"success_rate": run1.metrics.success_rate,
},
"run_2": {
"id": run2.id,
"status": run2.status.value,
"decisions": len(run2.decisions),
"success_rate": run2.metrics.success_rate,
},
"differences": self._find_differences(run1, run2),
}
# === WHAT SHOULD WE CHANGE? ===
def suggest_improvements(self, goal_id: str) -> list[dict[str, Any]]:
"""
Generate improvement suggestions based on run analysis.
This is what I use to propose changes to the human engineer.
"""
patterns = self.find_patterns(goal_id)
if patterns is None:
return []
suggestions = []
# Suggestion: Fix problematic nodes
for node_id, failure_rate in patterns.problematic_nodes:
suggestions.append(
{
"type": "node_improvement",
"target": node_id,
"reason": f"Node has {failure_rate:.1%} failure rate",
"recommendation": (
f"Review and improve node '{node_id}' - "
"high failure rate suggests prompt or tool issues"
),
"priority": "high" if failure_rate > 0.3 else "medium",
}
)
# Suggestion: Address common failures
for failure, count in patterns.common_failures:
if count >= 2:
suggestions.append(
{
"type": "error_handling",
"target": failure,
"reason": f"Error occurred {count} times",
"recommendation": f"Add handling for: {failure}",
"priority": "high" if count >= 5 else "medium",
}
)
# Suggestion: Overall success rate
if patterns.success_rate < 0.8:
suggestions.append(
{
"type": "architecture",
"target": goal_id,
"reason": f"Goal success rate is only {patterns.success_rate:.1%}",
"recommendation": (
"Consider restructuring the agent graph or improving goal definition"
),
"priority": "high",
}
)
return suggestions
def get_node_performance(self, node_id: str) -> dict[str, Any]:
"""Get performance metrics for a specific node across all runs."""
run_ids = self.storage.get_runs_by_node(node_id)
total_decisions = 0
successful_decisions = 0
total_latency = 0
total_tokens = 0
decision_types: dict[str, int] = defaultdict(int)
for run_id in run_ids:
run = self.storage.load_run(run_id)
if run:
for decision in run.decisions:
if decision.node_id == node_id:
total_decisions += 1
if decision.was_successful:
successful_decisions += 1
if decision.outcome:
total_latency += decision.outcome.latency_ms
total_tokens += decision.outcome.tokens_used
decision_types[decision.decision_type.value] += 1
return {
"node_id": node_id,
"total_decisions": total_decisions,
"success_rate": successful_decisions / total_decisions if total_decisions > 0 else 0,
"avg_latency_ms": total_latency / total_decisions if total_decisions > 0 else 0,
"total_tokens": total_tokens,
"decision_type_distribution": dict(decision_types),
}
# === PRIVATE HELPERS ===
def _generate_suggestions(
self,
run: Run,
failed_decisions: list[Decision],
) -> list[str]:
"""Generate suggestions based on failure analysis."""
suggestions = []
for decision in failed_decisions:
# Check if there were alternatives
if len(decision.options) > 1:
chosen = decision.chosen_option
alternatives = [o for o in decision.options if o.id != decision.chosen_option_id]
if alternatives:
alt_desc = alternatives[0].description
chosen_desc = chosen.description if chosen else "unknown"
suggestions.append(
f"Consider alternative: '{alt_desc}' instead of '{chosen_desc}'"
)
# Check for missing context
if not decision.input_context:
suggestions.append(
f"Decision '{decision.intent}' had no input context - "
"ensure relevant data is passed"
)
# Check for constraint issues
if decision.active_constraints:
constraints = ", ".join(decision.active_constraints)
suggestions.append(f"Review constraints: {constraints} - may be too restrictive")
# Check for reported problems with suggestions
for problem in run.problems:
if problem.suggested_fix:
suggestions.append(problem.suggested_fix)
return suggestions
def _analyze_decision_patterns(self, runs: list[Run]) -> dict[str, Any]:
"""Analyze decision patterns across runs."""
type_counts: dict[str, int] = defaultdict(int)
option_counts: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
for run in runs:
for decision in run.decisions:
type_counts[decision.decision_type.value] += 1
# Track which options are chosen for similar intents
intent_key = decision.intent[:50] # Truncate for grouping
if decision.chosen_option:
option_counts[intent_key][decision.chosen_option.description] += 1
# Find most common choices per intent
common_choices = {}
for intent, choices in option_counts.items():
if choices:
most_common = max(choices.items(), key=lambda x: x[1])
common_choices[intent] = {
"choice": most_common[0],
"count": most_common[1],
"alternatives": len(choices) - 1,
}
return {
"decision_type_distribution": dict(type_counts),
"common_choices": common_choices,
}
def _find_differences(self, run1: Run, run2: Run) -> list[str]:
"""Find key differences between two runs."""
differences = []
# Status difference
if run1.status != run2.status:
differences.append(f"Status: {run1.status.value} vs {run2.status.value}")
# Decision count difference
if len(run1.decisions) != len(run2.decisions):
differences.append(f"Decision count: {len(run1.decisions)} vs {len(run2.decisions)}")
# Find first divergence point
for i, (d1, d2) in enumerate(zip(run1.decisions, run2.decisions, strict=False)):
if d1.chosen_option_id != d2.chosen_option_id:
differences.append(
f"Diverged at decision {i}: "
f"chose '{d1.chosen_option_id}' vs '{d2.chosen_option_id}'"
)
break
# Node differences
nodes1 = set(run1.metrics.nodes_executed)
nodes2 = set(run2.metrics.nodes_executed)
if nodes1 != nodes2:
only_1 = nodes1 - nodes2
only_2 = nodes2 - nodes1
if only_1:
differences.append(f"Nodes only in run 1: {only_1}")
if only_2:
differences.append(f"Nodes only in run 2: {only_2}")
return differences
+1 -3
View File
@@ -6,7 +6,7 @@ This module provides secure credential storage with:
- Template-based usage: {{cred.key}} patterns for injection - Template-based usage: {{cred.key}} patterns for injection
- Bipartisan model: Store stores values, tools define usage - Bipartisan model: Store stores values, tools define usage
- Provider system: Extensible lifecycle management (refresh, validate) - Provider system: Extensible lifecycle management (refresh, validate)
- Multiple backends: Encrypted files, env vars, HashiCorp Vault - Multiple backends: Encrypted files, env vars
Quick Start: Quick Start:
from core.framework.credentials import CredentialStore, CredentialObject from core.framework.credentials import CredentialStore, CredentialObject
@@ -38,8 +38,6 @@ For Aden server sync:
AdenSyncProvider, AdenSyncProvider,
) )
For Vault integration:
from core.framework.credentials.vault import HashiCorpVaultStorage
""" """
from .key_storage import ( from .key_storage import (
@@ -1,55 +0,0 @@
"""
HashiCorp Vault integration for the credential store.
This module provides enterprise-grade secret management through
HashiCorp Vault integration.
Quick Start:
from core.framework.credentials import CredentialStore
from core.framework.credentials.vault import HashiCorpVaultStorage
# Configure Vault storage
storage = HashiCorpVaultStorage(
url="https://vault.example.com:8200",
# token read from VAULT_TOKEN env var
mount_point="secret",
path_prefix="hive/agents/prod"
)
# Create credential store with Vault backend
store = CredentialStore(storage=storage)
# Use normally - credentials are stored in Vault
credential = store.get_credential("my_api")
Requirements:
pip install hvac
Authentication:
Set the VAULT_TOKEN environment variable or pass the token directly:
export VAULT_TOKEN="hvs.xxxxxxxxxxxxx"
For production, consider using Vault auth methods:
- Kubernetes auth
- AppRole auth
- AWS IAM auth
Vault Configuration:
Ensure KV v2 secrets engine is enabled:
vault secrets enable -path=secret kv-v2
Grant appropriate policies:
path "secret/data/hive/credentials/*" {
capabilities = ["create", "read", "update", "delete", "list"]
}
path "secret/metadata/hive/credentials/*" {
capabilities = ["list", "delete"]
}
"""
from .hashicorp import HashiCorpVaultStorage
__all__ = ["HashiCorpVaultStorage"]
@@ -1,394 +0,0 @@
"""
HashiCorp Vault storage adapter.
Provides integration with HashiCorp Vault for enterprise secret management.
Requires the 'hvac' package: uv pip install hvac
"""
from __future__ import annotations
import logging
import os
from datetime import datetime
from typing import Any
from pydantic import SecretStr
from ..models import CredentialKey, CredentialObject, CredentialType
from ..storage import CredentialStorage
logger = logging.getLogger(__name__)
class HashiCorpVaultStorage(CredentialStorage):
"""
HashiCorp Vault storage adapter.
Features:
- KV v2 secrets engine support
- Namespace support (Enterprise)
- Automatic secret versioning
- Audit logging via Vault
The adapter stores credentials in Vault's KV v2 secrets engine with
the following structure:
{mount_point}/data/{path_prefix}/{credential_id}
data:
_type: "oauth2"
access_token: "xxx"
refresh_token: "yyy"
_expires_access_token: "2024-01-26T12:00:00"
_provider_id: "oauth2"
Example:
storage = HashiCorpVaultStorage(
url="https://vault.example.com:8200",
token="hvs.xxx", # Or use VAULT_TOKEN env var
mount_point="secret",
path_prefix="hive/credentials"
)
store = CredentialStore(storage=storage)
# Credentials are now stored in Vault
store.save_credential(credential)
credential = store.get_credential("my_api")
Authentication:
The adapter uses token-based authentication. The token can be provided:
1. Directly via the 'token' parameter
2. Via the VAULT_TOKEN environment variable
For production, consider using:
- Kubernetes auth method
- AppRole auth method
- AWS IAM auth method
Requirements:
uv pip install hvac
"""
def __init__(
self,
url: str,
token: str | None = None,
mount_point: str = "secret",
path_prefix: str = "hive/credentials",
namespace: str | None = None,
verify_ssl: bool = True,
):
"""
Initialize Vault storage.
Args:
url: Vault server URL (e.g., https://vault.example.com:8200)
token: Vault token. If None, reads from VAULT_TOKEN env var
mount_point: KV secrets engine mount point (default: "secret")
path_prefix: Path prefix for all credentials
namespace: Vault namespace (Enterprise feature)
verify_ssl: Whether to verify SSL certificates
Raises:
ImportError: If hvac is not installed
ValueError: If authentication fails
"""
try:
import hvac
except ImportError as e:
raise ImportError(
"HashiCorp Vault support requires 'hvac'. Install with: uv pip install hvac"
) from e
self._url = url
self._token = token or os.environ.get("VAULT_TOKEN")
self._mount = mount_point
self._prefix = path_prefix
self._namespace = namespace
if not self._token:
raise ValueError(
"Vault token required. Set VAULT_TOKEN env var or pass token parameter."
)
self._client = hvac.Client(
url=url,
token=self._token,
namespace=namespace,
verify=verify_ssl,
)
if not self._client.is_authenticated():
raise ValueError("Vault authentication failed. Check token and server URL.")
logger.info(f"Connected to HashiCorp Vault at {url}")
def _path(self, credential_id: str) -> str:
"""Build Vault path for credential."""
# Sanitize credential_id
safe_id = credential_id.replace("/", "_").replace("\\", "_")
return f"{self._prefix}/{safe_id}"
def save(self, credential: CredentialObject) -> None:
"""Save credential to Vault KV v2."""
path = self._path(credential.id)
data = self._serialize_for_vault(credential)
try:
self._client.secrets.kv.v2.create_or_update_secret(
path=path,
secret=data,
mount_point=self._mount,
)
logger.debug(f"Saved credential '{credential.id}' to Vault at {path}")
except Exception as e:
logger.error(f"Failed to save credential '{credential.id}' to Vault: {e}")
raise
def load(self, credential_id: str) -> CredentialObject | None:
"""Load credential from Vault."""
path = self._path(credential_id)
try:
response = self._client.secrets.kv.v2.read_secret_version(
path=path,
mount_point=self._mount,
)
data = response["data"]["data"]
return self._deserialize_from_vault(credential_id, data)
except Exception as e:
# Check if it's a "not found" error
error_str = str(e).lower()
if "not found" in error_str or "404" in error_str:
logger.debug(f"Credential '{credential_id}' not found in Vault")
return None
logger.error(f"Failed to load credential '{credential_id}' from Vault: {e}")
raise
def delete(self, credential_id: str) -> bool:
"""Delete credential from Vault (all versions)."""
path = self._path(credential_id)
try:
self._client.secrets.kv.v2.delete_metadata_and_all_versions(
path=path,
mount_point=self._mount,
)
logger.debug(f"Deleted credential '{credential_id}' from Vault")
return True
except Exception as e:
error_str = str(e).lower()
if "not found" in error_str or "404" in error_str:
return False
logger.error(f"Failed to delete credential '{credential_id}' from Vault: {e}")
raise
def list_all(self) -> list[str]:
"""List all credentials under the prefix."""
try:
response = self._client.secrets.kv.v2.list_secrets(
path=self._prefix,
mount_point=self._mount,
)
keys = response.get("data", {}).get("keys", [])
# Remove trailing slashes from folder names
return [k.rstrip("/") for k in keys]
except Exception as e:
error_str = str(e).lower()
if "not found" in error_str or "404" in error_str:
return []
logger.error(f"Failed to list credentials from Vault: {e}")
raise
def exists(self, credential_id: str) -> bool:
"""Check if credential exists in Vault."""
try:
path = self._path(credential_id)
self._client.secrets.kv.v2.read_secret_version(
path=path,
mount_point=self._mount,
)
return True
except Exception:
return False
def _serialize_for_vault(self, credential: CredentialObject) -> dict[str, Any]:
"""Convert credential to Vault secret format."""
data: dict[str, Any] = {
"_type": credential.credential_type.value,
}
if credential.provider_id:
data["_provider_id"] = credential.provider_id
if credential.description:
data["_description"] = credential.description
if credential.auto_refresh:
data["_auto_refresh"] = "true"
# Store each key
for key_name, key in credential.keys.items():
data[key_name] = key.get_secret_value()
if key.expires_at:
data[f"_expires_{key_name}"] = key.expires_at.isoformat()
if key.metadata:
data[f"_metadata_{key_name}"] = str(key.metadata)
return data
def _deserialize_from_vault(self, credential_id: str, data: dict[str, Any]) -> CredentialObject:
"""Reconstruct credential from Vault secret."""
# Extract metadata fields
cred_type = CredentialType(data.pop("_type", "api_key"))
provider_id = data.pop("_provider_id", None)
description = data.pop("_description", "")
auto_refresh = data.pop("_auto_refresh", "") == "true"
# Build keys dict
keys: dict[str, CredentialKey] = {}
# Find all non-metadata keys
key_names = [k for k in data.keys() if not k.startswith("_")]
for key_name in key_names:
value = data[key_name]
# Check for expiration
expires_at = None
expires_key = f"_expires_{key_name}"
if expires_key in data:
try:
expires_at = datetime.fromisoformat(data[expires_key])
except (ValueError, TypeError):
pass
# Check for metadata
metadata: dict[str, Any] = {}
metadata_key = f"_metadata_{key_name}"
if metadata_key in data:
try:
import ast
metadata = ast.literal_eval(data[metadata_key])
except (ValueError, SyntaxError):
pass
keys[key_name] = CredentialKey(
name=key_name,
value=SecretStr(value),
expires_at=expires_at,
metadata=metadata,
)
return CredentialObject(
id=credential_id,
credential_type=cred_type,
keys=keys,
provider_id=provider_id,
description=description,
auto_refresh=auto_refresh,
)
# --- Vault-Specific Operations ---
def get_secret_metadata(self, credential_id: str) -> dict[str, Any] | None:
"""
Get Vault metadata for a secret (version info, timestamps, etc.).
Args:
credential_id: The credential identifier
Returns:
Metadata dict or None if not found
"""
path = self._path(credential_id)
try:
response = self._client.secrets.kv.v2.read_secret_metadata(
path=path,
mount_point=self._mount,
)
return response.get("data", {})
except Exception:
return None
def soft_delete(self, credential_id: str, versions: list[int] | None = None) -> bool:
"""
Soft delete specific versions (can be recovered).
Args:
credential_id: The credential identifier
versions: Version numbers to delete. If None, deletes latest.
Returns:
True if successful
"""
path = self._path(credential_id)
try:
if versions:
self._client.secrets.kv.v2.delete_secret_versions(
path=path,
versions=versions,
mount_point=self._mount,
)
else:
self._client.secrets.kv.v2.delete_latest_version_of_secret(
path=path,
mount_point=self._mount,
)
return True
except Exception as e:
logger.error(f"Soft delete failed for '{credential_id}': {e}")
return False
def undelete(self, credential_id: str, versions: list[int]) -> bool:
"""
Recover soft-deleted versions.
Args:
credential_id: The credential identifier
versions: Version numbers to recover
Returns:
True if successful
"""
path = self._path(credential_id)
try:
self._client.secrets.kv.v2.undelete_secret_versions(
path=path,
versions=versions,
mount_point=self._mount,
)
return True
except Exception as e:
logger.error(f"Undelete failed for '{credential_id}': {e}")
return False
def load_version(self, credential_id: str, version: int) -> CredentialObject | None:
"""
Load a specific version of a credential.
Args:
credential_id: The credential identifier
version: Version number to load
Returns:
CredentialObject or None
"""
path = self._path(credential_id)
try:
response = self._client.secrets.kv.v2.read_secret_version(
path=path,
version=version,
mount_point=self._mount,
)
data = response["data"]["data"]
return self._deserialize_from_vault(credential_id, data)
except Exception:
return None
-203
View File
@@ -1,203 +0,0 @@
"""
Standardized HITL (Human-In-The-Loop) Protocol
This module defines the formal structure for pause/resume interactions
where agents need to gather input from humans.
"""
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any
class HITLInputType(StrEnum):
"""Type of input expected from human."""
FREE_TEXT = "free_text" # Open-ended text response
STRUCTURED = "structured" # Specific fields to fill
SELECTION = "selection" # Choose from options
APPROVAL = "approval" # Yes/no/modify decision
MULTI_FIELD = "multi_field" # Multiple related inputs
@dataclass
class HITLQuestion:
"""A single question to ask the human."""
id: str
question: str
input_type: HITLInputType = HITLInputType.FREE_TEXT
# For SELECTION type
options: list[str] = field(default_factory=list)
# For STRUCTURED type
fields: dict[str, str] = field(default_factory=dict) # {field_name: description}
# Metadata
required: bool = True
help_text: str = ""
@dataclass
class HITLRequest:
"""
Formal request for human input at a pause node.
This is what the agent produces when it needs human input.
"""
# Context
objective: str # What we're trying to accomplish
current_state: str # Where we are in the process
# What we need
questions: list[HITLQuestion] = field(default_factory=list)
missing_info: list[str] = field(default_factory=list)
# Guidance
instructions: str = ""
examples: list[str] = field(default_factory=list)
# Metadata
request_id: str = ""
node_id: str = ""
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"objective": self.objective,
"current_state": self.current_state,
"questions": [
{
"id": q.id,
"question": q.question,
"input_type": q.input_type.value,
"options": q.options,
"fields": q.fields,
"required": q.required,
"help_text": q.help_text,
}
for q in self.questions
],
"missing_info": self.missing_info,
"instructions": self.instructions,
"examples": self.examples,
"request_id": self.request_id,
"node_id": self.node_id,
}
@dataclass
class HITLResponse:
"""
Human's response to a HITL request.
This is what gets passed back when resuming from a pause.
"""
# Original request reference
request_id: str
# Human's answers
answers: dict[str, Any] = field(default_factory=dict) # {question_id: answer}
raw_input: str = "" # Raw text if provided
# Metadata
response_time_ms: int = 0
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"request_id": self.request_id,
"answers": self.answers,
"raw_input": self.raw_input,
"response_time_ms": self.response_time_ms,
}
class HITLProtocol:
"""
Standardized protocol for HITL interactions.
Usage in pause nodes:
1. Pause Node: Generates HITLRequest with questions
2. Executor: Saves state and returns request to user
3. User: Provides HITLResponse with answers
4. Resume Node: Processes response and merges into context
"""
@staticmethod
def create_request(
objective: str,
questions: list[HITLQuestion],
missing_info: list[str] | None = None,
node_id: str = "",
) -> HITLRequest:
"""Create a standardized HITL request."""
return HITLRequest(
objective=objective,
current_state="Awaiting clarification",
questions=questions,
missing_info=missing_info or [],
request_id=f"{node_id}_{hash(objective) % 10000}",
node_id=node_id,
)
@staticmethod
def parse_response(
raw_input: str,
request: HITLRequest,
use_haiku: bool = True,
) -> HITLResponse:
"""
Parse human's raw input into structured response.
Maps the raw input to the first question. For multi-question HITL,
the caller should present one question at a time.
"""
response = HITLResponse(request_id=request.request_id, raw_input=raw_input)
# If no questions, just return raw input
if not request.questions:
return response
# Map raw input to first question
response.answers[request.questions[0].id] = raw_input
return response
@staticmethod
def format_for_display(request: HITLRequest) -> str:
"""Format HITL request for user-friendly display."""
parts = []
if request.objective:
parts.append(f"📋 Objective: {request.objective}")
if request.current_state:
parts.append(f"📍 Current State: {request.current_state}")
if request.instructions:
parts.append(f"\n{request.instructions}")
if request.questions:
parts.append(f"\n❓ Questions ({len(request.questions)}):")
for i, q in enumerate(request.questions, 1):
parts.append(f"{i}. {q.question}")
if q.help_text:
parts.append(f" 💡 {q.help_text}")
if q.options:
parts.append(f" Options: {', '.join(q.options)}")
if request.missing_info:
parts.append("\n📝 Missing Information:")
for info in request.missing_info:
parts.append(f"{info}")
if request.examples:
parts.append("\n📚 Examples:")
for example in request.examples:
parts.append(f"{example}")
return "\n".join(parts)
-4
View File
@@ -1,4 +0,0 @@
"""MCP servers for worker-bee."""
# Don't auto-import servers to avoid double-import issues when running with -m
__all__ = []
+55 -375
View File
@@ -51,11 +51,7 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
action="store_true", action="store_true",
help="Show detailed execution logs (steps, LLM calls, etc.)", help="Show detailed execution logs (steps, LLM calls, etc.)",
) )
run_parser.add_argument(
"--tui",
action="store_true",
help="Launch interactive terminal dashboard",
)
run_parser.add_argument( run_parser.add_argument(
"--model", "--model",
"-m", "-m",
@@ -194,143 +190,6 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
shell_parser.set_defaults(func=cmd_shell) shell_parser.set_defaults(func=cmd_shell)
# tui command (interactive agent dashboard) # tui command (interactive agent dashboard)
tui_parser = subparsers.add_parser(
"tui",
help="Launch interactive TUI dashboard",
description="Browse available agents and launch the terminal dashboard.",
)
tui_parser.add_argument(
"--model",
"-m",
type=str,
default=None,
help="LLM model to use (any LiteLLM-compatible name)",
)
tui_parser.set_defaults(func=cmd_tui)
# sessions command group (checkpoint/resume management)
sessions_parser = subparsers.add_parser(
"sessions",
help="Manage agent sessions",
description="List, inspect, and manage agent execution sessions.",
)
sessions_subparsers = sessions_parser.add_subparsers(
dest="sessions_cmd",
help="Session management commands",
)
# sessions list
sessions_list_parser = sessions_subparsers.add_parser(
"list",
help="List agent sessions",
description="List all sessions for an agent.",
)
sessions_list_parser.add_argument(
"agent_path",
type=str,
help="Path to agent folder",
)
sessions_list_parser.add_argument(
"--status",
choices=["all", "active", "failed", "completed", "paused"],
default="all",
help="Filter by session status (default: all)",
)
sessions_list_parser.add_argument(
"--has-checkpoints",
action="store_true",
help="Show only sessions with checkpoints",
)
sessions_list_parser.set_defaults(func=cmd_sessions_list)
# sessions show
sessions_show_parser = sessions_subparsers.add_parser(
"show",
help="Show session details",
description="Display detailed information about a specific session.",
)
sessions_show_parser.add_argument(
"agent_path",
type=str,
help="Path to agent folder",
)
sessions_show_parser.add_argument(
"session_id",
type=str,
help="Session ID to inspect",
)
sessions_show_parser.add_argument(
"--json",
action="store_true",
help="Output as JSON",
)
sessions_show_parser.set_defaults(func=cmd_sessions_show)
# sessions checkpoints
sessions_checkpoints_parser = sessions_subparsers.add_parser(
"checkpoints",
help="List session checkpoints",
description="List all checkpoints for a session.",
)
sessions_checkpoints_parser.add_argument(
"agent_path",
type=str,
help="Path to agent folder",
)
sessions_checkpoints_parser.add_argument(
"session_id",
type=str,
help="Session ID",
)
sessions_checkpoints_parser.set_defaults(func=cmd_sessions_checkpoints)
# pause command
pause_parser = subparsers.add_parser(
"pause",
help="Pause running session",
description="Request graceful pause of a running agent session.",
)
pause_parser.add_argument(
"agent_path",
type=str,
help="Path to agent folder",
)
pause_parser.add_argument(
"session_id",
type=str,
help="Session ID to pause",
)
pause_parser.set_defaults(func=cmd_pause)
# resume command
resume_parser = subparsers.add_parser(
"resume",
help="Resume session from checkpoint",
description="Resume a paused or failed session from a checkpoint.",
)
resume_parser.add_argument(
"agent_path",
type=str,
help="Path to agent folder",
)
resume_parser.add_argument(
"session_id",
type=str,
help="Session ID to resume",
)
resume_parser.add_argument(
"--checkpoint",
"-c",
type=str,
help="Specific checkpoint ID to resume from (default: latest)",
)
resume_parser.add_argument(
"--tui",
action="store_true",
help="Resume in TUI dashboard mode",
)
resume_parser.set_defaults(func=cmd_resume)
# setup-credentials command # setup-credentials command
setup_creds_parser = subparsers.add_parser( setup_creds_parser = subparsers.add_parser(
"setup-credentials", "setup-credentials",
@@ -562,128 +421,67 @@ def cmd_run(args: argparse.Namespace) -> int:
) )
return 1 return 1
# Run the agent (with TUI or standard) # Standard execution
if getattr(args, "tui", False): # AgentRunner handles credential setup interactively when stdin is a TTY.
from framework.tui.app import AdenTUI try:
runner = AgentRunner.load(
args.agent_path,
model=args.model,
)
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return 1
except FileNotFoundError as e:
print(f"Error: {e}", file=sys.stderr)
return 1
async def run_with_tui(): # Prompt before starting (allows credential updates)
try: if sys.stdin.isatty() and not args.quiet:
# Load runner inside the async loop to ensure strict loop affinity runner = _prompt_before_start(args.agent_path, runner, args.model)
# (only one load — avoids spawning duplicate MCP subprocesses) if runner is None:
# AgentRunner handles credential setup interactively when stdin is a TTY. return 1
try:
runner = AgentRunner.load(
args.agent_path,
model=args.model,
)
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return
except Exception as e:
print(f"Error loading agent: {e}")
return
# Prompt before starting (allows credential updates) # Load session/checkpoint state for resume (headless mode)
if sys.stdin.isatty(): session_state = None
runner = _prompt_before_start(args.agent_path, runner, args.model) resume_session = getattr(args, "resume_session", None)
if runner is None: checkpoint = getattr(args, "checkpoint", None)
return if resume_session:
session_state = _load_resume_state(args.agent_path, resume_session, checkpoint)
# Force setup inside the loop if session_state is None:
if runner._agent_runtime is None: print(
try: f"Error: Could not load session state for {resume_session}",
runner._setup() file=sys.stderr,
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return
# Start runtime before TUI so it's ready for user input
if runner._agent_runtime and not runner._agent_runtime.is_running:
await runner._agent_runtime.start()
app = AdenTUI(
runner._agent_runtime,
resume_session=getattr(args, "resume_session", None),
resume_checkpoint=getattr(args, "checkpoint", None),
)
# TUI handles execution via ChatRepl — user submits input,
# ChatRepl calls runtime.trigger_and_wait(). No auto-launch.
await app.run_async()
except Exception as e:
import traceback
traceback.print_exc()
print(f"TUI error: {e}")
await runner.cleanup_async()
return None
asyncio.run(run_with_tui())
print("TUI session ended.")
return 0
else:
# Standard execution — load runner here (not shared with TUI path)
# AgentRunner handles credential setup interactively when stdin is a TTY.
try:
runner = AgentRunner.load(
args.agent_path,
model=args.model,
) )
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return 1 return 1
except FileNotFoundError as e:
print(f"Error: {e}", file=sys.stderr)
return 1
# Prompt before starting (allows credential updates)
if sys.stdin.isatty() and not args.quiet:
runner = _prompt_before_start(args.agent_path, runner, args.model)
if runner is None:
return 1
# Load session/checkpoint state for resume (headless mode)
session_state = None
resume_session = getattr(args, "resume_session", None)
checkpoint = getattr(args, "checkpoint", None)
if resume_session:
session_state = _load_resume_state(args.agent_path, resume_session, checkpoint)
if session_state is None:
print(
f"Error: Could not load session state for {resume_session}",
file=sys.stderr,
)
return 1
if not args.quiet:
resume_node = session_state.get("paused_at", "unknown")
if checkpoint:
print(f"Resuming from checkpoint: {checkpoint}")
else:
print(f"Resuming session: {resume_session}")
print(f"Resume point: {resume_node}")
print()
# Auto-inject user_id if the agent expects it but it's not provided
entry_input_keys = runner.graph.nodes[0].input_keys if runner.graph.nodes else []
if "user_id" in entry_input_keys and context.get("user_id") is None:
import os
context["user_id"] = os.environ.get("USER", "default_user")
if not args.quiet: if not args.quiet:
info = runner.info() resume_node = session_state.get("paused_at", "unknown")
print(f"Agent: {info.name}") if checkpoint:
print(f"Goal: {info.goal_name}") print(f"Resuming from checkpoint: {checkpoint}")
print(f"Steps: {info.node_count}") else:
print(f"Input: {json.dumps(context)}") print(f"Resuming session: {resume_session}")
print() print(f"Resume point: {resume_node}")
print("=" * 60)
print("Executing agent...")
print("=" * 60)
print() print()
result = asyncio.run(runner.run(context, session_state=session_state)) # Auto-inject user_id if the agent expects it but it's not provided
entry_input_keys = runner.graph.nodes[0].input_keys if runner.graph.nodes else []
if "user_id" in entry_input_keys and context.get("user_id") is None:
import os
context["user_id"] = os.environ.get("USER", "default_user")
if not args.quiet:
info = runner.info()
print(f"Agent: {info.name}")
print(f"Goal: {info.goal_name}")
print(f"Steps: {info.node_count}")
print(f"Input: {json.dumps(context)}")
print()
print("=" * 60)
print("Executing agent...")
print("=" * 60)
print()
result = asyncio.run(runner.run(context, session_state=session_state))
# Format output # Format output
output = { output = {
@@ -1349,74 +1147,6 @@ def _get_framework_agents_dir() -> Path:
return Path(__file__).resolve().parent.parent / "agents" return Path(__file__).resolve().parent.parent / "agents"
def _launch_agent_tui(
agent_path: str | Path,
model: str | None = None,
) -> int:
"""Load an agent and launch the TUI. Shared by cmd_tui and cmd_code."""
from framework.credentials.models import CredentialError
from framework.runner import AgentRunner
from framework.tui.app import AdenTUI
async def run_with_tui():
# AgentRunner handles credential setup interactively when stdin is a TTY.
try:
runner = AgentRunner.load(
agent_path,
model=model,
)
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return
except Exception as e:
print(f"Error loading agent: {e}")
return
if runner._agent_runtime is None:
try:
runner._setup()
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return
if runner._agent_runtime and not runner._agent_runtime.is_running:
await runner._agent_runtime.start()
app = AdenTUI(runner._agent_runtime)
try:
await app.run_async()
except Exception as e:
import traceback
traceback.print_exc()
print(f"TUI error: {e}")
await runner.cleanup_async()
asyncio.run(run_with_tui())
print("TUI session ended.")
return 0
def cmd_tui(args: argparse.Namespace) -> int:
"""Launch the interactive TUI dashboard with in-app agent picker."""
import logging
logging.basicConfig(level=logging.WARNING, format="%(message)s")
from framework.tui.app import AdenTUI
async def run_tui():
app = AdenTUI(
model=args.model,
)
await app.run_async()
asyncio.run(run_tui())
print("TUI session ended.")
return 0
def _extract_python_agent_metadata(agent_path: Path) -> tuple[str, str]: def _extract_python_agent_metadata(agent_path: Path) -> tuple[str, str]:
"""Extract name and description from a Python-based agent's config.py. """Extract name and description from a Python-based agent's config.py.
@@ -1769,56 +1499,6 @@ def _interactive_multi(agents_dir: Path) -> int:
return 0 return 0
def cmd_sessions_list(args: argparse.Namespace) -> int:
"""List agent sessions."""
print("⚠ Sessions list command not yet implemented")
print("This will be available once checkpoint infrastructure is complete.")
print(f"\nAgent: {args.agent_path}")
print(f"Status filter: {args.status}")
print(f"Has checkpoints: {args.has_checkpoints}")
return 1
def cmd_sessions_show(args: argparse.Namespace) -> int:
"""Show detailed session information."""
print("⚠ Session show command not yet implemented")
print("This will be available once checkpoint infrastructure is complete.")
print(f"\nAgent: {args.agent_path}")
print(f"Session: {args.session_id}")
return 1
def cmd_sessions_checkpoints(args: argparse.Namespace) -> int:
"""List checkpoints for a session."""
print("⚠ Session checkpoints command not yet implemented")
print("This will be available once checkpoint infrastructure is complete.")
print(f"\nAgent: {args.agent_path}")
print(f"Session: {args.session_id}")
return 1
def cmd_pause(args: argparse.Namespace) -> int:
"""Pause a running session."""
print("⚠ Pause command not yet implemented")
print("This will be available once executor pause integration is complete.")
print(f"\nAgent: {args.agent_path}")
print(f"Session: {args.session_id}")
return 1
def cmd_resume(args: argparse.Namespace) -> int:
"""Resume a session from checkpoint."""
print("⚠ Resume command not yet implemented")
print("This will be available once checkpoint resume integration is complete.")
print(f"\nAgent: {args.agent_path}")
print(f"Session: {args.session_id}")
if args.checkpoint:
print(f"Checkpoint: {args.checkpoint}")
if args.tui:
print("Mode: TUI")
return 1
def cmd_setup_credentials(args: argparse.Namespace) -> int: def cmd_setup_credentials(args: argparse.Namespace) -> int:
"""Interactive credential setup for an agent.""" """Interactive credential setup for an agent."""
from framework.credentials.setup import CredentialSetupSession from framework.credentials.setup import CredentialSetupSession
-36
View File
@@ -1,36 +0,0 @@
"""Backward-compatibility shim.
The primary implementation is now in ``session_manager.py``.
This module re-exports ``SessionManager`` as ``AgentManager`` and
keeps ``AgentSlot`` for test compatibility.
"""
import asyncio
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from framework.server.session_manager import Session, SessionManager # noqa: F401
@dataclass
class AgentSlot:
"""Legacy data class — kept for test compatibility only.
New code should use ``Session`` from ``session_manager``.
"""
id: str
agent_path: Path
runner: Any
runtime: Any
info: Any
loaded_at: float
queen_executor: Any = None
queen_task: asyncio.Task | None = None
judge_task: asyncio.Task | None = None
escalation_sub: str | None = None
# Backward compat alias
AgentManager = SessionManager
+1 -1
View File
@@ -731,7 +731,7 @@ async def handle_delete_history_session(request: web.Request) -> web.Response:
async def handle_discover(request: web.Request) -> web.Response: async def handle_discover(request: web.Request) -> web.Response:
"""GET /api/discover — discover agents from filesystem.""" """GET /api/discover — discover agents from filesystem."""
from framework.tui.screens.agent_picker import discover_agents from framework.agents.discovery import discover_agents
manager = _get_manager(request) manager = _get_manager(request)
loaded_paths = {str(s.worker_path) for s in manager.list_sessions() if s.worker_path} loaded_paths = {str(s.worker_path) for s in manager.list_sessions() if s.worker_path}
-181
View File
@@ -1,181 +0,0 @@
"""
State Writer - Dual-write adapter for migration period.
Writes execution state to both old (Run/RunSummary) and new (state.json) formats
to maintain backward compatibility during the transition period.
"""
import logging
import os
from datetime import datetime
from framework.schemas.run import Problem, Run, RunMetrics, RunStatus
from framework.schemas.session_state import SessionState, SessionStatus
from framework.storage.concurrent import ConcurrentStorage
from framework.storage.session_store import SessionStore
logger = logging.getLogger(__name__)
class StateWriter:
"""
Writes execution state to both old and new formats during migration.
During the dual-write phase:
- New format (state.json) is written when USE_UNIFIED_SESSIONS=true
- Old format (Run/RunSummary) is always written for backward compatibility
"""
def __init__(self, old_storage: ConcurrentStorage, session_store: SessionStore):
"""
Initialize state writer.
Args:
old_storage: ConcurrentStorage for old format (runs/, summaries/)
session_store: SessionStore for new format (sessions/*/state.json)
"""
self.old = old_storage
self.new = session_store
self.dual_write_enabled = os.getenv("USE_UNIFIED_SESSIONS", "false").lower() == "true"
async def write_execution_state(
self,
session_id: str,
state: SessionState,
) -> None:
"""
Write execution state to both old and new formats.
Args:
session_id: Session ID
state: SessionState to write
"""
# Write to new format if enabled
if self.dual_write_enabled:
try:
# Stamp the owning process ID for cross-process stale detection
state.pid = os.getpid()
await self.new.write_state(session_id, state)
logger.debug(f"Wrote state.json for session {session_id}")
except Exception as e:
logger.error(f"Failed to write state.json for {session_id}: {e}")
# Don't fail - old format is still written
# Always write to old format for backward compatibility
try:
run = self._convert_to_run(state)
await self.old.save_run(run)
logger.debug(f"Wrote Run object for session {session_id}")
except Exception as e:
logger.error(f"Failed to write Run object for {session_id}: {e}")
# This is more critical - reraise if old format fails
raise
def _convert_to_run(self, state: SessionState) -> Run:
"""
Convert SessionState to legacy Run object.
Args:
state: SessionState to convert
Returns:
Run object
"""
# Map SessionStatus to RunStatus
status_mapping = {
SessionStatus.ACTIVE: RunStatus.RUNNING,
SessionStatus.PAUSED: RunStatus.RUNNING, # Paused is still "running" in old format
SessionStatus.COMPLETED: RunStatus.COMPLETED,
SessionStatus.FAILED: RunStatus.FAILED,
SessionStatus.CANCELLED: RunStatus.CANCELLED,
}
run_status = status_mapping.get(state.status, RunStatus.FAILED)
# Convert timestamps
started_at = datetime.fromisoformat(state.timestamps.started_at)
completed_at = (
datetime.fromisoformat(state.timestamps.completed_at)
if state.timestamps.completed_at
else None
)
# Build RunMetrics
metrics = RunMetrics(
total_decisions=state.metrics.decision_count,
successful_decisions=state.metrics.decision_count
- len(state.progress.nodes_with_failures), # Approximate
failed_decisions=len(state.progress.nodes_with_failures),
total_tokens=state.metrics.total_input_tokens + state.metrics.total_output_tokens,
total_latency_ms=state.progress.total_latency_ms,
nodes_executed=state.metrics.nodes_executed,
edges_traversed=state.metrics.edges_traversed,
)
# Convert problems (SessionState stores as dicts, Run expects Problem objects)
problems = []
for p_dict in state.problems:
# Handle both old Problem objects and new dict format
if isinstance(p_dict, dict):
problems.append(Problem(**p_dict))
else:
problems.append(p_dict)
# Convert decisions (SessionState stores as dicts, Run expects Decision objects)
from framework.schemas.decision import Decision
decisions = []
for d_dict in state.decisions:
# Handle both old Decision objects and new dict format
if isinstance(d_dict, dict):
try:
decisions.append(Decision(**d_dict))
except Exception:
# Skip invalid decisions
continue
else:
decisions.append(d_dict)
# Create Run object
run = Run(
id=state.session_id, # Use session_id as run_id
goal_id=state.goal_id,
started_at=started_at,
status=run_status,
completed_at=completed_at,
decisions=decisions,
problems=problems,
metrics=metrics,
goal_description="", # Not stored in SessionState
input_data=state.input_data,
output_data=state.result.output,
)
return run
async def read_state(
self,
session_id: str,
prefer_new: bool = True,
) -> SessionState | None:
"""
Read execution state from either format.
Args:
session_id: Session ID
prefer_new: If True, try new format first (default)
Returns:
SessionState or None if not found
"""
if prefer_new:
# Try new format first
state = await self.new.read_state(session_id)
if state:
return state
# Fall back to old format
run = await self.old.load_run(session_id)
if run:
return SessionState.from_legacy_run(run, session_id)
return None
File diff suppressed because it is too large Load Diff
-13
View File
@@ -1,13 +0,0 @@
"""TUI screens package."""
from .account_selection import AccountSelectionScreen
from .add_local_credential import AddLocalCredentialScreen
from .agent_picker import AgentPickerScreen
from .credential_setup import CredentialSetupScreen
__all__ = [
"AccountSelectionScreen",
"AddLocalCredentialScreen",
"AgentPickerScreen",
"CredentialSetupScreen",
]
@@ -1,111 +0,0 @@
"""Account selection ModalScreen for picking a connected account before agent start."""
from __future__ import annotations
from rich.text import Text
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Vertical
from textual.screen import ModalScreen
from textual.widgets import Label, OptionList
from textual.widgets._option_list import Option
class AccountSelectionScreen(ModalScreen[dict | None]):
"""Modal screen showing connected accounts for pre-run selection.
Returns the selected account dict, or None if dismissed.
"""
SCOPED_CSS = False
BINDINGS = [
Binding("escape", "dismiss_picker", "Cancel"),
]
DEFAULT_CSS = """
AccountSelectionScreen {
align: center middle;
}
#acct-container {
width: 70%;
max-width: 80;
height: 60%;
background: $surface;
border: heavy $primary;
padding: 1 2;
}
#acct-title {
text-align: center;
text-style: bold;
width: 100%;
color: $text;
}
#acct-subtitle {
text-align: center;
width: 100%;
margin-bottom: 1;
}
#acct-footer {
text-align: center;
width: 100%;
margin-top: 1;
}
"""
def __init__(self, accounts: list[dict]) -> None:
super().__init__()
self._accounts = accounts
def compose(self) -> ComposeResult:
n = len(self._accounts)
with Vertical(id="acct-container"):
yield Label("Select Account to Test", id="acct-title")
yield Label(
f"[dim]{n} connected account{'s' if n != 1 else ''}[/dim]",
id="acct-subtitle",
)
option_list = OptionList(id="acct-list")
# Group: Aden accounts first, then local
aden = [a for a in self._accounts if a.get("source") != "local"]
local = [a for a in self._accounts if a.get("source") == "local"]
ordered = aden + local
for i, acct in enumerate(ordered):
provider = acct.get("provider", "unknown")
alias = acct.get("alias", "unknown")
identity = acct.get("identity", {})
source = acct.get("source", "aden")
# Build identity label: prefer email, then username/workspace
identity_label = (
identity.get("email")
or identity.get("username")
or identity.get("workspace")
or ""
)
label = Text()
label.append(f"{provider}/", style="bold")
label.append(alias, style="bold cyan")
if source == "local":
label.append(" [local]", style="dim yellow")
if identity_label:
label.append(f" ({identity_label})", style="dim")
option_list.add_option(Option(label, id=f"acct-{i}"))
# Keep ordered list for index lookups
self._accounts = ordered
yield option_list
yield Label(
"[dim]Enter[/dim] Select [dim]Esc[/dim] Cancel",
id="acct-footer",
)
def on_mount(self) -> None:
ol = self.query_one("#acct-list", OptionList)
ol.styles.height = "1fr"
def on_option_list_option_selected(self, event: OptionList.OptionSelected) -> None:
idx = event.option_index
if 0 <= idx < len(self._accounts):
self.dismiss(self._accounts[idx])
def action_dismiss_picker(self) -> None:
self.dismiss(None)
@@ -1,244 +0,0 @@
"""Add Local Credential ModalScreen for storing named local API key accounts."""
from __future__ import annotations
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Vertical, VerticalScroll
from textual.screen import ModalScreen
from textual.widgets import Button, Input, Label, OptionList
from textual.widgets._option_list import Option
class AddLocalCredentialScreen(ModalScreen[dict | None]):
"""Modal screen for adding a named local API key credential.
Phase 1: Pick credential type from list.
Phase 2: Enter alias + API key, run health check, save.
Returns a dict with credential_id, alias, and identity on success, or None on cancel.
"""
BINDINGS = [
Binding("escape", "dismiss_screen", "Cancel"),
]
DEFAULT_CSS = """
AddLocalCredentialScreen {
align: center middle;
}
#alc-container {
width: 80%;
max-width: 90;
height: 80%;
background: $surface;
border: heavy $primary;
padding: 1 2;
}
#alc-title {
text-align: center;
text-style: bold;
width: 100%;
color: $text;
}
#alc-subtitle {
text-align: center;
width: 100%;
margin-bottom: 1;
}
#alc-type-list {
height: 1fr;
}
#alc-form {
height: 1fr;
}
.alc-field {
margin-bottom: 1;
height: auto;
}
.alc-field Label {
margin-bottom: 0;
}
#alc-status {
width: 100%;
height: auto;
margin-top: 1;
padding: 1;
background: $panel;
}
.alc-buttons {
height: auto;
margin-top: 1;
align: center middle;
}
.alc-buttons Button {
margin: 0 1;
}
#alc-footer {
text-align: center;
width: 100%;
margin-top: 1;
}
"""
def __init__(self) -> None:
super().__init__()
# Load credential specs that support direct API keys
self._specs: list[tuple[str, object]] = self._load_specs()
# Selected credential spec (set in phase 2)
self._selected_id: str = ""
self._selected_spec: object = None
self._phase: int = 1 # 1 = type selection, 2 = form
@staticmethod
def _load_specs() -> list[tuple[str, object]]:
"""Return (credential_id, spec) pairs for direct-API-key credentials."""
try:
from aden_tools.credentials import CREDENTIAL_SPECS
return [
(cid, spec)
for cid, spec in CREDENTIAL_SPECS.items()
if getattr(spec, "direct_api_key_supported", False)
]
except Exception:
return []
# ------------------------------------------------------------------
# Compose
# ------------------------------------------------------------------
def compose(self) -> ComposeResult:
with Vertical(id="alc-container"):
yield Label("Add Local Credential", id="alc-title")
yield Label("[dim]Store a named API key account[/dim]", id="alc-subtitle")
# Phase 1: type selection
option_list = OptionList(id="alc-type-list")
for cid, spec in self._specs:
description = getattr(spec, "description", cid)
option_list.add_option(Option(f"{cid} [dim]{description}[/dim]", id=f"type-{cid}"))
yield option_list
# Phase 2: form (hidden initially)
with VerticalScroll(id="alc-form"):
with Vertical(classes="alc-field"):
yield Label("[bold]Alias[/bold] [dim](e.g. work, personal)[/dim]")
yield Input(value="default", id="alc-alias")
with Vertical(classes="alc-field"):
yield Label("[bold]API Key[/bold]")
yield Input(placeholder="Paste API key...", password=True, id="alc-key")
yield Label("", id="alc-status")
with Vertical(classes="alc-buttons"):
yield Button("Test & Save", variant="primary", id="btn-save")
yield Button("Back", variant="default", id="btn-back")
yield Label(
"[dim]Enter[/dim] Select [dim]Esc[/dim] Cancel",
id="alc-footer",
)
def on_mount(self) -> None:
self._show_phase(1)
# ------------------------------------------------------------------
# Phase switching
# ------------------------------------------------------------------
def _show_phase(self, phase: int) -> None:
self._phase = phase
type_list = self.query_one("#alc-type-list", OptionList)
form = self.query_one("#alc-form", VerticalScroll)
if phase == 1:
type_list.display = True
form.display = False
subtitle = self.query_one("#alc-subtitle", Label)
subtitle.update("[dim]Select the credential type to add[/dim]")
else:
type_list.display = False
form.display = True
spec = self._selected_spec
description = (
getattr(spec, "description", self._selected_id) if spec else self._selected_id
)
subtitle = self.query_one("#alc-subtitle", Label)
subtitle.update(f"[dim]{self._selected_id}[/dim] {description}")
self._clear_status()
# Focus the alias input
self.query_one("#alc-alias", Input).focus()
# ------------------------------------------------------------------
# Event handlers
# ------------------------------------------------------------------
def on_option_list_option_selected(self, event: OptionList.OptionSelected) -> None:
if self._phase != 1:
return
option_id = event.option.id or ""
if option_id.startswith("type-"):
cid = option_id[5:] # strip "type-" prefix
self._selected_id = cid
self._selected_spec = next(
(spec for spec_id, spec in self._specs if spec_id == cid), None
)
self._show_phase(2)
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-save":
self._do_save()
elif event.button.id == "btn-back":
self._show_phase(1)
# ------------------------------------------------------------------
# Save logic
# ------------------------------------------------------------------
def _do_save(self) -> None:
alias = self.query_one("#alc-alias", Input).value.strip() or "default"
api_key = self.query_one("#alc-key", Input).value.strip()
if not api_key:
self._set_status("[red]API key cannot be empty.[/red]")
return
self._set_status("[dim]Running health check...[/dim]")
# Disable save button while running
btn = self.query_one("#btn-save", Button)
btn.disabled = True
try:
from framework.credentials.local.registry import LocalCredentialRegistry
registry = LocalCredentialRegistry.default()
info, health_result = registry.save_account(
credential_id=self._selected_id,
alias=alias,
api_key=api_key,
run_health_check=True,
)
if health_result is not None and not health_result.valid:
self._set_status(
f"[yellow]Saved with failed health check:[/yellow] {health_result.message}\n"
"[dim]You can re-validate later via validate_credential().[/dim]"
)
else:
identity = info.identity.to_dict()
identity_str = ""
if identity:
parts = [f"{k}: {v}" for k, v in identity.items() if v]
identity_str = " " + ", ".join(parts) if parts else ""
self._set_status(f"[green]Saved:[/green] {info.storage_id}{identity_str}")
# Dismiss with result so callers can react
self.set_timer(1.0, lambda: self.dismiss(info.to_account_dict()))
return
except Exception as e:
self._set_status(f"[red]Error:[/red] {e}")
finally:
btn.disabled = False
def _set_status(self, markup: str) -> None:
self.query_one("#alc-status", Label).update(markup)
def _clear_status(self) -> None:
self.query_one("#alc-status", Label).update("")
def action_dismiss_screen(self) -> None:
self.dismiss(None)
-362
View File
@@ -1,362 +0,0 @@
"""Agent picker ModalScreen for selecting agents within the TUI."""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from rich.console import Group
from rich.text import Text
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Vertical
from textual.screen import ModalScreen
from textual.widgets import Label, OptionList, TabbedContent, TabPane
from textual.widgets._option_list import Option
class GetStartedAction(Enum):
"""Actions available in the Get Started tab."""
RUN_EXAMPLES = "run_examples"
RUN_EXISTING = "run_existing"
BUILD_EDIT = "build_edit"
@dataclass
class AgentEntry:
"""Lightweight agent metadata for the picker."""
path: Path
name: str
description: str
category: str
session_count: int = 0
node_count: int = 0
tool_count: int = 0
tags: list[str] = field(default_factory=list)
last_active: str | None = None
def _get_last_active(agent_name: str) -> str | None:
"""Return the most recent updated_at timestamp across all sessions."""
sessions_dir = Path.home() / ".hive" / "agents" / agent_name / "sessions"
if not sessions_dir.exists():
return None
latest: str | None = None
for session_dir in sessions_dir.iterdir():
if not session_dir.is_dir() or not session_dir.name.startswith("session_"):
continue
state_file = session_dir / "state.json"
if not state_file.exists():
continue
try:
data = json.loads(state_file.read_text(encoding="utf-8"))
ts = data.get("timestamps", {}).get("updated_at")
if ts and (latest is None or ts > latest):
latest = ts
except Exception:
continue
return latest
def _count_sessions(agent_name: str) -> int:
"""Count session directories under ~/.hive/agents/{agent_name}/sessions/."""
sessions_dir = Path.home() / ".hive" / "agents" / agent_name / "sessions"
if not sessions_dir.exists():
return 0
return sum(1 for d in sessions_dir.iterdir() if d.is_dir() and d.name.startswith("session_"))
def _extract_agent_stats(agent_path: Path) -> tuple[int, int, list[str]]:
"""Extract node count, tool count, and tags from an agent directory.
Prefers agent.py (AST-parsed) over agent.json for node/tool counts
since agent.json may be stale. Tags are only available from agent.json.
"""
import ast
node_count, tool_count, tags = 0, 0, []
# Try agent.py first — source of truth for nodes
agent_py = agent_path / "agent.py"
if agent_py.exists():
try:
tree = ast.parse(agent_py.read_text(encoding="utf-8"))
for node in ast.walk(tree):
# Find `nodes = [...]` assignment
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == "nodes":
if isinstance(node.value, ast.List):
node_count = len(node.value.elts)
except Exception:
pass
# Fall back to / supplement from agent.json
agent_json = agent_path / "agent.json"
if agent_json.exists():
try:
data = json.loads(agent_json.read_text(encoding="utf-8"))
json_nodes = data.get("nodes", [])
if node_count == 0:
node_count = len(json_nodes)
# Tool count: use whichever source gave us nodes, but agent.json
# has the structured tool lists so prefer it for tool counting
tools: set[str] = set()
for n in json_nodes:
tools.update(n.get("tools", []))
tool_count = len(tools)
tags = data.get("agent", {}).get("tags", [])
except Exception:
pass
return node_count, tool_count, tags
def discover_agents() -> dict[str, list[AgentEntry]]:
"""Discover agents from all known sources grouped by category."""
from framework.runner.cli import (
_extract_python_agent_metadata,
_get_framework_agents_dir,
_is_valid_agent_dir,
)
groups: dict[str, list[AgentEntry]] = {}
sources = [
("Your Agents", Path("exports")),
("Framework", _get_framework_agents_dir()),
("Examples", Path("examples/templates")),
]
for category, base_dir in sources:
if not base_dir.exists():
continue
entries: list[AgentEntry] = []
for path in sorted(base_dir.iterdir(), key=lambda p: p.name):
if not _is_valid_agent_dir(path):
continue
# config.py is source of truth for name/description
name, desc = _extract_python_agent_metadata(path)
config_fallback_name = path.name.replace("_", " ").title()
used_config = name != config_fallback_name
node_count, tool_count, tags = _extract_agent_stats(path)
if not used_config:
# config.py didn't provide values, fall back to agent.json
agent_json = path / "agent.json"
if agent_json.exists():
try:
data = json.loads(agent_json.read_text(encoding="utf-8"))
meta = data.get("agent", {})
name = meta.get("name", name)
desc = meta.get("description", desc)
except Exception:
pass
entries.append(
AgentEntry(
path=path,
name=name,
description=desc,
category=category,
session_count=_count_sessions(path.name),
node_count=node_count,
tool_count=tool_count,
tags=tags,
last_active=_get_last_active(path.name),
)
)
if entries:
groups[category] = entries
return groups
def _render_agent_option(agent: AgentEntry) -> Group:
"""Build a Rich renderable for a single agent option."""
# Line 1: name + session badge
line1 = Text()
line1.append(agent.name, style="bold")
if agent.session_count:
line1.append(f" {agent.session_count} sessions", style="dim cyan")
# Line 2: description (word-wrapped by the widget)
desc = agent.description if agent.description else "No description"
line2 = Text(desc, style="dim")
# Line 3: stats chips
chips = Text()
if agent.node_count:
chips.append(f" {agent.node_count} nodes ", style="on dark_green white")
chips.append(" ")
if agent.tool_count:
chips.append(f" {agent.tool_count} tools ", style="on dark_blue white")
chips.append(" ")
for tag in agent.tags[:3]:
chips.append(f" {tag} ", style="on grey37 white")
chips.append(" ")
parts = [line1, line2]
if chips.plain.strip():
parts.append(chips)
return Group(*parts)
def _render_get_started_option(title: str, description: str, icon: str = "") -> Group:
"""Build a Rich renderable for a Get Started option."""
line1 = Text()
line1.append(f"{icon} ", style="bold cyan")
line1.append(title, style="bold")
line2 = Text(description, style="dim")
return Group(line1, line2)
class AgentPickerScreen(ModalScreen[str | None]):
"""Modal screen showing available agents organized by tabbed categories.
Returns the selected agent path as a string, or None if dismissed.
For Get Started actions, returns a special prefix like "action:run_examples".
"""
BINDINGS = [
Binding("escape", "dismiss_picker", "Cancel"),
]
DEFAULT_CSS = """
AgentPickerScreen {
align: center middle;
}
#picker-container {
width: 90%;
max-width: 120;
height: 85%;
background: $surface;
border: heavy $primary;
padding: 1 2;
}
#picker-title {
text-align: center;
text-style: bold;
width: 100%;
color: $text;
}
#picker-subtitle {
text-align: center;
width: 100%;
margin-bottom: 1;
}
#picker-footer {
text-align: center;
width: 100%;
margin-top: 1;
}
TabPane {
padding: 0;
}
OptionList {
height: 1fr;
}
OptionList > .option-list--option {
padding: 1 2;
}
"""
def __init__(
self,
agent_groups: dict[str, list[AgentEntry]],
show_get_started: bool = False,
) -> None:
super().__init__()
self._groups = agent_groups
self._show_get_started = show_get_started
# Map (tab_id, option_index) -> AgentEntry
self._option_map: dict[str, dict[int, AgentEntry]] = {}
def compose(self) -> ComposeResult:
total = sum(len(v) for v in self._groups.values())
with Vertical(id="picker-container"):
yield Label("Hive Agent Launcher", id="picker-title")
yield Label(
f"[dim]{total} agents available[/dim]",
id="picker-subtitle",
)
with TabbedContent():
# Get Started tab (only on initial launch)
if self._show_get_started:
with TabPane("Get Started", id="get-started"):
option_list = OptionList(id="list-get-started")
option_list.add_option(
Option(
_render_get_started_option(
"Test and run example agents",
"Try pre-built example agents to learn how Hive works",
"📚",
),
id="action:run_examples",
)
)
option_list.add_option(
Option(
_render_get_started_option(
"Test and run existing agent",
"Load and run an agent you've already built (from exports/)",
"🚀",
),
id="action:run_existing",
)
)
option_list.add_option(
Option(
_render_get_started_option(
"Build or edit agent",
"Create a new agent or modify an existing one",
"🛠️ ",
),
id="action:build_edit",
)
)
yield option_list
# Agent category tabs
for category, agents in self._groups.items():
tab_id = category.lower().replace(" ", "-")
with TabPane(f"{category} ({len(agents)})", id=tab_id):
option_list = OptionList(id=f"list-{tab_id}")
self._option_map[f"list-{tab_id}"] = {}
for i, agent in enumerate(agents):
option_list.add_option(
Option(
_render_agent_option(agent),
id=str(agent.path),
)
)
self._option_map[f"list-{tab_id}"][i] = agent
yield option_list
yield Label(
"[dim]Enter[/dim] Select [dim]Tab[/dim] Switch category [dim]Esc[/dim] Cancel",
id="picker-footer",
)
def on_option_list_option_selected(self, event: OptionList.OptionSelected) -> None:
list_id = event.option_list.id or ""
# Handle Get Started tab options
if list_id == "list-get-started":
option = event.option
if option and option.id:
self.dismiss(option.id) # Returns "action:run_examples", etc.
return
# Handle agent selection from other tabs
idx = event.option_index
agent_map = self._option_map.get(list_id, {})
agent = agent_map.get(idx)
if agent:
self.dismiss(str(agent.path))
def action_dismiss_picker(self) -> None:
self.dismiss(None)
@@ -1,304 +0,0 @@
"""Credential setup ModalScreen for configuring missing agent credentials."""
from __future__ import annotations
import os
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Vertical, VerticalScroll
from textual.screen import ModalScreen
from textual.widgets import Button, Input, Label
from framework.credentials.setup import CredentialSetupSession, MissingCredential
class CredentialSetupScreen(ModalScreen[bool | None]):
"""Modal screen for configuring missing agent credentials.
Shows a form with one password Input per missing credential.
For Aden-backed credentials (``aden_supported=True``), prompts for
``ADEN_API_KEY`` and runs the Aden sync flow instead of storing a
raw value.
Returns True on successful save, or None on cancel/skip.
"""
BINDINGS = [
Binding("escape", "dismiss_setup", "Cancel"),
]
DEFAULT_CSS = """
CredentialSetupScreen {
align: center middle;
}
#cred-container {
width: 80%;
max-width: 100;
height: 80%;
background: $surface;
border: heavy $primary;
padding: 1 2;
}
#cred-title {
text-align: center;
text-style: bold;
width: 100%;
color: $text;
}
#cred-subtitle {
text-align: center;
width: 100%;
margin-bottom: 1;
}
#cred-scroll {
height: 1fr;
}
.cred-entry {
margin-bottom: 1;
padding: 1;
background: $panel;
height: auto;
}
.cred-entry Input {
margin-top: 1;
}
.cred-buttons {
height: auto;
margin-top: 1;
align: center middle;
}
.cred-buttons Button {
margin: 0 1;
}
#cred-footer {
text-align: center;
width: 100%;
margin-top: 1;
}
"""
def __init__(self, session: CredentialSetupSession) -> None:
super().__init__()
self._session = session
self._missing: list[MissingCredential] = session.missing
# Track which credentials need Aden sync vs direct API key
self._aden_creds: set[int] = set()
self._needs_aden_key = False
for i, cred in enumerate(self._missing):
if cred.aden_supported and not cred.direct_api_key_supported:
self._aden_creds.add(i)
self._needs_aden_key = True
def compose(self) -> ComposeResult:
n = len(self._missing)
with Vertical(id="cred-container"):
yield Label("Credential Setup", id="cred-title")
yield Label(
f"[dim]{n} credential{'s' if n != 1 else ''} needed to run this agent[/dim]",
id="cred-subtitle",
)
with VerticalScroll(id="cred-scroll"):
# If any credential needs Aden, show ADEN_API_KEY input first
if self._needs_aden_key:
aden_key = os.environ.get("ADEN_API_KEY", "")
with Vertical(classes="cred-entry"):
yield Label("[bold]ADEN_API_KEY[/bold]")
aden_names = [
self._missing[i].credential_name for i in sorted(self._aden_creds)
]
yield Label(f"[dim]Required for OAuth sync: {', '.join(aden_names)}[/dim]")
yield Label("[cyan]Get key:[/cyan] https://hive.adenhq.com")
yield Input(
placeholder="Paste ADEN_API_KEY..."
if not aden_key
else "Already set (leave blank to keep)",
password=True,
id="key-aden",
)
# Show direct API key inputs for non-Aden credentials
for i, cred in enumerate(self._missing):
if i in self._aden_creds:
continue # Handled via Aden sync above
with Vertical(classes="cred-entry"):
yield Label(f"[bold]{cred.env_var}[/bold]")
affected = cred.tools or cred.node_types
if affected:
yield Label(f"[dim]Required by: {', '.join(affected)}[/dim]")
if cred.description:
yield Label(f"[dim]{cred.description}[/dim]")
if cred.help_url:
yield Label(f"[cyan]Get key:[/cyan] {cred.help_url}")
yield Input(
placeholder="Paste API key...",
password=True,
id=f"key-{i}",
)
with Vertical(classes="cred-buttons"):
yield Button("Save & Continue", variant="primary", id="btn-save")
yield Button("Skip", variant="default", id="btn-skip")
yield Label(
"[dim]Enter[/dim] Submit [dim]Esc[/dim] Cancel",
id="cred-footer",
)
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-save":
self._save_credentials()
elif event.button.id == "btn-skip":
self.dismiss(None)
def _save_credentials(self) -> None:
"""Collect inputs, store credentials, and dismiss."""
self._session._ensure_credential_key()
configured = 0
# Handle Aden-backed credentials
if self._needs_aden_key:
aden_input = self.query_one("#key-aden", Input)
aden_key = aden_input.value.strip()
if aden_key:
from framework.credentials.key_storage import save_aden_api_key
save_aden_api_key(aden_key)
configured += 1 # ADEN_API_KEY itself counts as configured
# Run Aden sync for all Aden-backed creds (best-effort)
if aden_key or os.environ.get("ADEN_API_KEY"):
self._sync_aden_credentials()
# Handle direct API key credentials
for i, cred in enumerate(self._missing):
if i in self._aden_creds:
continue
input_widget = self.query_one(f"#key-{i}", Input)
value = input_widget.value.strip()
if not value:
continue
try:
self._session._store_credential(cred, value)
configured += 1
except Exception as e:
self.notify(f"Error storing {cred.env_var}: {e}", severity="error")
if configured > 0:
self.dismiss(True)
else:
self.notify("No credentials configured", severity="warning", timeout=3)
def _sync_aden_credentials(self) -> int:
"""Sync Aden-backed credentials and return count of successfully synced."""
# Build the Aden sync components directly so we get real errors
# instead of CredentialStore.with_aden_sync() silently falling back.
try:
from framework.credentials.aden import (
AdenCachedStorage,
AdenClientConfig,
AdenCredentialClient,
AdenSyncProvider,
)
from framework.credentials.storage import EncryptedFileStorage
client = AdenCredentialClient(AdenClientConfig(base_url="https://api.adenhq.com"))
provider = AdenSyncProvider(client=client)
local_storage = EncryptedFileStorage()
cached_storage = AdenCachedStorage(
local_storage=local_storage,
aden_provider=provider,
)
except Exception as e:
self.notify(
f"Aden setup error: {e}",
severity="error",
timeout=8,
)
return 0
# Sync all integrations from Aden to get the provider index populated
try:
from framework.credentials import CredentialStore
store = CredentialStore(
storage=cached_storage,
providers=[provider],
auto_refresh=True,
)
num_synced = provider.sync_all(store)
if num_synced == 0:
self.notify(
"No active integrations found in Aden. "
"Connect integrations at hive.adenhq.com.",
severity="warning",
timeout=8,
)
except Exception as e:
self.notify(
f"Aden sync error: {e}",
severity="error",
timeout=8,
)
return 0
synced = 0
for i in sorted(self._aden_creds):
cred = self._missing[i]
cred_id = cred.credential_id or cred.credential_name
if store.is_available(cred_id):
try:
value = store.get_key(cred_id, cred.credential_key)
if value:
os.environ[cred.env_var] = value
self._persist_to_local_store(cred_id, cred.credential_key, value)
synced += 1
else:
self.notify(
f"{cred.credential_name}: key "
f"'{cred.credential_key}' not found "
f"in credential '{cred_id}'",
severity="warning",
timeout=8,
)
except Exception as e:
self.notify(
f"{cred.credential_name} extraction failed: {e}",
severity="error",
timeout=8,
)
else:
self.notify(
f"{cred.credential_name} (id='{cred_id}') "
f"not found in Aden. Connect this "
f"integration at hive.adenhq.com first.",
severity="warning",
timeout=8,
)
return synced
@staticmethod
def _persist_to_local_store(cred_id: str, key_name: str, value: str) -> None:
"""Save a synced token to the local encrypted store under the canonical ID."""
try:
from pydantic import SecretStr
from framework.credentials.models import CredentialKey, CredentialObject, CredentialType
from framework.credentials.storage import EncryptedFileStorage
cred_obj = CredentialObject(
id=cred_id,
credential_type=CredentialType.OAUTH2,
keys={
key_name: CredentialKey(
name=key_name,
value=SecretStr(value),
),
},
auto_refresh=True,
)
EncryptedFileStorage().save(cred_obj)
except Exception:
pass # Best-effort; env var is the primary delivery mechanism
def action_dismiss_setup(self) -> None:
self.dismiss(None)
File diff suppressed because it is too large Load Diff
-139
View File
@@ -1,139 +0,0 @@
"""
Native OS file dialog for PDF selection.
Launches the platform's native file picker (macOS: NSOpenPanel via osascript,
Linux: zenity/kdialog, Windows: PowerShell OpenFileDialog) in a background
thread so Textual's event loop stays responsive.
Falls back to None when no GUI is available (SSH, headless).
"""
import asyncio
import os
import subprocess
import sys
from pathlib import Path
def _has_gui() -> bool:
"""Detect whether a GUI display is available."""
if sys.platform == "darwin":
# macOS: GUI is available unless running over SSH without display forwarding.
return "SSH_CONNECTION" not in os.environ or "DISPLAY" in os.environ
elif sys.platform == "win32":
return True
else:
# Linux/BSD: Need X11 or Wayland.
return bool(os.environ.get("DISPLAY") or os.environ.get("WAYLAND_DISPLAY"))
def _linux_file_dialog() -> subprocess.CompletedProcess | None:
"""Try zenity, then kdialog, on Linux. Returns CompletedProcess or None."""
# Try zenity (GTK)
try:
return subprocess.run(
[
"zenity",
"--file-selection",
"--title=Select a PDF file",
"--file-filter=PDF files (*.pdf)|*.pdf",
],
encoding="utf-8",
capture_output=True,
text=True,
timeout=300,
)
except FileNotFoundError:
pass
# Try kdialog (KDE)
try:
return subprocess.run(
[
"kdialog",
"--getopenfilename",
".",
"PDF files (*.pdf)",
],
encoding="utf-8",
capture_output=True,
text=True,
timeout=300,
)
except FileNotFoundError:
pass
return None
def _pick_pdf_subprocess() -> Path | None:
"""Run the native file dialog. BLOCKS until user picks or cancels.
Returns a Path on success, None on cancel or error.
Must be called from a non-main thread (via asyncio.to_thread).
"""
try:
if sys.platform == "darwin":
result = subprocess.run(
[
"osascript",
"-e",
'POSIX path of (choose file of type {"com.adobe.pdf"} '
'with prompt "Select a PDF file")',
],
encoding="utf-8",
capture_output=True,
text=True,
timeout=300,
)
elif sys.platform == "win32":
ps_script = (
"Add-Type -AssemblyName System.Windows.Forms; "
"$f = New-Object System.Windows.Forms.OpenFileDialog; "
"$f.Filter = 'PDF files (*.pdf)|*.pdf'; "
"$f.Title = 'Select a PDF file'; "
"if ($f.ShowDialog() -eq 'OK') { $f.FileName }"
)
result = subprocess.run(
["powershell", "-NoProfile", "-Command", ps_script],
encoding="utf-8",
capture_output=True,
text=True,
timeout=300,
)
else:
result = _linux_file_dialog()
if result is None:
return None
if result.returncode != 0:
return None
path_str = result.stdout.strip()
if not path_str:
return None
path = Path(path_str)
if path.is_file() and path.suffix.lower() == ".pdf":
return path
return None
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
return None
async def pick_pdf_file() -> Path | None:
"""Open a native OS file dialog to pick a PDF file.
Non-blocking: runs the dialog subprocess in a background thread via
asyncio.to_thread(), so the calling event loop stays responsive.
Returns:
Path to the selected PDF, or None if the user cancelled,
no GUI is available, or the dialog command was not found.
"""
if not _has_gui():
return None
return await asyncio.to_thread(_pick_pdf_subprocess)
-585
View File
@@ -1,585 +0,0 @@
"""
Graph/Tree Overview Widget - Displays real agent graph structure.
Supports rendering loops (back-edges) via right-side return channels:
arrows drawn on the right margin that visually point back up to earlier nodes.
"""
from __future__ import annotations
import re
import time
from textual.app import ComposeResult
from textual.containers import Vertical
from framework.runtime.agent_runtime import AgentRuntime
from framework.runtime.event_bus import EventType
from framework.tui.widgets.selectable_rich_log import SelectableRichLog as RichLog
# Width of each return-channel column (padding + │ + gap)
_CHANNEL_WIDTH = 5
# Regex to strip Rich markup tags for measuring visible width
_MARKUP_RE = re.compile(r"\[/?[^\]]*\]")
def _plain_len(s: str) -> int:
"""Return the visible character length of a Rich-markup string."""
return len(_MARKUP_RE.sub("", s))
class GraphOverview(Vertical):
"""Widget to display Agent execution graph/tree with real data."""
DEFAULT_CSS = """
GraphOverview {
width: 100%;
height: 100%;
background: $panel;
}
GraphOverview > RichLog {
width: 100%;
height: 100%;
background: $panel;
border: none;
scrollbar-background: $surface;
scrollbar-color: $primary;
}
"""
def __init__(self, runtime: AgentRuntime):
super().__init__()
self.runtime = runtime
self._override_graph = None # Set by switch_graph() for secondary graphs
self.active_node: str | None = None
self.execution_path: list[str] = []
# Per-node status strings shown next to the node in the graph display.
# e.g. {"planner": "thinking...", "searcher": "web_search..."}
self._node_status: dict[str, str] = {}
@property
def _graph(self):
"""The graph currently being displayed (may be a secondary graph)."""
return self._override_graph or self.runtime.graph
def switch_graph(self, graph) -> None:
"""Switch to displaying a different graph and refresh."""
self._override_graph = graph
self.active_node = None
self.execution_path = []
self._node_status = {}
self._display_graph()
def compose(self) -> ComposeResult:
# Use RichLog for formatted output
yield RichLog(id="graph-display", highlight=True, markup=True)
def on_mount(self) -> None:
"""Display initial graph structure."""
self._display_graph()
# Refresh every 1s so timer countdowns stay current
if self.runtime._timer_next_fire is not None:
self.set_interval(1.0, self._display_graph)
# ------------------------------------------------------------------
# Graph analysis helpers
# ------------------------------------------------------------------
def _topo_order(self) -> list[str]:
"""BFS from entry_node following edges."""
graph = self._graph
visited: list[str] = []
seen: set[str] = set()
queue = [graph.entry_node]
while queue:
nid = queue.pop(0)
if nid in seen:
continue
seen.add(nid)
visited.append(nid)
for edge in graph.get_outgoing_edges(nid):
if edge.target not in seen:
queue.append(edge.target)
# Append orphan nodes not reachable from entry
for node in graph.nodes:
if node.id not in seen:
visited.append(node.id)
return visited
def _detect_back_edges(self, ordered: list[str]) -> list[dict]:
"""Find edges where target appears before (or equal to) source in topo order.
Returns a list of dicts with keys: edge, source, target, source_idx, target_idx.
"""
order_idx = {nid: i for i, nid in enumerate(ordered)}
back_edges: list[dict] = []
for node_id in ordered:
for edge in self._graph.get_outgoing_edges(node_id):
target_idx = order_idx.get(edge.target, -1)
source_idx = order_idx.get(node_id, -1)
if target_idx != -1 and target_idx <= source_idx:
back_edges.append(
{
"edge": edge,
"source": node_id,
"target": edge.target,
"source_idx": source_idx,
"target_idx": target_idx,
}
)
return back_edges
def _is_back_edge(self, source: str, target: str, order_idx: dict[str, int]) -> bool:
"""Check whether an edge from *source* to *target* is a back-edge."""
si = order_idx.get(source, -1)
ti = order_idx.get(target, -1)
return ti != -1 and ti <= si
# ------------------------------------------------------------------
# Line rendering (Pass 1)
# ------------------------------------------------------------------
def _render_node_line(self, node_id: str) -> str:
"""Render a single node with status symbol and optional status text."""
graph = self._graph
is_terminal = node_id in (graph.terminal_nodes or [])
is_active = node_id == self.active_node
is_done = node_id in self.execution_path and not is_active
status = self._node_status.get(node_id, "")
if is_active:
sym = "[bold green]●[/bold green]"
elif is_done:
sym = "[dim]✓[/dim]"
elif is_terminal:
sym = "[yellow]■[/yellow]"
else:
sym = ""
if is_active:
name = f"[bold green]{node_id}[/bold green]"
elif is_done:
name = f"[dim]{node_id}[/dim]"
else:
name = node_id
suffix = f" [italic]{status}[/italic]" if status else ""
return f" {sym} {name}{suffix}"
def _render_edges(self, node_id: str, order_idx: dict[str, int]) -> list[str]:
"""Render forward-edge connectors from *node_id*.
Back-edges are excluded here they are drawn by the return-channel
overlay in Pass 2.
"""
all_edges = self._graph.get_outgoing_edges(node_id)
if not all_edges:
return []
# Split into forward and back
forward = [e for e in all_edges if not self._is_back_edge(node_id, e.target, order_idx)]
if not forward:
# All edges are back-edges — nothing to render here
return []
if len(forward) == 1:
return ["", ""]
# Fan-out: show branches
lines: list[str] = []
for i, edge in enumerate(forward):
connector = "" if i == len(forward) - 1 else ""
cond = ""
if edge.condition.value not in ("always", "on_success"):
cond = f" [dim]({edge.condition.value})[/dim]"
lines.append(f" {connector}──▶ {edge.target}{cond}")
return lines
# ------------------------------------------------------------------
# Return-channel overlay (Pass 2)
# ------------------------------------------------------------------
def _overlay_return_channels(
self,
lines: list[str],
node_line_map: dict[str, int],
back_edges: list[dict],
available_width: int,
) -> list[str]:
"""Overlay right-side return channels onto the line buffer.
Each back-edge gets a vertical channel on the right margin. Channels
are allocated left-to-right by increasing span length so that shorter
(inner) loops are closer to the graph body and longer (outer) loops are
further right.
If the terminal is too narrow to fit even one channel, we fall back to
simple inline ```` annotations instead.
"""
if not back_edges:
return lines
num_channels = len(back_edges)
# Sort by span length ascending → inner loops get nearest channel
sorted_be = sorted(back_edges, key=lambda b: b["source_idx"] - b["target_idx"])
# --- Insert dedicated connector lines for back-edge sources ---
# Each back-edge source gets a blank line inserted after its node
# section (after any forward-edge lines). We process insertions in
# reverse order so that earlier indices remain valid.
all_node_lines_set = set(node_line_map.values())
insertions: list[tuple[int, int]] = [] # (insert_after_line, be_index)
for be_idx, be in enumerate(sorted_be):
source_node_line = node_line_map.get(be["source"])
if source_node_line is None:
continue
# Walk forward to find the last line in this node's section
last_section_line = source_node_line
for li in range(source_node_line + 1, len(lines)):
if li in all_node_lines_set:
break
last_section_line = li
insertions.append((last_section_line, be_idx))
source_line_for_be: dict[int, int] = {}
for insert_after, be_idx in sorted(insertions, reverse=True):
insert_at = insert_after + 1
lines.insert(insert_at, "") # placeholder for connector
source_line_for_be[be_idx] = insert_at
# Shift node_line_map entries that come after the insertion point
for nid in node_line_map:
if node_line_map[nid] > insert_after:
node_line_map[nid] += 1
# Also shift already-assigned source lines
for prev_idx in source_line_for_be:
if prev_idx != be_idx and source_line_for_be[prev_idx] > insert_after:
source_line_for_be[prev_idx] += 1
# Recompute max content width after insertions
max_content_w = max(_plain_len(ln) for ln in lines) if lines else 0
# Check if we have room for channels
channels_total_w = num_channels * _CHANNEL_WIDTH
if max_content_w + channels_total_w + 2 > available_width:
return self._inline_back_edge_fallback(lines, node_line_map, back_edges)
content_pad = max_content_w + 3 # gap between content and first channel
# Build channel info with final line positions
channel_info: list[dict] = []
for ch_idx, be in enumerate(sorted_be):
target_line = node_line_map.get(be["target"])
source_line = source_line_for_be.get(ch_idx)
if target_line is None or source_line is None:
continue
col = content_pad + ch_idx * _CHANNEL_WIDTH
channel_info.append(
{
"target_line": target_line,
"source_line": source_line,
"col": col,
}
)
if not channel_info:
return lines
# Build overlay grid — one row per line, columns for channel area
total_width = content_pad + num_channels * _CHANNEL_WIDTH + 1
overlay_width = total_width - max_content_w
overlays: list[list[str]] = [[" "] * overlay_width for _ in range(len(lines))]
for ci in channel_info:
tl = ci["target_line"]
sl = ci["source_line"]
col_offset = ci["col"] - max_content_w
if col_offset < 0 or col_offset >= overlay_width:
continue
# Target line: ◄──...──┐
if 0 <= tl < len(overlays):
for c in range(col_offset):
if overlays[tl][c] == " ":
overlays[tl][c] = ""
overlays[tl][col_offset] = ""
# Source line: ──...──┘
if 0 <= sl < len(overlays):
for c in range(col_offset):
if overlays[sl][c] == " ":
overlays[sl][c] = ""
overlays[sl][col_offset] = ""
# Vertical lines between target+1 and source-1
for li in range(tl + 1, sl):
if 0 <= li < len(overlays) and overlays[li][col_offset] == " ":
overlays[li][col_offset] = ""
# Merge overlays into the line strings
result: list[str] = []
for i, line in enumerate(lines):
pw = _plain_len(line)
pad = max_content_w - pw
overlay_chars = overlays[i] if i < len(overlays) else []
overlay_str = "".join(overlay_chars)
overlay_trimmed = overlay_str.rstrip()
if overlay_trimmed:
is_target_line = any(ci["target_line"] == i for ci in channel_info)
if is_target_line:
overlay_trimmed = "" + overlay_trimmed[1:]
is_source_line = any(ci["source_line"] == i for ci in channel_info)
if is_source_line and not line.strip():
# Inserted blank line → build └───┘ connector.
# " └" = 3 chars of content prefix, so remaining pad = max_content_w - 3
remaining_pad = max_content_w - 3
full = list(" " * remaining_pad + overlay_trimmed)
# Find the ┘ corner for this source connector
corner_pos = -1
for ci_s in channel_info:
if ci_s["source_line"] == i:
corner_pos = remaining_pad + (ci_s["col"] - max_content_w)
break
# Fill everything up to the corner with ─
if corner_pos >= 0:
for c in range(corner_pos):
if full[c] not in ("", "", ""):
full[c] = ""
connector = "" + "".join(full).rstrip()
result.append(f"[dim]{connector}[/dim]")
continue
colored_overlay = f"[dim]{' ' * pad}{overlay_trimmed}[/dim]"
result.append(f"{line}{colored_overlay}")
else:
result.append(line)
return result
def _inline_back_edge_fallback(
self,
lines: list[str],
node_line_map: dict[str, int],
back_edges: list[dict],
) -> list[str]:
"""Fallback: add inline ↺ annotations when terminal is too narrow for channels."""
# Group back-edges by source node
source_to_be: dict[str, list[dict]] = {}
for be in back_edges:
source_to_be.setdefault(be["source"], []).append(be)
result = list(lines)
# Insert annotation lines after each source node's section
offset = 0
all_node_lines = sorted(node_line_map.values())
for source, bes in source_to_be.items():
source_line = node_line_map.get(source)
if source_line is None:
continue
# Find end of source node section
end_line = source_line
for nl in all_node_lines:
if nl > source_line:
end_line = nl - 1
break
else:
end_line = len(lines) - 1
# Insert after last content line of this node's section
insert_at = end_line + offset + 1
for be in bes:
cond = ""
edge = be["edge"]
if edge.condition.value not in ("always", "on_success"):
cond = f" [dim]({edge.condition.value})[/dim]"
annotation = f" [yellow]↺[/yellow] {be['target']}{cond}"
result.insert(insert_at, annotation)
insert_at += 1
offset += 1
return result
# ------------------------------------------------------------------
# Main display
# ------------------------------------------------------------------
def _display_graph(self) -> None:
"""Display the graph as an ASCII DAG with edge connectors and loop channels."""
display = self.query_one("#graph-display", RichLog)
display.clear()
graph = self._graph
display.write(f"[bold cyan]Agent Graph:[/bold cyan] {graph.id}\n")
ordered = self._topo_order()
order_idx = {nid: i for i, nid in enumerate(ordered)}
# --- Pass 1: Build line buffer ---
lines: list[str] = []
node_line_map: dict[str, int] = {}
for node_id in ordered:
node_line_map[node_id] = len(lines)
lines.append(self._render_node_line(node_id))
for edge_line in self._render_edges(node_id, order_idx):
lines.append(edge_line)
# --- Pass 2: Overlay return channels for back-edges ---
back_edges = self._detect_back_edges(ordered)
if back_edges:
# Try to get actual widget width; default to a reasonable value
try:
available_width = self.size.width or 60
except Exception:
available_width = 60
lines = self._overlay_return_channels(lines, node_line_map, back_edges, available_width)
# Write all lines
for line in lines:
display.write(line)
# Execution path footer
if self.execution_path:
display.write("")
display.write(f"[dim]Path:[/dim] {''.join(self.execution_path[-5:])}")
# Event sources section
self._render_event_sources(display)
# ------------------------------------------------------------------
# Event sources display
# ------------------------------------------------------------------
def _render_event_sources(self, display: RichLog) -> None:
"""Render event source info (webhooks, timers) below the graph."""
entry_points = self.runtime.get_entry_points()
# Filter to non-manual entry points (webhooks, timers, events)
event_sources = [ep for ep in entry_points if ep.trigger_type not in ("manual",)]
if not event_sources:
return
display.write("")
display.write("[bold cyan]Event Sources[/bold cyan]")
config = self.runtime._config
for ep in event_sources:
if ep.trigger_type == "timer":
cron_expr = ep.trigger_config.get("cron")
interval = ep.trigger_config.get("interval_minutes", "?")
schedule_label = f"cron: {cron_expr}" if cron_expr else f"every {interval} min"
display.write(f" [green]⏱[/green] {ep.name} [dim]→ {ep.entry_node}[/dim]")
# Show schedule + next fire countdown
next_fire = self.runtime._timer_next_fire.get(ep.id)
if next_fire is not None:
remaining = max(0, next_fire - time.monotonic())
hours, rem = divmod(int(remaining), 3600)
mins, secs = divmod(rem, 60)
if hours > 0:
countdown = f"{hours}h {mins:02d}m {secs:02d}s"
else:
countdown = f"{mins}m {secs:02d}s"
display.write(f" [dim]{schedule_label} — next in {countdown}[/dim]")
else:
display.write(f" [dim]{schedule_label}[/dim]")
elif ep.trigger_type in ("event", "webhook"):
display.write(f" [yellow]⚡[/yellow] {ep.name} [dim]→ {ep.entry_node}[/dim]")
# Show webhook endpoint if configured
route = None
for r in config.webhook_routes:
src = r.get("source_id", "")
if src and src in ep.id:
route = r
break
if not route and config.webhook_routes:
# Fall back to first route
route = config.webhook_routes[0]
if route:
host = config.webhook_host
port = config.webhook_port
path = route.get("path", "/webhook")
display.write(f" [dim]{host}:{port}{path}[/dim]")
else:
event_types = ep.trigger_config.get("event_types", [])
if event_types:
display.write(f" [dim]events: {', '.join(event_types)}[/dim]")
# ------------------------------------------------------------------
# Public API (called by app.py)
# ------------------------------------------------------------------
def update_active_node(self, node_id: str) -> None:
"""Update the currently active node."""
self.active_node = node_id
if node_id not in self.execution_path:
self.execution_path.append(node_id)
self._display_graph()
def update_execution(self, event) -> None:
"""Update the displayed node status based on execution lifecycle events."""
if event.type == EventType.EXECUTION_STARTED:
self._node_status.clear()
self.execution_path.clear()
entry_node = event.data.get("entry_node") or (
self._graph.entry_node if self.runtime else None
)
if entry_node:
self.update_active_node(entry_node)
elif event.type == EventType.EXECUTION_COMPLETED:
self.active_node = None
self._node_status.clear()
self._display_graph()
elif event.type == EventType.EXECUTION_FAILED:
error = event.data.get("error", "Unknown error")
if self.active_node:
self._node_status[self.active_node] = f"[red]FAILED: {error}[/red]"
self.active_node = None
self._display_graph()
# -- Event handlers called by app.py _handle_event --
def handle_node_loop_started(self, node_id: str) -> None:
"""A node's event loop has started."""
self._node_status[node_id] = "thinking..."
self.update_active_node(node_id)
def handle_node_loop_iteration(self, node_id: str, iteration: int) -> None:
"""A node advanced to a new loop iteration."""
self._node_status[node_id] = f"step {iteration}"
self._display_graph()
def handle_node_loop_completed(self, node_id: str) -> None:
"""A node's event loop completed."""
self._node_status.pop(node_id, None)
if self.active_node == node_id:
self.active_node = None
self._display_graph()
def handle_tool_call(self, node_id: str, tool_name: str, *, started: bool) -> None:
"""Show tool activity next to the active node."""
if started:
self._node_status[node_id] = f"{tool_name}..."
else:
# Restore to generic thinking status after tool completes
self._node_status[node_id] = "thinking..."
self._display_graph()
def handle_stalled(self, node_id: str, reason: str) -> None:
"""Highlight a stalled node."""
self._node_status[node_id] = f"[red]stalled: {reason}[/red]"
self._display_graph()
def handle_edge_traversed(self, source_node: str, target_node: str) -> None:
"""Highlight an edge being traversed."""
self._node_status[source_node] = f"[dim]→ {target_node}[/dim]"
self._display_graph()
-172
View File
@@ -1,172 +0,0 @@
"""
Log formatting utilities and LogPane widget.
The module-level functions (format_event, extract_event_text, format_python_log)
can be used by any widget that needs to render log lines without instantiating LogPane.
"""
import logging
from datetime import datetime
from textual.app import ComposeResult
from textual.containers import Container
from framework.runtime.event_bus import AgentEvent, EventType
from framework.tui.widgets.selectable_rich_log import SelectableRichLog as RichLog
# --- Module-level formatting constants ---
EVENT_FORMAT: dict[EventType, tuple[str, str]] = {
EventType.EXECUTION_STARTED: (">>", "bold cyan"),
EventType.EXECUTION_COMPLETED: ("<<", "bold green"),
EventType.EXECUTION_FAILED: ("!!", "bold red"),
EventType.TOOL_CALL_STARTED: ("->", "yellow"),
EventType.TOOL_CALL_COMPLETED: ("<-", "green"),
EventType.NODE_LOOP_STARTED: ("@@", "cyan"),
EventType.NODE_LOOP_ITERATION: ("..", "dim"),
EventType.NODE_LOOP_COMPLETED: ("@@", "dim"),
EventType.LLM_TURN_COMPLETE: ("", "green"),
EventType.NODE_STALLED: ("!!", "bold yellow"),
EventType.NODE_INPUT_BLOCKED: ("!!", "yellow"),
EventType.GOAL_PROGRESS: ("%%", "blue"),
EventType.GOAL_ACHIEVED: ("**", "bold green"),
EventType.CONSTRAINT_VIOLATION: ("!!", "bold red"),
EventType.STATE_CHANGED: ("~~", "dim"),
EventType.CLIENT_INPUT_REQUESTED: ("??", "magenta"),
}
LOG_LEVEL_COLORS: dict[int, str] = {
logging.DEBUG: "dim",
logging.INFO: "",
logging.WARNING: "yellow",
logging.ERROR: "red",
logging.CRITICAL: "bold red",
}
# --- Module-level formatting functions ---
def extract_event_text(event: AgentEvent) -> str:
"""Extract human-readable text from an event's data dict."""
et = event.type
data = event.data
if et == EventType.EXECUTION_STARTED:
return "Execution started"
elif et == EventType.EXECUTION_COMPLETED:
return "Execution completed"
elif et == EventType.EXECUTION_FAILED:
return f"Execution FAILED: {data.get('error', 'unknown')}"
elif et == EventType.TOOL_CALL_STARTED:
return f"Tool call: {data.get('tool_name', 'unknown')}"
elif et == EventType.TOOL_CALL_COMPLETED:
name = data.get("tool_name", "unknown")
if data.get("is_error"):
preview = str(data.get("result", ""))[:80]
return f"Tool error: {name} - {preview}"
return f"Tool done: {name}"
elif et == EventType.NODE_LOOP_STARTED:
return f"Node started: {event.node_id or 'unknown'}"
elif et == EventType.NODE_LOOP_ITERATION:
return f"{event.node_id or 'unknown'} iteration {data.get('iteration', '?')}"
elif et == EventType.NODE_LOOP_COMPLETED:
return f"Node done: {event.node_id or 'unknown'}"
elif et == EventType.NODE_STALLED:
reason = data.get("reason", "")
node = event.node_id or "unknown"
return f"Node stalled: {node} - {reason}" if reason else f"Node stalled: {node}"
elif et == EventType.NODE_INPUT_BLOCKED:
return f"Node input blocked: {event.node_id or 'unknown'}"
elif et == EventType.GOAL_PROGRESS:
return f"Goal progress: {data.get('progress', '?')}"
elif et == EventType.GOAL_ACHIEVED:
return "Goal achieved"
elif et == EventType.CONSTRAINT_VIOLATION:
return f"Constraint violated: {data.get('description', 'unknown')}"
elif et == EventType.STATE_CHANGED:
return f"State changed: {data.get('key', 'unknown')}"
elif et == EventType.CLIENT_INPUT_REQUESTED:
return "Waiting for user input"
elif et == EventType.LLM_TURN_COMPLETE:
stop = data.get("stop_reason", "?")
model = data.get("model", "?")
inp = data.get("input_tokens", 0)
out = data.get("output_tokens", 0)
return f"{model}{stop} ({inp}+{out} tokens)"
else:
return f"{et.value}: {data}"
def format_event(event: AgentEvent) -> str:
"""Format an AgentEvent as a Rich markup string with timestamp + symbol."""
ts = event.timestamp.strftime("%H:%M:%S")
symbol, color = EVENT_FORMAT.get(event.type, ("--", "dim"))
text = extract_event_text(event)
return f"[dim]{ts}[/dim] [{color}]{symbol} {text}[/{color}]"
def format_python_log(record: logging.LogRecord) -> str:
"""Format a Python log record as a Rich markup string with timestamp and severity color."""
ts = datetime.fromtimestamp(record.created).strftime("%H:%M:%S")
color = LOG_LEVEL_COLORS.get(record.levelno, "")
msg = record.getMessage()
if color:
return f"[dim]{ts}[/dim] [{color}]{record.levelname}[/{color}] {msg}"
else:
return f"[dim]{ts}[/dim] {record.levelname} {msg}"
# --- LogPane widget (kept for backward compatibility) ---
class LogPane(Container):
"""Widget to display logs with reliable rendering."""
DEFAULT_CSS = """
LogPane {
width: 100%;
height: 100%;
}
LogPane > RichLog {
width: 100%;
height: 100%;
background: $surface;
border: none;
scrollbar-background: $panel;
scrollbar-color: $primary;
}
"""
def compose(self) -> ComposeResult:
yield RichLog(id="main-log", highlight=True, markup=True, auto_scroll=False)
def write_event(self, event: AgentEvent) -> None:
"""Format an AgentEvent with timestamp + symbol and write to the log."""
self.write_log(format_event(event))
def write_python_log(self, record: logging.LogRecord) -> None:
"""Format a Python log record with timestamp and severity color."""
self.write_log(format_python_log(record))
def write_log(self, message: str) -> None:
"""Write a log message to the log pane."""
try:
if not self.is_mounted:
return
log = self.query_one("#main-log", RichLog)
if not log.is_mounted:
return
was_at_bottom = log.is_vertical_scroll_end
log.write(message)
if was_at_bottom:
log.scroll_end(animate=False)
except Exception:
pass
@@ -1,229 +0,0 @@
"""
SelectableRichLog - RichLog with mouse-driven text selection and clipboard copy.
Drop-in replacement for RichLog. Click-and-drag to select text, which is
visually highlighted. Press Ctrl+C to copy selection to clipboard (handled
by app.py). Press Escape or single-click to clear selection.
"""
from __future__ import annotations
import subprocess
import sys
from rich.segment import Segment as RichSegment
from rich.style import Style
from textual.geometry import Offset
from textual.selection import Selection
from textual.strip import Strip
from textual.widgets import RichLog
# Highlight style for selected text
_HIGHLIGHT_STYLE = Style(bgcolor="blue", color="white")
class SelectableRichLog(RichLog):
"""RichLog with mouse-driven text selection."""
DEFAULT_CSS = """
SelectableRichLog {
pointer: text;
}
"""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._sel_anchor: Offset | None = None
self._sel_end: Offset | None = None
self._selecting: bool = False
# -- Internal helpers --
def _apply_highlight(self, strip: Strip) -> Strip:
"""Apply highlight with correct precedence (highlight wins over base style)."""
segments = []
for text, style, control in strip._segments:
if control:
segments.append(RichSegment(text, style, control))
else:
new_style = (style + _HIGHLIGHT_STYLE) if style else _HIGHLIGHT_STYLE
segments.append(RichSegment(text, new_style, control))
return Strip(segments, strip.cell_length)
# -- Selection helpers --
@property
def selection(self) -> Selection | None:
"""Build a Selection from current anchor/end, or None if no selection."""
if self._sel_anchor is None or self._sel_end is None:
return None
if self._sel_anchor == self._sel_end:
return None
return Selection.from_offsets(self._sel_anchor, self._sel_end)
def _mouse_to_content(self, event_x: int, event_y: int) -> Offset:
"""Convert viewport mouse coords to content (line, col) coords."""
scroll_x, scroll_y = self.scroll_offset
return Offset(scroll_x + event_x, scroll_y + event_y)
def clear_selection(self) -> None:
"""Clear any active selection."""
had_selection = self._sel_anchor is not None
self._sel_anchor = None
self._sel_end = None
self._selecting = False
if had_selection:
self.refresh()
# -- Mouse handlers (left button only) --
def on_mouse_down(self, event) -> None:
"""Start selection on left mouse button."""
if event.button != 1:
return
self._sel_anchor = self._mouse_to_content(event.x, event.y)
self._sel_end = self._sel_anchor
self._selecting = True
self.capture_mouse()
self.refresh()
def on_mouse_move(self, event) -> None:
"""Extend selection while dragging."""
if not self._selecting:
return
self._sel_end = self._mouse_to_content(event.x, event.y)
self.refresh()
def on_mouse_up(self, event) -> None:
"""End selection on mouse release."""
if not self._selecting:
return
self._selecting = False
self.release_mouse()
# Single-click (no drag) clears selection
if self._sel_anchor == self._sel_end:
self.clear_selection()
# -- Keyboard handlers --
def on_key(self, event) -> None:
"""Clear selection on Escape."""
if event.key == "escape":
self.clear_selection()
# -- Rendering with highlight --
def render_line(self, y: int) -> Strip:
"""Override to apply selection highlight on top of the base strip."""
strip = super().render_line(y)
sel = self.selection
if sel is None:
return strip
# Determine which content line this viewport row corresponds to
_, scroll_y = self.scroll_offset
content_y = scroll_y + y
span = sel.get_span(content_y)
if span is None:
return strip
start_x, end_x = span
cell_len = strip.cell_length
if cell_len == 0:
return strip
scroll_x, _ = self.scroll_offset
# -1 means "to end of content line" — use viewport end
if end_x == -1:
end_x = cell_len
else:
# Convert content-space x to viewport-space x
end_x = end_x - scroll_x
# Convert content-space x to viewport-space x
start_x = start_x - scroll_x
# Clamp to viewport strip bounds
start_x = max(0, start_x)
end_x = min(end_x, cell_len)
if start_x >= end_x:
return strip
# Divide strip into [before, selected, after] and highlight the middle
parts = strip.divide([start_x, end_x])
if len(parts) < 2:
return strip
highlighted_parts: list[Strip] = []
for i, part in enumerate(parts):
if i == 1:
highlighted_parts.append(self._apply_highlight(part))
else:
highlighted_parts.append(part)
return Strip.join(highlighted_parts)
# -- Text extraction & clipboard --
def get_selected_text(self) -> str | None:
"""Extract the plain text of the current selection, or None."""
sel = self.selection
if sel is None:
return None
# Build full text from all lines
all_text = "\n".join(strip.text for strip in self.lines)
try:
extracted = sel.extract(all_text)
except (IndexError, ValueError):
# Selection coordinates can exceed line count when the virtual
# canvas is larger than the actual content (e.g. after scroll).
return None
return extracted if extracted else None
def copy_selection(self) -> str | None:
"""Copy selected text to system clipboard. Returns text or None."""
text = self.get_selected_text()
if not text:
return None
_copy_to_clipboard(text)
return text
def _copy_to_clipboard(text: str) -> None:
"""Copy text to system clipboard using platform-native tools."""
try:
if sys.platform == "darwin":
subprocess.run(["pbcopy"], encoding="utf-8", input=text.encode(), check=True, timeout=5)
elif sys.platform == "win32":
subprocess.run(
["clip.exe"],
encoding="utf-8",
input=text.encode("utf-16le"),
check=True,
timeout=5,
)
elif sys.platform.startswith("linux"):
try:
subprocess.run(
["xclip", "-selection", "clipboard"],
encoding="utf-8",
input=text.encode(),
check=True,
timeout=5,
)
except (subprocess.SubprocessError, FileNotFoundError):
subprocess.run(
["xsel", "--clipboard", "--input"],
encoding="utf-8",
input=text.encode(),
check=True,
timeout=5,
)
except (subprocess.SubprocessError, FileNotFoundError):
pass
-2
View File
@@ -11,12 +11,10 @@ dependencies = [
"litellm>=1.81.0", "litellm>=1.81.0",
"mcp>=1.0.0", "mcp>=1.0.0",
"fastmcp>=2.0.0", "fastmcp>=2.0.0",
"textual>=1.0.0",
"tools", "tools",
] ]
[project.optional-dependencies] [project.optional-dependencies]
tui = ["textual>=0.75.0"]
webhook = ["aiohttp>=3.9.0"] webhook = ["aiohttp>=3.9.0"]
server = ["aiohttp>=3.9.0"] server = ["aiohttp>=3.9.0"]
testing = [ testing = [
-90
View File
@@ -1,90 +0,0 @@
"""Tests for ChatTextArea key handling (Enter submits, Shift+Enter / Ctrl+J insert newlines)."""
import pytest
from textual.app import App, ComposeResult
from framework.tui.widgets.chat_repl import ChatTextArea
class ChatTextAreaApp(App):
"""Minimal app that mounts a ChatTextArea for testing."""
submitted_texts: list[str]
def compose(self) -> ComposeResult:
yield ChatTextArea(id="input")
def on_mount(self) -> None:
self.submitted_texts = []
def on_chat_text_area_submitted(self, message: ChatTextArea.Submitted) -> None:
self.submitted_texts.append(message.text)
@pytest.fixture
def app():
return ChatTextAreaApp()
@pytest.mark.asyncio
async def test_enter_submits_text(app):
"""Pressing Enter should post a Submitted message and clear the widget."""
async with app.run_test() as pilot:
await pilot.press("h", "e", "l", "l", "o")
await pilot.press("enter")
assert app.submitted_texts == ["hello"]
@pytest.mark.asyncio
async def test_enter_on_empty_does_not_submit(app):
"""Pressing Enter with no text should not post a Submitted message."""
async with app.run_test() as pilot:
await pilot.press("enter")
assert app.submitted_texts == []
@pytest.mark.asyncio
async def test_shift_enter_inserts_newline(app):
"""Shift+Enter should insert a newline, not submit."""
async with app.run_test() as pilot:
widget = app.query_one("#input", ChatTextArea)
await pilot.press("a")
await pilot.press("shift+enter")
await pilot.press("b")
assert app.submitted_texts == []
assert "\n" in widget.text
assert widget.text.startswith("a")
assert widget.text.endswith("b")
@pytest.mark.asyncio
async def test_ctrl_j_inserts_newline(app):
"""Ctrl+J should insert a newline (fallback for terminals without Shift+Enter)."""
async with app.run_test() as pilot:
widget = app.query_one("#input", ChatTextArea)
await pilot.press("a")
await pilot.press("ctrl+j")
await pilot.press("b")
assert app.submitted_texts == []
assert "\n" in widget.text
assert widget.text.startswith("a")
assert widget.text.endswith("b")
@pytest.mark.asyncio
async def test_multiline_submit(app):
"""Typing multiline text via Ctrl+J then pressing Enter should submit all lines."""
async with app.run_test() as pilot:
await pilot.press("a")
await pilot.press("ctrl+j")
await pilot.press("b")
await pilot.press("enter")
assert len(app.submitted_texts) == 1
assert app.submitted_texts[0] == "a\nb"
-11
View File
@@ -42,14 +42,3 @@ class TestMCPDependencies:
assert FastMCP is not None assert FastMCP is not None
class TestMCPPackageExports:
"""Tests for the framework.mcp package exports."""
def test_package_importable(self):
"""Test that framework.mcp package can be imported."""
if not MCP_AVAILABLE:
pytest.skip(MCP_SKIP_REASON)
import framework.mcp
assert framework.mcp is not None
Generated
+1 -62
View File
@@ -841,7 +841,6 @@ dependencies = [
{ name = "litellm" }, { name = "litellm" },
{ name = "mcp" }, { name = "mcp" },
{ name = "pydantic" }, { name = "pydantic" },
{ name = "textual" },
{ name = "tools" }, { name = "tools" },
] ]
@@ -854,9 +853,6 @@ testing = [
{ name = "pytest-asyncio" }, { name = "pytest-asyncio" },
{ name = "pytest-xdist" }, { name = "pytest-xdist" },
] ]
tui = [
{ name = "textual" },
]
webhook = [ webhook = [
{ name = "aiohttp" }, { name = "aiohttp" },
] ]
@@ -883,11 +879,9 @@ requires-dist = [
{ name = "pytest", marker = "extra == 'testing'", specifier = ">=8.0" }, { name = "pytest", marker = "extra == 'testing'", specifier = ">=8.0" },
{ name = "pytest-asyncio", marker = "extra == 'testing'", specifier = ">=0.23" }, { name = "pytest-asyncio", marker = "extra == 'testing'", specifier = ">=0.23" },
{ name = "pytest-xdist", marker = "extra == 'testing'", specifier = ">=3.0" }, { name = "pytest-xdist", marker = "extra == 'testing'", specifier = ">=3.0" },
{ name = "textual", specifier = ">=1.0.0" },
{ name = "textual", marker = "extra == 'tui'", specifier = ">=0.75.0" },
{ name = "tools", editable = "tools" }, { name = "tools", editable = "tools" },
] ]
provides-extras = ["tui", "webhook", "server", "testing"] provides-extras = ["webhook", "server", "testing"]
[package.metadata.requires-dev] [package.metadata.requires-dev]
dev = [ dev = [
@@ -1615,18 +1609,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/81/db/e655086b7f3a705df045bf0933bdd9c2f79bb3c97bfef1384598bb79a217/keyring-25.7.0-py3-none-any.whl", hash = "sha256:be4a0b195f149690c166e850609a477c532ddbfbaed96a404d4e43f8d5e2689f", size = 39160, upload-time = "2025-11-16T16:26:08.402Z" }, { url = "https://files.pythonhosted.org/packages/81/db/e655086b7f3a705df045bf0933bdd9c2f79bb3c97bfef1384598bb79a217/keyring-25.7.0-py3-none-any.whl", hash = "sha256:be4a0b195f149690c166e850609a477c532ddbfbaed96a404d4e43f8d5e2689f", size = 39160, upload-time = "2025-11-16T16:26:08.402Z" },
] ]
[[package]]
name = "linkify-it-py"
version = "2.0.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "uc-micro-py" },
]
sdist = { url = "https://files.pythonhosted.org/packages/2a/ae/bb56c6828e4797ba5a4821eec7c43b8bf40f69cda4d4f5f8c8a2810ec96a/linkify-it-py-2.0.3.tar.gz", hash = "sha256:68cda27e162e9215c17d786649d1da0021a451bdc436ef9e0fa0ba5234b9b048", size = 27946, upload-time = "2024-02-04T14:48:04.179Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/04/1e/b832de447dee8b582cac175871d2f6c3d5077cc56d5575cadba1fd1cccfa/linkify_it_py-2.0.3-py3-none-any.whl", hash = "sha256:6bcbc417b0ac14323382aef5c5192c0075bf8a9d6b41820a2b66371eac6b6d79", size = 19820, upload-time = "2024-02-04T14:48:02.496Z" },
]
[[package]] [[package]]
name = "litellm" name = "litellm"
version = "1.81.7" version = "1.81.7"
@@ -1725,11 +1707,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" }, { url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" },
] ]
[package.optional-dependencies]
linkify = [
{ name = "linkify-it-py" },
]
[[package]] [[package]]
name = "markupsafe" name = "markupsafe"
version = "3.0.3" version = "3.0.3"
@@ -1829,18 +1806,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/fd/d9/eaa1f80170d2b7c5ba23f3b59f766f3a0bb41155fbc32a69adfa1adaaef9/mcp-1.26.0-py3-none-any.whl", hash = "sha256:904a21c33c25aa98ddbeb47273033c435e595bbacfdb177f4bd87f6dceebe1ca", size = 233615, upload-time = "2026-01-24T19:40:30.652Z" }, { url = "https://files.pythonhosted.org/packages/fd/d9/eaa1f80170d2b7c5ba23f3b59f766f3a0bb41155fbc32a69adfa1adaaef9/mcp-1.26.0-py3-none-any.whl", hash = "sha256:904a21c33c25aa98ddbeb47273033c435e595bbacfdb177f4bd87f6dceebe1ca", size = 233615, upload-time = "2026-01-24T19:40:30.652Z" },
] ]
[[package]]
name = "mdit-py-plugins"
version = "0.5.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b2/fd/a756d36c0bfba5f6e39a1cdbdbfdd448dc02692467d83816dff4592a1ebc/mdit_py_plugins-0.5.0.tar.gz", hash = "sha256:f4918cb50119f50446560513a8e311d574ff6aaed72606ddae6d35716fe809c6", size = 44655, upload-time = "2025-08-11T07:25:49.083Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/fb/86/dd6e5db36df29e76c7a7699123569a4a18c1623ce68d826ed96c62643cae/mdit_py_plugins-0.5.0-py3-none-any.whl", hash = "sha256:07a08422fc1936a5d26d146759e9155ea466e842f5ab2f7d2266dd084c8dab1f", size = 57205, upload-time = "2025-08-11T07:25:47.597Z" },
]
[[package]] [[package]]
name = "mdurl" name = "mdurl"
version = "0.1.2" version = "0.1.2"
@@ -3434,23 +3399,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/9e/4b/0b7d5920f2be5e42d72bdfc44a9fae57b422668bfc8dacdf2f74886f6daa/stripe-14.3.0-py3-none-any.whl", hash = "sha256:3e36b68b256c8970e99b703e195d947e2a2919095758788c7074ac4485ac255e", size = 2106980, upload-time = "2026-01-28T21:20:27.566Z" }, { url = "https://files.pythonhosted.org/packages/9e/4b/0b7d5920f2be5e42d72bdfc44a9fae57b422668bfc8dacdf2f74886f6daa/stripe-14.3.0-py3-none-any.whl", hash = "sha256:3e36b68b256c8970e99b703e195d947e2a2919095758788c7074ac4485ac255e", size = 2106980, upload-time = "2026-01-28T21:20:27.566Z" },
] ]
[[package]]
name = "textual"
version = "7.5.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py", extra = ["linkify"] },
{ name = "mdit-py-plugins" },
{ name = "platformdirs" },
{ name = "pygments" },
{ name = "rich" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/9f/38/7d169a765993efde5095c70a668bf4f5831bb7ac099e932f2783e9b71abf/textual-7.5.0.tar.gz", hash = "sha256:c730cba1e3d704e8f1ca915b6a3af01451e3bca380114baacf6abf87e9dac8b6", size = 1592319, upload-time = "2026-01-30T13:46:39.881Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9c/78/96ddb99933e11d91bc6e05edae23d2687e44213066bcbaca338898c73c47/textual-7.5.0-py3-none-any.whl", hash = "sha256:849dfee9d705eab3b2d07b33152b7bd74fb1f5056e002873cc448bce500c6374", size = 718164, upload-time = "2026-01-30T13:46:37.635Z" },
]
[[package]] [[package]]
name = "tiktoken" name = "tiktoken"
version = "0.12.0" version = "0.12.0"
@@ -3747,15 +3695,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/b0/003792df09decd6849a5e39c28b513c06e84436a54440380862b5aeff25d/tzdata-2025.3-py2.py3-none-any.whl", hash = "sha256:06a47e5700f3081aab02b2e513160914ff0694bce9947d6b76ebd6bf57cfc5d1", size = 348521, upload-time = "2025-12-13T17:45:33.889Z" }, { url = "https://files.pythonhosted.org/packages/c7/b0/003792df09decd6849a5e39c28b513c06e84436a54440380862b5aeff25d/tzdata-2025.3-py2.py3-none-any.whl", hash = "sha256:06a47e5700f3081aab02b2e513160914ff0694bce9947d6b76ebd6bf57cfc5d1", size = 348521, upload-time = "2025-12-13T17:45:33.889Z" },
] ]
[[package]]
name = "uc-micro-py"
version = "1.0.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/91/7a/146a99696aee0609e3712f2b44c6274566bc368dfe8375191278045186b8/uc-micro-py-1.0.3.tar.gz", hash = "sha256:d321b92cff673ec58027c04015fcaa8bb1e005478643ff4a500882eaab88c48a", size = 6043, upload-time = "2024-02-09T16:52:01.654Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/37/87/1f677586e8ac487e29672e4b17455758fce261de06a0d086167bb760361a/uc_micro_py-1.0.3-py3-none-any.whl", hash = "sha256:db1dffff340817673d7b466ec86114a9dc0e9d4d9b5ba229d9d60e5c12600cd5", size = 6229, upload-time = "2024-02-09T16:52:00.371Z" },
]
[[package]] [[package]]
name = "urllib3" name = "urllib3"
version = "2.6.3" version = "2.6.3"