Compare commits

..

46 Commits

Author SHA1 Message Date
Timothy 9cdc4f2006 feat: add profiler to the config 2026-03-16 16:20:43 -07:00
Timothy @aden 1e74f194a1 Update authors in MCP Server Registry document 2026-03-13 12:15:50 -07:00
Timothy 08157d2bd6 chore(docs): bounty program - standard 2026-03-13 12:10:21 -07:00
Timothy ef036257a9 docs(mcp): MCP integration PRD 2026-03-13 11:56:33 -07:00
Timothy 16ce984c74 chore: add default context limit on windows quickstart 2026-03-13 10:04:49 -07:00
RichardTang-Aden 52b1a3f472 Merge pull request #6282 from aden-hive/feat/refactor-session
Release / Create Release (push) Waiting to run
Refactor session lifecycle with flowchart planning and triggers
2026-03-12 21:15:10 -07:00
Richard Tang 079e00c8f7 Merge remote-tracking branch 'origin/main' into feat/refactor-session 2026-03-12 21:13:15 -07:00
Richard Tang 60bba38941 chore: ruff lint 2026-03-12 21:01:47 -07:00
Richard Tang ea8e7b11c6 Merge remote-tracking branch 'origin/feature/flowchart-linked-experimental' into feat/refactor-session 2026-03-12 20:54:08 -07:00
Richard Tang 3dc2b25b01 fix: adding the trigger helpers 2026-03-12 20:53:45 -07:00
Richard Tang 2ad78ec8a2 Merge remote-tracking branch 'origin/feature/flowchart-linked-experimental' into feat/refactor-session 2026-03-12 20:48:09 -07:00
Richard Tang 9bfddec322 fix: missing _FLOWCHART_TYPES reference 2026-03-12 20:43:03 -07:00
Richard Tang 51fdc4ddde fix: always new session for new agent 2026-03-12 20:34:42 -07:00
Richard Tang 04685d33ca fix: solve the problem from merge conflict 2026-03-12 20:28:25 -07:00
Richard Tang 729a0e0cec fix: resolve merge conflict 2026-03-12 20:23:58 -07:00
Richard Tang 993b31f19b Merge remote-tracking branch 'origin/feature/flowchart-linked-experimental' into feat/refactor-session 2026-03-12 20:00:45 -07:00
Ishan Chaurasia 9d1f268078 fix(server): honor session_id in one-step session creation (#6233)
Align POST /api/sessions behavior across queen-only and one-step worker creation so callers can rely on deterministic session IDs. Add a regression test covering the forwarded session_id contract.

Made-with: Cursor
2026-03-13 10:43:12 +08:00
Ishan Chaurasia 336557d7c7 fix: pass browser_wait text as data (#6235)
Pass browser_wait text through Playwright's function argument channel so quoted and multiline strings do not break the generated wait expression. Add a regression test covering text that previously would have been interpolated unsafely.

Made-with: Cursor
2026-03-13 10:08:16 +08:00
Richard Tang c2c4929de8 feat: remove the phase in the label 2026-03-12 18:55:24 -07:00
Richard Tang f9d5f95936 Merge remote-tracking branch 'origin/feature/flowchart-linked-experimental' into feat/refactor-session 2026-03-12 18:32:26 -07:00
Richard Tang 944567dc31 chore: ruff lint 2026-03-12 16:23:13 -07:00
Richard Tang 674cf05601 feat: track the number of runs 2026-03-12 15:19:13 -07:00
Richard Tang 6fa71fa27d feat: track queen phase by message 2026-03-12 14:58:35 -07:00
Richard Tang 8c7065ad37 refactor: remove the parts conversion logic 2026-03-12 14:36:27 -07:00
Richard Tang a18ed5bbe6 feat: restore queen phase 2026-03-12 14:29:01 -07:00
Richard Tang ed3d4bfe33 feat: resume cold session from event logs 2026-03-12 14:07:57 -07:00
Richard Tang 596ce9878d feat: unique run id 2026-03-12 11:09:36 -07:00
Richard Tang 726016d24a fix: remove the duplicated session logic 2026-03-11 17:11:03 -07:00
Richard Tang 4895cea08a chore: lint and micro-fix 2026-03-11 16:55:29 -07:00
Richard Tang c9723a3ff2 feat(wip): always resume the previous session 2026-03-11 16:48:31 -07:00
Richard Tang 6cb73a6fea refactor: remove the remaining old trigger format and change the trigger format in examples to the latest format 2026-03-11 16:13:37 -07:00
Richard Tang 0c7f43f595 refactor: remove reference of the unused session judge 2026-03-11 16:01:00 -07:00
Richard Tang ea5cfcc5d6 refactor: remove the unused session judge 2026-03-11 15:57:19 -07:00
Richard Tang 34e85019c3 feat: stop supporting the old scheduler 2026-03-11 15:54:48 -07:00
Richard Tang c979dba958 fix: reference error from the rename 2026-03-11 14:33:42 -07:00
Richard Tang b4caa045e1 Merge remote-tracking branch 'origin/main' into feat/agent-trigger 2026-03-11 14:32:36 -07:00
bryan cba0ec110f fix: linter update 2026-03-08 19:37:57 -07:00
bryan 0256e0c944 Merge branch 'main' into feat/agent-trigger 2026-03-08 19:28:36 -07:00
bryan 4d9d0362a0 fixes to make the timer trigger properly 2026-03-08 18:44:42 -07:00
bryan f474d0bc8e Merge branch 'main' into feat/agent-trigger 2026-03-08 16:59:14 -07:00
bryan 6a0681b9aa feat: fixing phase 4, continuing to test 2026-03-08 16:52:00 -07:00
bryan c7e634851b feat: phase 4 of trigger plan 2026-03-06 19:21:32 -08:00
bryan cdb7155960 feat: phase 3 of trigger plan 2026-03-06 18:07:26 -08:00
bryan 3f7790c26a feat: phase 2 of trigger plan 2026-03-06 17:22:57 -08:00
bryan 5676b115f4 Merge branch 'feat/queen-responsibility' into feat/agent-trigger 2026-03-06 16:58:06 -08:00
bryan 61c59d57e8 feat: phase 1 of trigger plan 2026-03-06 15:11:36 -08:00
67 changed files with 3844 additions and 1465 deletions
@@ -0,0 +1,78 @@
name: Standard Bounty
description: A bounty task for general framework contributions (not integration-specific)
title: "[Bounty]: "
labels: []
body:
- type: markdown
attributes:
value: |
## Standard Bounty
This issue is part of the [Bounty Program](../../docs/bounty-program/README.md).
**Claim this bounty** by commenting below — a maintainer will assign you within 24 hours.
- type: dropdown
id: bounty-size
attributes:
label: Bounty Size
options:
- "Small (10 pts)"
- "Medium (30 pts)"
- "Large (75 pts)"
- "Extreme (150 pts)"
validations:
required: true
- type: dropdown
id: difficulty
attributes:
label: Difficulty
options:
- Easy
- Medium
- Hard
validations:
required: true
- type: textarea
id: description
attributes:
label: Description
description: What needs to be done to complete this bounty.
placeholder: |
Describe the specific task, including:
- What the contributor needs to do
- Links to relevant files in the repo
- Any context or motivation for the change
validations:
required: true
- type: textarea
id: acceptance-criteria
attributes:
label: Acceptance Criteria
description: What "done" looks like. The PR must meet all criteria.
placeholder: |
- [ ] Criterion 1
- [ ] Criterion 2
- [ ] CI passes
validations:
required: true
- type: textarea
id: relevant-files
attributes:
label: Relevant Files
description: Links to files or directories related to this bounty.
placeholder: |
- `path/to/file.py`
- `path/to/directory/`
- type: textarea
id: resources
attributes:
label: Resources
description: Links to docs, issues, or external references that will help.
placeholder: |
- Related issue: #XXXX
- Docs: https://...
+10 -10
View File
@@ -5,20 +5,20 @@ help: ## Show this help
awk 'BEGIN {FS = ":.*?## "}; {printf " \033[36m%-15s\033[0m %s\n", $$1, $$2}'
lint: ## Run ruff linter and formatter (with auto-fix)
cd core && ruff check --fix .
cd tools && ruff check --fix .
cd core && ruff format .
cd tools && ruff format .
cd core && uv run ruff check --fix .
cd tools && uv run ruff check --fix .
cd core && uv run ruff format .
cd tools && uv run ruff format .
format: ## Run ruff formatter
cd core && ruff format .
cd tools && ruff format .
cd core && uv run ruff format .
cd tools && uv run ruff format .
check: ## Run all checks without modifying files (CI-safe)
cd core && ruff check .
cd tools && ruff check .
cd core && ruff format --check .
cd tools && ruff format --check .
cd core && uv run ruff check .
cd tools && uv run ruff check .
cd core && uv run ruff format --check .
cd tools && uv run ruff format --check .
test: ## Run all tests (core + tools, excludes live)
cd core && uv run python -m pytest tests/ -v
+27
View File
@@ -16,6 +16,7 @@ class AgentEntry:
description: str
category: str
session_count: int = 0
run_count: int = 0
node_count: int = 0
tool_count: int = 0
tags: list[str] = field(default_factory=list)
@@ -52,6 +53,31 @@ def _count_sessions(agent_name: str) -> int:
return sum(1 for d in sessions_dir.iterdir() if d.is_dir() and d.name.startswith("session_"))
def _count_runs(agent_name: str) -> int:
"""Count unique run_ids across all sessions for an agent."""
sessions_dir = Path.home() / ".hive" / "agents" / agent_name / "sessions"
if not sessions_dir.exists():
return 0
run_ids: set[str] = set()
for session_dir in sessions_dir.iterdir():
if not session_dir.is_dir() or not session_dir.name.startswith("session_"):
continue
# runs.jsonl lives inside workspace subdirectories
for runs_file in session_dir.rglob("runs.jsonl"):
try:
for line in runs_file.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
record = json.loads(line)
rid = record.get("run_id")
if rid:
run_ids.add(rid)
except Exception:
continue
return len(run_ids)
def _extract_agent_stats(agent_path: Path) -> tuple[int, int, list[str]]:
"""Extract node count, tool count, and tags from an agent directory.
@@ -139,6 +165,7 @@ def discover_agents() -> dict[str, list[AgentEntry]]:
description=desc,
category=category,
session_count=_count_sessions(path.name),
run_count=_count_runs(path.name),
node_count=node_count,
tool_count=tool_count,
tags=tags,
+1 -2
View File
@@ -14,8 +14,7 @@ queen_goal = Goal(
id="queen-manager",
name="Queen Manager",
description=(
"Manage the worker agent lifecycle and serve as the user's primary "
"interactive interface. Triage health escalations from the judge."
"Manage the worker agent lifecycle and serve as the user's primary interactive interface."
),
success_criteria=[],
constraints=[],
+70 -8
View File
@@ -110,6 +110,10 @@ _QUEEN_STAGING_TOOLS = [
"stop_worker_and_edit",
"stop_worker_and_plan",
"write_to_diary", # Episodic memory — available in all phases
# Trigger management
"set_trigger",
"remove_trigger",
"list_triggers",
]
# Running phase: worker is executing — monitor and control.
@@ -126,11 +130,16 @@ _QUEEN_RUNNING_TOOLS = [
"stop_worker_and_edit",
"stop_worker_and_plan",
"get_worker_status",
"run_agent_with_input",
"inject_worker_message",
# Monitoring
"get_worker_health_summary",
"notify_operator",
"write_to_diary", # Episodic memory — available in all phases
# Trigger management
"set_trigger",
"remove_trigger",
"list_triggers",
]
@@ -496,8 +505,8 @@ nodes/__init__.py
- Goal description, success criteria values, constraint values, edge \
definitions, identity_prompt in agent.py
- CLI options in __main__.py
- For async entry points (timers/webhooks), add AsyncEntryPointSpec \
and AgentRuntimeConfig to agent.py
- For triggers (timers/webhooks), add entries to triggers.json in the \
agent's export directory
Do NOT modify or rewrite:
- Import statements at top of agent.py (they are correct)
@@ -660,6 +669,9 @@ The agent is loaded and ready to run. You can inspect it and launch it:
- stop_worker_and_plan() Go to PLANNING phase to discuss changes with the user \
first (DEFAULT for most modification requests)
- stop_worker_and_edit() Go to BUILDING phase for immediate, specific fixes
- set_trigger(trigger_id, trigger_type?, trigger_config?) Activate a trigger (timer)
- remove_trigger(trigger_id) Deactivate a trigger
- list_triggers() List all triggers and their active/inactive status
You do NOT have write tools. To modify the agent, prefer \
stop_worker_and_plan() unless the user gave a specific instruction.
@@ -682,6 +694,15 @@ with the user first (DEFAULT for most modification requests)
You do NOT have write tools. To modify the agent, prefer \
stop_worker_and_plan() unless the user gave a specific instruction. \
To just stop without modifying, call stop_worker().
- stop_worker_and_edit() Stop the worker and switch back to BUILDING phase
- set_trigger(trigger_id, trigger_type?, trigger_config?) Activate a trigger (timer)
- remove_trigger(trigger_id) Deactivate a trigger
- list_triggers() List all triggers and their active/inactive status
You do NOT have write tools or agent construction tools. \
If you need to modify the agent, call stop_worker_and_edit() to switch back \
to BUILDING phase. To stop the worker and ask the user what to do next, call \
stop_worker() to return to STAGING phase.
"""
# -- Behavior shared across all phases --
@@ -968,6 +989,33 @@ Use stop_worker_and_edit() only when:
- The user gave a specific, concrete instruction ("add save_data to the gather node")
- You already discussed the fix in a previous planning session
- The change is trivial and unambiguous (rename, toggle a flag)
## Trigger Management
Use list_triggers() to see available triggers from the loaded worker.
Use set_trigger(trigger_id) to activate a timer. Once active, triggers \
fire periodically and inject [TRIGGER: ...] messages so you can decide \
whether to call run_agent_with_input(task).
### When the user says "Enable trigger <id>" (or clicks Enable in the UI):
1. Call get_worker_status(focus="memory") to check if the worker has \
saved configuration (rules, preferences, settings from a prior run).
2. If memory contains saved config: compose a task string from it \
(e.g. "Process inbox emails using saved rules") and call \
set_trigger(trigger_id, task="...") immediately. Tell the user the \
trigger is now active and what schedule it uses. Do NOT ask them to \
provide the task you derive it from memory.
3. If memory is empty (no prior run): tell the user the agent needs to \
run once first so its configuration can be saved. Offer to run it now. \
Once the worker finishes, enable the trigger.
4. If the user just provided config this session (rules/task context \
already in conversation): use that directly, no memory lookup needed. \
Enable the trigger immediately.
Never ask "what should the task be?" when enabling a trigger for an \
agent with a clear purpose. The task string is a brief description of \
what the worker does, derived from its saved state or your current context.
"""
# -- RUNNING phase behavior --
@@ -982,7 +1030,6 @@ NOT ask the user directly.
You wake up when:
- The user explicitly addresses you
- A worker escalation arrives (`[WORKER_ESCALATION_REQUEST]`)
- An escalation ticket arrives from the judge
- The worker finishes (`[WORKER_TERMINAL]`)
If the user asks for progress, call get_worker_status() ONCE and report. \
@@ -1060,6 +1107,21 @@ When the user asks to fix, change, modify, or update the loaded worker \
**Default: use stop_worker_and_plan().** Most modification requests need \
discussion first. Only use stop_worker_and_edit() when the user gave a \
specific, unambiguous instruction or you already agreed on the fix.
## Trigger Handling
You will receive [TRIGGER: ...] messages when a scheduled timer fires. \
These are framework-level signals, not user messages.
Rules:
- Check get_worker_status() before calling run_agent_with_input(task). If the worker \
is already RUNNING, decide: skip this trigger, or note it for after completion.
- When multiple [TRIGGER] messages arrive at once, read them all before acting. \
Batch your response do not call run_agent_with_input() once per trigger.
- If a trigger fires but the task no longer makes sense (e.g., user changed \
config since last run), skip it and inform the user.
- Never disable a trigger without telling the user. Use remove_trigger() only \
when explicitly asked or when the trigger is clearly obsolete.
"""
# -- Backward-compatible composed versions (used by queen_node.system_prompt default) --
@@ -1123,8 +1185,8 @@ ticket_triage_node = NodeSpec(
id="ticket_triage",
name="Ticket Triage",
description=(
"Queen's triage node. Receives an EscalationTicket from the Health Judge "
"via event-driven entry point and decides: dismiss or notify the operator."
"Queen's triage node. Receives an EscalationTicket via event-driven "
"entry point and decides: dismiss or notify the operator."
),
node_type="event_loop",
client_facing=True, # Operator can chat with queen once connected (Ctrl+Q)
@@ -1138,8 +1200,8 @@ ticket_triage_node = NodeSpec(
),
tools=["notify_operator"],
system_prompt="""\
You are the Queen. The Worker Health Judge has escalated a worker \
issue to you. The ticket is in your memory under key "ticket". Read it carefully.
You are the Queen. A worker health issue has been escalated to you. \
The ticket is in your memory under key "ticket". Read it carefully.
## Dismiss criteria — do NOT call notify_operator:
- severity is "low" AND steps_since_last_accept < 8
@@ -1178,7 +1240,7 @@ queen_node = NodeSpec(
description=(
"User's primary interactive interface with full coding capability. "
"Can build agents directly or delegate to the worker. Manages the "
"worker agent lifecycle and triages health escalations from the judge."
"worker agent lifecycle."
),
node_type="event_loop",
client_facing=True,
@@ -332,81 +332,46 @@ class MyAgent:
default_agent = MyAgent()
```
## agent.py — Async Entry Points Variant
## triggers.json — Timer and Webhook Triggers
When an agent needs timers, webhooks, or event-driven triggers, add
`async_entry_points` and optionally `runtime_config` as module-level variables.
These are IN ADDITION to the standard variables above.
When an agent needs timers, webhooks, or event-driven triggers, create a
`triggers.json` file in the agent's directory (alongside `agent.py`).
The queen loads these at session start and the user can manage them via
the `set_trigger` / `remove_trigger` tools at runtime.
```python
# Additional imports for async entry points
from framework.graph.edge import GraphSpec, AsyncEntryPointSpec
from framework.runtime.agent_runtime import (
AgentRuntime, AgentRuntimeConfig, create_agent_runtime,
)
# ... (goal, nodes, edges, entry_node, entry_points, etc. as above) ...
# Async entry points — event-driven triggers
async_entry_points = [
# Timer with cron: daily at 9am
AsyncEntryPointSpec(
id="daily-check",
name="Daily Check",
entry_node="process-node",
trigger_type="timer",
trigger_config={"cron": "0 9 * * *"},
isolation_level="shared",
max_concurrent=1,
),
# Timer with fixed interval: every 20 minutes
AsyncEntryPointSpec(
id="scheduled-check",
name="Scheduled Check",
entry_node="process-node",
trigger_type="timer",
trigger_config={"interval_minutes": 20, "run_immediately": False},
isolation_level="shared",
max_concurrent=1,
),
# Event: reacts to webhook events
AsyncEntryPointSpec(
id="webhook-event",
name="Webhook Event Handler",
entry_node="process-node",
trigger_type="event",
trigger_config={"event_types": ["webhook_received"]},
isolation_level="shared",
max_concurrent=10,
),
```json
[
{
"id": "daily-check",
"name": "Daily Check",
"trigger_type": "timer",
"trigger_config": {"cron": "0 9 * * *"},
"task": "Run the daily check process"
},
{
"id": "scheduled-check",
"name": "Scheduled Check",
"trigger_type": "timer",
"trigger_config": {"interval_minutes": 20},
"task": "Run the scheduled check"
},
{
"id": "webhook-event",
"name": "Webhook Event Handler",
"trigger_type": "webhook",
"trigger_config": {"event_types": ["webhook_received"]},
"task": "Process incoming webhook event"
}
]
# Webhook server config (only needed if using webhooks)
runtime_config = AgentRuntimeConfig(
webhook_host="127.0.0.1",
webhook_port=8080,
webhook_routes=[
{
"source_id": "my-source",
"path": "/webhooks/my-source",
"methods": ["POST"],
},
],
)
```
**Key rules for async entry points:**
- `async_entry_points` is a list of `AsyncEntryPointSpec` (NOT `EntryPointSpec`)
- `runtime_config` is `AgentRuntimeConfig` (NOT `RuntimeConfig` from config.py)
- Valid trigger_types: `timer`, `event`, `webhook`, `manual`, `api`
- Valid isolation_levels: `isolated`, `shared`, `synchronized`
**Key rules for triggers.json:**
- Valid trigger_types: `timer`, `webhook`
- Timer trigger_config (cron): `{"cron": "0 9 * * *"}` — standard 5-field cron expression
- Timer trigger_config (interval): `{"interval_minutes": float, "run_immediately": bool}`
- Event trigger_config: `{"event_types": ["webhook_received"], "filter_stream": "...", "filter_node": "..."}`
- Use `isolation_level="shared"` for async entry points that need to read
the primary session's memory (e.g., user-configured rules)
- The `_build_graph()` method passes `async_entry_points` to GraphSpec
- Reference: `exports/gmail_inbox_guardian/agent.py`
- Timer trigger_config (interval): `{"interval_minutes": float}`
- Each trigger must have a unique `id`
- The `task` field describes what the worker should do when the trigger fires
- Triggers are persisted back to `triggers.json` when modified via queen tools
## __init__.py
@@ -453,21 +418,6 @@ __all__ = [
]
```
**If the agent uses async entry points**, also import and export:
```python
from .agent import (
...,
async_entry_points,
runtime_config, # Only if using webhooks
)
__all__ = [
...,
"async_entry_points",
"runtime_config",
]
```
## __main__.py
```python
@@ -31,8 +31,7 @@ module-level variables via `getattr()`:
| `conversation_mode` | no | not passed | Isolated mode (no context carryover) |
| `identity_prompt` | no | not passed | No agent-level identity |
| `loop_config` | no | `{}` | No iteration limits |
| `async_entry_points` | no | `[]` | No async triggers (timers, webhooks, events) |
| `runtime_config` | no | `None` | No webhook server |
| `triggers.json` (file) | no | not present | No triggers (timers, webhooks) |
**CRITICAL:** `__init__.py` MUST import and re-export ALL of these from
`agent.py`. Missing exports silently fall back to defaults, causing
@@ -257,44 +256,28 @@ Multiple ON_SUCCESS edges from same source → parallel execution via asyncio.ga
Judge is the SOLE acceptance mechanism — no ad-hoc framework gating.
## Async Entry Points (Webhooks, Timers, Events)
## Triggers (Timers, Webhooks)
For agents that react to external events, use `AsyncEntryPointSpec`:
For agents that react to external events, create a `triggers.json` file
in the agent's export directory:
```python
from framework.graph.edge import AsyncEntryPointSpec
from framework.runtime.agent_runtime import AgentRuntimeConfig
# Timer trigger (cron or interval)
async_entry_points = [
AsyncEntryPointSpec(
id="daily-check",
name="Daily Check",
entry_node="process",
trigger_type="timer",
trigger_config={"cron": "0 9 * * *"}, # daily at 9am
isolation_level="shared",
)
```json
[
{
"id": "daily-check",
"name": "Daily Check",
"trigger_type": "timer",
"trigger_config": {"cron": "0 9 * * *"},
"task": "Run the daily check process"
}
]
# Webhook server (optional)
runtime_config = AgentRuntimeConfig(
webhook_host="127.0.0.1",
webhook_port=8080,
webhook_routes=[{"source_id": "gmail", "path": "/webhooks/gmail", "methods": ["POST"]}],
)
```
### Key Fields
- `trigger_type`: `"timer"`, `"event"`, `"webhook"`, `"manual"`
- `trigger_type`: `"timer"` or `"webhook"`
- `trigger_config`: `{"cron": "0 9 * * *"}` or `{"interval_minutes": 20}`
- `isolation_level`: `"shared"` (recommended), `"isolated"`, `"synchronized"`
- `event_types`: For event triggers, e.g., `["webhook_received"]`
### Exports Required
Both `async_entry_points` and `runtime_config` must be exported from `__init__.py`.
See `exports/gmail_inbox_guardian/agent.py` for complete example.
- `task`: describes what the worker should do when the trigger fires
- Triggers can also be created/removed at runtime via `set_trigger` / `remove_trigger` queen tools
## Tool Discovery
@@ -1,8 +1,8 @@
"""Queen's ticket receiver entry point.
When the Worker Health Judge emits a WORKER_ESCALATION_TICKET event on the
shared EventBus, this entry point fires and routes to the ``ticket_triage``
node, where the Queen deliberates and decides whether to notify the operator.
When a WORKER_ESCALATION_TICKET event is emitted on the shared EventBus,
this entry point fires and routes to the ``ticket_triage`` node, where the
Queen deliberates and decides whether to notify the operator.
Isolation level is ``isolated`` the queen's triage memory is kept separate
from the worker's shared memory. Each ticket triage runs in its own context.
+13 -85
View File
@@ -322,7 +322,11 @@ class AsyncEntryPointSpec(BaseModel):
id: str = Field(description="Unique identifier for this entry point")
name: str = Field(description="Human-readable name")
entry_node: str = Field(description="Node ID to start execution from")
entry_node: str = Field(
default="",
description="Deprecated: Node ID to start execution from. "
"Triggers are graph-level; worker always enters at GraphSpec.entry_node.",
)
trigger_type: str = Field(
default="manual",
description="How this entry point is triggered: webhook, api, timer, event, manual",
@@ -331,6 +335,10 @@ class AsyncEntryPointSpec(BaseModel):
default_factory=dict,
description="Trigger-specific configuration (e.g., webhook URL, timer interval)",
)
task: str = Field(
default="",
description="Worker task string when this trigger fires autonomously",
)
isolation_level: str = Field(
default="shared", description="State isolation: isolated, shared, or synchronized"
)
@@ -368,28 +376,8 @@ class GraphSpec(BaseModel):
edges=[...],
)
For multi-entry-point agents (concurrent streams):
GraphSpec(
id="support-agent-graph",
goal_id="support-001",
entry_node="process-webhook", # Default entry
async_entry_points=[
AsyncEntryPointSpec(
id="webhook",
name="Zendesk Webhook",
entry_node="process-webhook",
trigger_type="webhook",
),
AsyncEntryPointSpec(
id="api",
name="API Handler",
entry_node="process-request",
trigger_type="api",
),
],
nodes=[...],
edges=[...],
)
Triggers (timer, webhook, event) are now defined in ``triggers.json``
alongside the agent directory, not embedded in the graph spec.
"""
id: str
@@ -402,12 +390,6 @@ class GraphSpec(BaseModel):
default_factory=dict,
description="Named entry points for resuming execution. Format: {name: node_id}",
)
async_entry_points: list[AsyncEntryPointSpec] = Field(
default_factory=list,
description=(
"Asynchronous entry points for concurrent execution streams (used with AgentRuntime)"
),
)
terminal_nodes: list[str] = Field(
default_factory=list, description="IDs of nodes that end execution"
)
@@ -486,17 +468,6 @@ class GraphSpec(BaseModel):
return node
return None
def has_async_entry_points(self) -> bool:
"""Check if this graph uses async entry points (multi-stream execution)."""
return len(self.async_entry_points) > 0
def get_async_entry_point(self, entry_point_id: str) -> AsyncEntryPointSpec | None:
"""Get an async entry point by ID."""
for ep in self.async_entry_points:
if ep.id == entry_point_id:
return ep
return None
def get_outgoing_edges(self, node_id: str) -> list[EdgeSpec]:
"""Get all edges leaving a node, sorted by priority."""
edges = [e for e in self.edges if e.source == node_id]
@@ -587,37 +558,6 @@ class GraphSpec(BaseModel):
if not self.get_node(self.entry_node):
errors.append(f"Entry node '{self.entry_node}' not found")
# Check async entry points
seen_entry_ids = set()
for entry_point in self.async_entry_points:
# Check for duplicate IDs
if entry_point.id in seen_entry_ids:
errors.append(f"Duplicate async entry point ID: '{entry_point.id}'")
seen_entry_ids.add(entry_point.id)
# Check entry node exists
if not self.get_node(entry_point.entry_node):
errors.append(
f"Async entry point '{entry_point.id}' references "
f"missing node '{entry_point.entry_node}'"
)
# Validate isolation level
valid_isolation = {"isolated", "shared", "synchronized"}
if entry_point.isolation_level not in valid_isolation:
errors.append(
f"Async entry point '{entry_point.id}' has invalid isolation_level "
f"'{entry_point.isolation_level}'. Valid: {valid_isolation}"
)
# Validate trigger type
valid_triggers = {"webhook", "api", "timer", "event", "manual"}
if entry_point.trigger_type not in valid_triggers:
errors.append(
f"Async entry point '{entry_point.id}' has invalid trigger_type "
f"'{entry_point.trigger_type}'. Valid: {valid_triggers}"
)
# Check terminal nodes exist
for term in self.terminal_nodes:
if not self.get_node(term):
@@ -646,10 +586,6 @@ class GraphSpec(BaseModel):
for entry_point_node in self.entry_points.values():
to_visit.append(entry_point_node)
# Add all async entry points as valid starting points
for async_entry in self.async_entry_points:
to_visit.append(async_entry.entry_node)
# Traverse from all entry points
while to_visit:
current = to_visit.pop()
@@ -666,18 +602,10 @@ class GraphSpec(BaseModel):
for sub_agent_id in sub_agents:
reachable.add(sub_agent_id)
# Build set of async entry point nodes for quick lookup
async_entry_nodes = {ep.entry_node for ep in self.async_entry_points}
for node in self.nodes:
if node.id not in reachable:
# Skip if node is a pause node, entry point target, or async entry
# (pause/resume architecture and async entry points make reachable)
if (
node.id in self.pause_nodes
or node.id in self.entry_points.values()
or node.id in async_entry_nodes
):
# Skip if node is a pause node or entry point target
if node.id in self.pause_nodes or node.id in self.entry_points.values():
continue
errors.append(f"Node '{node.id}' is unreachable from entry")
+79 -6
View File
@@ -36,6 +36,21 @@ from framework.runtime.llm_debug_logger import log_llm_turn
logger = logging.getLogger(__name__)
@dataclass
class TriggerEvent:
"""A framework-level trigger signal (timer tick or webhook hit).
Triggers are queued separately from user messages / external events
and drained atomically so the LLM sees all pending triggers at once.
"""
trigger_type: str # "timer" | "webhook"
source_id: str # entry point ID or webhook route ID
payload: dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
# Pattern for detecting context-window-exceeded errors across LLM providers.
_CONTEXT_TOO_LARGE_RE = re.compile(
r"context.{0,20}(length|window|limit|size)|"
@@ -346,6 +361,7 @@ class EventLoopNode(NodeProtocol):
self._tool_executor = tool_executor
self._conversation_store = conversation_store
self._injection_queue: asyncio.Queue[tuple[str, bool]] = asyncio.Queue()
self._trigger_queue: asyncio.Queue[TriggerEvent] = asyncio.Queue()
# Client-facing input blocking state
self._input_ready = asyncio.Event()
self._awaiting_input = False
@@ -631,6 +647,8 @@ class EventLoopNode(NodeProtocol):
# 6b. Drain injection queue
await self._drain_injection_queue(conversation)
# 6b1. Drain trigger queue (framework-level signals)
await self._drain_trigger_queue(conversation)
# 6b2. Dynamic tool refresh (mode switching)
if ctx.dynamic_tools_provider is not None:
@@ -656,8 +674,20 @@ class EventLoopNode(NodeProtocol):
conversation.update_system_prompt(_new_prompt)
logger.info("[%s] Dynamic prompt updated (phase switch)", node_id)
# 6c. Publish iteration event
await self._publish_iteration(stream_id, node_id, iteration, execution_id)
# 6c. Publish iteration event (with per-iteration metadata when available)
_iter_meta = None
if ctx.iteration_metadata_provider is not None:
try:
_iter_meta = ctx.iteration_metadata_provider()
except Exception:
pass
await self._publish_iteration(
stream_id,
node_id,
iteration,
execution_id,
extra_data=_iter_meta,
)
# 6d. Pre-turn compaction check (tiered)
_compacted_this_iter = False
@@ -1738,6 +1768,15 @@ class EventLoopNode(NodeProtocol):
await self._injection_queue.put((content, is_client_input))
self._input_ready.set()
async def inject_trigger(self, trigger: TriggerEvent) -> None:
"""Inject a framework-level trigger into the running queen loop.
Triggers are queued separately from user messages and drained
atomically via _drain_trigger_queue().
"""
await self._trigger_queue.put(trigger)
self._input_ready.set()
def signal_shutdown(self) -> None:
"""Signal the node to exit its loop cleanly.
@@ -1788,9 +1827,9 @@ class EventLoopNode(NodeProtocol):
Returns True if input arrived, False if shutdown was signaled.
"""
# If messages arrived while the LLM was processing, skip blocking
# entirely — the next _drain_injection_queue() will pick them up.
if not self._injection_queue.empty():
# If messages or triggers arrived while the LLM was processing, skip
# blocking — the next drain pass will pick them up.
if not self._injection_queue.empty() or not self._trigger_queue.empty():
return True
# Clear BEFORE emitting so that synchronous handlers (e.g. the
@@ -4047,6 +4086,34 @@ class EventLoopNode(NodeProtocol):
break
return count
async def _drain_trigger_queue(self, conversation: NodeConversation) -> int:
"""Drain all pending trigger events as a single batched user message.
Multiple triggers are merged so the LLM sees them atomically and can
reason about all pending triggers before acting.
"""
triggers: list[TriggerEvent] = []
while not self._trigger_queue.empty():
try:
triggers.append(self._trigger_queue.get_nowait())
except asyncio.QueueEmpty:
break
if not triggers:
return 0
parts: list[str] = []
for t in triggers:
task = t.payload.get("task", "")
task_line = f"\nTask: {task}" if task else ""
payload_str = json.dumps(t.payload, default=str)
parts.append(f"[TRIGGER: {t.trigger_type}/{t.source_id}]{task_line}\n{payload_str}")
combined = "\n\n".join(parts)
logger.info("[drain] %d trigger(s): %s", len(triggers), combined[:200])
await conversation.add_user_message(combined)
return len(triggers)
async def _check_pause(
self,
ctx: NodeContext,
@@ -4181,7 +4248,12 @@ class EventLoopNode(NodeProtocol):
await conversation.add_user_message(result.inject)
async def _publish_iteration(
self, stream_id: str, node_id: str, iteration: int, execution_id: str = ""
self,
stream_id: str,
node_id: str,
iteration: int,
execution_id: str = "",
extra_data: dict | None = None,
) -> None:
if self._event_bus:
await self._event_bus.emit_node_loop_iteration(
@@ -4189,6 +4261,7 @@ class EventLoopNode(NodeProtocol):
node_id=node_id,
iteration=iteration,
execution_id=execution_id,
extra_data=extra_data,
)
async def _publish_llm_turn_complete(
+3
View File
@@ -148,6 +148,7 @@ class GraphExecutor:
tool_provider_map: dict[str, str] | None = None,
dynamic_tools_provider: Callable | None = None,
dynamic_prompt_provider: Callable | None = None,
iteration_metadata_provider: Callable | None = None,
):
"""
Initialize the executor.
@@ -193,6 +194,7 @@ class GraphExecutor:
self.tool_provider_map = tool_provider_map
self.dynamic_tools_provider = dynamic_tools_provider
self.dynamic_prompt_provider = dynamic_prompt_provider
self.iteration_metadata_provider = iteration_metadata_provider
# Parallel execution settings
self.enable_parallel_execution = enable_parallel_execution
@@ -1809,6 +1811,7 @@ class GraphExecutor:
shared_node_registry=self.node_registry, # For subagent escalation routing
dynamic_tools_provider=self.dynamic_tools_provider,
dynamic_prompt_provider=self.dynamic_prompt_provider,
iteration_metadata_provider=self.iteration_metadata_provider,
)
VALID_NODE_TYPES = {
+5
View File
@@ -565,6 +565,11 @@ class NodeContext:
# staging / running) without restarting the conversation.
dynamic_prompt_provider: Any = None # Callable[[], str] | None
# Per-iteration metadata provider — when set, EventLoopNode merges
# the returned dict into node_loop_iteration event data. Used by
# the queen to record the current phase per iteration.
iteration_metadata_provider: Any = None # Callable[[], dict] | None
@dataclass
class NodeResult:
+1 -33
View File
@@ -1,33 +1 @@
"""Framework-level worker monitoring package.
Provides the Worker Health Judge: a reusable secondary graph that attaches to
any worker agent runtime and monitors its execution health via periodic log
inspection. Emits structured EscalationTickets when degradation is detected.
Usage::
from framework.monitoring import HEALTH_JUDGE_ENTRY_POINT, judge_goal, judge_graph
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
# Register tools bound to the worker runtime's EventBus
monitoring_registry = ToolRegistry()
register_worker_monitoring_tools(monitoring_registry, worker_runtime._event_bus, storage_path)
# Load judge as secondary graph on the worker runtime
await worker_runtime.add_graph(
graph_id="judge",
graph=judge_graph,
goal=judge_goal,
entry_points={"health_check": HEALTH_JUDGE_ENTRY_POINT},
storage_subpath="graphs/judge",
)
"""
from .judge import HEALTH_JUDGE_ENTRY_POINT, judge_goal, judge_graph, judge_node
__all__ = [
"HEALTH_JUDGE_ENTRY_POINT",
"judge_goal",
"judge_graph",
"judge_node",
]
"""Framework-level worker monitoring package."""
-258
View File
@@ -1,258 +0,0 @@
"""Worker Health Judge — framework-level reusable monitoring graph.
Attaches to any worker agent runtime as a secondary graph. Fires on a
2-minute timer, reads the worker's session logs via ``get_worker_health_summary``,
accumulates observations in a continuous conversation context, and emits a
structured ``EscalationTicket`` when it detects a degradation pattern.
Usage::
from framework.monitoring import judge_graph, judge_goal, HEALTH_JUDGE_ENTRY_POINT
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
# Register tools bound to the worker runtime's event bus
monitoring_registry = ToolRegistry()
register_worker_monitoring_tools(
monitoring_registry, worker_runtime._event_bus, storage_path
)
monitoring_tools = list(monitoring_registry.get_tools().values())
monitoring_executor = monitoring_registry.get_executor()
# Load judge as secondary graph on the worker runtime
await worker_runtime.add_graph(
graph_id="judge",
graph=judge_graph,
goal=judge_goal,
entry_points={"health_check": HEALTH_JUDGE_ENTRY_POINT},
storage_subpath="graphs/judge",
)
Design:
- ``isolation_level="isolated"`` the judge has its own memory, not
polluting the worker's shared memory namespace.
- ``conversation_mode="continuous"`` the judge's conversation carries
across timer ticks. The conversation IS the judge's memory. It tracks
trends by referring to its own prior messages ("Last check I saw 47
steps; now 52; 5 new steps, 3 RETRY").
- No shared memory keys. No external state files.
"""
from __future__ import annotations
from framework.graph import Constraint, Goal, NodeSpec, SuccessCriterion
from framework.graph.edge import AsyncEntryPointSpec, GraphSpec
# ---------------------------------------------------------------------------
# Goal
# ---------------------------------------------------------------------------
judge_goal = Goal(
id="worker-health-monitor",
name="Worker Health Monitor",
description=(
"Periodically assess the health of the worker agent by reading its "
"execution logs. Detect degradation patterns (excessive retries, "
"stalls, doom loops) and emit structured EscalationTickets when the "
"worker needs attention."
),
success_criteria=[
SuccessCriterion(
id="accurate-detection",
description="Only escalates genuine degradation, not normal retry cycles",
metric="false_positive_rate",
target="low",
weight=0.5,
),
SuccessCriterion(
id="timely-detection",
description="Detects genuine stalls within 2 timer ticks (≤4 minutes)",
metric="detection_latency_minutes",
target="<=4",
weight=0.5,
),
],
constraints=[
Constraint(
id="conservative-escalation",
description=(
"Do not escalate on a single bad verdict or a brief stall. "
"Require clear patterns (10+ consecutive bad verdicts or 4+ minute stall) "
"before creating a ticket."
),
constraint_type="hard",
category="quality",
),
Constraint(
id="complete-ticket",
description=(
"Every EscalationTicket must have all required fields filled. "
"Do not emit partial or placeholder tickets."
),
constraint_type="hard",
category="correctness",
),
],
)
# ---------------------------------------------------------------------------
# Node
# ---------------------------------------------------------------------------
judge_node = NodeSpec(
id="judge",
name="Worker Health Judge",
description=(
"Autonomous health monitor for worker agents. Reads execution logs "
"on each timer tick, compares to prior observations (via conversation "
"history), and emits a structured EscalationTicket when a genuine "
"degradation pattern is detected."
),
node_type="event_loop",
client_facing=False, # Autonomous monitor, not interactive
max_node_visits=0, # Unbounded — runs on every timer tick
input_keys=[],
output_keys=["health_verdict"],
nullable_output_keys=["health_verdict"],
success_criteria=(
"A clear health verdict is produced each check: either 'healthy' with "
"a brief observation, or a complete EscalationTicket is emitted via "
"emit_escalation_ticket and health_verdict describes the issue."
),
tools=[
"get_worker_health_summary",
"emit_escalation_ticket",
],
system_prompt="""\
You are the Worker Health Judge. You run every 2 minutes alongside a worker \
agent to monitor its execution health.
# Your Role
You observe the worker's iteration patterns over time and escalate only when \
you see genuine degradation not normal retry cycles. Your conversation history \
IS your memory. On each check, refer to your previous observations to track trends.
# Check Procedure
On each timer tick (every 2 minutes):
## Step 1: Read health snapshot
Call get_worker_health_summary() with no arguments to auto-discover the active \
session. This returns:
- worker_agent_id: the worker's agent name — use this for ticket identity fields
- worker_graph_id: the worker's primary graph ID — use this for ticket identity fields
- session_id: the session being monitored use this for worker_session_id in tickets
- total_steps: how many log steps have been recorded
- recent_verdicts: list of recent ACCEPT/RETRY/CONTINUE verdicts
- steps_since_last_accept: consecutive non-ACCEPT steps
- stall_minutes: wall-clock since last step (null if active)
- evidence_snippet: recent LLM output
## Step 2: Compare to prior check
Look at your conversation history. What was total_steps last time?
- If total_steps is UNCHANGED from prior check AND prior check was also unchanged:
STALL confirmed (worker has produced no new iterations in 4+ minutes).
Escalate with severity="high" or "critical" depending on stall duration.
- If total_steps increased: worker is making progress. Examine verdicts.
## Step 3: Analyze verdict pattern
- Healthy: Mix of ACCEPT and RETRY, steps_since_last_accept < 5. No action.
- Warning: steps_since_last_accept is 5-9. Note it, no escalation yet.
- Degraded: steps_since_last_accept >= 10. Examine evidence_snippet.
- If evidence shows the agent is making real progress (complex reasoning,
exploring solutions, productive tool use): may be a hard problem. Note it.
- If evidence shows a loop (same error, same tool call, no new information):
Escalate with severity="medium" or "high".
- Critical: steps_since_last_accept >= 20, OR stall_minutes >= 4.
Escalate with severity="critical".
## Step 4: Decide
### If healthy:
set_output("health_verdict", "healthy: <brief observation>")
Done.
### If escalating:
Build an EscalationTicket JSON string with ALL required fields:
{
"worker_agent_id": "<worker_agent_id from get_worker_health_summary>",
"worker_session_id": "<session_id from get_worker_health_summary>",
"worker_node_id": "<worker_graph_id from get_worker_health_summary>",
"worker_graph_id": "<worker_graph_id from get_worker_health_summary>",
"severity": "<low|medium|high|critical>",
"cause": "<what you observed — concrete, specific>",
"judge_reasoning": "<why you decided to escalate, not just dismiss>",
"suggested_action": "<what you recommend: restart, human review, etc.>",
"recent_verdicts": [<list from get_worker_health_summary>],
"total_steps_checked": <int>,
"steps_since_last_accept": <int>,
"stall_minutes": <float or null>,
"evidence_snippet": "<from get_worker_health_summary>"
}
Call: emit_escalation_ticket(ticket_json=<the JSON string above>)
Then: set_output("health_verdict", "escalated: <one-line summary>")
# Severity Guide
- low: Mild concern, worth noting. 5-9 consecutive bad verdicts.
- medium: Clear degradation pattern. 10-15 bad verdicts or brief stall (1-2 min).
- high: Serious issue. 15+ bad verdicts or stall 2-4 minutes or clear doom loop.
- critical: Worker is definitively stuck. 20+ bad verdicts or stall > 4 minutes.
# Conservative Bias
You MUST resist the urge to escalate prematurely. Worker agents naturally retry.
A node may legitimately need 5-8 retries before succeeding. Do not escalate unless:
1. The pattern is clear and sustained across your observation window, AND
2. The evidence shows no genuine progress
One missed escalation is less costly than two false alarms. The Queen will filter \
further. But do not be passive genuine stalls and doom loops must be caught.
# Rules
- Never escalate on the FIRST check unless stall_minutes > 4
- Always call get_worker_health_summary FIRST before deciding anything
- All ticket fields are REQUIRED do not submit partial tickets
- After any emit_escalation_ticket call, always set_output to complete the check
""",
)
# ---------------------------------------------------------------------------
# Entry Point
# ---------------------------------------------------------------------------
HEALTH_JUDGE_ENTRY_POINT = AsyncEntryPointSpec(
id="health_check",
name="Worker Health Check",
entry_node="judge",
trigger_type="timer",
trigger_config={
"interval_minutes": 2,
"run_immediately": True, # Fire immediately to establish a baseline
},
isolation_level="isolated", # Own memory namespace, not polluting worker's
)
# ---------------------------------------------------------------------------
# Graph
# ---------------------------------------------------------------------------
judge_graph = GraphSpec(
id="judge-graph",
goal_id=judge_goal.id,
version="1.0.0",
entry_node="judge",
entry_points={"health_check": "judge"},
terminal_nodes=["judge"], # Judge node can terminate after each check
pause_nodes=[],
nodes=[judge_node],
edges=[],
conversation_mode="continuous", # Conversation persists across timer ticks
async_entry_points=[HEALTH_JUDGE_ENTRY_POINT],
loop_config={
"max_iterations": 10, # One check shouldn't take many turns
"max_tool_calls_per_turn": 3, # get_summary + optionally emit_ticket
"max_context_tokens": 16000, # Compact — judge only needs recent context
},
)
+3
View File
@@ -14,10 +14,13 @@ from framework.observability.logging import (
get_trace_context,
set_trace_context,
)
from framework.observability.profiling import get_profile_snapshot, start_debug_profiling
__all__ = [
"configure_logging",
"get_trace_context",
"set_trace_context",
"clear_trace_context",
"start_debug_profiling",
"get_profile_snapshot",
]
+280
View File
@@ -0,0 +1,280 @@
"""
Event loop profiling for debug mode.
Activated by ``hive open --debug`` (or ``hive serve --debug``).
What this provides
------------------
- **Event loop lag sampling** a background task wakes every 100 ms and
measures how late it actually woke. Lag > 50 ms means something blocked
the loop (sync file I/O, CPU-bound work, etc.).
- **asyncio slow-callback capture** when asyncio debug mode is on the
runtime logs every callback that held the loop for > ``slow_callback_duration``
seconds. We intercept those log records and store them in a ring buffer so
the ``/api/debug/profile`` endpoint can surface them.
- **asyncio debug mode** ``loop.set_debug(True)`` enables asyncio's own
built-in coroutine-origin tracking and slow-callback warnings.
Usage (framework-internal)
--------------------------
Inside an async context (after ``asyncio.run()`` enters):
from framework.observability.profiling import start_debug_profiling
monitor = await start_debug_profiling()
...
# on shutdown:
await monitor.stop()
monitor.print_summary()
"""
from __future__ import annotations
import asyncio
import contextlib
import logging
import time
from datetime import UTC, datetime
from typing import Any
logger = logging.getLogger(__name__)
# Ring-buffer sizes
_MAX_LAG_SAMPLES = 600 # 60 s at 100 ms interval
_MAX_SLOW_CALLBACKS = 100
# Thresholds
_LAG_WARN_THRESHOLD_MS = 50.0 # log a warning when loop lag exceeds this
_SAMPLE_INTERVAL_S = 0.1 # how often to probe the event loop (100 ms)
class _SlowCallbackCapture(logging.Handler):
"""Intercepts asyncio's built-in slow-callback warnings.
asyncio emits them at WARNING level from the ``asyncio`` logger when
``loop.set_debug(True)`` is active and a callback holds the loop for
longer than ``loop.slow_callback_duration`` seconds.
Each captured entry is tagged ``blocking=True`` only if the lag sampler
measured a spike at the same time which is the definitive signal that
the loop was actually held rather than the task being slow async I/O.
"""
def __init__(self, monitor: EventLoopMonitor) -> None:
super().__init__(level=logging.WARNING)
self._monitor = monitor
def emit(self, record: logging.LogRecord) -> None:
msg = record.getMessage()
# asyncio formats these as: "Executing <...> took X.XXX seconds"
if "took" in msg and "second" in msg:
# Check whether the lag sampler saw a spike in the past two probe
# intervals. If yes this is a real blocking call; if no the task
# was just slow async I/O that properly yielded the loop.
now = time.monotonic()
window = _SAMPLE_INTERVAL_S * 2
recent_lag = max(
(lag for ts, lag in self._monitor._lag_spikes if now - ts <= window),
default=0.0,
)
self._monitor._record_slow_callback(
{
"timestamp": datetime.now(UTC).isoformat(),
"message": msg,
"source": "asyncio",
"lag_during_ms": round(recent_lag, 2),
# True only when lag also spiked → real event loop block
"blocking": recent_lag > _LAG_WARN_THRESHOLD_MS,
}
)
class EventLoopMonitor:
"""Measures event-loop lag by scheduling a repeating probe task.
The probe sleeps for ``_SAMPLE_INTERVAL_S`` seconds. The *actual* sleep
duration minus the *expected* duration gives the lag introduced by any
blocking work that ran between two consecutive iterations of the loop.
All stats are available via :meth:`snapshot` and are exposed through the
``/api/debug/profile`` HTTP endpoint.
"""
def __init__(self) -> None:
self._samples: list[float] = [] # lag in ms, newest last
self._lag_spikes: list[tuple[float, float]] = [] # (monotonic_ts, lag_ms)
self._slow_callbacks: list[dict[str, Any]] = [] # ring buffer
self._start_time: float = 0.0
self._task: asyncio.Task | None = None
self._log_handler: _SlowCallbackCapture | None = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def start(self) -> None:
"""Start the background probe and attach the slow-callback log handler."""
self._start_time = time.monotonic()
# Attach handler to asyncio logger to capture slow-callback warnings
self._log_handler = _SlowCallbackCapture(self)
asyncio_logger = logging.getLogger("asyncio")
asyncio_logger.addHandler(self._log_handler)
self._task = asyncio.create_task(self._probe_loop(), name="hive-loop-monitor")
logger.info(
"Event loop monitor started — lag threshold %.0f ms, sample interval %.0f ms",
_LAG_WARN_THRESHOLD_MS,
_SAMPLE_INTERVAL_S * 1000,
)
async def stop(self) -> None:
"""Cancel the probe task and detach the log handler."""
if self._task and not self._task.done():
self._task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._task
if self._log_handler is not None:
asyncio_logger = logging.getLogger("asyncio")
asyncio_logger.removeHandler(self._log_handler)
self._log_handler = None
# ------------------------------------------------------------------
# Internal probe
# ------------------------------------------------------------------
async def _probe_loop(self) -> None:
loop = asyncio.get_running_loop()
while True:
expected_wake = loop.time() + _SAMPLE_INTERVAL_S
await asyncio.sleep(_SAMPLE_INTERVAL_S)
actual_wake = loop.time()
lag_ms = max(0.0, (actual_wake - expected_wake) * 1000)
self._samples.append(lag_ms)
if len(self._samples) > _MAX_LAG_SAMPLES:
self._samples = self._samples[-_MAX_LAG_SAMPLES:]
if lag_ms > _LAG_WARN_THRESHOLD_MS:
self._lag_spikes.append((time.monotonic(), lag_ms))
if len(self._lag_spikes) > 200:
self._lag_spikes = self._lag_spikes[-200:]
logger.warning(
"Event loop lag %.1f ms — possible blocking call on main thread",
lag_ms,
extra={"event": "loop_lag", "lag_ms": round(lag_ms, 1)},
)
def _record_slow_callback(self, entry: dict[str, Any]) -> None:
self._slow_callbacks.append(entry)
if len(self._slow_callbacks) > _MAX_SLOW_CALLBACKS:
self._slow_callbacks = self._slow_callbacks[-_MAX_SLOW_CALLBACKS:]
# ------------------------------------------------------------------
# Stats
# ------------------------------------------------------------------
def snapshot(self) -> dict[str, Any]:
"""Return current profiling stats as a plain dict (JSON-serialisable)."""
samples = list(self._samples)
uptime = round(time.monotonic() - self._start_time, 1) if self._start_time else 0.0
if not samples:
return {
"enabled": True,
"uptime_seconds": uptime,
"samples": 0,
"slow_callbacks": list(self._slow_callbacks),
}
sorted_s = sorted(samples)
n = len(sorted_s)
def pct(p: float) -> float:
return round(sorted_s[min(int(n * p), n - 1)], 2)
over = sum(1 for s in samples if s > _LAG_WARN_THRESHOLD_MS)
callbacks = list(self._slow_callbacks)
return {
"enabled": True,
"uptime_seconds": uptime,
"samples": n,
"mean_lag_ms": round(sum(samples) / n, 2),
"p50_lag_ms": pct(0.50),
"p95_lag_ms": pct(0.95),
"p99_lag_ms": pct(0.99),
"worst_lag_ms": round(max(samples), 2),
"slow_samples": over,
"slow_sample_pct": round(over / n * 100, 1),
"slow_threshold_ms": _LAG_WARN_THRESHOLD_MS,
# All asyncio slow-task warnings captured
"slow_callbacks": callbacks,
# Subset where lag also spiked → real event loop blocking
"blocking_callbacks": [cb for cb in callbacks if cb.get("blocking")],
# Subset that are just slow async I/O — loop was free the whole time
"benign_slow_tasks": [cb for cb in callbacks if not cb.get("blocking")],
}
def print_summary(self) -> None:
"""Print a human-readable summary to stdout (called on server shutdown)."""
snap = self.snapshot()
if snap["samples"] == 0:
print("[profiler] No samples collected.")
return
print("\n[profiler] Event loop summary")
print(f" Uptime: {snap['uptime_seconds']}s")
print(f" Samples: {snap['samples']}")
print(f" Mean lag: {snap['mean_lag_ms']} ms")
p50, p95, p99 = snap["p50_lag_ms"], snap["p95_lag_ms"], snap["p99_lag_ms"]
print(f" p50 / p95 / p99: {p50} / {p95} / {p99} ms")
print(f" Worst lag: {snap['worst_lag_ms']} ms")
slow_pct = snap["slow_sample_pct"]
thresh = snap["slow_threshold_ms"]
print(f" Slow samples: {snap['slow_samples']} ({slow_pct}%) > {thresh} ms")
blocking = snap.get("blocking_callbacks", [])
benign = snap.get("benign_slow_tasks", [])
if blocking:
print(f" BLOCKING callbacks: {len(blocking)} ← investigate these")
for cb in blocking[-5:]:
lag = cb.get("lag_during_ms", "?")
print(f" [lag {lag}ms] {cb['timestamp']} {cb['message'][:120]}")
if benign:
print(
f" Slow async tasks (benign): {len(benign)}"
" — loop was free, just slow I/O"
)
print()
# Module-level singleton so routes can access it without passing it around.
_monitor: EventLoopMonitor | None = None
async def start_debug_profiling() -> EventLoopMonitor:
"""Enable asyncio debug mode and start the event loop monitor.
Must be called from inside a running event loop (i.e. inside
``asyncio.run()``). Returns the monitor so the caller can stop it and
print a summary on shutdown.
"""
global _monitor
loop = asyncio.get_running_loop()
# asyncio built-in debug mode: enables coroutine-origin tracking and
# logs any callback/coroutine that holds the loop for > slow_callback_duration.
loop.set_debug(True)
loop.slow_callback_duration = 0.1 # 100 ms — asyncio default; avoids noise
_monitor = EventLoopMonitor()
await _monitor.start()
return _monitor
def get_profile_snapshot() -> dict[str, Any]:
"""Return the current monitor snapshot, or a disabled sentinel if not running."""
if _monitor is None:
return {"enabled": False}
return _monitor.snapshot()
+12
View File
@@ -1644,6 +1644,13 @@ def cmd_serve(args: argparse.Namespace) -> int:
async def run_server():
manager = app["manager"]
# Start event loop profiling when --debug is active.
_profiler = None
if getattr(args, "debug", False):
from framework.observability.profiling import start_debug_profiling
_profiler = await start_debug_profiling()
# Preload agents specified via --agent
for agent_path in args.agent:
try:
@@ -1673,6 +1680,8 @@ def cmd_serve(args: argparse.Namespace) -> int:
if has_frontend:
print(f"Dashboard: {dashboard_url}")
print(f"Health: {dashboard_url}/api/health")
if _profiler is not None:
print(f"Profiler: {dashboard_url}/api/debug/profile")
print(f"Agents loaded: {sum(1 for s in manager.list_sessions() if s.worker_runtime)}")
print()
print("Press Ctrl+C to stop")
@@ -1687,6 +1696,9 @@ def cmd_serve(args: argparse.Namespace) -> int:
except asyncio.CancelledError:
pass
finally:
if _profiler is not None:
await _profiler.stop()
_profiler.print_summary()
await manager.shutdown_all()
await runner.cleanup()
-65
View File
@@ -16,7 +16,6 @@ from framework.credentials.validation import (
from framework.graph import Goal
from framework.graph.edge import (
DEFAULT_MAX_TOKENS,
AsyncEntryPointSpec,
EdgeCondition,
EdgeSpec,
GraphSpec,
@@ -570,9 +569,6 @@ class AgentInfo:
constraints: list[dict]
required_tools: list[str]
has_tools_module: bool
# Multi-entry-point support
async_entry_points: list[dict] = field(default_factory=list)
is_multi_entry_point: bool = False
@dataclass
@@ -630,22 +626,6 @@ def load_agent_export(data: str | dict) -> tuple[GraphSpec, Goal]:
)
edges.append(edge)
# Build AsyncEntryPointSpec objects for multi-entry-point support
async_entry_points = []
for aep_data in graph_data.get("async_entry_points", []):
async_entry_points.append(
AsyncEntryPointSpec(
id=aep_data["id"],
name=aep_data.get("name", aep_data["id"]),
entry_node=aep_data["entry_node"],
trigger_type=aep_data.get("trigger_type", "manual"),
trigger_config=aep_data.get("trigger_config", {}),
isolation_level=aep_data.get("isolation_level", "shared"),
priority=aep_data.get("priority", 0),
max_concurrent=aep_data.get("max_concurrent", 10),
)
)
# Build GraphSpec
graph = GraphSpec(
id=graph_data.get("id", "agent-graph"),
@@ -653,7 +633,6 @@ def load_agent_export(data: str | dict) -> tuple[GraphSpec, Goal]:
version=graph_data.get("version", "1.0.0"),
entry_node=graph_data.get("entry_node", ""),
entry_points=graph_data.get("entry_points", {}), # Support pause/resume architecture
async_entry_points=async_entry_points, # Support multi-entry-point agents
terminal_nodes=graph_data.get("terminal_nodes", []),
pause_nodes=graph_data.get("pause_nodes", []), # Support pause/resume architecture
nodes=nodes,
@@ -805,8 +784,6 @@ class AgentRunner:
# AgentRuntime — unified execution path for all agents
self._agent_runtime: AgentRuntime | None = None
self._uses_async_entry_points = self.graph.has_async_entry_points()
# Pre-load validation: structural checks + credentials.
# Fails fast with actionable guidance — no MCP noise on screen.
run_preload_validation(
@@ -965,7 +942,6 @@ class AgentRunner:
"version": "1.0.0",
"entry_node": getattr(agent_module, "entry_node", nodes[0].id),
"entry_points": getattr(agent_module, "entry_points", {}),
"async_entry_points": getattr(agent_module, "async_entry_points", []),
"terminal_nodes": getattr(agent_module, "terminal_nodes", []),
"pause_nodes": getattr(agent_module, "pause_nodes", []),
"nodes": nodes,
@@ -1451,21 +1427,7 @@ class AgentRunner:
event_bus=None,
) -> None:
"""Set up multi-entry-point execution using AgentRuntime."""
# Convert AsyncEntryPointSpec to EntryPointSpec for AgentRuntime
entry_points = []
for async_ep in self.graph.async_entry_points:
ep = EntryPointSpec(
id=async_ep.id,
name=async_ep.name,
entry_node=async_ep.entry_node,
trigger_type=async_ep.trigger_type,
trigger_config=async_ep.trigger_config,
isolation_level=async_ep.isolation_level,
priority=async_ep.priority,
max_concurrent=async_ep.max_concurrent,
max_resurrections=async_ep.max_resurrections,
)
entry_points.append(ep)
# Always create a primary entry point for the graph's entry node.
# For multi-entry-point agents this ensures the primary path (e.g.
@@ -1772,19 +1734,6 @@ class AgentRunner:
for edge in self.graph.edges
]
# Build async entry points info
async_entry_points_info = [
{
"id": ep.id,
"name": ep.name,
"entry_node": ep.entry_node,
"trigger_type": ep.trigger_type,
"isolation_level": ep.isolation_level,
"max_concurrent": ep.max_concurrent,
}
for ep in self.graph.async_entry_points
]
return AgentInfo(
name=self.graph.id,
description=self.graph.description,
@@ -1811,8 +1760,6 @@ class AgentRunner:
],
required_tools=sorted(required_tools),
has_tools_module=(self.agent_path / "tools.py").exists(),
async_entry_points=async_entry_points_info,
is_multi_entry_point=self._uses_async_entry_points,
)
def validate(self) -> ValidationResult:
@@ -2127,18 +2074,6 @@ Respond with JSON only:
trigger_type="manual",
isolation_level="shared",
)
for aep in runner.graph.async_entry_points:
entry_points[aep.id] = EntryPointSpec(
id=aep.id,
name=aep.name,
entry_node=aep.entry_node,
trigger_type=aep.trigger_type,
trigger_config=aep.trigger_config,
isolation_level=aep.isolation_level,
priority=aep.priority,
max_concurrent=aep.max_concurrent,
)
await runtime.add_graph(
graph_id=gid,
graph=runner.graph,
+2 -2
View File
@@ -454,11 +454,11 @@ An agent has requested handoff to the Hive Coder (via the `escalate` synthetic t
## Worker Health Monitoring
These events form the **judge → queen → operator** escalation pipeline.
These events form the **queen → operator** escalation pipeline.
### `worker_escalation_ticket`
The Worker Health Judge has detected a degradation pattern and is escalating to the Queen.
A worker degradation pattern has been detected and is being escalated to the Queen.
| Data Field | Type | Description |
| ---------- | ------ | ------------------------------------ |
+5 -3
View File
@@ -8,6 +8,7 @@ while preserving the goal-driven approach.
import asyncio
import logging
import time
import uuid
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import datetime
@@ -822,7 +823,8 @@ class AgentRuntime:
if stream is None:
raise ValueError(f"Entry point '{entry_point_id}' not found")
return await stream.execute(input_data, correlation_id, session_state)
run_id = uuid.uuid4().hex[:12]
return await stream.execute(input_data, correlation_id, session_state, run_id=run_id)
async def trigger_and_wait(
self,
@@ -1359,8 +1361,8 @@ class AgentRuntime:
allowed_keys = set(entry_node.input_keys)
# Search primary graph's streams for an active session.
# Skip isolated streams (e.g. health judge) — they have their own
# session directories and must never be used as a shared session.
# Skip isolated streams — they have their own session directories
# and must never be used as a shared session.
all_streams: list[tuple[str, ExecutionStream]] = []
for _gid, reg in self._graphs.items():
for ep_id, stream in reg.streams.items():
+5 -5
View File
@@ -1,4 +1,4 @@
"""EscalationTicket — structured schema for worker health judge escalations."""
"""EscalationTicket — structured schema for worker health escalations."""
from __future__ import annotations
@@ -10,10 +10,10 @@ from pydantic import BaseModel, Field
class EscalationTicket(BaseModel):
"""Structured escalation report emitted by the Worker Health Judge.
"""Structured escalation report for worker health monitoring.
The judge must fill every field before calling emit_escalation_ticket.
Pydantic validation rejects partial tickets, preventing impulsive escalation.
All fields must be filled before calling emit_escalation_ticket.
Pydantic validation rejects partial tickets.
"""
ticket_id: str = Field(default_factory=lambda: str(uuid4()))
@@ -25,7 +25,7 @@ class EscalationTicket(BaseModel):
worker_node_id: str
worker_graph_id: str
# Problem characterization (filled by judge via LLM deliberation)
# Problem characterization
severity: Literal["low", "medium", "high", "critical"]
cause: str # Human-readable: "Node has produced 18 RETRY verdicts..."
judge_reasoning: str # Judge's own deliberation chain
+170 -5
View File
@@ -97,6 +97,7 @@ class EventType(StrEnum):
# Client I/O (client_facing=True nodes only)
CLIENT_OUTPUT_DELTA = "client_output_delta"
CLIENT_INPUT_REQUESTED = "client_input_requested"
CLIENT_INPUT_RECEIVED = "client_input_received"
# Internal node observability (client_facing=False nodes)
NODE_INTERNAL_OUTPUT = "node_internal_output"
@@ -104,7 +105,7 @@ class EventType(StrEnum):
NODE_STALLED = "node_stalled"
NODE_TOOL_DOOM_LOOP = "node_tool_doom_loop"
# Judge decisions
# Judge decisions (implicit judge in event loop nodes)
JUDGE_VERDICT = "judge_verdict"
# Output tracking
@@ -126,7 +127,7 @@ class EventType(StrEnum):
# Escalation (agent requests handoff to queen)
ESCALATION_REQUESTED = "escalation_requested"
# Worker health monitoring (judge → queen → operator)
# Worker health monitoring
WORKER_ESCALATION_TICKET = "worker_escalation_ticket"
QUEEN_INTERVENTION_REQUESTED = "queen_intervention_requested"
@@ -152,6 +153,13 @@ class EventType(StrEnum):
# Subagent reports (one-way progress updates from sub-agents)
SUBAGENT_REPORT = "subagent_report"
# Trigger lifecycle (queen-level triggers / heartbeats)
TRIGGER_AVAILABLE = "trigger_available"
TRIGGER_ACTIVATED = "trigger_activated"
TRIGGER_DEACTIVATED = "trigger_deactivated"
TRIGGER_FIRED = "trigger_fired"
TRIGGER_REMOVED = "trigger_removed"
@dataclass
class AgentEvent:
@@ -165,10 +173,11 @@ class AgentEvent:
timestamp: datetime = field(default_factory=datetime.now)
correlation_id: str | None = None # For tracking related events
graph_id: str | None = None # Which graph emitted this event (multi-graph sessions)
run_id: str | None = None # Unique ID per trigger() invocation — used for run dividers
def to_dict(self) -> dict:
"""Convert to dictionary for serialization."""
return {
d = {
"type": self.type.value,
"stream_id": self.stream_id,
"node_id": self.node_id,
@@ -178,6 +187,9 @@ class AgentEvent:
"correlation_id": self.correlation_id,
"graph_id": self.graph_id,
}
if self.run_id is not None:
d["run_id"] = self.run_id
return d
# Type for event handlers
@@ -246,6 +258,127 @@ class EventBus:
self._semaphore = asyncio.Semaphore(max_concurrent_handlers)
self._subscription_counter = 0
self._lock = asyncio.Lock()
# Per-session persistent event log (always-on, survives restarts)
self._session_log: IO[str] | None = None
self._session_log_iteration_offset: int = 0
# Accumulator for client_output_delta snapshots — flushed on llm_turn_complete.
# Key: (stream_id, node_id, execution_id, iteration) → latest AgentEvent
self._pending_output_snapshots: dict[tuple, AgentEvent] = {}
def set_session_log(self, path: Path, *, iteration_offset: int = 0) -> None:
"""Enable per-session event persistence to a JSONL file.
Called once when the queen starts so that all events survive server
restarts and can be replayed to reconstruct the frontend state.
``iteration_offset`` is added to the ``iteration`` field in logged
events so that cold-resumed sessions produce monotonically increasing
iteration values preventing frontend message ID collisions between
the original run and resumed runs.
"""
if self._session_log is not None:
try:
self._session_log.close()
except Exception:
pass
path.parent.mkdir(parents=True, exist_ok=True)
self._session_log = open(path, "a", encoding="utf-8") # noqa: SIM115
self._session_log_iteration_offset = iteration_offset
logger.info("Session event log → %s (iteration_offset=%d)", path, iteration_offset)
def close_session_log(self) -> None:
"""Close the per-session event log file."""
# Flush any pending output snapshots before closing
self._flush_pending_snapshots()
if self._session_log is not None:
try:
self._session_log.close()
except Exception:
pass
self._session_log = None
# Event types that are high-frequency streaming deltas — accumulated rather
# than written individually to the session log.
_STREAMING_DELTA_TYPES = frozenset(
{
EventType.CLIENT_OUTPUT_DELTA,
EventType.LLM_TEXT_DELTA,
EventType.LLM_REASONING_DELTA,
}
)
def _write_session_log_event(self, event: AgentEvent) -> None:
"""Write an event to the per-session log with streaming coalescing.
Streaming deltas (client_output_delta, llm_text_delta) are accumulated
in memory. When llm_turn_complete fires, any pending snapshots for that
(stream_id, node_id, execution_id) are flushed as single consolidated
events before the turn-complete event itself is written.
Note: iteration offset is already applied in publish() before this is
called, so events here already have correct iteration values.
"""
if self._session_log is None:
return
if event.type in self._STREAMING_DELTA_TYPES:
# Accumulate — keep only the latest event (which carries the full snapshot)
key = (
event.stream_id,
event.node_id,
event.execution_id,
event.data.get("iteration"),
)
self._pending_output_snapshots[key] = event
return
# On turn-complete, flush accumulated snapshots for this stream first
if event.type == EventType.LLM_TURN_COMPLETE:
self._flush_pending_snapshots(
stream_id=event.stream_id,
node_id=event.node_id,
execution_id=event.execution_id,
)
line = json.dumps(event.to_dict(), default=str)
self._session_log.write(line + "\n")
self._session_log.flush()
def _flush_pending_snapshots(
self,
stream_id: str | None = None,
node_id: str | None = None,
execution_id: str | None = None,
) -> None:
"""Flush accumulated streaming snapshots to the session log.
When called with filters, only matching entries are flushed.
When called without filters (e.g. on close), everything is flushed.
"""
if self._session_log is None or not self._pending_output_snapshots:
return
to_flush: list[tuple] = []
for key, _evt in self._pending_output_snapshots.items():
if stream_id is not None:
k_stream, k_node, k_exec, _ = key
if k_stream != stream_id or k_node != node_id or k_exec != execution_id:
continue
to_flush.append(key)
for key in to_flush:
evt = self._pending_output_snapshots.pop(key)
try:
line = json.dumps(evt.to_dict(), default=str)
self._session_log.write(line + "\n")
except Exception:
pass
if to_flush:
try:
self._session_log.flush()
except Exception:
pass
def subscribe(
self,
@@ -311,6 +444,19 @@ class EventBus:
Args:
event: Event to publish
"""
# Apply iteration offset at the source so ALL consumers (SSE subscribers,
# event history, session log) see the same monotonically increasing
# iteration values. Without this, live SSE would use raw iterations
# while events.jsonl would use offset iterations, causing ID collisions
# on the frontend when replaying after cold resume.
if (
self._session_log_iteration_offset
and isinstance(event.data, dict)
and "iteration" in event.data
):
offset = self._session_log_iteration_offset
event.data = {**event.data, "iteration": event.data["iteration"] + offset}
# Add to history
async with self._lock:
self._event_history.append(event)
@@ -331,6 +477,15 @@ class EventBus:
except Exception:
pass # never break event delivery
# Per-session persistent log (always-on when set_session_log was called).
# Streaming deltas are coalesced: client_output_delta and llm_text_delta
# are accumulated and flushed as a single snapshot event on llm_turn_complete.
if self._session_log is not None:
try:
self._write_session_log_event(event)
except Exception:
pass # never break event delivery
# Find matching subscriptions
matching_handlers: list[EventHandler] = []
@@ -391,6 +546,7 @@ class EventBus:
execution_id: str,
input_data: dict[str, Any] | None = None,
correlation_id: str | None = None,
run_id: str | None = None,
) -> None:
"""Emit execution started event."""
await self.publish(
@@ -400,6 +556,7 @@ class EventBus:
execution_id=execution_id,
data={"input": input_data or {}},
correlation_id=correlation_id,
run_id=run_id,
)
)
@@ -409,6 +566,7 @@ class EventBus:
execution_id: str,
output: dict[str, Any] | None = None,
correlation_id: str | None = None,
run_id: str | None = None,
) -> None:
"""Emit execution completed event."""
await self.publish(
@@ -418,6 +576,7 @@ class EventBus:
execution_id=execution_id,
data={"output": output or {}},
correlation_id=correlation_id,
run_id=run_id,
)
)
@@ -427,6 +586,7 @@ class EventBus:
execution_id: str,
error: str,
correlation_id: str | None = None,
run_id: str | None = None,
) -> None:
"""Emit execution failed event."""
await self.publish(
@@ -436,6 +596,7 @@ class EventBus:
execution_id=execution_id,
data={"error": error},
correlation_id=correlation_id,
run_id=run_id,
)
)
@@ -527,15 +688,19 @@ class EventBus:
node_id: str,
iteration: int,
execution_id: str | None = None,
extra_data: dict[str, Any] | None = None,
) -> None:
"""Emit node loop iteration event."""
data: dict[str, Any] = {"iteration": iteration}
if extra_data:
data.update(extra_data)
await self.publish(
AgentEvent(
type=EventType.NODE_LOOP_ITERATION,
stream_id=stream_id,
node_id=node_id,
execution_id=execution_id,
data={"iteration": iteration},
data=data,
)
)
@@ -1009,7 +1174,7 @@ class EventBus:
ticket: dict,
execution_id: str | None = None,
) -> None:
"""Emitted by health judge when worker shows a degradation pattern."""
"""Emitted when worker shows a degradation pattern."""
await self.publish(
AgentEvent(
type=EventType.WORKER_ESCALATION_TICKET,
+81 -3
View File
@@ -127,6 +127,7 @@ class ExecutionContext:
input_data: dict[str, Any]
isolation_level: IsolationLevel
session_state: dict[str, Any] | None = None # For resuming from pause
run_id: str | None = None # Unique ID per trigger() invocation
started_at: datetime = field(default_factory=datetime.now)
completed_at: datetime | None = None
status: str = "pending" # pending, running, completed, failed, paused
@@ -425,11 +426,36 @@ class ExecutionStream:
return True
return False
async def inject_trigger(
self,
node_id: str,
trigger: Any,
) -> bool:
"""Inject a trigger event into a running queen EventLoopNode.
Searches active executors for a node matching ``node_id`` and calls
its ``inject_trigger()`` method to wake the queen.
Args:
node_id: The queen EventLoopNode ID.
trigger: A ``TriggerEvent`` instance (typed as Any to avoid
circular imports with graph layer).
Returns True if the trigger was delivered, False otherwise.
"""
for executor in self._active_executors.values():
node = executor.node_registry.get(node_id)
if node is not None and hasattr(node, "inject_trigger"):
await node.inject_trigger(trigger)
return True
return False
async def execute(
self,
input_data: dict[str, Any],
correlation_id: str | None = None,
session_state: dict[str, Any] | None = None,
run_id: str | None = None,
) -> str:
"""
Queue an execution and return its ID.
@@ -440,6 +466,7 @@ class ExecutionStream:
input_data: Input data for this execution
correlation_id: Optional ID to correlate related executions
session_state: Optional session state to resume from (with paused_at, memory)
run_id: Unique ID for this trigger invocation (for run dividers)
Returns:
Execution ID for tracking
@@ -500,6 +527,7 @@ class ExecutionStream:
input_data=input_data,
isolation_level=self.entry_spec.get_isolation_level(),
session_state=session_state,
run_id=run_id,
)
async with self._lock:
@@ -575,7 +603,9 @@ class ExecutionStream:
execution_id=execution_id,
input_data=ctx.input_data,
correlation_id=ctx.correlation_id,
run_id=ctx.run_id,
)
self._write_run_event(execution_id, ctx.run_id, "run_started")
# Create execution-scoped memory
self._state_manager.create_memory(
@@ -740,6 +770,7 @@ class ExecutionStream:
execution_id=execution_id,
output=result.output,
correlation_id=ctx.correlation_id,
run_id=ctx.run_id,
)
elif result.paused_at:
# The executor returns paused_at on CancelledError but
@@ -757,8 +788,22 @@ class ExecutionStream:
execution_id=execution_id,
error=result.error or "Unknown error",
correlation_id=ctx.correlation_id,
run_id=ctx.run_id,
)
# Write run event for historical restoration
if result.success:
self._write_run_event(execution_id, ctx.run_id, "run_completed")
elif result.paused_at:
self._write_run_event(execution_id, ctx.run_id, "run_paused")
else:
self._write_run_event(
execution_id,
ctx.run_id,
"run_failed",
{"error": result.error or "Unknown error"},
)
logger.debug(f"Execution {execution_id} completed: success={result.success}")
except asyncio.CancelledError:
@@ -818,8 +863,10 @@ class ExecutionStream:
execution_id=execution_id,
error=cancel_reason,
correlation_id=ctx.correlation_id,
run_id=ctx.run_id,
)
self._write_run_event(execution_id, ctx.run_id, "run_cancelled")
# Don't re-raise - we've handled it and saved state
except Exception as e:
@@ -856,7 +903,9 @@ class ExecutionStream:
execution_id=execution_id,
error=str(e),
correlation_id=ctx.correlation_id,
run_id=ctx.run_id,
)
self._write_run_event(execution_id, ctx.run_id, "run_failed", {"error": str(e)})
finally:
# Clean up state
@@ -872,6 +921,36 @@ class ExecutionStream:
self._completion_events.pop(execution_id, None)
self._execution_tasks.pop(execution_id, None)
def _write_run_event(
self,
execution_id: str,
run_id: str | None,
event: str,
extra: dict[str, Any] | None = None,
) -> None:
"""Append a run lifecycle event to runs.jsonl for historical restoration."""
if not self._session_store or not run_id:
return
import json as _json
session_dir = self._session_store.get_session_path(execution_id)
runs_file = session_dir / "runs.jsonl"
now = datetime.now()
record = {
"run_id": run_id,
"event": event,
"timestamp": now.isoformat(),
"created_at": now.timestamp(),
}
if extra:
record.update(extra)
try:
runs_file.parent.mkdir(parents=True, exist_ok=True)
with open(runs_file, "a", encoding="utf-8") as f:
f.write(_json.dumps(record) + "\n")
except OSError:
pass # Non-critical — don't break execution
async def _write_session_state(
self,
execution_id: str,
@@ -978,8 +1057,8 @@ class ExecutionStream:
def _create_modified_graph(self) -> "GraphSpec":
"""Create a graph with the entry point overridden.
Preserves the original graph's entry_points and async_entry_points
so that validation correctly considers ALL entry nodes reachable.
Preserves the original graph's entry_points so that validation
correctly considers ALL entry nodes reachable.
Each stream only executes from its own entry_node, but the full
graph must validate with all entry points accounted for.
"""
@@ -1004,7 +1083,6 @@ class ExecutionStream:
version=self.graph.version,
entry_node=self.entry_spec.entry_node, # Use our entry point
entry_points=merged_entry_points,
async_entry_points=self.graph.async_entry_points,
terminal_nodes=self.graph.terminal_nodes,
pause_nodes=self.graph.pause_nodes,
nodes=self.graph.nodes,
@@ -17,7 +17,7 @@ from pathlib import Path
import pytest
from framework.graph import Goal
from framework.graph.edge import AsyncEntryPointSpec, EdgeCondition, EdgeSpec, GraphSpec
from framework.graph.edge import EdgeCondition, EdgeSpec, GraphSpec
from framework.graph.goal import Constraint, SuccessCriterion
from framework.graph.node import NodeSpec
from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
@@ -101,30 +101,12 @@ def sample_graph():
),
]
async_entry_points = [
AsyncEntryPointSpec(
id="webhook",
name="Webhook Handler",
entry_node="process-webhook",
trigger_type="webhook",
isolation_level="shared",
),
AsyncEntryPointSpec(
id="api",
name="API Handler",
entry_node="process-api",
trigger_type="api",
isolation_level="shared",
),
]
return GraphSpec(
id="test-graph",
goal_id="test-goal",
version="1.0.0",
entry_node="process-webhook",
entry_points={"start": "process-webhook"},
async_entry_points=async_entry_points,
terminal_nodes=["complete"],
pause_nodes=[],
nodes=nodes,
@@ -504,108 +486,6 @@ class TestAgentRuntime:
# === GraphSpec Validation Tests ===
class TestGraphSpecValidation:
"""Tests for GraphSpec with async_entry_points."""
def test_has_async_entry_points(self, sample_graph):
"""Test checking for async entry points."""
assert sample_graph.has_async_entry_points() is True
# Graph without async entry points
simple_graph = GraphSpec(
id="simple",
goal_id="goal",
entry_node="start",
nodes=[],
edges=[],
)
assert simple_graph.has_async_entry_points() is False
def test_get_async_entry_point(self, sample_graph):
"""Test getting async entry point by ID."""
ep = sample_graph.get_async_entry_point("webhook")
assert ep is not None
assert ep.id == "webhook"
assert ep.entry_node == "process-webhook"
ep_not_found = sample_graph.get_async_entry_point("nonexistent")
assert ep_not_found is None
def test_validate_async_entry_points(self):
"""Test validation catches async entry point errors."""
nodes = [
NodeSpec(
id="valid-node",
name="Valid Node",
description="A valid node",
node_type="event_loop",
input_keys=[],
output_keys=[],
),
]
# Invalid entry node
graph = GraphSpec(
id="test",
goal_id="goal",
entry_node="valid-node",
async_entry_points=[
AsyncEntryPointSpec(
id="invalid",
name="Invalid",
entry_node="nonexistent-node",
trigger_type="webhook",
),
],
nodes=nodes,
edges=[],
)
errors = graph.validate()["errors"]
assert any("nonexistent-node" in e for e in errors)
# Invalid isolation level
graph2 = GraphSpec(
id="test",
goal_id="goal",
entry_node="valid-node",
async_entry_points=[
AsyncEntryPointSpec(
id="bad-isolation",
name="Bad Isolation",
entry_node="valid-node",
trigger_type="webhook",
isolation_level="invalid",
),
],
nodes=nodes,
edges=[],
)
errors2 = graph2.validate()["errors"]
assert any("isolation_level" in e for e in errors2)
# Invalid trigger type
graph3 = GraphSpec(
id="test",
goal_id="goal",
entry_node="valid-node",
async_entry_points=[
AsyncEntryPointSpec(
id="bad-trigger",
name="Bad Trigger",
entry_node="valid-node",
trigger_type="invalid_trigger",
),
],
nodes=nodes,
edges=[],
)
errors3 = graph3.validate()["errors"]
assert any("trigger_type" in e for e in errors3)
# === Integration Tests ===
@@ -483,7 +483,6 @@ class TestEventDrivenEntryPoints:
version="1.0.0",
entry_node="process-event",
entry_points={"start": "process-event"},
async_entry_points=[],
terminal_nodes=[],
pause_nodes=[],
nodes=nodes,
+22
View File
@@ -0,0 +1,22 @@
"""Trigger definitions for queen-level heartbeats (timers, webhooks)."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass
class TriggerDefinition:
"""A registered trigger that can be activated on the queen runtime.
Trigger *definitions* come from the worker's ``triggers.json``.
Activation state is per-session (persisted in ``SessionState.active_triggers``).
"""
id: str
trigger_type: str # "timer" | "webhook"
trigger_config: dict[str, Any] = field(default_factory=dict)
description: str = ""
task: str = ""
active: bool = False
+7
View File
@@ -144,6 +144,13 @@ class SessionState(BaseModel):
checkpoint_enabled: bool = False
latest_checkpoint_id: str | None = None
# Trigger activation state (IDs of triggers the queen/user turned on)
active_triggers: list[str] = Field(default_factory=list)
# Per-trigger task strings (user overrides, keyed by trigger ID)
trigger_tasks: dict[str, str] = Field(default_factory=dict)
# True after first successful worker execution (gates trigger delivery on restart)
worker_configured: bool = Field(default=False)
model_config = {"extra": "allow"}
@computed_field
+27
View File
@@ -94,6 +94,29 @@ def sessions_dir(session: Session) -> Path:
return Path.home() / ".hive" / "agents" / agent_name / "sessions"
def cold_sessions_dir(session_id: str) -> Path | None:
"""Resolve the worker sessions directory from disk for a cold/stopped session.
Reads agent_path from the queen session's meta.json to find the agent name,
then returns ~/.hive/agents/{agent_name}/sessions/.
Returns None if meta.json is missing or has no agent_path.
"""
import json
meta_path = Path.home() / ".hive" / "queen" / "session" / session_id / "meta.json"
if not meta_path.exists():
return None
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
agent_path = meta.get("agent_path")
if not agent_path:
return None
agent_name = Path(agent_path).name
return Path.home() / ".hive" / "agents" / agent_name / "sessions"
except (json.JSONDecodeError, OSError):
return None
# Allowed CORS origins (localhost on any port)
_CORS_ORIGINS = {"http://localhost", "http://127.0.0.1"}
@@ -226,6 +249,10 @@ def create_app(model: str | None = None) -> web.Application:
register_graph_routes(app)
register_log_routes(app)
from framework.server.routes_debug import register_routes as register_debug_routes
register_debug_routes(app)
# Static file serving — Option C production mode
# If frontend/dist/ exists, serve built frontend files on /
_setup_static_serving(app)
@@ -275,6 +275,7 @@ async def create_queen(
execution_id=session.id,
dynamic_tools_provider=phase_state.get_current_tools,
dynamic_prompt_provider=phase_state.get_current_prompt,
iteration_metadata_provider=lambda: {"phase": phase_state.phase},
)
session.queen_executor = executor
@@ -292,6 +293,8 @@ async def create_queen(
return
if phase_state.phase == "running":
if event.type == EventType.EXECUTION_COMPLETED:
# Mark worker as configured after first successful run
session.worker_configured = True
output = event.data.get("output", {})
output_summary = ""
if output:
+43
View File
@@ -0,0 +1,43 @@
"""Debug/profiling routes — only meaningful when ``hive open --debug`` is active.
Routes
------
GET /api/debug/profile
Returns the current event loop profiling snapshot as JSON.
When profiling is not active (``--debug`` flag absent) the response is
``{"enabled": false}``.
GET /api/debug/slow-callbacks
Returns the ring buffer of slow callbacks captured from asyncio's
built-in slow-callback logger. Empty list when not active.
"""
import logging
from aiohttp import web
from framework.observability.profiling import get_profile_snapshot
logger = logging.getLogger(__name__)
async def handle_profile(request: web.Request) -> web.Response:
"""GET /api/debug/profile — current event loop profiling stats."""
return web.json_response(get_profile_snapshot())
async def handle_slow_callbacks(request: web.Request) -> web.Response:
"""GET /api/debug/slow-callbacks — recent slow asyncio callbacks."""
snap = get_profile_snapshot()
return web.json_response(
{
"enabled": snap.get("enabled", False),
"slow_callbacks": snap.get("slow_callbacks", []),
"slow_threshold_ms": snap.get("slow_threshold_ms", 50),
}
)
def register_routes(app: web.Application) -> None:
app.router.add_get("/api/debug/profile", handle_profile)
app.router.add_get("/api/debug/slow-callbacks", handle_slow_callbacks)
+8
View File
@@ -15,6 +15,7 @@ logger = logging.getLogger(__name__)
DEFAULT_EVENT_TYPES = [
EventType.CLIENT_OUTPUT_DELTA,
EventType.CLIENT_INPUT_REQUESTED,
EventType.CLIENT_INPUT_RECEIVED,
EventType.LLM_TEXT_DELTA,
EventType.TOOL_CALL_STARTED,
EventType.TOOL_CALL_COMPLETED,
@@ -40,6 +41,11 @@ DEFAULT_EVENT_TYPES = [
EventType.CREDENTIALS_REQUIRED,
EventType.SUBAGENT_REPORT,
EventType.QUEEN_PHASE_CHANGED,
EventType.TRIGGER_AVAILABLE,
EventType.TRIGGER_ACTIVATED,
EventType.TRIGGER_DEACTIVATED,
EventType.TRIGGER_FIRED,
EventType.TRIGGER_REMOVED,
EventType.DRAFT_GRAPH_UPDATED,
]
@@ -90,6 +96,7 @@ async def handle_events(request: web.Request) -> web.StreamResponse:
"execution_failed",
"execution_paused",
"client_input_requested",
"client_input_received",
"node_loop_iteration",
"node_loop_started",
"credentials_required",
@@ -143,6 +150,7 @@ async def handle_events(request: web.Request) -> web.StreamResponse:
EventType.CLIENT_OUTPUT_DELTA.value,
EventType.EXECUTION_STARTED.value,
EventType.CLIENT_INPUT_REQUESTED.value,
EventType.CLIENT_INPUT_RECEIVED.value,
}
event_type_values = {et.value for et in event_types}
replay_types = _REPLAY_TYPES & event_type_values
+12
View File
@@ -125,6 +125,18 @@ async def handle_chat(request: web.Request) -> web.Response:
node = queen_executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
await node.inject_event(message, is_client_input=True)
# Publish to EventBus so the session event log captures user messages
from framework.runtime.event_bus import AgentEvent, EventType
await session.event_bus.publish(
AgentEvent(
type=EventType.CLIENT_INPUT_RECEIVED,
stream_id="queen",
node_id="queen",
execution_id=session.id,
data={"content": message},
)
)
return web.json_response(
{
"status": "queen",
+15
View File
@@ -2,6 +2,7 @@
import json
import logging
import time
from aiohttp import web
@@ -116,6 +117,20 @@ async def handle_list_nodes(request: web.Request) -> web.Response:
}
for ep in reg.entry_points.values()
]
# Append triggers from triggers.json (stored on session)
for t in getattr(session, "available_triggers", {}).values():
entry = {
"id": t.id,
"name": t.description or t.id,
"entry_node": graph.entry_node,
"trigger_type": t.trigger_type,
"trigger_config": t.trigger_config,
"task": t.task,
}
mono = getattr(session, "trigger_next_fire", {}).get(t.id)
if mono is not None:
entry["next_fire_in"] = max(0.0, mono - time.monotonic())
entry_points.append(entry)
return web.json_response(
{
"nodes": nodes,
+217 -49
View File
@@ -9,8 +9,10 @@ Session-primary routes:
- DELETE /api/sessions/{session_id}/worker unload worker from session
- GET /api/sessions/{session_id}/stats runtime statistics
- GET /api/sessions/{session_id}/entry-points list entry points
- PATCH /api/sessions/{session_id}/triggers/{id} update trigger task
- GET /api/sessions/{session_id}/graphs list graph IDs
- GET /api/sessions/{session_id}/queen-messages queen conversation history
- GET /api/sessions/{session_id}/events/history persisted eventbus log (for replay)
Worker session browsing (persisted execution runs on disk):
- GET /api/sessions/{session_id}/worker-sessions list
@@ -31,6 +33,7 @@ from pathlib import Path
from aiohttp import web
from framework.server.app import (
cold_sessions_dir,
resolve_session,
safe_path_segment,
sessions_dir,
@@ -140,6 +143,7 @@ async def handle_create_session(request: web.Request) -> web.Response:
session = await manager.create_session_with_worker(
agent_path,
agent_id=agent_id,
session_id=session_id,
model=model,
initial_prompt=initial_prompt,
queen_resume_from=queen_resume_from,
@@ -228,6 +232,22 @@ async def handle_get_live_session(request: web.Request) -> web.Response:
}
for ep in rt.get_entry_points()
]
# Append triggers from triggers.json (stored on session)
runner = getattr(session, "runner", None)
graph_entry = runner.graph.entry_node if runner else ""
for t in getattr(session, "available_triggers", {}).values():
entry = {
"id": t.id,
"name": t.description or t.id,
"entry_node": graph_entry,
"trigger_type": t.trigger_type,
"trigger_config": t.trigger_config,
"task": t.task,
}
mono = getattr(session, "trigger_next_fire", {}).get(t.id)
if mono is not None:
entry["next_fire_in"] = max(0.0, mono - time.monotonic())
data["entry_points"].append(entry)
data["graphs"] = session.worker_runtime.list_graphs()
return web.json_response(data)
@@ -351,23 +371,84 @@ async def handle_session_entry_points(request: web.Request) -> web.Response:
rt = session.worker_runtime
eps = rt.get_entry_points() if rt else []
entry_points = [
{
"id": ep.id,
"name": ep.name,
"entry_node": ep.entry_node,
"trigger_type": ep.trigger_type,
"trigger_config": ep.trigger_config,
**(
{"next_fire_in": nf}
if rt and (nf := rt.get_timer_next_fire_in(ep.id)) is not None
else {}
),
}
for ep in eps
]
# Append triggers from triggers.json (stored on session)
runner = getattr(session, "runner", None)
graph_entry = runner.graph.entry_node if runner else ""
for t in getattr(session, "available_triggers", {}).values():
entry = {
"id": t.id,
"name": t.description or t.id,
"entry_node": graph_entry,
"trigger_type": t.trigger_type,
"trigger_config": t.trigger_config,
"task": t.task,
}
mono = getattr(session, "trigger_next_fire", {}).get(t.id)
if mono is not None:
entry["next_fire_in"] = max(0.0, mono - time.monotonic())
entry_points.append(entry)
return web.json_response({"entry_points": entry_points})
async def handle_update_trigger_task(request: web.Request) -> web.Response:
"""PATCH /api/sessions/{session_id}/triggers/{trigger_id} — update trigger task."""
session, err = resolve_session(request)
if err:
return err
trigger_id = request.match_info["trigger_id"]
available = getattr(session, "available_triggers", {})
tdef = available.get(trigger_id)
if tdef is None:
return web.json_response(
{"error": f"Trigger '{trigger_id}' not found"},
status=404,
)
try:
body = await request.json()
except Exception:
return web.json_response({"error": "Invalid JSON body"}, status=400)
task = body.get("task")
if task is None:
return web.json_response({"error": "Missing 'task' field"}, status=400)
if not isinstance(task, str):
return web.json_response({"error": "'task' must be a string"}, status=400)
tdef.task = task
# Persist to session state and agent definition
from framework.tools.queen_lifecycle_tools import (
_persist_active_triggers,
_save_trigger_to_agent,
)
if trigger_id in getattr(session, "active_trigger_ids", set()):
session_id = request.match_info["session_id"]
await _persist_active_triggers(session, session_id)
_save_trigger_to_agent(session, trigger_id, tdef)
return web.json_response(
{
"entry_points": [
{
"id": ep.id,
"name": ep.name,
"entry_node": ep.entry_node,
"trigger_type": ep.trigger_type,
"trigger_config": ep.trigger_config,
**(
{"next_fire_in": nf}
if rt and (nf := rt.get_timer_next_fire_in(ep.id)) is not None
else {}
),
}
for ep in eps
]
"trigger_id": trigger_id,
"task": tdef.task,
}
)
@@ -397,12 +478,15 @@ async def handle_list_worker_sessions(request: web.Request) -> web.Response:
"""List worker sessions on disk."""
session, err = resolve_session(request)
if err:
return err
if not session.worker_path:
return web.json_response({"sessions": []})
sess_dir = sessions_dir(session)
# Fall back to cold session lookup from disk
sid = request.match_info["session_id"]
sess_dir = cold_sessions_dir(sid)
if sess_dir is None:
return err
else:
if not session.worker_path:
return web.json_response({"sessions": []})
sess_dir = sessions_dir(session)
if not sess_dir.exists():
return web.json_response({"sessions": []})
@@ -564,48 +648,85 @@ async def handle_messages(request: web.Request) -> web.Response:
"""Get messages for a worker session."""
session, err = resolve_session(request)
if err:
return err
if not session.worker_path:
return web.json_response({"error": "No worker loaded"}, status=503)
# Fall back to cold session lookup from disk
sid = request.match_info["session_id"]
sess_dir = cold_sessions_dir(sid)
if sess_dir is None:
return err
else:
if not session.worker_path:
return web.json_response({"error": "No worker loaded"}, status=503)
sess_dir = sessions_dir(session)
ws_id = request.match_info.get("ws_id") or request.match_info.get("session_id", "")
ws_id = safe_path_segment(ws_id)
convs_dir = sessions_dir(session) / ws_id / "conversations"
convs_dir = sess_dir / ws_id / "conversations"
if not convs_dir.exists():
return web.json_response({"messages": []})
filter_node = request.query.get("node_id")
all_messages = []
for node_dir in convs_dir.iterdir():
if not node_dir.is_dir():
continue
if filter_node and node_dir.name != filter_node:
continue
parts_dir = node_dir / "parts"
def _collect_msg_parts(parts_dir: Path, node_id: str) -> None:
if not parts_dir.exists():
continue
return
for part_file in sorted(parts_dir.iterdir()):
if part_file.suffix != ".json":
continue
try:
part = json.loads(part_file.read_text(encoding="utf-8"))
part["_node_id"] = node_dir.name
part["_node_id"] = node_id
part.setdefault("created_at", part_file.stat().st_mtime)
all_messages.append(part)
except (json.JSONDecodeError, OSError):
continue
# Flat layout: conversations/parts/*.json
if not filter_node:
_collect_msg_parts(convs_dir / "parts", "worker")
# Node-based layout: conversations/<node_id>/parts/*.json
for node_dir in convs_dir.iterdir():
if not node_dir.is_dir() or node_dir.name == "parts":
continue
if filter_node and node_dir.name != filter_node:
continue
_collect_msg_parts(node_dir / "parts", node_dir.name)
# Merge run lifecycle markers from runs.jsonl (for historical dividers)
runs_file = sess_dir / ws_id / "runs.jsonl"
if runs_file.exists():
try:
for line in runs_file.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
all_messages.append(
{
"seq": -1,
"role": "system",
"content": "",
"_node_id": "_run_marker",
"is_run_marker": True,
"run_id": record.get("run_id"),
"run_event": record.get("event"),
"created_at": record.get("created_at", 0),
}
)
except json.JSONDecodeError:
continue
except OSError:
pass
all_messages.sort(key=lambda m: m.get("created_at", m.get("seq", 0)))
client_only = request.query.get("client_only", "").lower() in ("true", "1")
if client_only:
client_facing_nodes: set[str] = set()
if session.runner and hasattr(session.runner, "graph"):
if session and session.runner and hasattr(session.runner, "graph"):
for node in session.runner.graph.nodes:
if node.client_facing:
client_facing_nodes.add(node.id)
@@ -614,12 +735,15 @@ async def handle_messages(request: web.Request) -> web.Response:
all_messages = [
m
for m in all_messages
if not m.get("is_transition_marker")
and m["role"] != "tool"
and not (m["role"] == "assistant" and m.get("tool_calls"))
and (
(m["role"] == "user" and m.get("is_client_input"))
or (m["role"] == "assistant" and m.get("_node_id") in client_facing_nodes)
if m.get("is_run_marker")
or (
not m.get("is_transition_marker")
and m["role"] != "tool"
and not (m["role"] == "assistant" and m.get("tool_calls"))
and (
(m["role"] == "user" and m.get("is_client_input"))
or (m["role"] == "assistant" and m.get("_node_id") in client_facing_nodes)
)
)
]
@@ -640,18 +764,16 @@ async def handle_queen_messages(request: web.Request) -> web.Response:
return web.json_response({"messages": [], "session_id": session_id})
all_messages: list[dict] = []
for node_dir in convs_dir.iterdir():
if not node_dir.is_dir():
continue
parts_dir = node_dir / "parts"
def _read_parts(parts_dir: Path, node_id: str) -> None:
if not parts_dir.exists():
continue
return
for part_file in sorted(parts_dir.iterdir()):
if part_file.suffix != ".json":
continue
try:
part = json.loads(part_file.read_text(encoding="utf-8"))
part["_node_id"] = node_dir.name
part["_node_id"] = node_id
# Use file mtime as created_at so frontend can order
# queen and worker messages chronologically.
part.setdefault("created_at", part_file.stat().st_mtime)
@@ -659,6 +781,15 @@ async def handle_queen_messages(request: web.Request) -> web.Response:
except (json.JSONDecodeError, OSError):
continue
# Flat layout: conversations/parts/*.json
_read_parts(convs_dir / "parts", "queen")
# Node-based layout: conversations/<node_id>/parts/*.json
for node_dir in convs_dir.iterdir():
if not node_dir.is_dir() or node_dir.name == "parts":
continue
_read_parts(node_dir / "parts", node_dir.name)
all_messages.sort(key=lambda m: m.get("created_at", m.get("seq", 0)))
# Filter to client-facing messages only
@@ -673,6 +804,38 @@ async def handle_queen_messages(request: web.Request) -> web.Response:
return web.json_response({"messages": all_messages, "session_id": session_id})
async def handle_session_events_history(request: web.Request) -> web.Response:
"""GET /api/sessions/{session_id}/events/history — persisted eventbus log.
Reads ``events.jsonl`` from the session directory on disk so it works for
both live sessions and cold (post-server-restart) sessions. The frontend
replays these events through ``sseEventToChatMessage`` to fully reconstruct
the UI state on resume.
"""
session_id = request.match_info["session_id"]
queen_dir = Path.home() / ".hive" / "queen" / "session" / session_id
events_path = queen_dir / "events.jsonl"
if not events_path.exists():
return web.json_response({"events": [], "session_id": session_id})
events: list[dict] = []
try:
with open(events_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
events.append(json.loads(line))
except json.JSONDecodeError:
continue
except OSError:
return web.json_response({"events": [], "session_id": session_id})
return web.json_response({"events": events, "session_id": session_id})
async def handle_session_history(request: web.Request) -> web.Response:
"""GET /api/sessions/history — all queen sessions on disk (live + cold).
@@ -746,6 +909,7 @@ async def handle_discover(request: web.Request) -> web.Response:
"description": entry.description,
"category": entry.category,
"session_count": entry.session_count,
"run_count": entry.run_count,
"node_count": entry.node_count,
"tool_count": entry.tool_count,
"tags": entry.tags,
@@ -783,8 +947,12 @@ def register_routes(app: web.Application) -> None:
# Session info
app.router.add_get("/api/sessions/{session_id}/stats", handle_session_stats)
app.router.add_get("/api/sessions/{session_id}/entry-points", handle_session_entry_points)
app.router.add_patch(
"/api/sessions/{session_id}/triggers/{trigger_id}", handle_update_trigger_task
)
app.router.add_get("/api/sessions/{session_id}/graphs", handle_session_graphs)
app.router.add_get("/api/sessions/{session_id}/queen-messages", handle_queen_messages)
app.router.add_get("/api/sessions/{session_id}/events/history", handle_session_events_history)
# Worker session browsing (session-primary)
app.router.add_get("/api/sessions/{session_id}/worker-sessions", handle_list_worker_sessions)
+233 -145
View File
@@ -7,7 +7,6 @@ Architecture:
- Session owns EventBus + LLM, shared with queen and worker
- Queen is always present once a session starts
- Worker is optional loaded into an existing session
- Judge is active only when a worker is loaded
"""
import asyncio
@@ -15,11 +14,13 @@ import json
import logging
import time
import uuid
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
from framework.runtime.triggers import TriggerDefinition
logger = logging.getLogger(__name__)
@@ -42,12 +43,23 @@ class Session:
worker_info: Any | None = None # AgentInfo
# Queen phase state (building/staging/running)
phase_state: Any = None # QueenPhaseState
# Judge (active when worker is loaded)
judge_task: asyncio.Task | None = None
escalation_sub: str | None = None
# Worker handoff subscription
worker_handoff_sub: str | None = None
# Memory consolidation subscription (fires on CONTEXT_COMPACTED)
memory_consolidation_sub: str | None = None
# Trigger definitions loaded from agent's triggers.json (available but inactive)
available_triggers: dict[str, TriggerDefinition] = field(default_factory=dict)
# Active trigger tracking (IDs currently firing + their asyncio tasks)
active_trigger_ids: set[str] = field(default_factory=set)
active_timer_tasks: dict[str, asyncio.Task] = field(default_factory=dict)
# Queen-owned webhook server (lazy singleton, created on first webhook trigger activation)
queen_webhook_server: Any = None
# EventBus subscription IDs for active webhook triggers (trigger_id -> sub_id)
active_webhook_subs: dict[str, str] = field(default_factory=dict)
# True after first successful worker execution (gates trigger delivery)
worker_configured: bool = False
# Monotonic timestamps for next trigger fire (mirrors AgentRuntime._timer_next_fire)
trigger_next_fire: dict[str, float] = field(default_factory=dict)
# Session directory resumption:
# When set, _start_queen writes queen conversations to this existing session's
# directory instead of creating a new one. This lets cold-restores accumulate
@@ -130,7 +142,9 @@ class SessionManager:
to that existing session's directory instead of creating a new one.
This preserves full conversation history across server restarts.
"""
session = await self._create_session_core(session_id=session_id, model=model)
# Reuse the original session ID when cold-restoring
resolved_session_id = queen_resume_from or session_id
session = await self._create_session_core(session_id=resolved_session_id, model=model)
session.queen_resume_from = queen_resume_from
# Start queen immediately (queen-only, no worker tools yet)
@@ -147,22 +161,28 @@ class SessionManager:
self,
agent_path: str | Path,
agent_id: str | None = None,
session_id: str | None = None,
model: str | None = None,
initial_prompt: str | None = None,
queen_resume_from: str | None = None,
) -> Session:
"""Create a session and load a worker in one step.
When ``queen_resume_from`` is set the queen writes conversation messages
to that existing session's directory instead of creating a new one.
When ``queen_resume_from`` is set the session reuses the original session
ID so the frontend sees a single continuous session. The queen writes
conversation messages to that existing directory, preserving full history.
"""
from framework.tools.queen_lifecycle_tools import build_worker_profile
agent_path = Path(agent_path)
resolved_worker_id = agent_id or agent_path.name
# Auto-generate session ID (not the agent name)
session = await self._create_session_core(model=model)
# Reuse the original session ID when cold-restoring so the frontend
# sees one continuous session instead of a new one each time.
session = await self._create_session_core(
session_id=queen_resume_from,
model=model,
)
session.queen_resume_from = queen_resume_from
try:
# Load worker FIRST (before queen) so queen gets full tools
@@ -202,8 +222,8 @@ class SessionManager:
) -> None:
"""Load a worker agent into a session (core logic).
Sets up the runner, runtime, and session fields. Does NOT start the
judge or notify the queen callers handle those steps.
Sets up the runner, runtime, and session fields. Does NOT notify
the queen callers handle that step.
"""
from framework.runner import AgentRunner
@@ -242,6 +262,25 @@ class SessionManager:
runtime = runner._agent_runtime
# Load triggers from the agent's triggers.json definition file.
from framework.tools.queen_lifecycle_tools import _read_agent_triggers_json
for tdata in _read_agent_triggers_json(agent_path):
tid = tdata.get("id", "")
ttype = tdata.get("trigger_type", "")
if tid and ttype in ("timer", "webhook"):
session.available_triggers[tid] = TriggerDefinition(
id=tid,
trigger_type=ttype,
trigger_config=tdata.get("trigger_config", {}),
description=tdata.get("name", tid),
task=tdata.get("task", ""),
)
logger.info("Loaded trigger '%s' (%s) from triggers.json", tid, ttype)
if session.available_triggers:
await self._emit_trigger_events(session, "available", session.available_triggers)
# Start runtime on event loop
if runtime and not runtime.is_running:
await runtime.start()
@@ -369,7 +408,7 @@ class SessionManager:
) -> Session:
"""Load a worker agent into an existing session (with running queen).
Starts the worker runtime, health judge, and notifies the queen.
Starts the worker runtime and notifies the queen.
"""
agent_path = Path(agent_path)
@@ -385,11 +424,48 @@ class SessionManager:
)
# Notify queen about the loaded worker (skip for queen itself).
# Health judge disabled for simplicity.
if agent_path.name != "queen" and session.worker_runtime:
# await self._start_judge(session, session.runner._storage_path)
await self._notify_queen_worker_loaded(session)
# Restore previously active triggers from persisted session state
if session.available_triggers and session.worker_runtime:
try:
store = session.worker_runtime._session_store
state = await store.read_state(session_id)
if state and state.active_triggers:
from framework.tools.queen_lifecycle_tools import (
_start_trigger_timer,
_start_trigger_webhook,
)
saved_tasks = getattr(state, "trigger_tasks", {}) or {}
for tid in state.active_triggers:
tdef = session.available_triggers.get(tid)
if tdef:
# Restore user-configured task override
saved_task = saved_tasks.get(tid, "")
if saved_task:
tdef.task = saved_task
tdef.active = True
session.active_trigger_ids.add(tid)
if tdef.trigger_type == "timer":
await _start_trigger_timer(session, tid, tdef)
logger.info("Restored trigger timer '%s'", tid)
elif tdef.trigger_type == "webhook":
await _start_trigger_webhook(session, tid, tdef)
logger.info("Restored webhook trigger '%s'", tid)
else:
logger.warning(
"Saved trigger '%s' not found in worker entry points, skipping",
tid,
)
# Restore worker_configured flag
if state and getattr(state, "worker_configured", False):
session.worker_configured = True
except Exception as e:
logger.warning("Failed to restore active triggers: %s", e)
# Emit SSE event so the frontend can update UI
await self._emit_worker_loaded(session)
@@ -403,9 +479,6 @@ class SessionManager:
if session.worker_runtime is None:
return False
# Stop judge + escalation
self._stop_judge(session)
# Cleanup worker
if session.runner:
try:
@@ -413,6 +486,26 @@ class SessionManager:
except Exception as e:
logger.error("Error cleaning up worker '%s': %s", session.worker_id, e)
# Cancel active trigger timers
for tid, task in session.active_timer_tasks.items():
task.cancel()
logger.info("Cancelled trigger timer '%s' on unload", tid)
session.active_timer_tasks.clear()
# Unsubscribe webhook handlers (server stays alive — queen-owned)
for sub_id in session.active_webhook_subs.values():
try:
session.event_bus.unsubscribe(sub_id)
except Exception:
pass
session.active_webhook_subs.clear()
session.active_trigger_ids.clear()
# Clean up triggers
if session.available_triggers:
await self._emit_trigger_events(session, "removed", session.available_triggers)
session.available_triggers.clear()
worker_id = session.worker_id
session.worker_id = None
session.worker_path = None
@@ -443,8 +536,6 @@ class SessionManager:
_storage_id = getattr(session, "queen_resume_from", None) or session_id
_session_dir = Path.home() / ".hive" / "queen" / "session" / _storage_id
# Stop judge
self._stop_judge(session)
if session.worker_handoff_sub is not None:
try:
session.event_bus.unsubscribe(session.worker_handoff_sub)
@@ -464,6 +555,25 @@ class SessionManager:
session.queen_task = None
session.queen_executor = None
# Cancel active trigger timers
for task in session.active_timer_tasks.values():
task.cancel()
session.active_timer_tasks.clear()
# Unsubscribe webhook handlers and stop queen webhook server
for sub_id in session.active_webhook_subs.values():
try:
session.event_bus.unsubscribe(sub_id)
except Exception:
pass
session.active_webhook_subs.clear()
if session.queen_webhook_server is not None:
try:
await session.queen_webhook_server.stop()
except Exception:
logger.error("Error stopping queen webhook server", exc_info=True)
session.queen_webhook_server = None
# Cleanup worker
if session.runner:
try:
@@ -482,6 +592,9 @@ class SessionManager:
name=f"queen-memory-consolidation-{session_id}",
)
# Close per-session event log
session.event_bus.close_session_log()
logger.info("Session '%s' stopped", session_id)
return True
@@ -491,7 +604,7 @@ class SessionManager:
async def _handle_worker_handoff(self, session: Session, executor: Any, event: Any) -> None:
"""Route worker escalation events into the queen conversation."""
if event.stream_id in ("queen", "judge"):
if event.stream_id == "queen":
return
reason = str(event.data.get("reason", "")).strip()
@@ -580,6 +693,39 @@ class SessionManager:
except OSError:
pass
# Enable per-session event persistence so that all eventbus events
# survive server restarts and can be replayed on cold-session resume.
# Scan the existing event log to find the max iteration ever written,
# then use max+1 as offset so resumed sessions produce monotonically
# increasing iteration values — preventing frontend message ID collisions.
iteration_offset = 0
events_path = queen_dir / "events.jsonl"
try:
if events_path.exists():
max_iter = -1
with open(events_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
evt = json.loads(line)
it = evt.get("data", {}).get("iteration")
if isinstance(it, int) and it > max_iter:
max_iter = it
except (json.JSONDecodeError, TypeError):
continue
if max_iter >= 0:
iteration_offset = max_iter + 1
logger.info(
"Session '%s' resuming with iteration_offset=%d (from events.jsonl max)",
session.id,
iteration_offset,
)
except OSError:
pass
session.event_bus.set_session_log(events_path, iteration_offset=iteration_offset)
session.queen_task = await create_queen(
session=session,
session_manager=self,
@@ -623,116 +769,6 @@ class SessionManager:
handler=_on_compaction,
)
# ------------------------------------------------------------------
# Judge startup / teardown
# ------------------------------------------------------------------
async def _start_judge(
self,
session: Session,
worker_storage_path: str | Path,
) -> None:
"""Start the health judge for a session's worker."""
from framework.graph.executor import GraphExecutor
from framework.monitoring import judge_goal, judge_graph
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.core import Runtime
from framework.runtime.event_bus import EventType as _ET
from framework.tools.worker_monitoring_tools import register_worker_monitoring_tools
worker_storage_path = Path(worker_storage_path)
try:
# Monitoring tools
monitoring_registry = ToolRegistry()
register_worker_monitoring_tools(
monitoring_registry,
session.event_bus,
worker_storage_path,
worker_graph_id=session.worker_runtime._graph_id,
)
hive_home = Path.home() / ".hive"
judge_dir = hive_home / "judge" / "session" / session.id
judge_dir.mkdir(parents=True, exist_ok=True)
judge_runtime = Runtime(hive_home / "judge")
monitoring_tools = list(monitoring_registry.get_tools().values())
monitoring_executor = monitoring_registry.get_executor()
async def _judge_loop():
interval = 300 # 5 minutes between checks
# Wait before the first check — let the worker actually do something
await asyncio.sleep(interval)
while True:
try:
executor = GraphExecutor(
runtime=judge_runtime,
llm=session.llm,
tools=monitoring_tools,
tool_executor=monitoring_executor,
event_bus=session.event_bus,
stream_id="judge",
storage_path=judge_dir,
loop_config=judge_graph.loop_config,
)
await executor.execute(
graph=judge_graph,
goal=judge_goal,
input_data={
"event": {"source": "timer", "reason": "scheduled"},
},
session_state={"resume_session_id": session.id},
)
except Exception:
logger.error("Health judge tick failed", exc_info=True)
await asyncio.sleep(interval)
session.judge_task = asyncio.create_task(_judge_loop())
# Escalation: judge → queen
async def _on_escalation(event):
ticket = event.data.get("ticket", {})
executor = session.queen_executor
if executor is None:
logger.warning("Escalation received but queen executor is None")
return
node = executor.node_registry.get("queen")
if node is not None and hasattr(node, "inject_event"):
msg = "[ESCALATION TICKET from Health Judge]\n" + json.dumps(
ticket, indent=2, ensure_ascii=False
)
await node.inject_event(msg)
else:
logger.warning("Escalation received but queen node not ready")
session.escalation_sub = session.event_bus.subscribe(
event_types=[_ET.WORKER_ESCALATION_TICKET],
handler=_on_escalation,
)
logger.info("Judge started for session '%s'", session.id)
except Exception as e:
logger.error(
"Failed to start judge for session '%s': %s",
session.id,
e,
exc_info=True,
)
def _stop_judge(self, session: Session) -> None:
"""Cancel judge task and unsubscribe escalation events."""
if session.judge_task is not None:
session.judge_task.cancel()
session.judge_task = None
if session.escalation_sub is not None:
try:
session.event_bus.unsubscribe(session.escalation_sub)
except Exception:
pass
session.escalation_sub = None
# ------------------------------------------------------------------
# Queen notifications
# ------------------------------------------------------------------
@@ -749,7 +785,22 @@ class SessionManager:
return
profile = build_worker_profile(session.worker_runtime, agent_path=session.worker_path)
await node.inject_event(f"[SYSTEM] Worker loaded.{profile}")
# Append available trigger info so the queen knows what's schedulable
trigger_lines = ""
if session.available_triggers:
parts = []
for t in session.available_triggers.values():
cfg = t.trigger_config
detail = cfg.get("cron") or f"every {cfg.get('interval_minutes', '?')} min"
task_info = f' -> task: "{t.task}"' if t.task else " (no task configured)"
parts.append(f" - {t.id} ({t.trigger_type}: {detail}){task_info}")
trigger_lines = (
"\n\nAvailable triggers (inactive — use set_trigger to activate):\n"
+ "\n".join(parts)
)
await node.inject_event(f"[SYSTEM] Worker loaded.{profile}{trigger_lines}")
async def _emit_worker_loaded(self, session: Session) -> None:
"""Publish a WORKER_LOADED event so the frontend can update."""
@@ -785,6 +836,31 @@ class SessionManager:
"according to your current phase."
)
async def _emit_trigger_events(
self,
session: Session,
kind: str,
triggers: dict[str, TriggerDefinition],
) -> None:
"""Emit TRIGGER_AVAILABLE or TRIGGER_REMOVED events for each trigger."""
from framework.runtime.event_bus import AgentEvent, EventType
event_type = (
EventType.TRIGGER_AVAILABLE if kind == "available" else EventType.TRIGGER_REMOVED
)
for t in triggers.values():
await session.event_bus.publish(
AgentEvent(
type=event_type,
stream_id="queen",
data={
"trigger_id": t.id,
"trigger_type": t.trigger_type,
"trigger_config": t.trigger_config,
},
)
)
async def revive_queen(self, session: Session, initial_prompt: str | None = None) -> None:
"""Revive a dead queen executor on an existing session.
@@ -856,13 +932,19 @@ class SessionManager:
# Check whether any message part files are actually present
has_messages = False
try:
for node_dir in convs_dir.iterdir():
if not node_dir.is_dir():
continue
parts_dir = node_dir / "parts"
if parts_dir.exists() and any(f.suffix == ".json" for f in parts_dir.iterdir()):
has_messages = True
break
# Flat layout: conversations/parts/*.json
flat_parts = convs_dir / "parts"
if flat_parts.exists() and any(f.suffix == ".json" for f in flat_parts.iterdir()):
has_messages = True
else:
# Node-based layout: conversations/<node_id>/parts/*.json
for node_dir in convs_dir.iterdir():
if not node_dir.is_dir() or node_dir.name == "parts":
continue
parts_dir = node_dir / "parts"
if parts_dir.exists() and any(f.suffix == ".json" for f in parts_dir.iterdir()):
has_messages = True
break
except OSError:
pass
@@ -939,21 +1021,27 @@ class SessionManager:
if convs_dir.exists():
try:
all_parts: list[dict] = []
for node_dir in convs_dir.iterdir():
if not node_dir.is_dir():
continue
parts_dir = node_dir / "parts"
def _collect_parts(parts_dir: Path, _dest: list[dict] = all_parts) -> None:
if not parts_dir.exists():
continue
return
for part_file in sorted(parts_dir.iterdir()):
if part_file.suffix != ".json":
continue
try:
part = json.loads(part_file.read_text(encoding="utf-8"))
part.setdefault("created_at", part_file.stat().st_mtime)
all_parts.append(part)
_dest.append(part)
except (json.JSONDecodeError, OSError):
continue
# Flat layout: conversations/parts/*.json
_collect_parts(convs_dir / "parts")
# Node-based layout: conversations/<node_id>/parts/*.json
for node_dir in convs_dir.iterdir():
if not node_dir.is_dir() or node_dir.name == "parts":
continue
_collect_parts(node_dir / "parts")
# Filter to client-facing messages only
client_msgs = [
p
+32
View File
@@ -16,6 +16,9 @@ from aiohttp.test_utils import TestClient, TestServer
from framework.server.app import create_app
from framework.server.session_manager import Session
REPO_ROOT = Path(__file__).resolve().parents[4]
EXAMPLE_AGENT_PATH = REPO_ROOT / "examples" / "templates" / "deep_research_agent"
# ---------------------------------------------------------------------------
# Mock helpers
# ---------------------------------------------------------------------------
@@ -347,6 +350,35 @@ class TestHealth:
class TestSessionCRUD:
@pytest.mark.asyncio
async def test_create_session_with_worker_forwards_session_id(self):
app = create_app()
manager = app["manager"]
manager.create_session_with_worker = AsyncMock(
return_value=_make_session(agent_id="my-custom-session")
)
async with TestClient(TestServer(app)) as client:
resp = await client.post(
"/api/sessions",
json={
"session_id": "my-custom-session",
"agent_path": str(EXAMPLE_AGENT_PATH),
},
)
data = await resp.json()
assert resp.status == 201
assert data["session_id"] == "my-custom-session"
manager.create_session_with_worker.assert_awaited_once_with(
str(EXAMPLE_AGENT_PATH.resolve()),
agent_id=None,
session_id="my-custom-session",
model=None,
initial_prompt=None,
queen_resume_from=None,
)
@pytest.mark.asyncio
async def test_list_sessions_empty(self):
app = create_app()
+519 -7
View File
@@ -37,7 +37,7 @@ import asyncio
import json
import logging
from dataclasses import dataclass, field
from datetime import UTC
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any
@@ -285,10 +285,6 @@ def build_worker_profile(runtime: AgentRuntime, agent_path: Path | str | None =
return "\n".join(lines)
# Classical flowchart symbols per ISO 5807 / ANSI standards.
# Each type maps to a standard shape and a unique color for the
# frontend renderer. Shapes use Mermaid-compatible names where
# possible so the frontend can render them directly.
_FLOWCHART_TYPES = {
# ── Core symbols (ISO 5807 §4) ──────────────────────────
# Terminator — rounded rectangle (stadium shape)
@@ -351,6 +347,210 @@ _FLOWCHART_TYPES = {
}
def _read_agent_triggers_json(agent_path: Path) -> list[dict]:
"""Read triggers.json from the agent's export directory."""
triggers_path = agent_path / "triggers.json"
if not triggers_path.exists():
return []
try:
data = json.loads(triggers_path.read_text(encoding="utf-8"))
return data if isinstance(data, list) else []
except (json.JSONDecodeError, OSError):
return []
def _write_agent_triggers_json(agent_path: Path, triggers: list[dict]) -> None:
"""Write triggers.json to the agent's export directory."""
triggers_path = agent_path / "triggers.json"
triggers_path.write_text(
json.dumps(triggers, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
def _save_trigger_to_agent(session: Any, trigger_id: str, tdef: Any) -> None:
"""Persist a trigger definition to the agent's triggers.json."""
agent_path = getattr(session, "worker_path", None)
if agent_path is None:
return
triggers = _read_agent_triggers_json(agent_path)
triggers = [t for t in triggers if t.get("id") != trigger_id]
triggers.append(
{
"id": tdef.id,
"name": tdef.description or tdef.id,
"trigger_type": tdef.trigger_type,
"trigger_config": tdef.trigger_config,
"task": tdef.task or "",
}
)
_write_agent_triggers_json(agent_path, triggers)
logger.info("Saved trigger '%s' to %s/triggers.json", trigger_id, agent_path)
def _remove_trigger_from_agent(session: Any, trigger_id: str) -> None:
"""Remove a trigger definition from the agent's triggers.json."""
agent_path = getattr(session, "worker_path", None)
if agent_path is None:
return
triggers = _read_agent_triggers_json(agent_path)
updated = [t for t in triggers if t.get("id") != trigger_id]
if len(updated) != len(triggers):
_write_agent_triggers_json(agent_path, updated)
logger.info("Removed trigger '%s' from %s/triggers.json", trigger_id, agent_path)
async def _persist_active_triggers(session: Any, session_id: str) -> None:
"""Persist the set of active trigger IDs (and their tasks) to SessionState."""
runtime = getattr(session, "worker_runtime", None)
if runtime is None:
return
store = getattr(runtime, "_session_store", None)
if store is None:
return
try:
state = await store.read_state(session_id)
if state is None:
return
active_ids = list(getattr(session, "active_trigger_ids", set()))
state.active_triggers = active_ids
# Persist per-trigger task overrides
available = getattr(session, "available_triggers", {})
state.trigger_tasks = {
tid: available[tid].task
for tid in active_ids
if tid in available and available[tid].task
}
await store.write_state(session_id, state)
except Exception:
logger.warning(
"Failed to persist active triggers for session %s", session_id, exc_info=True
)
async def _start_trigger_timer(session: Any, trigger_id: str, tdef: Any) -> None:
"""Start an asyncio background task that fires the trigger on a timer."""
from framework.graph.event_loop_node import TriggerEvent
cron_expr = tdef.trigger_config.get("cron")
interval_minutes = tdef.trigger_config.get("interval_minutes")
async def _timer_loop() -> None:
if cron_expr:
from croniter import croniter
cron = croniter(cron_expr, datetime.now(tz=UTC))
while True:
try:
if cron_expr:
next_fire = cron.get_next(datetime)
delay = (next_fire - datetime.now(tz=UTC)).total_seconds()
if delay > 0:
await asyncio.sleep(delay)
else:
await asyncio.sleep(float(interval_minutes) * 60)
# Record next fire time for introspection
fire_times = getattr(session, "trigger_next_fire", None)
if fire_times is not None:
fire_times[trigger_id] = datetime.now(tz=UTC).isoformat()
# Gate on worker being loaded
if getattr(session, "worker_runtime", None) is None:
continue
# Fire into queen node
executor = getattr(session, "queen_executor", None)
if executor is None:
continue
queen_node = getattr(executor, "node_registry", {}).get("queen")
if queen_node is None:
continue
event = TriggerEvent(
trigger_type="timer",
source_id=trigger_id,
payload={
"task": tdef.task or "",
"trigger_config": tdef.trigger_config,
},
)
await queen_node.inject_trigger(event)
except asyncio.CancelledError:
raise
except Exception:
logger.warning("Timer trigger '%s' tick failed", trigger_id, exc_info=True)
task = asyncio.create_task(_timer_loop(), name=f"trigger_timer_{trigger_id}")
if not hasattr(session, "active_timer_tasks"):
session.active_timer_tasks = {}
session.active_timer_tasks[trigger_id] = task
async def _start_trigger_webhook(session: Any, trigger_id: str, tdef: Any) -> None:
"""Subscribe to WEBHOOK_RECEIVED events and route matching ones to the queen."""
from framework.graph.event_loop_node import TriggerEvent
from framework.runtime.webhook_server import WebhookRoute, WebhookServer, WebhookServerConfig
bus = session.event_bus
path = tdef.trigger_config.get("path", "")
methods = [m.upper() for m in tdef.trigger_config.get("methods", ["POST"])]
async def _on_webhook(event: AgentEvent) -> None:
data = event.data or {}
if data.get("path") != path:
return
if data.get("method", "").upper() not in methods:
return
# Gate on worker being loaded
if getattr(session, "worker_runtime", None) is None:
return
executor = getattr(session, "queen_executor", None)
if executor is None:
return
queen_node = getattr(executor, "node_registry", {}).get("queen")
if queen_node is None:
return
trigger_event = TriggerEvent(
trigger_type="webhook",
source_id=trigger_id,
payload={
"task": tdef.task or "",
"path": data.get("path", ""),
"method": data.get("method", ""),
"headers": data.get("headers", {}),
"payload": data.get("payload", {}),
"query_params": data.get("query_params", {}),
},
)
await queen_node.inject_trigger(trigger_event)
sub_id = bus.subscribe(
event_types=[EventType.WEBHOOK_RECEIVED],
handler=_on_webhook,
filter_stream=trigger_id,
)
if not hasattr(session, "active_webhook_subs"):
session.active_webhook_subs = {}
session.active_webhook_subs[trigger_id] = sub_id
# Ensure the webhook HTTP server is running
if getattr(session, "queen_webhook_server", None) is None:
port = int(tdef.trigger_config.get("port", 8090))
config = WebhookServerConfig(host="127.0.0.1", port=port)
server = WebhookServer(bus, config)
session.queen_webhook_server = server
server = session.queen_webhook_server
route = WebhookRoute(source_id=trigger_id, path=path, methods=methods)
server.add_route(route)
if not getattr(server, "is_running", False):
await server.start()
server.is_running = True
def _dissolve_planning_nodes(
draft: dict,
) -> tuple[dict, dict[str, list[str]]]:
@@ -2517,7 +2717,6 @@ def register_queen_lifecycle_tools(
def _format_time_ago(ts) -> str:
"""Format a datetime as relative time ago."""
from datetime import datetime
now = datetime.now(UTC)
if ts.tzinfo is None:
@@ -2555,7 +2754,6 @@ def register_queen_lifecycle_tools(
- pending_question (when waiting)
- _active_execs (internal, stripped before return)
"""
from datetime import datetime
graph_id = runtime.graph_id
reg = runtime.get_graph_registration(graph_id)
@@ -3738,5 +3936,319 @@ def register_queen_lifecycle_tools(
)
tools_registered += 1
# --- set_trigger -----------------------------------------------------------
async def set_trigger(
trigger_id: str,
trigger_type: str | None = None,
trigger_config: dict | None = None,
task: str | None = None,
) -> str:
"""Activate a trigger so it fires periodically into the queen."""
if trigger_id in getattr(session, "active_trigger_ids", set()):
return json.dumps({"error": f"Trigger '{trigger_id}' is already active."})
# Look up existing or create new
available = getattr(session, "available_triggers", {})
tdef = available.get(trigger_id)
if tdef is None:
if trigger_type and trigger_config:
from framework.runtime.triggers import TriggerDefinition
tdef = TriggerDefinition(
id=trigger_id,
trigger_type=trigger_type,
trigger_config=trigger_config,
)
available[trigger_id] = tdef
else:
return json.dumps(
{
"error": (
f"Trigger '{trigger_id}' not found. "
"Provide trigger_type and trigger_config to create a custom trigger."
)
}
)
# Apply task override if provided
if task:
tdef.task = task
# Task is mandatory before activation
if not tdef.task:
return json.dumps(
{
"error": f"Trigger '{trigger_id}' has no task configured. "
"Set a task describing what the worker should do when this trigger fires."
}
)
# Use provided overrides if given
t_type = trigger_type or tdef.trigger_type
t_config = trigger_config or tdef.trigger_config
if trigger_type:
tdef.trigger_type = t_type
if trigger_config:
tdef.trigger_config = t_config
# Validate and activate by type
if t_type == "webhook":
path = t_config.get("path", "").strip()
if not path or not path.startswith("/"):
return json.dumps(
{
"error": (
"Webhook trigger requires 'path' starting with '/'"
" in trigger_config (e.g. '/hooks/github')."
)
}
)
valid_methods = {"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"}
methods = t_config.get("methods", ["POST"])
invalid = [m.upper() for m in methods if m.upper() not in valid_methods]
if invalid:
return json.dumps(
{"error": f"Invalid HTTP methods: {invalid}. Valid: {sorted(valid_methods)}"}
)
try:
await _start_trigger_webhook(session, trigger_id, tdef)
except Exception as e:
return json.dumps({"error": f"Failed to start webhook trigger: {e}"})
tdef.active = True
session.active_trigger_ids.add(trigger_id)
await _persist_active_triggers(session, session_id)
_save_trigger_to_agent(session, trigger_id, tdef)
bus = getattr(session, "event_bus", None)
if bus:
await bus.publish(
AgentEvent(
type=EventType.TRIGGER_ACTIVATED,
stream_id="queen",
data={
"trigger_id": trigger_id,
"trigger_type": t_type,
"trigger_config": t_config,
},
)
)
port = int(t_config.get("port", 8090))
return json.dumps(
{
"status": "activated",
"trigger_id": trigger_id,
"trigger_type": t_type,
"webhook_url": f"http://127.0.0.1:{port}{path}",
}
)
if t_type != "timer":
return json.dumps({"error": f"Unsupported trigger type: {t_type}"})
cron_expr = t_config.get("cron")
interval = t_config.get("interval_minutes")
if cron_expr:
try:
from croniter import croniter
if not croniter.is_valid(cron_expr):
return json.dumps({"error": f"Invalid cron expression: {cron_expr}"})
except ImportError:
return json.dumps(
{"error": "croniter package not installed — cannot validate cron expression."}
)
elif interval:
if not isinstance(interval, (int, float)) or interval <= 0:
return json.dumps({"error": f"interval_minutes must be > 0, got {interval}"})
else:
return json.dumps(
{"error": "Timer trigger needs 'cron' or 'interval_minutes' in trigger_config."}
)
# Start timer
try:
await _start_trigger_timer(session, trigger_id, tdef)
except Exception as e:
return json.dumps({"error": f"Failed to start trigger timer: {e}"})
tdef.active = True
session.active_trigger_ids.add(trigger_id)
# Persist to session state and agent definition
await _persist_active_triggers(session, session_id)
_save_trigger_to_agent(session, trigger_id, tdef)
# Emit event
bus = getattr(session, "event_bus", None)
if bus:
await bus.publish(
AgentEvent(
type=EventType.TRIGGER_ACTIVATED,
stream_id="queen",
data={
"trigger_id": trigger_id,
"trigger_type": t_type,
"trigger_config": t_config,
},
)
)
return json.dumps(
{
"status": "activated",
"trigger_id": trigger_id,
"trigger_type": t_type,
"trigger_config": t_config,
}
)
_set_trigger_tool = Tool(
name="set_trigger",
description=(
"Activate a trigger (timer) so it fires periodically. "
"Use trigger_id of an available trigger, or provide trigger_type + trigger_config"
" to create a custom one. "
"A task must be configured before activation —"
" either pre-set on the trigger or provided here."
),
parameters={
"type": "object",
"properties": {
"trigger_id": {
"type": "string",
"description": (
"ID of the trigger to activate (from list_triggers) or a new custom ID"
),
},
"trigger_type": {
"type": "string",
"description": "Type of trigger ('timer'). Only needed for custom triggers.",
},
"trigger_config": {
"type": "object",
"description": (
"Config for the trigger."
" Timer: {cron: '*/5 * * * *'} or {interval_minutes: 5}."
" Only needed for custom triggers."
),
},
"task": {
"type": "string",
"description": (
"The task/instructions for the worker when this trigger fires"
" (e.g. 'Process inbox emails using saved rules')."
" Required if not already configured on the trigger."
),
},
},
"required": ["trigger_id"],
},
)
registry.register("set_trigger", _set_trigger_tool, lambda inputs: set_trigger(**inputs))
tools_registered += 1
# --- remove_trigger --------------------------------------------------------
async def remove_trigger(trigger_id: str) -> str:
"""Deactivate an active trigger."""
if trigger_id not in getattr(session, "active_trigger_ids", set()):
return json.dumps({"error": f"Trigger '{trigger_id}' is not active."})
# Cancel timer task (if timer trigger)
task = session.active_timer_tasks.pop(trigger_id, None)
if task and not task.done():
task.cancel()
getattr(session, "trigger_next_fire", {}).pop(trigger_id, None)
# Unsubscribe webhook handler (if webhook trigger)
webhook_subs = getattr(session, "active_webhook_subs", {})
if sub_id := webhook_subs.pop(trigger_id, None):
try:
session.event_bus.unsubscribe(sub_id)
except Exception:
pass
session.active_trigger_ids.discard(trigger_id)
# Mark inactive
available = getattr(session, "available_triggers", {})
tdef = available.get(trigger_id)
if tdef:
tdef.active = False
# Persist to session state and remove from agent definition
await _persist_active_triggers(session, session_id)
_remove_trigger_from_agent(session, trigger_id)
# Emit event
bus = getattr(session, "event_bus", None)
if bus:
await bus.publish(
AgentEvent(
type=EventType.TRIGGER_DEACTIVATED,
stream_id="queen",
data={"trigger_id": trigger_id},
)
)
return json.dumps({"status": "deactivated", "trigger_id": trigger_id})
_remove_trigger_tool = Tool(
name="remove_trigger",
description=(
"Deactivate an active trigger."
" The trigger stops firing but remains available for re-activation."
),
parameters={
"type": "object",
"properties": {
"trigger_id": {
"type": "string",
"description": "ID of the trigger to deactivate",
},
},
"required": ["trigger_id"],
},
)
registry.register(
"remove_trigger", _remove_trigger_tool, lambda inputs: remove_trigger(**inputs)
)
tools_registered += 1
# --- list_triggers ---------------------------------------------------------
async def list_triggers() -> str:
"""List all available triggers and their status."""
available = getattr(session, "available_triggers", {})
triggers = []
for tdef in available.values():
triggers.append(
{
"id": tdef.id,
"trigger_type": tdef.trigger_type,
"trigger_config": tdef.trigger_config,
"description": tdef.description,
"task": tdef.task,
"active": tdef.active,
}
)
return json.dumps({"triggers": triggers})
_list_triggers_tool = Tool(
name="list_triggers",
description=(
"List all available triggers (from the loaded worker) and their active/inactive status."
),
parameters={
"type": "object",
"properties": {},
},
)
registry.register("list_triggers", _list_triggers_tool, lambda inputs: list_triggers())
tools_registered += 1
logger.info("Registered %d queen lifecycle tools", tools_registered)
return tools_registered
@@ -78,19 +78,6 @@ def register_graph_tools(registry: ToolRegistry, runtime: AgentRuntime) -> int:
isolation_level="shared",
)
# Async entry points
for aep in runner.graph.async_entry_points:
entry_points[aep.id] = EntryPointSpec(
id=aep.id,
name=aep.name,
entry_node=aep.entry_node,
trigger_type=aep.trigger_type,
trigger_config=aep.trigger_config,
isolation_level=aep.isolation_level,
priority=aep.priority,
max_concurrent=aep.max_concurrent,
)
await runtime.add_graph(
graph_id=graph_id,
graph=runner.graph,
@@ -1,20 +1,17 @@
"""Worker monitoring tools for the Health Judge and Queen triage agents.
"""Worker monitoring tools for Queen triage agents.
Three tools are registered by ``register_worker_monitoring_tools()``:
- ``get_worker_health_summary`` reads the worker's session log files and
returns a compact health snapshot (recent verdicts, step count, timing).
session_id is optional: if omitted, the most recent active session is
auto-discovered from storage. No agent-side configuration required.
Used by the Health Judge on every timer tick.
auto-discovered from storage.
- ``emit_escalation_ticket`` validates and publishes an EscalationTicket
to the shared EventBus as a WORKER_ESCALATION_TICKET event.
Used by the Health Judge when it decides to escalate.
- ``notify_operator`` emits a QUEEN_INTERVENTION_REQUESTED event so the TUI
can surface a non-disruptive operator notification.
Used by the Queen's ticket_triage_node when it decides to intervene.
Usage::
@@ -45,7 +42,7 @@ def register_worker_monitoring_tools(
registry: ToolRegistry,
event_bus: EventBus,
storage_path: Path,
stream_id: str = "judge",
stream_id: str = "monitoring",
worker_graph_id: str | None = None,
) -> int:
"""Register worker monitoring tools bound to *event_bus* and *storage_path*.
@@ -55,7 +52,7 @@ def register_worker_monitoring_tools(
event_bus: The shared EventBus for the worker runtime.
storage_path: Root storage path of the worker runtime
(e.g. ``~/.hive/agents/{name}``).
stream_id: Stream ID used when emitting events; defaults to judge's stream.
stream_id: Stream ID used when emitting events.
worker_graph_id: The primary worker graph's ID. Included in health summary
so the judge can populate ticket identity fields accurately.
@@ -65,7 +62,7 @@ def register_worker_monitoring_tools(
from framework.llm.provider import Tool
storage_path = Path(storage_path)
# Derive agent identity from storage path so the judge can fill ticket fields.
# Derive agent identity from storage path for ticket fields.
# storage_path is ~/.hive/agents/{agent_name} — the name is the last component.
_worker_agent_id: str = storage_path.name
_worker_graph_id: str = worker_graph_id or storage_path.name
@@ -201,10 +198,9 @@ def register_worker_monitoring_tools(
description=(
"Read the worker agent's execution logs and return a compact health snapshot. "
"Returns worker_agent_id and worker_graph_id (use these for ticket identity fields), "
"recent judge verdicts, step count, time since last step, and "
"recent verdicts, step count, time since last step, and "
"a snippet of the most recent LLM output. "
"session_id is optional — omit it to auto-discover the most recent active session. "
"Use this on every health check to observe trends."
"session_id is optional — omit it to auto-discover the most recent active session."
),
parameters={
"type": "object",
@@ -241,8 +237,7 @@ def register_worker_monitoring_tools(
"""Validate and publish an EscalationTicket to the shared EventBus.
ticket_json must be a JSON string containing all required EscalationTicket
fields. The ticket is validated before publishing this ensures the judge
has genuinely filled out all required evidence fields.
fields. The ticket is validated before publishing.
Returns a confirmation JSON with the ticket_id on success, or an error.
"""
@@ -257,7 +252,7 @@ def register_worker_monitoring_tools(
try:
await event_bus.emit_worker_escalation_ticket(
stream_id=stream_id,
node_id="judge",
node_id="monitoring",
ticket=ticket.model_dump(),
)
logger.info(
@@ -280,7 +275,6 @@ def register_worker_monitoring_tools(
name="emit_escalation_ticket",
description=(
"Validate and publish a structured EscalationTicket to the shared EventBus. "
"The Queen's ticket_receiver entry point will fire and triage the ticket. "
"ticket_json must be a JSON string with all required EscalationTicket fields: "
"worker_agent_id, worker_session_id, worker_node_id, worker_graph_id, "
"severity (low/medium/high/critical), cause, judge_reasoning, suggested_action, "
+5
View File
@@ -38,4 +38,9 @@ export const api = {
body: body ? JSON.stringify(body) : undefined,
}),
delete: <T>(path: string) => request<T>(path, { method: "DELETE" }),
patch: <T>(path: string, body?: unknown) =>
request<T>(path, {
method: "PATCH",
body: body ? JSON.stringify(body) : undefined,
}),
};
+10 -12
View File
@@ -1,11 +1,11 @@
import { api } from "./client";
import type {
AgentEvent,
LiveSession,
LiveSessionDetail,
SessionSummary,
SessionDetail,
Checkpoint,
Message,
EntryPoint,
} from "./types";
@@ -64,12 +64,18 @@ export const sessionsApi = {
`/sessions/${sessionId}/entry-points`,
),
updateTriggerTask: (sessionId: string, triggerId: string, task: string) =>
api.patch<{ trigger_id: string; task: string }>(
`/sessions/${sessionId}/triggers/${triggerId}`,
{ task },
),
graphs: (sessionId: string) =>
api.get<{ graphs: string[] }>(`/sessions/${sessionId}/graphs`),
/** Get queen conversation history for a session (works for cold/post-restart sessions too). */
queenMessages: (sessionId: string) =>
api.get<{ messages: Message[]; session_id: string }>(`/sessions/${sessionId}/queen-messages`),
/** Get persisted eventbus log for a session (works for cold sessions — used for full UI replay). */
eventsHistory: (sessionId: string) =>
api.get<{ events: AgentEvent[]; session_id: string }>(`/sessions/${sessionId}/events/history`),
/** List all queen sessions on disk — live + cold (post-restart). */
history: () =>
@@ -105,12 +111,4 @@ export const sessionsApi = {
api.post<{ execution_id: string }>(
`/sessions/${sessionId}/worker-sessions/${wsId}/checkpoints/${checkpointId}/restore`,
),
messages: (sessionId: string, wsId: string, nodeId?: string) => {
const params = new URLSearchParams({ client_only: "true" });
if (nodeId) params.set("node_id", nodeId);
return api.get<{ messages: Message[] }>(
`/sessions/${sessionId}/worker-sessions/${wsId}/messages?${params}`,
);
},
};
+11 -1
View File
@@ -31,6 +31,8 @@ export interface EntryPoint {
entry_node: string;
trigger_type: string;
trigger_config?: Record<string, unknown>;
/** Worker task string when this trigger fires autonomously. */
task?: string;
/** Seconds until the next timer fire (only present for timer entry points). */
next_fire_in?: number;
}
@@ -41,6 +43,7 @@ export interface DiscoverEntry {
description: string;
category: string;
session_count: number;
run_count: number;
node_count: number;
tool_count: number;
tags: string[];
@@ -311,6 +314,7 @@ export type EventTypeName =
| "tool_call_completed"
| "client_output_delta"
| "client_input_requested"
| "client_input_received"
| "node_internal_output"
| "node_input_blocked"
| "node_stalled"
@@ -328,7 +332,12 @@ export type EventTypeName =
| "queen_phase_changed"
| "subagent_report"
| "draft_graph_updated"
| "flowchart_map_updated";
| "flowchart_map_updated"
| "trigger_available"
| "trigger_activated"
| "trigger_deactivated"
| "trigger_fired"
| "trigger_removed";
export interface AgentEvent {
type: EventTypeName;
@@ -339,4 +348,5 @@ export interface AgentEvent {
timestamp: string;
correlation_id: string | null;
graph_id: string | null;
run_id?: string | null;
}
+31 -8
View File
@@ -171,6 +171,14 @@ function useThemeColors() {
return { statusColors, triggerColors };
}
// Active trigger — brighter, more saturated blue
const activeTriggerColors = {
bg: "hsl(210,30%,18%)",
border: "hsl(210,50%,50%)",
text: "hsl(210,40%,75%)",
icon: "hsl(210,60%,65%)",
};
const triggerIcons: Record<string, string> = {
webhook: "\u26A1", // lightning bolt
timer: "\u23F1", // stopwatch
@@ -546,10 +554,12 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
const triggerAvailW = nodeW - 38;
const triggerDisplayLabel = truncateLabel(node.label, triggerAvailW, triggerFontSize);
const nextFireIn = node.triggerConfig?.next_fire_in as number | undefined;
const isActive = node.status === "running" || node.status === "complete";
const colors = isActive ? activeTriggerColors : triggerColors;
// Format countdown for display below node
let countdownLabel: string | null = null;
if (nextFireIn != null && nextFireIn > 0) {
if (isActive && nextFireIn != null && nextFireIn > 0) {
const h = Math.floor(nextFireIn / 3600);
const m = Math.floor((nextFireIn % 3600) / 60);
const s = Math.floor(nextFireIn % 60);
@@ -558,24 +568,28 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
: `next in ${m}m ${String(s).padStart(2, "0")}s`;
}
// Status label below countdown
const statusLabel = isActive ? "active" : "inactive";
const statusColor = isActive ? "hsl(140,40%,50%)" : "hsl(210,20%,40%)";
return (
<g key={node.id} onClick={() => onNodeClick?.(node)} style={{ cursor: onNodeClick ? "pointer" : "default" }}>
<title>{node.label}</title>
{/* Pill-shaped background with dashed border */}
{/* Pill-shaped background — solid border when active, dashed when inactive */}
<rect
x={pos.x} y={pos.y}
width={nodeW} height={NODE_H}
rx={NODE_H / 2}
fill={triggerColors.bg}
stroke={triggerColors.border}
strokeWidth={1}
strokeDasharray="4 2"
fill={colors.bg}
stroke={colors.border}
strokeWidth={isActive ? 1.5 : 1}
strokeDasharray={isActive ? undefined : "4 2"}
/>
{/* Trigger type icon */}
<text
x={pos.x + 18} y={pos.y + NODE_H / 2}
fill={triggerColors.icon} fontSize={13}
fill={colors.icon} fontSize={13}
textAnchor="middle" dominantBaseline="middle"
>
{icon}
@@ -584,7 +598,7 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
{/* Label */}
<text
x={pos.x + 32} y={pos.y + NODE_H / 2}
fill={triggerColors.text}
fill={colors.text}
fontSize={triggerFontSize}
fontWeight={500}
dominantBaseline="middle"
@@ -603,6 +617,15 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
{countdownLabel}
</text>
)}
{/* Status label */}
<text
x={pos.x + nodeW / 2} y={pos.y + NODE_H + (countdownLabel ? 25 : 13)}
fill={statusColor} fontSize={9}
textAnchor="middle" opacity={0.8}
>
{statusLabel}
</text>
</g>
);
};
+23 -9
View File
@@ -10,12 +10,14 @@ export interface ChatMessage {
agentColor: string;
content: string;
timestamp: string;
type?: "system" | "agent" | "user" | "tool_status" | "worker_input_request";
type?: "system" | "agent" | "user" | "tool_status" | "worker_input_request" | "run_divider";
role?: "queen" | "worker";
/** Which worker thread this message belongs to (worker agent name) */
thread?: string;
/** Epoch ms when this message was first created — used for ordering queen/worker interleaving */
createdAt?: number;
/** Queen phase active when this message was created */
phase?: "planning" | "building" | "staging" | "running";
}
interface ChatPanelProps {
@@ -154,6 +156,18 @@ const MessageBubble = memo(function MessageBubble({ msg, queenPhase }: { msg: Ch
const isQueen = msg.role === "queen";
const color = getColor(msg.agent, msg.role);
if (msg.type === "run_divider") {
return (
<div className="flex items-center gap-3 py-2 my-1">
<div className="flex-1 h-px bg-border/60" />
<span className="text-[10px] text-muted-foreground font-medium uppercase tracking-wider">
{msg.content}
</span>
<div className="flex-1 h-px bg-border/60" />
</div>
);
}
if (msg.type === "system") {
return (
<div className="flex justify-center py-1">
@@ -205,13 +219,13 @@ const MessageBubble = memo(function MessageBubble({ msg, queenPhase }: { msg: Ch
}`}
>
{isQueen
? queenPhase === "running"
? "running phase"
: queenPhase === "staging"
? "staging phase"
: queenPhase === "planning"
? "planning phase"
: "building phase"
? ((msg.phase ?? queenPhase) === "running"
? "running"
: (msg.phase ?? queenPhase) === "staging"
? "staging"
: (msg.phase ?? queenPhase) === "planning"
? "planning"
: "building")
: "Worker"}
</span>
</div>
@@ -225,7 +239,7 @@ const MessageBubble = memo(function MessageBubble({ msg, queenPhase }: { msg: Ch
</div>
</div>
);
}, (prev, next) => prev.msg.id === next.msg.id && prev.msg.content === next.msg.content && prev.queenPhase === next.queenPhase);
}, (prev, next) => prev.msg.id === next.msg.id && prev.msg.content === next.msg.content && prev.msg.phase === next.msg.phase && prev.queenPhase === next.queenPhase);
export default function ChatPanel({ messages, onSend, isWaiting, isWorkerWaiting, isBusy, activeThread, disabled, onCancel, pendingQuestion, pendingOptions, pendingQuestions, onQuestionSubmit, onMultiQuestionSubmit, onQuestionDismiss, queenPhase }: ChatPanelProps) {
const [input, setInput] = useState("");
+3 -3
View File
@@ -1,8 +1,8 @@
import { useState, useCallback } from "react";
import { useNavigate } from "react-router-dom";
import { Crown, X } from "lucide-react";
import { loadPersistedTabs, savePersistedTabs, TAB_STORAGE_KEY, type PersistedTabState } from "@/lib/tab-persistence";
import { sessionsApi } from "@/api/sessions";
import { loadPersistedTabs, savePersistedTabs, TAB_STORAGE_KEY, type PersistedTabState } from "@/lib/tab-persistence";
export interface TopBarTab {
agentType: string;
@@ -51,10 +51,10 @@ export default function TopBar({ tabs: tabsProp, onTabClick, onCloseTab, canClos
onCloseTab(agentType);
return;
}
// Kill the backend session (queen/judge/worker) even outside workspace
// Kill the backend session (queen/worker) even outside workspace
sessionsApi.list()
.then(({ sessions }) => {
const match = sessions.find(s => s.agent_path === agentType);
const match = sessions.find(s => s.agent_path.endsWith(agentType));
if (match) return sessionsApi.stop(match.session_id);
})
.catch(() => {}); // fire-and-forget
+22 -65
View File
@@ -1,60 +1,6 @@
import { describe, it, expect } from "vitest";
import { backendMessageToChatMessage, sseEventToChatMessage, formatAgentDisplayName } from "./chat-helpers";
import type { AgentEvent, Message } from "@/api/types";
// ---------------------------------------------------------------------------
// backendMessageToChatMessage
// ---------------------------------------------------------------------------
describe("backendMessageToChatMessage", () => {
it("converts a user message", () => {
const msg: Message = { seq: 1, role: "user", content: "hello", _node_id: "chat" };
const result = backendMessageToChatMessage(msg, "inbox-management");
expect(result.type).toBe("user");
expect(result.agent).toBe("You");
expect(result.role).toBeUndefined();
expect(result.content).toBe("hello");
expect(result.thread).toBe("inbox-management");
});
it("converts an assistant message with node_id as agent", () => {
const msg: Message = { seq: 2, role: "assistant", content: "hi", _node_id: "intake" };
const result = backendMessageToChatMessage(msg, "inbox-management");
expect(result.agent).toBe("intake");
expect(result.role).toBe("worker");
expect(result.type).toBeUndefined();
});
it("defaults agent to 'Agent' when _node_id is empty", () => {
const msg: Message = { seq: 3, role: "assistant", content: "ok", _node_id: "" };
const result = backendMessageToChatMessage(msg, "inbox-management");
expect(result.agent).toBe("Agent");
});
it("produces deterministic ID from seq", () => {
const msg: Message = { seq: 42, role: "user", content: "test", _node_id: "x" };
const result = backendMessageToChatMessage(msg, "thread");
expect(result.id).toBe("backend-42");
});
it("passes through the thread parameter", () => {
const msg: Message = { seq: 1, role: "user", content: "hi", _node_id: "x" };
const result = backendMessageToChatMessage(msg, "my-thread");
expect(result.thread).toBe("my-thread");
});
it("uses agentDisplayName instead of node_id when provided", () => {
const msg: Message = { seq: 2, role: "assistant", content: "hi", _node_id: "intake" };
const result = backendMessageToChatMessage(msg, "thread", "Competitive Intel Agent");
expect(result.agent).toBe("Competitive Intel Agent");
});
it("still shows 'You' for user messages even when agentDisplayName is provided", () => {
const msg: Message = { seq: 1, role: "user", content: "hello", _node_id: "chat" };
const result = backendMessageToChatMessage(msg, "thread", "My Agent");
expect(result.agent).toBe("You");
});
});
import { sseEventToChatMessage, formatAgentDisplayName } from "./chat-helpers";
import type { AgentEvent } from "@/api/types";
// ---------------------------------------------------------------------------
// sseEventToChatMessage
@@ -261,25 +207,36 @@ describe("sseEventToChatMessage", () => {
expect(result!.id).toMatch(/^stream-t-\d+-chat$/);
});
it("converts client_input_requested with prompt to message", () => {
it("returns null for client_input_requested (handled in workspace.tsx)", () => {
const event = makeEvent({
type: "client_input_requested",
node_id: "chat",
execution_id: "abc",
data: { prompt: "What next?" },
});
const result = sseEventToChatMessage(event, "t");
expect(result).not.toBeNull();
expect(result!.content).toBe("What next?");
expect(result!.role).toBe("worker");
expect(sseEventToChatMessage(event, "t")).toBeNull();
});
it("returns null for client_input_requested without prompt", () => {
it("converts client_input_received to user message", () => {
const event = makeEvent({
type: "client_input_requested",
node_id: "chat",
type: "client_input_received",
node_id: "queen",
execution_id: "abc",
data: { prompt: "" },
data: { content: "do the thing" },
});
const result = sseEventToChatMessage(event, "t");
expect(result).not.toBeNull();
expect(result!.agent).toBe("You");
expect(result!.type).toBe("user");
expect(result!.content).toBe("do the thing");
});
it("returns null for client_input_received with empty content", () => {
const event = makeEvent({
type: "client_input_received",
node_id: "queen",
execution_id: "abc",
data: { content: "" },
});
expect(sseEventToChatMessage(event, "t")).toBeNull();
});
+39 -28
View File
@@ -1,10 +1,10 @@
/**
* Pure functions for converting backend messages and SSE events into ChatMessage objects.
* Pure functions for converting SSE events into ChatMessage objects.
* No React dependencies — just JSON in, object out.
*/
import type { ChatMessage } from "@/components/ChatPanel";
import type { AgentEvent, Message } from "@/api/types";
import type { AgentEvent } from "@/api/types";
/**
* Derive a human-readable display name from a raw agent identifier.
@@ -27,32 +27,6 @@ export function formatAgentDisplayName(raw: string): string {
.trim();
}
/**
* Convert a backend Message (from sessionsApi.messages()) into a ChatMessage.
* When agentDisplayName is provided, it is used as the sender for all agent
* messages instead of the raw node_id.
*/
export function backendMessageToChatMessage(
msg: Message,
thread: string,
agentDisplayName?: string,
): ChatMessage {
// Use file-mtime created_at (epoch seconds → ms) for cross-conversation
// ordering; fall back to seq for backwards compatibility.
const createdAt = msg.created_at ? msg.created_at * 1000 : msg.seq;
return {
id: `backend-${msg._node_id}-${msg.seq}`,
agent: msg.role === "user" ? "You" : agentDisplayName || msg._node_id || "Agent",
agentColor: "",
content: msg.content,
timestamp: "",
type: msg.role === "user" ? "user" : undefined,
role: msg.role === "user" ? undefined : "worker",
thread,
createdAt,
};
}
/**
* Convert an SSE AgentEvent into a ChatMessage, or null if the event
* doesn't produce a visible chat message.
@@ -101,6 +75,21 @@ export function sseEventToChatMessage(
// create a worker_input_request message and set awaitingInput state.
return null;
case "client_input_received": {
const userContent = (event.data?.content as string) || "";
if (!userContent) return null;
return {
id: `user-input-${event.timestamp}`,
agent: "You",
agentColor: "",
content: userContent,
timestamp: "",
type: "user",
thread,
createdAt,
};
}
case "llm_text_delta": {
const snapshot = (event.data?.snapshot as string) || (event.data?.content as string) || "";
if (!snapshot) return null;
@@ -148,3 +137,25 @@ export function sseEventToChatMessage(
return null;
}
}
type QueenPhase = "planning" | "building" | "staging" | "running";
const VALID_PHASES = new Set<string>(["planning", "building", "staging", "running"]);
/**
* Scan an array of persisted events and return the last queen phase seen,
* or null if no phase event exists. Reads both `queen_phase_changed` events
* and the per-iteration `phase` metadata on `node_loop_iteration` events.
*/
export function extractLastPhase(events: AgentEvent[]): QueenPhase | null {
let last: QueenPhase | null = null;
for (const evt of events) {
const phase =
evt.type === "queen_phase_changed" ? (evt.data?.phase as string) :
evt.type === "node_loop_iteration" ? (evt.data?.phase as string | undefined) :
undefined;
if (phase && VALID_PHASES.has(phase)) {
last = phase as QueenPhase;
}
}
return last;
}
+1
View File
@@ -51,6 +51,7 @@ export function topologyToGraphNodes(topology: GraphTopology): GraphNode[] {
triggerConfig: {
...ep.trigger_config,
...(ep.next_fire_in != null ? { next_fire_in: ep.next_fire_in } : {}),
...(ep.task ? { task: ep.task } : {}),
},
next: [ep.entry_node],
});
+1 -1
View File
@@ -113,7 +113,7 @@ export default function MyAgents() {
<div className="flex items-center gap-1">
<Activity className="w-3 h-3" />
<span>
{agent.session_count} session{agent.session_count !== 1 ? "s" : ""}
{agent.run_count} run{agent.run_count !== 1 ? "s" : ""}
</span>
</div>
<span>{agent.last_active ? timeAgo(agent.last_active) : "Never run"}</span>
+518 -116
View File
@@ -14,8 +14,8 @@ import { executionApi } from "@/api/execution";
import { graphsApi } from "@/api/graphs";
import { sessionsApi } from "@/api/sessions";
import { useMultiSSE } from "@/hooks/use-sse";
import type { LiveSession, AgentEvent, DiscoverEntry, Message, NodeSpec, DraftGraph as DraftGraphData } from "@/api/types";
import { backendMessageToChatMessage, sseEventToChatMessage, formatAgentDisplayName } from "@/lib/chat-helpers";
import type { LiveSession, AgentEvent, DiscoverEntry, NodeSpec, DraftGraph as DraftGraphData } from "@/api/types";
import { sseEventToChatMessage, formatAgentDisplayName } from "@/lib/chat-helpers";
import { topologyToGraphNodes } from "@/lib/graph-converter";
import { ApiError } from "@/api/client";
@@ -242,6 +242,49 @@ function truncate(s: string, max: number): string {
return s.length > max ? s.slice(0, max) + "..." : s;
}
type SessionRestoreResult = {
messages: ChatMessage[];
restoredPhase: "planning" | "building" | "staging" | "running" | null;
};
/**
* Restore session messages from the persisted event log.
* Returns an empty result if no event log exists.
*/
async function restoreSessionMessages(
sessionId: string,
thread: string,
agentDisplayName: string,
): Promise<SessionRestoreResult> {
try {
const { events } = await sessionsApi.eventsHistory(sessionId);
if (events.length > 0) {
const messages: ChatMessage[] = [];
let runningPhase: ChatMessage["phase"] = undefined;
for (const evt of events) {
// Track phase transitions so each message gets the phase it was created in
const p = evt.type === "queen_phase_changed" ? evt.data?.phase as string
: evt.type === "node_loop_iteration" ? evt.data?.phase as string | undefined
: undefined;
if (p && ["planning", "building", "staging", "running"].includes(p)) {
runningPhase = p as ChatMessage["phase"];
}
const msg = sseEventToChatMessage(evt, thread, agentDisplayName);
if (!msg) continue;
if (evt.stream_id === "queen") {
msg.role = "queen";
msg.phase = runningPhase;
}
messages.push(msg);
}
return { messages, restoredPhase: runningPhase ?? null };
}
} catch {
// Event log not available — session will start fresh.
}
return { messages: [], restoredPhase: null };
}
// --- Per-agent backend state (consolidated) ---
interface AgentBackendState {
sessionId: string | null;
@@ -266,6 +309,7 @@ interface AgentBackendState {
flowchartMap: Record<string, string[]> | null;
workerRunState: "idle" | "deploying" | "running";
currentExecutionId: string | null;
currentRunId: string | null;
nodeLogs: Record<string, string[]>;
nodeActionPlans: Record<string, string>;
subagentReports: { subagent_id: string; message: string; data?: Record<string, unknown>; timestamp: string }[];
@@ -309,6 +353,7 @@ function defaultAgentState(): AgentBackendState {
agentPath: null,
workerRunState: "idle",
currentExecutionId: null,
currentRunId: null,
nodeLogs: {},
nodeActionPlans: {},
subagentReports: [],
@@ -353,11 +398,8 @@ export default function Workspace() {
// tabKey is the actual key used in sessionsByAgent (may contain "::" suffix).
// Fall back to agentType for tabs persisted before this field was added.
const tabKey = tab.tabKey || tab.agentType;
// Skip new-agent tabs when starting fresh from home with a prompt
// to avoid creating duplicate sessions
if (initialPrompt && hasExplicitAgent && (tab.agentType === "new-agent" || tab.agentType.startsWith("new-agent-"))) {
continue;
}
// New-agent tabs each have a unique key (e.g. "new-agent-abc123"),
// so they never collide with the incoming tab — always restore them.
if (!initial[tabKey]) initial[tabKey] = [];
const session = createSession(tab.agentType, tab.label);
session.id = tab.id;
@@ -388,15 +430,26 @@ export default function Workspace() {
if (initial[initialAgent]?.length) {
return initial;
}
// Also check for existing tabs with instance suffixes (e.g. "agentType::instanceId")
const existingKey = Object.keys(initial).find(
k => baseAgentType(k) === initialAgent && initial[k]?.length > 0
);
if (existingKey && !initialPrompt) {
return initial;
}
// If the user submitted a new prompt from the home page, always create
// a fresh session so the prompt isn't lost into an existing session.
// initialAgent is already a unique key (e.g. "new-agent-abc123") when
// coming from home, so the new tab won't overwrite existing ones.
if (initialPrompt && hasExplicitAgent) {
const label = initialAgent.startsWith("new-agent")
const rawLabel = initialAgent.startsWith("new-agent")
? "New Agent"
: formatAgentDisplayName(initialAgent);
const existingNewAgentCount = Object.keys(initial).filter(
k => (k === "new-agent" || k.startsWith("new-agent-")) && (initial[k] || []).length > 0
).length;
const label = existingNewAgentCount === 0 ? rawLabel : `${rawLabel} #${existingNewAgentCount + 1}`;
const newSession = createSession(initialAgent, label);
initial[initialAgent] = [newSession];
return initial;
@@ -494,6 +547,8 @@ export default function Workspace() {
const [credentialAgentPath, setCredentialAgentPath] = useState<string | null>(null);
const [dismissedBanner, setDismissedBanner] = useState<string | null>(null);
const [selectedNode, setSelectedNode] = useState<GraphNode | null>(null);
const [triggerTaskDraft, setTriggerTaskDraft] = useState("");
const [triggerTaskSaving, setTriggerTaskSaving] = useState(false);
const [newTabOpen, setNewTabOpen] = useState(false);
const newTabBtnRef = useRef<HTMLButtonElement>(null);
@@ -512,6 +567,10 @@ export default function Workspace() {
// Using a ref avoids stale-closure bugs when multiple SSE events
// arrive in the same React batch.
const turnCounterRef = useRef<Record<string, number>>({});
// Per-agent queen phase ref — used to stamp each message with the phase
// it was created in (avoids stale-closure when phase change and message
// events arrive in the same React batch).
const queenPhaseRef = useRef<Record<string, string>>({});
// Synchronous ref to suppress the queen's auto-intro SSE messages
// after a cold-restore (where we already restored the conversation from disk).
@@ -658,6 +717,38 @@ export default function Workspace() {
let restoredMessageCount = 0;
// Before creating a new session, check if there's already a live backend
// session for this queen-only agent that no open tab owns.
// Skip this search when the tab has a prompt — it's a fresh agent from
// home and must always get its own session.
if (!liveSession && !coldRestoreId && !prompt) {
try {
const { sessions: allLive } = await sessionsApi.list();
const existing = allLive.find(s => !s.has_worker && !s.agent_path);
if (existing) {
const alreadyOwned = Object.values(sessionsRef.current).flat()
.some(s => s.backendSessionId === existing.session_id);
if (!alreadyOwned) {
liveSession = existing;
}
}
} catch { /* proceed to create */ }
// If no live session, check history for a cold queen-only session
if (!liveSession) {
try {
const { sessions: allHistory } = await sessionsApi.history();
const coldMatch = allHistory.find(
s => !s.agent_path && s.has_messages
);
if (coldMatch) {
coldRestoreId = coldMatch.session_id;
}
} catch { /* proceed to create fresh */ }
}
}
let restoredPhase: "planning" | "building" | "staging" | "running" | null = null;
if (!liveSession) {
// Fetch conversation history from disk BEFORE creating the new session.
// SKIP if messages were already pre-populated by handleHistoryOpen.
@@ -666,12 +757,9 @@ export default function Workspace() {
const alreadyHasMessages = (activeSess?.messages?.length ?? 0) > 0;
if (restoreFrom && !alreadyHasMessages) {
try {
const { messages: queenMsgs } = await sessionsApi.queenMessages(restoreFrom);
for (const m of queenMsgs as Message[]) {
const msg = backendMessageToChatMessage(m, agentType, "Queen Bee");
msg.role = "queen";
preRestoredMsgs.push(msg);
}
const restored = await restoreSessionMessages(restoreFrom, agentType, "Queen Bee");
preRestoredMsgs.push(...restored.messages);
restoredPhase = restored.restoredPhase;
} catch {
// Not available — will start fresh
}
@@ -741,12 +829,16 @@ export default function Workspace() {
// If no messages were actually restored, lift the intro suppression
if (restoredMessageCount === 0) suppressIntroRef.current.delete(agentType);
const qPhase = restoredPhase || liveSession.queen_phase || "planning";
queenPhaseRef.current[agentType] = qPhase;
updateAgentState(agentType, {
sessionId: liveSession.session_id,
displayName: "Queen Bee",
ready: true,
loading: false,
queenReady: true,
queenPhase: qPhase,
queenBuilding: qPhase === "building",
});
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : String(err);
@@ -784,12 +876,44 @@ export default function Workspace() {
} catch {
// 404: session was explicitly stopped (via closeAgentTab) but conversation
// files likely still exist on disk. Treat it as cold so we can restore.
// Verify files exist before assuming cold — if queenMessages succeeds with
// content, files are there.
coldRestoreId = historySourceId || storedSessionId;
}
}
// No stored session — check for a live or cold session for this agent
// that we can reuse (e.g., tab was closed but backend session survived,
// or server restarted with conversation files on disk).
if (!liveSession && !coldRestoreId) {
try {
const { sessions: allLive } = await sessionsApi.list();
const existingLive = allLive.find(s => s.agent_path.endsWith(agentPath));
if (existingLive) {
const alreadyOwned = Object.values(sessionsRef.current).flat()
.some(s => s.backendSessionId === existingLive.session_id);
if (!alreadyOwned) {
liveSession = existingLive;
isResumedSession = true;
}
}
} catch { /* proceed */ }
// If no live session, check history for a cold session to restore
if (!liveSession) {
try {
const { sessions: allHistory } = await sessionsApi.history();
const coldMatch = allHistory.find(
s => s.agent_path?.endsWith(agentPath) && s.has_messages
);
if (coldMatch) {
coldRestoreId = coldMatch.session_id;
}
} catch { /* proceed to create fresh */ }
}
}
// Track the last queen phase seen in the event log for cold restore
let restoredPhase: "planning" | "building" | "staging" | "running" | null = null;
if (!liveSession) {
// Reconnect failed — clear stale cached messages from localStorage restore.
// NEVER wipe when: (a) doing a cold restore (we'll restore from disk) or
@@ -812,29 +936,10 @@ export default function Workspace() {
// double-fetch and greeting leakage).
let preQueenMsgs: ChatMessage[] = [];
if (coldRestoreId && !alreadyHasMessages) {
try {
const { messages: queenMsgs } = await sessionsApi.queenMessages(coldRestoreId);
// Also pre-fetch worker messages from the old session if a resumable worker exists
const displayNameTemp = formatAgentDisplayName(agentPath);
for (const m of queenMsgs as Message[]) {
const msg = backendMessageToChatMessage(m, agentType, "Queen Bee");
msg.role = "queen";
preQueenMsgs.push(msg);
}
// Also try to grab worker messages while we're here
try {
const { sessions: workerSessions } = await sessionsApi.workerSessions(coldRestoreId);
const resumable = workerSessions.find(s => s.status === "active" || s.status === "paused");
if (resumable) {
const { messages: wMsgs } = await sessionsApi.messages(coldRestoreId, resumable.session_id);
for (const m of wMsgs as Message[]) {
preQueenMsgs.push(backendMessageToChatMessage(m, agentType, displayNameTemp));
}
}
} catch { /* not critical */ }
} catch {
// Not available — will start fresh
}
const displayNameTemp = formatAgentDisplayName(agentPath);
const restored = await restoreSessionMessages(coldRestoreId, agentType, displayNameTemp);
preQueenMsgs = restored.messages;
restoredPhase = restored.restoredPhase;
}
// Suppress intro whenever we are about to restore a previous conversation.
@@ -908,7 +1013,8 @@ export default function Workspace() {
// failed, the throw inside the catch exits the outer try block.
const session = liveSession!;
const displayName = formatAgentDisplayName(session.worker_name || agentType);
const initialPhase = session.queen_phase || (session.has_worker ? "staging" : "planning");
const initialPhase = restoredPhase || session.queen_phase || (session.has_worker ? "staging" : "planning");
queenPhaseRef.current[agentType] = initialPhase;
updateAgentState(agentType, {
sessionId: session.session_id,
displayName,
@@ -945,37 +1051,23 @@ export default function Workspace() {
// For cold-restore, use the old session ID. For live resume, use current session.
const historyId = coldRestoreId ?? (isResumedSession ? session.session_id : undefined);
// For LIVE resume (not cold restore), fetch worker + queen messages now.
// For LIVE resume (not cold restore), fetch event log + worker status now.
// For cold restore they were already pre-fetched above (before create) so we skip to avoid
// double-restoring and to avoid capturing the new greeting.
if (historyId && !coldRestoreId) {
const restored = await restoreSessionMessages(historyId, agentType, displayName);
restoredMsgs.push(...restored.messages);
// Check worker status (needed for isWorkerRunning flag)
try {
const { sessions: workerSessions } = await sessionsApi.workerSessions(historyId);
const resumable = workerSessions.find(
(s) => s.status === "active" || s.status === "paused",
);
isWorkerRunning = resumable?.status === "active";
if (resumable) {
const { messages } = await sessionsApi.messages(historyId, resumable.session_id);
for (const m of messages as Message[]) {
restoredMsgs.push(backendMessageToChatMessage(m, agentType, displayName));
}
}
} catch {
// Worker session listing failed — not critical
}
try {
const { messages: queenMsgs } = await sessionsApi.queenMessages(historyId);
for (const m of queenMsgs as Message[]) {
const msg = backendMessageToChatMessage(m, agentType, "Queen Bee");
msg.role = "queen";
restoredMsgs.push(msg);
}
} catch {
// Queen messages not available — not critical
}
}
// Merge messages in chronological order (only for live resume; cold restore
@@ -1105,38 +1197,79 @@ export default function Workspace() {
}
}, [agentStates, updateAgentState]);
// Poll entry points every second for agents with timers to keep
// next_fire_in countdowns fresh without re-fetching the full topology.
// Poll entry points every second to keep next_fire_in countdowns fresh
// and discover dynamically created triggers (via set_trigger).
useEffect(() => {
const id = setInterval(async () => {
for (const [agentType, sessions] of Object.entries(sessionsByAgent)) {
const session = sessions[0];
if (!session) continue;
const timerNodes = session.graphNodes.filter(
(n) => n.nodeType === "trigger" && n.triggerType === "timer",
);
if (timerNodes.length === 0) continue;
const state = agentStates[agentType];
if (!state?.sessionId) continue;
try {
const { entry_points } = await sessionsApi.entryPoints(state.sessionId);
// Skip non-manual triggers only
const triggerEps = entry_points.filter(ep => ep.trigger_type !== "manual");
if (triggerEps.length === 0) continue;
const fireMap = new Map<string, number>();
for (const ep of entry_points) {
const taskMap = new Map<string, string>();
for (const ep of triggerEps) {
if (ep.next_fire_in != null) {
fireMap.set(`__trigger_${ep.id}`, ep.next_fire_in);
}
if (ep.task != null) {
taskMap.set(`__trigger_${ep.id}`, ep.task);
}
}
if (fireMap.size === 0) continue;
setSessionsByAgent((prev) => {
const ss = prev[agentType];
if (!ss?.length) return prev;
const updated = ss[0].graphNodes.map((n) => {
const existingIds = new Set(ss[0].graphNodes.map(n => n.id));
// Update existing trigger nodes
let updated = ss[0].graphNodes.map((n) => {
if (n.nodeType !== "trigger") return n;
const nfi = fireMap.get(n.id);
if (nfi == null || n.nodeType !== "trigger") return n;
return { ...n, triggerConfig: { ...n.triggerConfig, next_fire_in: nfi } };
const task = taskMap.get(n.id);
if (nfi == null && task == null) return n;
return {
...n,
triggerConfig: {
...n.triggerConfig,
...(nfi != null ? { next_fire_in: nfi } : {}),
...(task != null ? { task } : {}),
},
};
});
// Discover new triggers not yet in the graph
const entryNode = ss[0].graphNodes.find(n => n.nodeType !== "trigger")?.id;
const newNodes: GraphNode[] = [];
for (const ep of triggerEps) {
const nodeId = `__trigger_${ep.id}`;
if (existingIds.has(nodeId)) continue;
newNodes.push({
id: nodeId,
label: ep.name || ep.id,
status: "pending",
nodeType: "trigger",
triggerType: ep.trigger_type,
triggerConfig: {
...ep.trigger_config,
...(ep.next_fire_in != null ? { next_fire_in: ep.next_fire_in } : {}),
...(ep.task ? { task: ep.task } : {}),
},
...(entryNode ? { next: [entryNode] } : {}),
});
}
if (newNodes.length > 0) {
updated = [...newNodes, ...updated];
}
// Skip update if nothing changed
if (updated.every((n, idx) => n === ss[0].graphNodes[idx])) return prev;
if (newNodes.length === 0 && updated.every((n, idx) => n === ss[0].graphNodes[idx])) return prev;
return {
...prev,
[agentType]: ss.map((s, i) => (i === 0 ? { ...s, graphNodes: updated } : s)),
@@ -1275,7 +1408,7 @@ export default function Workspace() {
// --- SSE event handler ---
const upsertChatMessage = useCallback(
(agentType: string, chatMsg: ChatMessage) => {
(agentType: string, chatMsg: ChatMessage, options?: { reconcileOptimisticUser?: boolean }) => {
setSessionsByAgent((prev) => {
const sessions = prev[agentType] || [];
const activeId = activeSessionRef.current[agentType] || sessions[0]?.id;
@@ -1291,6 +1424,25 @@ export default function Workspace() {
i === idx ? { ...chatMsg, createdAt: m.createdAt ?? chatMsg.createdAt } : m,
);
} else {
const shouldReconcileOptimisticUser =
!!options?.reconcileOptimisticUser && chatMsg.type === "user" && s.messages.length > 0;
if (shouldReconcileOptimisticUser) {
const lastIdx = s.messages.length - 1;
const lastMsg = s.messages[lastIdx];
const incomingTs = chatMsg.createdAt ?? Date.now();
const lastTs = lastMsg.createdAt ?? incomingTs;
const sameMessage =
lastMsg.type === "user"
&& lastMsg.content === chatMsg.content
&& Math.abs(incomingTs - lastTs) <= 15000;
if (sameMessage) {
newMessages = s.messages.map((m, i) =>
i === lastIdx ? { ...m, id: chatMsg.id } : m,
);
return { ...s, messages: newMessages };
}
}
// Append — SSE events arrive in server-timestamp order via the
// shared EventBus, so arrival order already interleaves queen
// and worker correctly. Local user messages are always created
@@ -1308,8 +1460,6 @@ export default function Workspace() {
const handleSSEEvent = useCallback(
(agentType: string, event: AgentEvent) => {
const streamId = event.stream_id;
if (streamId === "judge") return;
const isQueen = streamId === "queen";
if (isQueen) console.log('[QUEEN] handleSSEEvent:', event.type, 'agentType:', agentType);
// Drop queen message content while suppressing the auto-intro after a cold-restore.
@@ -1345,6 +1495,23 @@ export default function Workspace() {
if (Object.keys(priorSnapshots).length > 0) {
console.debug(`[hive] execution_started: dropping ${Object.keys(priorSnapshots).length} unflushed LLM snapshot(s)`);
}
// Insert a run divider when a new run_id is detected
const incomingRunId = event.run_id || null;
const prevRunId = agentStates[agentType]?.currentRunId;
if (incomingRunId && incomingRunId !== prevRunId) {
const dividerMsg: ChatMessage = {
id: `run-divider-${incomingRunId}`,
agent: "",
agentColor: "",
content: prevRunId ? "New Run" : "Run Started",
timestamp: ts,
type: "run_divider",
role: "worker",
thread: agentType,
createdAt: eventCreatedAt,
};
upsertChatMessage(agentType, dividerMsg);
}
turnCounterRef.current[turnKey] = currentTurn + 1;
updateAgentState(agentType, {
isTyping: true,
@@ -1353,6 +1520,7 @@ export default function Workspace() {
awaitingInput: false,
workerRunState: "running",
currentExecutionId: event.execution_id || agentStates[agentType]?.currentExecutionId || null,
currentRunId: incomingRunId,
nodeLogs: {},
subagentReports: [],
llmSnapshots: {},
@@ -1404,13 +1572,19 @@ export default function Workspace() {
case "execution_paused":
case "execution_failed":
case "client_output_delta":
case "client_input_received":
case "client_input_requested":
case "llm_text_delta": {
const chatMsg = sseEventToChatMessage(event, agentType, displayName, currentTurn);
if (isQueen) console.log('[QUEEN] chatMsg:', chatMsg?.id, chatMsg?.content?.slice(0, 50), 'turn:', currentTurn);
if (chatMsg && !suppressQueenMessages) {
if (isQueen) chatMsg.role = role;
upsertChatMessage(agentType, chatMsg);
if (isQueen) {
chatMsg.role = role;
chatMsg.phase = queenPhaseRef.current[agentType] as ChatMessage["phase"];
}
upsertChatMessage(agentType, chatMsg, {
reconcileOptimisticUser: event.type === "client_input_received",
});
}
// Mark streaming when LLM text is actively arriving
@@ -1850,6 +2024,7 @@ export default function Workspace() {
: rawPhase === "staging" ? "staging"
: rawPhase === "planning" ? "planning"
: "building";
queenPhaseRef.current[agentType] = newPhase;
updateAgentState(agentType, {
queenPhase: newPhase,
queenBuilding: newPhase === "building",
@@ -1950,6 +2125,136 @@ export default function Workspace() {
break;
}
case "trigger_activated": {
const triggerId = event.data?.trigger_id as string;
if (triggerId) {
const nodeId = `__trigger_${triggerId}`;
// If the trigger node doesn't exist yet (dynamically created via set_trigger),
// synthesize it before updating status.
setSessionsByAgent(prev => {
const sessions = prev[agentType] || [];
const activeId = activeSessionRef.current[agentType] || sessions[0]?.id;
return {
...prev,
[agentType]: sessions.map(s => {
if (s.id !== activeId) return s;
const exists = s.graphNodes.some(n => n.id === nodeId);
if (exists) {
return {
...s,
graphNodes: s.graphNodes.map(n =>
n.id === nodeId ? { ...n, status: "running" as const } : n,
),
};
}
// Synthesize new trigger node at the front of the graph
const triggerType = (event.data?.trigger_type as string) || "timer";
const triggerConfig = (event.data?.trigger_config as Record<string, unknown>) || {};
const entryNode = s.graphNodes.find(n => n.nodeType !== "trigger")?.id;
const newNode: GraphNode = {
id: nodeId,
label: triggerId,
status: "running",
nodeType: "trigger",
triggerType,
triggerConfig,
...(entryNode ? { next: [entryNode] } : {}),
};
return { ...s, graphNodes: [newNode, ...s.graphNodes] };
}),
};
});
}
break;
}
case "trigger_deactivated": {
const triggerId = event.data?.trigger_id as string;
if (triggerId) {
// Clear next_fire_in so countdown hides when inactive
setSessionsByAgent(prev => {
const sessions = prev[agentType] || [];
const activeId = activeSessionRef.current[agentType] || sessions[0]?.id;
return {
...prev,
[agentType]: sessions.map(s => {
if (s.id !== activeId) return s;
return {
...s,
graphNodes: s.graphNodes.map(n => {
if (n.id !== `__trigger_${triggerId}`) return n;
const { next_fire_in: _, ...restConfig } = (n.triggerConfig || {}) as Record<string, unknown> & { next_fire_in?: unknown };
return { ...n, status: "pending" as const, triggerConfig: restConfig };
}),
};
}),
};
});
}
break;
}
case "trigger_fired": {
const triggerId = event.data?.trigger_id as string;
if (triggerId) {
const nodeId = `__trigger_${triggerId}`;
updateGraphNodeStatus(agentType, nodeId, "complete");
setTimeout(() => updateGraphNodeStatus(agentType, nodeId, "running"), 1500);
}
break;
}
case "trigger_available": {
const triggerId = event.data?.trigger_id as string;
if (triggerId) {
const nodeId = `__trigger_${triggerId}`;
setSessionsByAgent(prev => {
const sessions = prev[agentType] || [];
const activeId = activeSessionRef.current[agentType] || sessions[0]?.id;
return {
...prev,
[agentType]: sessions.map(s => {
if (s.id !== activeId) return s;
if (s.graphNodes.some(n => n.id === nodeId)) return s;
const triggerType = (event.data?.trigger_type as string) || "timer";
const triggerConfig = (event.data?.trigger_config as Record<string, unknown>) || {};
const entryNode = s.graphNodes.find(n => n.nodeType !== "trigger")?.id;
const newNode: GraphNode = {
id: nodeId,
label: triggerId,
status: "pending",
nodeType: "trigger",
triggerType,
triggerConfig,
...(entryNode ? { next: [entryNode] } : {}),
};
return { ...s, graphNodes: [newNode, ...s.graphNodes] };
}),
};
});
}
break;
}
case "trigger_removed": {
const triggerId = event.data?.trigger_id as string;
if (triggerId) {
const nodeId = `__trigger_${triggerId}`;
setSessionsByAgent(prev => {
const sessions = prev[agentType] || [];
const activeId = activeSessionRef.current[agentType] || sessions[0]?.id;
return {
...prev,
[agentType]: sessions.map(s => {
if (s.id !== activeId) return s;
return { ...s, graphNodes: s.graphNodes.filter(n => n.id !== nodeId) };
}),
};
});
}
break;
}
default:
// Fallback: ensure queenReady is set even for unexpected first events
if (shouldMarkQueenReady) updateAgentState(agentType, { queenReady: true });
@@ -1980,6 +2285,18 @@ export default function Workspace() {
? { nodes: activeSession.graphNodes, title: activeAgentState?.displayName || formatAgentDisplayName(baseAgentType(activeWorker)) }
: { nodes: [] as GraphNode[], title: "" };
// Keep selectedNode in sync with live graphNodes (trigger status updates via SSE)
const liveSelectedNode = selectedNode && currentGraph.nodes.find(n => n.id === selectedNode.id);
const resolvedSelectedNode = liveSelectedNode || selectedNode;
// Sync trigger task draft when selected trigger node changes
useEffect(() => {
if (resolvedSelectedNode?.nodeType === "trigger") {
const tc = resolvedSelectedNode.triggerConfig as Record<string, unknown> | undefined;
setTriggerTaskDraft((tc?.task as string) || "");
}
}, [resolvedSelectedNode?.id]);
// Build a flat list of all agent-type tabs for the tab bar
const agentTabs = Object.entries(sessionsByAgent)
.filter(([, sessions]) => sessions.length > 0)
@@ -2276,7 +2593,7 @@ export default function Workspace() {
const closeAgentTab = useCallback((agentType: string) => {
setSelectedNode(null);
// Pause worker execution if running (saves checkpoint), then kill the
// entire backend session so the queen and judge don't keep running.
// entire backend session so the queen doesn't keep running.
const state = agentStates[agentType];
if (state?.sessionId) {
const pausePromise = (state.currentExecutionId && state.workerRunState === "running")
@@ -2316,28 +2633,37 @@ export default function Workspace() {
}
}, [sessionsByAgent, activeWorker, navigate, agentStates]);
// Create a new session for any agent type (used by NewTabPopover)
// Open a tab for an agent type. If a tab already exists, switch to it
// instead of creating a duplicate — each agent gets one session.
// Exception: "new-agent" tabs always create a new instance since each
// represents a distinct conversation the user is starting from scratch.
const addAgentSession = useCallback((agentType: string, agentLabel?: string) => {
// Count all existing open tabs for this base agent type (first tab uses agentType
// as key; subsequent tabs use "agentType::frontendSessionId" as unique keys).
const existingTabCount = Object.keys(sessionsByAgent).filter(
k => baseAgentType(k) === agentType && (sessionsByAgent[k] || []).length > 0,
).length;
const isNewAgent = agentType === "new-agent" || agentType.startsWith("new-agent-");
const newIndex = existingTabCount + 1;
const existingCreds = sessionsByAgent[agentType]?.[0]?.credentials;
const displayLabel = agentLabel || formatAgentDisplayName(agentType);
const label = newIndex === 1 ? displayLabel : `${displayLabel} #${newIndex}`;
const newSession = createSession(agentType, label, existingCreds);
// First tab keeps agentType as its key (backward-compatible with all existing
// logic). Additional tabs get a unique key so each has its own isolated
// agentStates slot, its own backend session, and its own tab-bar entry.
const tabKey = existingTabCount === 0 ? agentType : `${agentType}::${newSession.id}`;
if (tabKey !== agentType) {
newSession.tabKey = tabKey;
if (!isNewAgent) {
const existingTabKey = Object.keys(sessionsByAgent).find(
k => baseAgentType(k) === agentType && (sessionsByAgent[k] || []).length > 0,
);
if (existingTabKey) {
setActiveWorker(existingTabKey);
const existing = sessionsByAgent[existingTabKey]?.[0];
if (existing) {
setActiveSessionByAgent(prev => ({ ...prev, [existingTabKey]: existing.id }));
}
return;
}
}
const tabKey = isNewAgent ? `new-agent-${makeId()}` : agentType;
const existingNewAgentCount = isNewAgent
? Object.keys(sessionsByAgent).filter(
k => (k === "new-agent" || k.startsWith("new-agent-")) && (sessionsByAgent[k] || []).length > 0
).length
: 0;
const rawLabel = agentLabel || (isNewAgent ? "New Agent" : formatAgentDisplayName(agentType));
const displayLabel = existingNewAgentCount === 0 ? rawLabel : `${rawLabel} #${existingNewAgentCount + 1}`;
const newSession = createSession(tabKey, displayLabel);
setSessionsByAgent(prev => ({
...prev,
[tabKey]: [newSession],
@@ -2365,16 +2691,13 @@ export default function Workspace() {
}
// Pre-fetch messages from disk so the tab opens with conversation already shown.
// This happens BEFORE creating the tab so no "new session" empty state is visible.
// Prefer the persisted event log for full UI reconstruction; fall back to parts.
let prefetchedMessages: ChatMessage[] = [];
try {
const { messages: queenMsgs } = await sessionsApi.queenMessages(sessionId);
for (const m of queenMsgs as Message[]) {
const resolvedType = agentPath || "new-agent";
const msg = backendMessageToChatMessage(m, resolvedType, "Queen Bee");
msg.role = "queen";
prefetchedMessages.push(msg);
}
const resolvedType = agentPath || "new-agent";
const displayNameTemp = agentName || formatAgentDisplayName(resolvedType);
const restored = await restoreSessionMessages(sessionId, resolvedType, displayNameTemp);
prefetchedMessages = restored.messages;
if (prefetchedMessages.length > 0) {
prefetchedMessages.sort((a, b) => (a.createdAt ?? 0) - (b.createdAt ?? 0));
}
@@ -2602,20 +2925,32 @@ export default function Workspace() {
/>
)}
</div>
{selectedNode && (
<div className="w-[408px] min-w-[340px] flex-shrink-0">
{selectedNode.nodeType === "trigger" ? (
{resolvedSelectedNode && (
<div className="w-[480px] min-w-[400px] flex-shrink-0">
{resolvedSelectedNode.nodeType === "trigger" ? (
<div className="flex flex-col h-full border-l border-border/40 bg-card/20 animate-in slide-in-from-right">
<div className="px-4 pt-4 pb-3 border-b border-border/30 flex items-start justify-between gap-2">
<div className="flex items-start gap-3 min-w-0">
<div className="w-8 h-8 rounded-lg flex items-center justify-center flex-shrink-0 mt-0.5 bg-[hsl(210,40%,55%)]/15 border border-[hsl(210,40%,55%)]/25">
<span className="text-sm" style={{ color: "hsl(210,40%,55%)" }}>
{{ "webhook": "\u26A1", "timer": "\u23F1", "api": "\u2192", "event": "\u223F" }[selectedNode.triggerType || ""] || "\u26A1"}
{{ "webhook": "\u26A1", "timer": "\u23F1", "api": "\u2192", "event": "\u223F" }[resolvedSelectedNode.triggerType || ""] || "\u26A1"}
</span>
</div>
<div className="min-w-0">
<h3 className="text-sm font-semibold text-foreground leading-tight">{selectedNode.label}</h3>
<p className="text-[11px] text-muted-foreground mt-0.5 capitalize">{selectedNode.triggerType} trigger</p>
<h3 className="text-sm font-semibold text-foreground leading-tight">{resolvedSelectedNode.label}</h3>
<p className="text-[11px] text-muted-foreground mt-0.5 capitalize flex items-center gap-1.5">
{resolvedSelectedNode.triggerType} trigger
<span className={`inline-block w-1.5 h-1.5 rounded-full ${
resolvedSelectedNode.status === "running" || resolvedSelectedNode.status === "complete"
? "bg-emerald-400" : "bg-muted-foreground/40"
}`} />
<span className={`text-[10px] ${
resolvedSelectedNode.status === "running" || resolvedSelectedNode.status === "complete"
? "text-emerald-400" : "text-muted-foreground/60"
}`}>
{resolvedSelectedNode.status === "running" || resolvedSelectedNode.status === "complete" ? "active" : "inactive"}
</span>
</p>
</div>
</div>
<button onClick={() => setSelectedNode(null)} className="p-1 rounded-md text-muted-foreground hover:text-foreground hover:bg-muted/50 transition-colors flex-shrink-0">
@@ -2624,7 +2959,7 @@ export default function Workspace() {
</div>
<div className="px-4 py-4 flex flex-col gap-3">
{(() => {
const tc = selectedNode.triggerConfig as Record<string, unknown> | undefined;
const tc = resolvedSelectedNode.triggerConfig as Record<string, unknown> | undefined;
const cron = tc?.cron as string | undefined;
const interval = tc?.interval_minutes as number | undefined;
const eventTypes = tc?.event_types as string[] | undefined;
@@ -2645,7 +2980,7 @@ export default function Workspace() {
) : null;
})()}
{(() => {
const nfi = (selectedNode.triggerConfig as Record<string, unknown> | undefined)?.next_fire_in as number | undefined;
const nfi = (resolvedSelectedNode.triggerConfig as Record<string, unknown> | undefined)?.next_fire_in as number | undefined;
return nfi != null ? (
<div>
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mb-1.5">Next run</p>
@@ -2655,25 +2990,92 @@ export default function Workspace() {
</div>
) : null;
})()}
<div>
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mb-1.5">Task</p>
<textarea
value={triggerTaskDraft}
onChange={(e) => setTriggerTaskDraft(e.target.value)}
placeholder="Describe what the worker should do when this trigger fires..."
className="w-full text-xs text-foreground/80 bg-muted/30 rounded-lg px-3 py-2 border border-border/20 resize-none min-h-[60px] font-mono focus:outline-none focus:border-primary/40"
rows={3}
/>
{(() => {
const currentTask = (resolvedSelectedNode.triggerConfig as Record<string, unknown> | undefined)?.task as string || "";
const hasChanged = triggerTaskDraft !== currentTask;
if (!hasChanged) return null;
return (
<button
disabled={triggerTaskSaving}
onClick={async () => {
const sessionId = activeAgentState?.sessionId;
const triggerId = resolvedSelectedNode.id.replace("__trigger_", "");
if (!sessionId) return;
setTriggerTaskSaving(true);
try {
await sessionsApi.updateTriggerTask(sessionId, triggerId, triggerTaskDraft);
} finally {
setTriggerTaskSaving(false);
}
}}
className="mt-1.5 w-full text-[11px] px-3 py-1.5 rounded-lg border border-primary/30 text-primary hover:bg-primary/10 transition-colors disabled:opacity-50"
>
{triggerTaskSaving ? "Saving..." : "Save Task"}
</button>
);
})()}
{!triggerTaskDraft && (
<p className="text-[10px] text-amber-400/80 mt-1">A task is required before enabling this trigger.</p>
)}
</div>
<div>
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mb-1.5">Fires into</p>
<p className="text-xs text-foreground/80 font-mono bg-muted/30 rounded-lg px-3 py-2 border border-border/20">
{selectedNode.next?.[0]?.split("-").map(w => w.charAt(0).toUpperCase() + w.slice(1)).join(" ") || "—"}
{resolvedSelectedNode.next?.[0]?.split("-").map(w => w.charAt(0).toUpperCase() + w.slice(1)).join(" ") || "—"}
</p>
</div>
{activeAgentState?.queenPhase !== "building" && (() => {
const triggerIsActive = resolvedSelectedNode.status === "running" || resolvedSelectedNode.status === "complete";
const triggerId = resolvedSelectedNode.id.replace("__trigger_", "");
const taskMissing = !triggerTaskDraft;
return (
<div className="pt-1">
<button
disabled={!triggerIsActive && taskMissing}
onClick={async () => {
const sessionId = activeAgentState?.sessionId;
if (!sessionId) return;
const action = triggerIsActive ? "Disable" : "Enable";
await executionApi.chat(sessionId, `${action} trigger ${triggerId}`);
}}
className={`w-full text-xs px-3 py-2 rounded-lg border transition-colors ${
triggerIsActive
? "border-red-500/30 text-red-400 hover:bg-red-500/10"
: taskMissing
? "border-border/30 text-muted-foreground/40 cursor-not-allowed"
: "border-emerald-500/30 text-emerald-400 hover:bg-emerald-500/10"
}`}
>
{triggerIsActive ? "Disable Trigger" : "Enable Trigger"}
</button>
{!triggerIsActive && taskMissing && (
<p className="text-[10px] text-muted-foreground/50 mt-1 text-center">Configure a task first</p>
)}
</div>
);
})()}
</div>
</div>
) : (
<NodeDetailPanel
node={selectedNode}
nodeSpec={activeAgentState?.nodeSpecs.find(n => n.id === selectedNode.id) ?? null}
node={resolvedSelectedNode}
nodeSpec={activeAgentState?.nodeSpecs.find(n => n.id === resolvedSelectedNode.id) ?? null}
allNodeSpecs={activeAgentState?.nodeSpecs}
subagentReports={activeAgentState?.subagentReports}
sessionId={activeAgentState?.sessionId || undefined}
graphId={activeAgentState?.graphId || undefined}
workerSessionId={null}
nodeLogs={activeAgentState?.nodeLogs[selectedNode.id] || []}
actionPlan={activeAgentState?.nodeActionPlans[selectedNode.id]}
nodeLogs={activeAgentState?.nodeLogs[resolvedSelectedNode.id] || []}
actionPlan={activeAgentState?.nodeActionPlans[resolvedSelectedNode.id]}
onClose={() => setSelectedNode(null)}
/>
)}
@@ -50,7 +50,7 @@ async def test_worker_handoff_injects_formatted_request_into_queen() -> None:
@pytest.mark.asyncio
async def test_worker_handoff_ignores_queen_and_judge_streams() -> None:
async def test_worker_handoff_ignores_queen_stream() -> None:
bus = EventBus()
manager = SessionManager()
session = _make_session(bus)
@@ -63,11 +63,6 @@ async def test_worker_handoff_ignores_queen_and_judge_streams() -> None:
node_id="queen",
reason="should be ignored",
)
await bus.emit_escalation_requested(
stream_id="judge",
node_id="judge",
reason="should be ignored",
)
assert queen_node.inject_event.await_count == 0
+261
View File
@@ -0,0 +1,261 @@
"""Tests for queen-level trigger system.
Verifies that:
- Timer triggers fire inject_trigger() on the queen node
- Webhook triggers fire inject_trigger() via EventBus WEBHOOK_RECEIVED
- Queen node unavailable trigger skipped silently
- worker_runtime=None trigger discarded (gating)
- remove_trigger cleans up webhook subscription
- run_agent_with_input is in _QUEEN_RUNNING_TOOLS
- System prompts reference run_agent_with_input, not start_worker()
"""
from __future__ import annotations
import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from framework.runtime.event_bus import EventBus
from framework.runtime.triggers import TriggerDefinition
from framework.server.session_manager import Session
def _make_session(event_bus: EventBus, session_id: str = "session_trigger_test") -> Session:
return Session(id=session_id, event_bus=event_bus, llm=object(), loaded_at=0.0)
def _make_executor(queen_node) -> SimpleNamespace:
return SimpleNamespace(node_registry={"queen": queen_node})
@pytest.mark.asyncio
async def test_interval_timer_fires_inject_trigger_on_queen_node() -> None:
"""Timer with interval_minutes fires inject_trigger() on the queen node."""
from framework.graph.event_loop_node import TriggerEvent
from framework.tools.queen_lifecycle_tools import _start_trigger_timer
bus = EventBus()
session = _make_session(bus)
session.worker_runtime = object() # non-None → worker is loaded
queen_node = SimpleNamespace(inject_trigger=AsyncMock())
session.queen_executor = _make_executor(queen_node)
tdef = TriggerDefinition(
id="test-timer",
trigger_type="timer",
trigger_config={"interval_minutes": 0.001}, # ~60ms
task="run it",
)
await _start_trigger_timer(session, "test-timer", tdef)
# Let the timer fire at least once
await asyncio.sleep(0.15)
# Cancel the background task
task = session.active_timer_tasks.get("test-timer")
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
assert queen_node.inject_trigger.await_count >= 1
# Inspect the TriggerEvent passed to inject_trigger
call_args = queen_node.inject_trigger.await_args_list[0]
trigger: TriggerEvent = call_args.args[0]
assert trigger.trigger_type == "timer"
assert trigger.source_id == "test-timer"
assert trigger.payload.get("task") == "run it"
@pytest.mark.asyncio
async def test_timer_skipped_when_queen_node_unavailable() -> None:
"""No inject_trigger call and no exception when queen executor is not set."""
from framework.tools.queen_lifecycle_tools import _start_trigger_timer
bus = EventBus()
session = _make_session(bus)
session.worker_runtime = object()
session.queen_executor = None # queen not ready
tdef = TriggerDefinition(
id="no-queen-timer",
trigger_type="timer",
trigger_config={"interval_minutes": 0.001},
task="should not fire",
)
await _start_trigger_timer(session, "no-queen-timer", tdef)
await asyncio.sleep(0.15)
task = session.active_timer_tasks.get("no-queen-timer")
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# No exception raised, nothing to assert beyond completion
@pytest.mark.asyncio
async def test_webhook_trigger_fires_inject_trigger() -> None:
"""WEBHOOK_RECEIVED on EventBus → inject_trigger() on the queen node."""
from framework.graph.event_loop_node import TriggerEvent
from framework.tools.queen_lifecycle_tools import _start_trigger_webhook
bus = EventBus()
session = _make_session(bus)
session.worker_runtime = object()
queen_node = SimpleNamespace(inject_trigger=AsyncMock())
session.queen_executor = _make_executor(queen_node)
tdef = TriggerDefinition(
id="test-webhook",
trigger_type="webhook",
trigger_config={"path": "/hooks/test", "methods": ["POST"]},
task="process it",
)
# Patch WebhookServer to avoid binding a real port
mock_server = MagicMock()
mock_server.is_running = False
mock_server.add_route = MagicMock()
mock_server.start = AsyncMock()
with patch("framework.runtime.webhook_server.WebhookServer", return_value=mock_server):
with patch("framework.runtime.webhook_server.WebhookServerConfig"):
await _start_trigger_webhook(session, "test-webhook", tdef)
# Simulate an incoming webhook event on the EventBus
await bus.emit_webhook_received(
source_id="test-webhook",
path="/hooks/test",
method="POST",
headers={},
payload={"event": "push"},
)
await asyncio.sleep(0.05) # let handler run
assert queen_node.inject_trigger.await_count == 1
trigger: TriggerEvent = queen_node.inject_trigger.await_args_list[0].args[0]
assert trigger.trigger_type == "webhook"
assert trigger.source_id == "test-webhook"
assert trigger.payload["method"] == "POST"
assert trigger.payload["path"] == "/hooks/test"
assert trigger.payload["task"] == "process it"
assert trigger.payload["payload"] == {"event": "push"}
@pytest.mark.asyncio
async def test_webhook_trigger_discarded_when_no_worker() -> None:
"""inject_trigger is NOT called when no worker is loaded."""
from framework.tools.queen_lifecycle_tools import _start_trigger_webhook
bus = EventBus()
session = _make_session(bus)
session.worker_runtime = None # no worker
queen_node = SimpleNamespace(inject_trigger=AsyncMock())
session.queen_executor = _make_executor(queen_node)
tdef = TriggerDefinition(
id="no-worker-webhook",
trigger_type="webhook",
trigger_config={"path": "/hooks/noop", "methods": ["POST"]},
task="should not fire",
)
mock_server = MagicMock()
mock_server.is_running = False
mock_server.add_route = MagicMock()
mock_server.start = AsyncMock()
with patch("framework.runtime.webhook_server.WebhookServer", return_value=mock_server):
with patch("framework.runtime.webhook_server.WebhookServerConfig"):
await _start_trigger_webhook(session, "no-worker-webhook", tdef)
await bus.emit_webhook_received(
source_id="no-worker-webhook",
path="/hooks/noop",
method="POST",
headers={},
payload={},
)
await asyncio.sleep(0.05)
assert queen_node.inject_trigger.await_count == 0
@pytest.mark.asyncio
async def test_remove_trigger_cleans_up_webhook_subscription() -> None:
"""After remove_trigger(), WEBHOOK_RECEIVED no longer calls inject_trigger."""
from framework.tools.queen_lifecycle_tools import _start_trigger_webhook
bus = EventBus()
session = _make_session(bus)
session.worker_runtime = object()
queen_node = SimpleNamespace(inject_trigger=AsyncMock())
session.queen_executor = _make_executor(queen_node)
tdef = TriggerDefinition(
id="removable-webhook",
trigger_type="webhook",
trigger_config={"path": "/hooks/removable", "methods": ["POST"]},
task="run it",
)
mock_server = MagicMock()
mock_server.is_running = False
mock_server.add_route = MagicMock()
mock_server.start = AsyncMock()
with patch("framework.runtime.webhook_server.WebhookServer", return_value=mock_server):
with patch("framework.runtime.webhook_server.WebhookServerConfig"):
await _start_trigger_webhook(session, "removable-webhook", tdef)
# Manually unsubscribe (mirrors what remove_trigger does)
sub_id = session.active_webhook_subs.pop("removable-webhook", None)
assert sub_id is not None
bus.unsubscribe(sub_id)
# Now fire — should NOT reach queen
await bus.emit_webhook_received(
source_id="removable-webhook",
path="/hooks/removable",
method="POST",
headers={},
payload={},
)
await asyncio.sleep(0.05)
assert queen_node.inject_trigger.await_count == 0
assert "removable-webhook" not in session.active_webhook_subs
def test_run_agent_with_input_in_running_tools() -> None:
"""run_agent_with_input must be available to the queen in RUNNING phase."""
from framework.agents.queen.nodes import _QUEEN_RUNNING_TOOLS
assert "run_agent_with_input" in _QUEEN_RUNNING_TOOLS
def test_system_prompt_uses_correct_tool_name() -> None:
"""Trigger handling rules must reference run_agent_with_input, not start_worker()."""
from framework.agents.queen.nodes import (
_queen_behavior_running,
_queen_behavior_staging,
)
assert "run_agent_with_input" in _queen_behavior_running
assert "start_worker()" not in _queen_behavior_running
assert "run_agent_with_input" in _queen_behavior_staging
assert "start_worker()" not in _queen_behavior_staging
+63 -2
View File
@@ -1,6 +1,6 @@
# Integration Bounty Program
# Bounty Program
Earn XP, Discord roles, and money by testing, documenting, and building integrations for the Aden agent framework.
Earn XP, Discord roles, and money by contributing to the Aden agent framework — from quick fixes to major features, plus integration testing and development.
## Why Contribute?
@@ -33,6 +33,10 @@ Lurkr auto-assigns the first two roles. Core Contributor requires sustained, qua
## Bounty Types
### Integration Bounties
Focused on the tool ecosystem — testing, documenting, and building integrations.
| Type | Label | Points | What You Do |
| --------------------- | ----------------- | ------ | -------------------------------------------------------------------------- |
| **Test a tool** | `bounty:test` | 20 | Test with a real API key, submit a report with logs |
@@ -42,6 +46,47 @@ Lurkr auto-assigns the first two roles. Core Contributor requires sustained, qua
Promoting a tool from unverified to verified is the final step — submit a PR moving it from `_register_unverified()` to `_register_verified()` after the [promotion checklist](promotion-checklist.md) is complete.
### Standard Bounties
General contributions to the framework, docs, tests, and infrastructure — not tied to a specific integration.
| Size | Label | Points | Scope |
| ------------ | ------------------ | ------ | ---------------------------------------------------------------------------------- |
| **Small** | `bounty:small` | 10 | Typo fixes, broken links, error message improvements, confirm/reproduce bug reports |
| **Medium** | `bounty:medium` | 30 | Bug fixes, new or improved unit tests, how-to guides, CLI UX improvements |
| **Large** | `bounty:large` | 75 | New features, performance optimizations with benchmarks, architecture docs |
| **Extreme** | `bounty:extreme` | 150 | Major subsystem work, security audits, cross-cutting refactors, new core capabilities |
#### Examples by size
**Small (10 pts):**
- Fix typos or broken links in documentation
- Improve an error message to include actionable guidance
- Add missing type annotations to a module
- Reproduce and confirm an open bug report with environment details
- Fix linting or CI warnings
**Medium (30 pts):**
- Fix a non-critical bug with a regression test
- Write a how-to guide or tutorial for a common workflow
- Add or significantly improve test coverage for a core module
- Improve CLI help text, argument validation, or UX
- Add structured logging or observability to a module
**Large (75 pts):**
- Implement a new user-facing feature end to end
- Performance optimization with before/after benchmarks
- Build a new CLI command or subcommand
- Write comprehensive architecture documentation for a subsystem
- Add a new credential adapter type
**Extreme (150 pts):**
- Design and implement a major subsystem (e.g., plugin system, caching layer)
- Security audit of a core module with findings and fixes
- Major refactor of core architecture (must have maintainer pre-approval)
- Build a complete example application or reference implementation
- End-to-end testing framework for agent workflows
## Quality Gates
- **PRs** must be merged by a maintainer (not self-merged)
@@ -52,12 +97,28 @@ Promoting a tool from unverified to verified is the final step — submit a PR m
## Labels
### Integration bounty labels
| Label | Color | Meaning |
| ------------------- | ------------------ | --------------------------------------- |
| `bounty:test` | `#1D76DB` (blue) | Test a tool with a real API key |
| `bounty:docs` | `#FBCA04` (yellow) | Write or improve documentation |
| `bounty:code` | `#D93F0B` (orange) | Health checker, bug fix, or improvement |
| `bounty:new-tool` | `#6F42C1` (purple) | Build a new integration from scratch |
### Standard bounty labels
| Label | Color | Meaning |
| ------------------- | ------------------ | -------------------------------------------------- |
| `bounty:small` | `#C2E0C6` (green) | Quick fix — typos, links, error messages |
| `bounty:medium` | `#0E8A16` (green) | Bug fix, tests, guides, CLI improvements |
| `bounty:large` | `#B60205` (red) | New feature, perf work, architecture docs |
| `bounty:extreme` | `#000000` (black) | Major subsystem, security audit, core refactor |
### Difficulty labels
| Label | Color | Meaning |
| ------------------- | ------------------ | --------------------------------------- |
| `difficulty:easy` | `#BFD4F2` | Good first contribution |
| `difficulty:medium` | `#D4C5F9` | Requires some familiarity |
| `difficulty:hard` | `#F9D0C4` | Significant effort or expertise needed |
+66 -6
View File
@@ -1,6 +1,6 @@
# Contributor Guide — Integration Bounty Program
# Contributor Guide — Bounty Program
Earn XP, Discord roles, and eventually real money by testing and building integrations for the Aden agent framework.
Earn XP, Discord roles, and eventually real money by contributing to the Aden agent framework — from quick fixes to major features and integration work.
## Getting Started
@@ -30,7 +30,13 @@ XP comes from GitHub bounties (auto-pushed on PR merge) and Discord activity in
## Bounty Types
### Test a Tool (20 pts)
There are two categories: **integration bounties** (tool-specific) and **standard bounties** (general contributions).
---
### Integration Bounties
#### Test a Tool (20 pts)
Test an unverified tool with a real API key and report what happens.
@@ -41,7 +47,7 @@ Test an unverified tool with a real API key and report what happens.
Report both successes and failures. Finding bugs is valuable.
### Write Docs (20 pts)
#### Write Docs (20 pts)
Write a README for a tool that's missing one.
@@ -52,7 +58,7 @@ Write a README for a tool that's missing one.
Function names and API URLs must match reality — no AI hallucinations.
### Code Contribution (30 pts)
#### Code Contribution (30 pts)
Add a health checker, fix a bug, or improve an integration.
@@ -66,7 +72,7 @@ Add a health checker, fix a bug, or improve an integration.
1. Find a bug during testing, file an issue
2. Fix it in a PR with a test covering the bug
### New Integration (75 pts)
#### New Integration (75 pts)
Build a complete integration from scratch.
@@ -77,6 +83,60 @@ Build a complete integration from scratch.
Expect multiple review rounds.
---
### Standard Bounties
General contributions to the framework — not tied to a specific integration. Sized by effort and impact.
#### Small (10 pts)
Quick, focused fixes. Great for first-time contributors.
- Fix typos or broken links in documentation
- Improve an error message to include actionable guidance
- Add missing type annotations to a module
- Reproduce and confirm a bug report with environment details
- Fix linting or CI warnings
**How:** Open a PR with the fix. Tag with `bounty:small`.
#### Medium (30 pts)
Meaningful improvements that require reading and understanding existing code.
- Fix a non-critical bug with a regression test
- Write a how-to guide or tutorial
- Add or significantly improve test coverage for a core module
- Improve CLI help text, argument validation, or UX
- Add structured logging or observability to a module
**How:** Claim the issue first. Submit a PR with tests where applicable. Tag with `bounty:medium`.
#### Large (75 pts)
Significant work that adds real capability or improves the project substantially.
- Implement a new user-facing feature end to end
- Performance optimization with before/after benchmarks
- Build a new CLI command or subcommand
- Write comprehensive architecture documentation for a subsystem
- Add a new credential adapter type
**How:** Claim the issue and discuss your approach in the issue before starting. Submit a PR. Tag with `bounty:large`.
#### Extreme (150 pts)
Major contributions that shape the project's direction. Requires maintainer pre-approval.
- Design and implement a major subsystem (e.g., plugin system, caching layer)
- Security audit of a core module with findings and fixes
- Major refactor of core architecture
- Build a complete example application or reference implementation
- End-to-end testing framework for agent workflows
**How:** Comment on the issue with a design proposal. Wait for maintainer approval before starting work. Tag with `bounty:extreme`.
## Rules
1. **Claim before you start** — comment on the issue, wait for assignment
+37 -1
View File
@@ -27,7 +27,7 @@ When someone comments "I'd like to work on this":
5. Merge — the GitHub Action auto-awards XP and posts to Discord
6. Close the linked bounty issue
### Quality Gates
### Quality Gates — Integration Bounties
**`bounty:docs`:**
- [ ] Follows the [tool README template](templates/tool-readme-template.md)
@@ -51,6 +51,31 @@ When someone comments "I'd like to work on this":
- [ ] `make check && make test` passes
- [ ] Registered in `_register_unverified()` (not verified)
### Quality Gates — Standard Bounties
**`bounty:small`:**
- [ ] Change is correct and doesn't introduce regressions
- [ ] CI passes
- [ ] Scope matches "small" — not padded into a bigger change
**`bounty:medium`:**
- [ ] CI passes
- [ ] Bug fixes include a regression test
- [ ] Docs/guides are accurate and follow existing style
- [ ] Not AI-generated without verification
**`bounty:large`:**
- [ ] Design was discussed in the issue before implementation
- [ ] CI passes, new tests cover the change
- [ ] Benchmarks included for performance work (before/after)
- [ ] Architecture docs reviewed by a second maintainer
**`bounty:extreme`:**
- [ ] Maintainer pre-approved the design proposal before work began
- [ ] CI passes, comprehensive test coverage
- [ ] Documentation updated to reflect the change
- [ ] Reviewed by at least two maintainers
### Rejecting Submissions
1. Leave specific, constructive feedback
@@ -78,6 +103,8 @@ If a Core Contributor is inactive 8+ weeks, reach out privately first, then remo
Post dollar values in `#bounty-payouts` (Core Contributors only):
### Integration bounties
| Bounty Type | Dollar Range |
|-------------|-------------|
| `bounty:test` | $1030 |
@@ -85,6 +112,15 @@ Post dollar values in `#bounty-payouts` (Core Contributors only):
| `bounty:code` | $2050 |
| `bounty:new-tool` | $50150 |
### Standard bounties
| Bounty Type | Dollar Range |
|-------------|-------------|
| `bounty:small` | $515 |
| `bounty:medium` | $2050 |
| `bounty:large` | $50150 |
| `bounty:extreme` | $150500 |
**Payout:** PR merged → verify quality → record in `#bounty-payouts` → process payment.
XP is always awarded regardless of budget. Money is a bonus layer.
+1 -1
View File
@@ -14,7 +14,7 @@ Complete setup from zero to running. Estimated time: 30 minutes.
./scripts/setup-bounty-labels.sh
```
This creates 7 labels: 4 bounty types (`bounty:test`, `bounty:docs`, `bounty:code`, `bounty:new-tool`) and 3 difficulty levels (`difficulty:easy`, `difficulty:medium`, `difficulty:hard`).
This creates 11 labels: 4 integration bounty types (`bounty:test`, `bounty:docs`, `bounty:code`, `bounty:new-tool`), 4 standard bounty sizes (`bounty:small`, `bounty:medium`, `bounty:large`, `bounty:extreme`), and 3 difficulty levels (`difficulty:easy`, `difficulty:medium`, `difficulty:hard`).
## Step 2: Create Discord Channels (3 min)
+580
View File
@@ -0,0 +1,580 @@
# MCP Server Registry — Product & Business Requirements Document
**Status**: Draft v2
**Last updated**: 2026-03-13
**Authors**: Timothy
**Reviewers**: Platform, Product, OSS/Community, Security
---
## 1. Executive Summary
This document proposes an **MCP Server Registry** system that enables open-source contributors and Hive users to discover, publish, install, and manage MCP (Model Context Protocol) servers for use with Hive agents.
Today, MCP server configuration is static, duplicated across agents, and limited to servers that Hive spawns as subprocesses. This makes it impractical for users who run their own MCP servers on the same host, and impossible for the community to contribute standalone MCP integrations without modifying Hive internals.
The registry consists of three components:
1. **A public GitHub repository** (`hive-mcp-registry`) — a curated index where contributors submit MCP server entries via pull request
2. **Local registry tooling** — CLI commands and a `~/.hive/mcp_registry/` directory for installing, managing, and connecting to MCP servers
3. **Framework integration** — changes to Hive's `ToolRegistry`, `MCPClient`, and agent runner so agents can flexibly select which registry servers they need
---
## 2. Problem Statement
### 2.1 Current State
- Each Hive agent has a static `mcp_servers.json` file that hardcodes MCP server connection details.
- All 150+ tools live in a single monolithic `mcp_server.py` — contributors add tools to this one server.
- There is no mechanism for standalone MCP servers (e.g., a Jira MCP, a Notion MCP, or a custom database MCP) to be discovered or used by Hive agents.
- Each agent spawns its own MCP subprocess — no connection sharing across agents.
- Only `stdio` and basic `http` transports are supported. No unix sockets, no SSE, no reconnection.
- External MCP servers already running on the host cannot be easily registered.
### 2.2 Who Is Affected
| Persona | Pain Point |
|---|---|
| **OSS contributor** | Wants to publish a standalone MCP server for the Hive ecosystem but has no pathway to do so without modifying Hive core |
| **Self-hosted user** | Runs multiple MCP servers on the same host (Slack, GitHub, database tools) and wants Hive agents to discover them |
| **Agent builder** | Copies the same `mcp_servers.json` boilerplate across every agent; no way to say "use whatever the user has installed" |
| **Platform team** | Cannot manage MCP servers centrally; each agent manages its own connections independently |
### 2.3 Impact of Not Solving
- The Hive MCP ecosystem remains closed — growth depends entirely on tools being added to the monolithic server.
- Users with existing MCP infrastructure (from Claude Desktop, Cursor, or other MCP-compatible tools) cannot leverage it with Hive.
- Resource waste from duplicate subprocess spawning across agents.
- No path to community-contributed integrations beyond the core tool set.
---
## 3. Goals & Success Criteria
### 3.1 Primary Goals
| # | Goal | Metric |
|---|---|---|
| G1 | A contributor can register a new MCP server in under 5 minutes | Time from fork to PR submission |
| G2 | A user can install and use a registry MCP server in under 2 minutes | Time from `hive mcp install X` to first tool call |
| G3 | Agents can dynamically select MCP servers by name or tag without hardcoding configs | Agents use `mcp_registry.json` selectors instead of full server configs |
| G4 | Multiple agents share MCP connections instead of duplicating them | One subprocess/connection per unique server, not per agent |
| G5 | External MCP servers already running on the host can be registered with a single command | `hive mcp add --name X --url http://...` works end-to-end |
| G6 | Zero breaking changes to existing agent configurations | All current `mcp_servers.json` files continue to work unchanged |
### 3.2 Developer Success Goals
| # | Goal | Metric |
|---|---|---|
| G7 | First-install success rate exceeds 90% | Successful `hive mcp install` / total attempts (tracked via CLI telemetry opt-in) |
| G8 | First-tool-call success rate exceeds 85% after install | Successful tool invocation within 5 minutes of install |
| G9 | Users can self-diagnose and resolve config/auth issues without filing support tickets | Median time from error to resolution <5 minutes; support ticket volume per server <1/month |
| G10 | Registry entries remain healthy over time | % of entries passing automated health validation at 30/60/90 days |
| G11 | Server upgrades do not silently break agents | Zero undetected tool-signature changes on upgrade |
### 3.3 Non-Goals (Explicit Exclusions)
- **Billing or monetization** — the registry is free and open-source.
- **Hosting MCP servers** — the registry only stores metadata; actual servers are installed/run by users.
- **Replacing `mcp_servers.json`** — the static config remains for backward compatibility and offline use.
- **Runtime agent-to-agent MCP sharing** — this is about discovery and connection, not inter-agent protocol.
- **Decomposing the monolithic `mcp_server.py`** — this is a future phase, not part of the initial build.
---
## 4. User Stories
### 4.1 Contributor: Publishing an MCP Server
> As an OSS contributor who has built a Jira MCP server, I want to register it in a public registry so that any Hive user can install and use it without modifying Hive code.
**Acceptance criteria:**
- `hive mcp init` scaffolds a manifest with my server's details pre-filled from introspection.
- `hive mcp validate ./manifest.json` passes locally before I open a PR.
- `hive mcp test ./manifest.json` starts my server, lists tools, calls a health check, and reports pass/fail.
- CI validates my manifest automatically (schema, naming, required fields, package existence).
- After merge, the server appears in `hive mcp search` for all users.
### 4.2 User: Installing an MCP Server from the Registry
> As a Hive user, I want to install a community MCP server and have my agents use it immediately.
**Acceptance criteria:**
- `hive mcp install jira` fetches the manifest and configures the server locally.
- If credentials are required, the CLI prompts me: "Jira requires JIRA_API_TOKEN (get one at https://...). Enter value:"
- `hive mcp health jira` confirms the server is reachable and tools are discoverable.
- My queen agent (with `auto_discover: true`) automatically picks up the new server's tools.
- `hive mcp info jira` shows trust tier, last health check, installed version, and loaded tools.
### 4.3 User: Registering a Local/Running MCP Server
> As a user running a custom database MCP server on `localhost:9090`, I want Hive agents to use it without publishing it to any public registry.
**Acceptance criteria:**
- `hive mcp add --name my-db --transport http --url http://localhost:9090` registers it.
- The server appears in `hive mcp list` and is available to agents that include it.
- If the server goes down, Hive logs a warning with actionable next steps and retries on next tool call.
### 4.4 Agent Builder: Selecting MCP Servers for a Worker
> As an agent builder, I want my worker agent to use specific MCP servers (e.g., Slack + Jira) without hardcoding connection details.
**Acceptance criteria:**
- I create `mcp_registry.json` in my agent directory with `{"include": ["slack", "jira"]}`.
- At runtime, the agent automatically connects to whatever Slack and Jira servers the user has installed.
- If a requested server isn't installed, startup logs explain: "Server 'jira' requested by mcp_registry.json but not installed. Run: hive mcp install jira"
### 4.5 Queen: Auto-Discovering Available MCP Servers
> As the queen agent, I want access to installed MCP servers so I can delegate tasks that require any tool.
**Acceptance criteria:**
- Queen's `mcp_registry.json` uses `{"profile": "all"}` to load all enabled servers.
- Startup logs list every loaded server and its tool count: "Loaded 3 registry servers: jira (4 tools), slack (6 tools), my-db (2 tools)"
- If tool names collide across servers, the resolution is deterministic and logged.
- Queen respects a configurable max tool budget to avoid prompt overload.
### 4.6 User: Diagnosing a Broken MCP Server
> As a user whose agent suddenly can't call Jira tools, I want to quickly find and fix the problem.
**Acceptance criteria:**
- `hive mcp doctor` checks all installed servers and reports: connection status, credential validity, tool discovery result, last error.
- `hive mcp doctor jira` gives detailed diagnostics: "jira: UNHEALTHY. Transport: stdio. Error: Process exited with code 1. Stderr: 'JIRA_API_TOKEN not set'. Fix: hive mcp config jira --set JIRA_API_TOKEN=your-token"
- `hive mcp inspect jira` shows the resolved config, override chain, and which agents include it.
- `hive mcp why-not jira --agent exports/my-agent` explains why a server was or was not loaded for an agent.
---
## 5. Requirements
### 5.1 Functional Requirements
#### 5.1.1 Registry Repository
| ID | Requirement | Priority |
|---|---|---|
| FR-1 | The registry is a public GitHub repo with a defined directory structure for server entries | P0 |
| FR-2 | Each server entry is a `manifest.json` file conforming to a JSON Schema | P0 |
| FR-3 | CI validates manifests on every PR (schema, naming, uniqueness, required fields) | P0 |
| FR-4 | A flat index (`registry_index.json`) is auto-generated on merge for client consumption | P0 |
| FR-5 | A `_template/` directory provides a starter manifest + README for contributors | P0 |
| FR-6 | `CONTRIBUTING.md` documents the 5-minute submission process with annotated examples for each transport type (stdio, http, unix, sse) | P0 |
| FR-7 | CI checks that `install.pip` packages exist on PyPI (if specified) | P1 |
| FR-8 | Tags follow a controlled taxonomy with new tags requiring maintainer approval | P1 |
| FR-9 | Canonical example manifests are provided for each transport type in `registry/_examples/` | P0 |
#### 5.1.2 Manifest Schema
The manifest has a **portable base layer** (framework-agnostic, usable by any MCP client) and an optional **hive extension block** (Hive-specific ergonomics).
| ID | Requirement | Priority |
|---|---|---|
| FR-10 | Manifest base includes: name, display_name, version, description, author, repository, license | P0 |
| FR-11 | Manifest declares supported transports (stdio, http, unix, sse) with default | P0 |
| FR-12 | Manifest includes install instructions (pip package name, docker image, npm package) | P0 |
| FR-13 | Manifest lists tool names and descriptions (for pre-connect filtering) | P0 |
| FR-14 | Manifest declares credential requirements (env_var, description, help_url, required flag) | P0 |
| FR-15 | Manifest includes tags and categories for discovery | P1 |
| FR-16 | Manifest supports template variables (`{port}`, `{socket_path}`, `{name}`) in commands | P1 |
| FR-17 | Manifest includes `hive` extension block for Hive-specific metadata (see 5.1.8) | P1 |
#### 5.1.3 Manifest Trust & Quality Metadata
| ID | Requirement | Priority |
|---|---|---|
| FR-80 | Manifest includes `status` field: `official`, `verified`, or `community` | P0 |
| FR-81 | Manifest includes `maintainer` contact (email or GitHub handle) | P0 |
| FR-82 | Manifest includes `docs_url` pointing to server documentation | P1 |
| FR-83 | Manifest includes `example_agent_url` linking to an example agent using this server | P2 |
| FR-84 | Manifest includes `supported_os` list (e.g., `["linux", "macos", "windows"]`) | P1 |
| FR-85 | Manifest includes `deprecated` boolean and `deprecated_by` field for superseded entries | P1 |
| FR-86 | Registry index includes `last_validated_at` timestamp per entry (from automated CI health runs) | P1 |
#### 5.1.4 Local Registry
| ID | Requirement | Priority |
|---|---|---|
| FR-20 | `~/.hive/mcp_registry/installed.json` tracks all installed/registered servers | P0 |
| FR-21 | Servers can be sourced from the remote registry (`"source": "registry"`) or local (`"source": "local"`) | P0 |
| FR-22 | Each installed server has: transport preference, enabled/disabled state, and env/header overrides | P0 |
| FR-23 | The remote registry index is cached locally with configurable refresh interval | P1 |
| FR-24 | Each installed server tracks operational state: `last_health_check_at`, `last_health_status`, `last_error`, `last_used_at`, `resolved_package_version` | P1 |
| FR-25 | Each installed server supports `pinned: true` to prevent auto-update and `auto_update: true` for automatic version tracking | P1 |
#### 5.1.5 CLI Commands — Management
| ID | Requirement | Priority |
|---|---|---|
| FR-30 | `hive mcp install <name> [--version X]` — install from registry, optionally pin version | P0 |
| FR-31 | `hive mcp add --name X --transport T --url U` — register a local server | P0 |
| FR-32 | `hive mcp add --from manifest.json` — register from a manifest file | P1 |
| FR-33 | `hive mcp remove <name>` — uninstall/unregister | P0 |
| FR-34 | `hive mcp list` — list installed servers with status, health, and trust tier | P0 |
| FR-35 | `hive mcp list --available` — list all servers in remote registry | P1 |
| FR-36 | `hive mcp search <query>` — search by name/tag/description/tool-name | P1 |
| FR-37 | `hive mcp enable/disable <name>` — toggle without removing | P0 |
| FR-38 | `hive mcp health [name]` — check server reachability and tool discovery | P1 |
| FR-39 | `hive mcp update [name]` — refresh index cache or update a specific server | P1 |
| FR-40 | `hive mcp config <name> --set KEY=VAL` — set credential/env overrides | P0 |
| FR-41 | `hive mcp info <name>` — show full details: trust tier, version, tools, health, which agents use it | P0 |
#### 5.1.6 CLI Commands — Contributor Tooling
| ID | Requirement | Priority |
|---|---|---|
| FR-42 | `hive mcp init [--server-url URL]` — scaffold a manifest; if URL provided, introspects server to pre-fill tools list | P0 |
| FR-43 | `hive mcp validate <path>` — validate a manifest against the JSON Schema locally | P0 |
| FR-44 | `hive mcp test <path>` — start the server per manifest config, list tools, run health check, report pass/fail | P1 |
#### 5.1.7 CLI Commands — Diagnostics
| ID | Requirement | Priority |
|---|---|---|
| FR-45 | `hive mcp doctor [name]` — check all or one server: connection, credentials, tool discovery, last error; output actionable fix suggestions | P0 |
| FR-46 | `hive mcp inspect <name>` — show resolved config including override chain, transport details, and which agents include/exclude this server | P1 |
| FR-47 | `hive mcp why-not <name> --agent <path>` — explain why a server was or was not loaded for a specific agent's `mcp_registry.json` | P1 |
#### 5.1.8 Hive Extension Block in Manifest
The optional `hive` block in the manifest carries Hive-specific metadata that doesn't belong in the portable base:
| ID | Requirement | Priority |
|---|---|---|
| FR-90 | `hive.min_version` — minimum Hive version required | P1 |
| FR-91 | `hive.max_version` — maximum compatible Hive version (optional, for deprecation) | P2 |
| FR-92 | `hive.example_agent` — path or URL to an example agent using this server | P2 |
| FR-93 | `hive.profiles` — list of profile tags this server belongs to (e.g., `["core", "productivity", "developer"]`) | P1 |
| FR-94 | `hive.tool_namespace` — optional prefix for tool names to avoid collisions (e.g., `jira_`) | P1 |
#### 5.1.9 Agent Selection
| ID | Requirement | Priority |
|---|---|---|
| FR-50 | Agents can declare MCP server preferences in `mcp_registry.json` | P0 |
| FR-51 | Selection supports: explicit `include` list, `tags` matching, `exclude` blacklist | P0 |
| FR-52 | `profile` field loads servers matching a named profile (e.g., `"all"`, `"core"`, `"productivity"`) | P0 |
| FR-53 | If `mcp_registry.json` does not exist, no registry servers are loaded (backward compatible) | P0 |
| FR-54 | Missing requested servers produce warnings with actionable install instructions, not errors | P0 |
| FR-55 | Agent startup logs a summary of loaded/skipped registry servers with reasons | P0 |
| FR-56 | `max_tools` field caps total tools loaded from registry servers (prevents prompt overload) | P1 |
#### 5.1.10 Tool Resolution & Namespacing
| ID | Requirement | Priority |
|---|---|---|
| FR-100 | When multiple servers expose a tool with the same name, the first server in include-order wins (deterministic) | P0 |
| FR-101 | Tool collisions are logged at startup: "Tool 'search' from 'brave-search' shadowed by 'google-search' (loaded first)" | P0 |
| FR-102 | If a server declares `hive.tool_namespace`, its tools are prefixed: `jira_create_issue` instead of `create_issue` | P1 |
| FR-103 | `hive mcp inspect <name>` shows which tools are active vs shadowed | P1 |
#### 5.1.11 Connection Management
| ID | Requirement | Priority |
|---|---|---|
| FR-60 | A process-level connection manager shares MCP connections across agents | P1 |
| FR-61 | Connections are reference-counted — disconnected when no agent uses them | P1 |
| FR-62 | HTTP/unix/SSE connections retry once on failure before raising an error | P1 |
#### 5.1.12 Transport Extensions
| ID | Requirement | Priority |
|---|---|---|
| FR-70 | `MCPClient` supports unix socket transport via `httpx` UDS | P1 |
| FR-71 | `MCPClient` supports SSE transport via the official MCP Python SDK | P1 |
| FR-72 | `MCPServerConfig` includes `socket_path` field for unix transport | P1 |
### 5.2 Version Compatibility & Upgrade Safety
| ID | Requirement | Priority |
|---|---|---|
| VC-1 | Manifest includes `version` (semver) for the registry entry and `mcp_protocol_version` for the MCP spec | P0 |
| VC-2 | Manifest `hive` block includes optional `min_version` / `max_version` constraints | P1 |
| VC-3 | `hive mcp install` installs latest by default; `--version X` pins a specific version | P0 |
| VC-4 | `installed.json` records `resolved_package_version` (actual pip/npm version installed) | P1 |
| VC-5 | `hive mcp update <name>` compares old and new tool lists; warns if tools were removed or signatures changed | P1 |
| VC-6 | Agents can pin a resolved server version in `mcp_registry.json` via `"versions": {"jira": "1.2.0"}` | P2 |
| VC-7 | If a pinned version is no longer available, the agent logs an error with rollback instructions | P2 |
| VC-8 | `hive mcp update --dry-run` shows what would change without applying | P1 |
| VC-9 | Tool names and parameter schemas from the manifest constitute a compatibility contract; breaking changes require a major version bump | P1 |
### 5.3 Failure Handling & Diagnostics
| ID | Requirement | Priority |
|---|---|---|
| DX-1 | All MCP errors use structured error codes (e.g., `MCP_INSTALL_FAILED`, `MCP_AUTH_MISSING`, `MCP_CONNECT_TIMEOUT`, `MCP_TOOL_NOT_FOUND`, `MCP_PROTOCOL_MISMATCH`) | P0 |
| DX-2 | Every error message includes: what failed, why, and a suggested fix command | P0 |
| DX-3 | `hive mcp doctor` checks: connection, credentials (are required env vars set?), tool discovery, protocol version compatibility, Hive version compatibility | P0 |
| DX-4 | Agent startup emits a structured log line per registry server: `{server, status, tools_loaded, skipped_reason}` | P0 |
| DX-5 | Failed tool calls from registry servers include the server name and transport in the error context | P1 |
| DX-6 | `hive mcp doctor` output is machine-parseable (JSON with `--json` flag) for CI/automation | P2 |
### 5.4 Non-Functional Requirements
| ID | Requirement | Priority |
|---|---|---|
| NFR-1 | Registry index fetch must complete in <5s on typical internet connections | P1 |
| NFR-2 | Installing a server from registry must not require a Hive restart | P0 |
| NFR-3 | Connection manager must be thread-safe (multiple agents in same process) | P0 |
| NFR-4 | All new code must have unit test coverage | P0 |
| NFR-5 | Registry repo CI must run in <60s | P1 |
| NFR-6 | Manifest base schema must be framework-agnostic (usable by non-Hive MCP clients); Hive-specific fields live in the `hive` extension block | P1 |
| NFR-7 | `hive mcp install` prints a security notice on first use: "Registry servers run code on your machine. Only install servers you trust." | P0 |
---
## 6. Architecture Overview
```
┌──────────────────────────────────┐
│ hive-mcp-registry (GitHub) │
│ │
│ registry/servers/jira/manifest │
│ registry/servers/slack/manifest │
│ ... │
│ registry_index.json (auto-built) │
└────────────────┬───────────────────┘
│ hive mcp update
│ (fetches index)
┌─────────────────────────────────────────────────────────────────────┐
│ ~/.hive/mcp_registry/ │
│ │
│ installed.json config.json cache/ │
│ (jira, slack, (preferences) registry_index.json │
│ my-custom-db) (cached remote) │
└─────────────────────────────┬───────────────────────────────────────┘
┌───────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌──────────────┐
│ Queen Agent │ │Worker Agent │ │ hive mcp CLI │
│ │ │ │ │ │
│ mcp_registry │ │mcp_registry │ │ install │
│ .json: │ │.json: │ │ add / remove │
│ profile: all │ │include: │ │ doctor │
│ │ │ [jira] │ │ init / test │
└──────┬───────┘ └──────┬──────┘ └──────────────┘
│ │
▼ ▼
┌──────────────────────────────────┐
│ MCPConnectionManager │
│ (process singleton) │
│ │
│ jira → MCPClient (stdio, rc=2) │
│ slack → MCPClient (http, rc=1) │
│ my-db → MCPClient (unix, rc=1) │
└──────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌────────┐ ┌────────────┐
│ Jira MCP │ │Slack │ │ Custom DB │
│ (stdio) │ │MCP │ │ MCP (unix │
│ │ │(http) │ │ socket) │
└──────────┘ └────────┘ └────────────┘
```
### Component Responsibilities
| Component | Responsibility |
|---|---|
| **hive-mcp-registry** (GitHub repo) | Curated index of MCP server manifests; CI validates PRs; automated health checks |
| **~/.hive/mcp_registry/** | Local state: installed servers, cached index, user config, operational telemetry |
| **MCPRegistry** (Python module) | Core logic: install, remove, search, resolve for agent, doctor |
| **MCPConnectionManager** | Process-level connection pool with refcounting |
| **MCPClient** (extended) | Adds unix socket, SSE transports; retry on failure |
| **ToolRegistry** (extended) | New `load_registry_servers()` method with collision handling |
| **AgentRunner** (extended) | Loads `mcp_registry.json` alongside `mcp_servers.json`; logs resolution summary |
| **hive mcp CLI** | User-facing commands for management, diagnostics, and contributor tooling |
---
## 7. Data Models
### 7.1 Registry Manifest (`manifest.json`)
```json
{
"$schema": "https://raw.githubusercontent.com/aden-hive/hive-mcp-registry/main/schema/manifest.schema.json",
"name": "jira",
"display_name": "Jira MCP Server",
"version": "1.2.0",
"description": "Interact with Jira issues, boards, and sprints",
"author": {"name": "Jane Contributor", "github": "janedev", "url": "https://github.com/janedev"},
"maintainer": {"github": "janedev", "email": "jane@example.com"},
"repository": "https://github.com/janedev/jira-mcp-server",
"license": "MIT",
"status": "community",
"docs_url": "https://github.com/janedev/jira-mcp-server/blob/main/README.md",
"supported_os": ["linux", "macos", "windows"],
"deprecated": false,
"transport": {"supported": ["stdio", "http"], "default": "stdio"},
"install": {"pip": "jira-mcp-server", "docker": "ghcr.io/janedev/jira-mcp-server:latest", "npm": null},
"stdio": {"command": "uvx", "args": ["jira-mcp-server", "--stdio"]},
"http": {"default_port": 4010, "health_path": "/health", "command": "uvx", "args": ["jira-mcp-server", "--http", "--port", "{port}"]},
"unix": {"socket_template": "/tmp/mcp-{name}.sock", "command": "uvx", "args": ["jira-mcp-server", "--unix", "{socket_path}"]},
"tools": [
{"name": "jira_create_issue", "description": "Create a new Jira issue"},
{"name": "jira_search", "description": "Search Jira issues with JQL"},
{"name": "jira_update_issue", "description": "Update an existing issue"},
{"name": "jira_list_boards", "description": "List all Jira boards"}
],
"credentials": [
{"id": "jira_api_token", "env_var": "JIRA_API_TOKEN", "description": "Jira API token", "help_url": "https://id.atlassian.com/manage-profile/security/api-tokens", "required": true},
{"id": "jira_domain", "env_var": "JIRA_DOMAIN", "description": "Your Jira domain (e.g., mycompany.atlassian.net)", "required": true}
],
"tags": ["project-management", "atlassian", "issue-tracking"],
"categories": ["productivity"],
"mcp_protocol_version": "2024-11-05",
"hive": {
"min_version": "0.5.0",
"max_version": null,
"profiles": ["productivity", "developer"],
"tool_namespace": "jira",
"example_agent": "https://github.com/janedev/jira-mcp-server/tree/main/examples/hive-agent"
}
}
```
**Schema layering**:
- Everything outside `hive` is the **portable base** — usable by any MCP client.
- The `hive` block carries Hive-specific compatibility, profiles, namespacing, and examples.
### 7.2 Agent Selection (`mcp_registry.json`)
```json
{
"include": ["jira", "slack"],
"tags": ["crm"],
"exclude": ["github"],
"profile": "productivity",
"max_tools": 50,
"versions": {
"jira": "1.2.0"
}
}
```
**Selection precedence** (deterministic):
1. `profile` expands to a set of server names (union with `include` + `tags` matches).
2. `include` adds explicit servers.
3. `tags` adds servers whose tags overlap.
4. `exclude` removes from the final set (always wins).
5. Servers are loaded in `include`-order first, then alphabetically for tag/profile matches.
6. Tool collisions resolved by load order: first server wins.
### 7.3 Installed Server Entry (`installed.json` → `servers.<name>`)
```json
{
"source": "registry",
"manifest_version": "1.2.0",
"manifest": {},
"installed_at": "2026-03-13T10:00:00Z",
"installed_by": "hive mcp install",
"transport": "stdio",
"enabled": true,
"pinned": false,
"auto_update": false,
"resolved_package_version": "1.2.0",
"overrides": {"env": {"JIRA_DOMAIN": "mycompany.atlassian.net"}, "headers": {}},
"last_health_check_at": "2026-03-13T12:00:00Z",
"last_health_status": "healthy",
"last_error": null,
"last_used_at": "2026-03-13T11:30:00Z",
"last_validated_with_hive_version": "0.6.0"
}
```
---
## 8. Risks & Mitigations
| Risk | Impact | Likelihood | Mitigation |
|---|---|---|---|
| Low contributor adoption — nobody submits servers | Registry is empty, no value delivered | Medium | Seed with 5-10 popular MCP servers; `hive mcp init` makes submission trivial; canonical examples for every transport |
| High support burden from low-quality entries | Users install broken servers, file tickets against Hive | Medium | Trust tiers (official/verified/community); automated health checks in registry CI; `hive mcp doctor` for self-service debugging; quality gates beyond schema validation |
| Malicious MCP server in registry | User installs server that exfiltrates data or executes harmful code | Low | Maintainer review on all PRs; security notice on first install; servers run in user's trust boundary; verified tier requires code audit |
| Breaking changes to manifest schema | Existing manifests become invalid | Low | Schema versioning with `$schema` URL; CI validates backward compatibility; migration scripts |
| Server upgrades silently break agents | Tool signatures change, agents fail at runtime | Medium | `hive mcp update` diffs tool lists and warns on breaking changes; version pinning in `mcp_registry.json`; `--dry-run` flag |
| Connection manager concurrency bugs | Tool calls fail or deadlock under load | Medium | Thorough unit tests; reuse existing thread-safety patterns from `MCPClient._stdio_call_lock` |
| Registry index URL becomes unavailable | Users can't install new servers | Low | Local cache with TTL; fallback to last-known-good index; registry is a static file (cheap to host/mirror) |
| Name squatting in registry | Bad actors claim popular names | Low | Maintainer review on all PRs; naming guidelines in CONTRIBUTING.md |
| Auto-discover overloads agents with too many tools | Prompt bloat, confused tool selection, slower responses | Medium | `max_tools` cap in `mcp_registry.json`; profiles instead of blanket auto-discover; startup log shows tool count |
| Tool name collisions across servers | Wrong server handles a tool call | Medium | Deterministic load-order resolution; startup collision logging; optional tool namespacing via `hive.tool_namespace` |
---
## 9. Backward Compatibility
This system is **fully additive**:
- Existing `mcp_servers.json` files continue to work unchanged.
- Agents without `mcp_registry.json` load zero registry servers.
- The `MCPConnectionManager` is only used for registry-sourced connections; existing direct `MCPClient` usage is untouched.
- New CLI commands (`hive mcp ...`) don't conflict with existing commands.
- No existing files are modified in a breaking way.
- `mcp_servers.json` tools always take precedence over registry tools (they load first).
---
## 10. Documentation & Examples Strategy
Documentation is a first-class deliverable, not an afterthought. The following are required for launch:
| Doc | Audience | Deliverable |
|---|---|---|
| "Publish your first MCP server" | Contributors | Step-by-step guide from zero to merged registry entry, with screenshots |
| "Install and use your first registry server" | Users | Guide from `hive mcp install` to agent tool call |
| "Migration from mcp_servers.json" | Existing users | How to move static configs to registry-based selection |
| "Troubleshooting MCP servers" | Users | Common errors, `doctor` output examples, fix recipes |
| Manifest cookbook | Contributors | Annotated examples for stdio, http, unix, sse, multi-credential, no-credential |
| Example agents | Agent builders | 2-3 sample agents using `mcp_registry.json` with different selection strategies |
---
## 11. Phased Delivery
| Phase | Scope | Depends On |
|---|---|---|
| **Phase 1: Foundation** | MCPClient transport extensions (unix, SSE, retry); MCPConnectionManager; MCPRegistry module; CLI management commands; ToolRegistry `load_registry_servers()` with collision handling; AgentRunner `mcp_registry.json` loading with startup logging; structured error codes | -- |
| **Phase 2: Developer Tooling** | `hive mcp init`, `validate`, `test` (contributor flow); `doctor`, `inspect`, `why-not` (diagnostics); version pinning and `update --dry-run` | Phase 1 |
| **Phase 3: Registry Repo** | Create `hive-mcp-registry` GitHub repo with schema, validation CI, template, examples, CONTRIBUTING.md; seed with reference entries for built-in servers; automated health check CI | Phase 1 |
| **Phase 4: Docs & Launch** | All documentation deliverables from section 10; example agents; announcement | Phase 2, 3 |
| **Phase 5: Community Growth** | Trust tier promotion process; curated starter packs; popular/trending signals in registry | Phase 4 |
| **Phase 6: Monolith Decomposition** (future) | Extract tool groups from `mcp_server.py` into standalone servers; each becomes a registry entry | Phase 5 |
---
## 12. Open Questions
| # | Question | Owner | Status |
|---|---|---|---|
| Q1 | Should the registry repo live under `aden-hive` org or a new `hive-mcp` org? | Platform team | Open |
| Q2 | Should `hive mcp install` auto-prompt for required credentials interactively? | UX | Open |
| Q3 | Should the connection manager have a configurable max concurrent connections limit? | Engineering | Open |
| Q4 | Should we support a `docker` transport (Hive manages container lifecycle)? | Engineering | Open |
| Q5 | What is the process for promoting a `community` entry to `verified`? (e.g., code audit, usage threshold, maintainer SLA) | Platform + Security | Open |
| Q6 | Should the registry support private/enterprise indexes (e.g., `hive mcp config --index-url https://internal/...`)? | Platform | Open |
| Q7 | Should `hive mcp doctor` report telemetry (opt-in) to help identify systemic issues? | Product + Privacy | Open |
| Q8 | How should we handle MCP servers that require OAuth flows (not just static API keys)? | Engineering | Open |
---
## 13. Stakeholder Sign-Off
| Role | Name | Status |
|---|---|---|
| Engineering Lead | | Pending |
| Product | | Pending |
| OSS / Community | | Pending |
| Security | | Pending |
| Developer Experience | | Pending |
-111
View File
@@ -1,111 +0,0 @@
# Local credential parity: aliases, identity, status, and credential tester integration
## Summary
Gives local API key credentials (Brave Search, GitHub, Exa, Stripe, etc.) the same feature set as Aden OAuth credentials: named aliases, identity metadata, status tracking, CRUD management, and full visibility in the credential tester.
Fixes a bug where credentials configured with the existing `store_credential` MCP tool were invisible in the credential tester account picker.
---
## Changes
### New: `core/framework/credentials/local/`
**`models.py`** — `LocalAccountInfo` dataclass mirroring `AdenIntegrationInfo`:
- Fields: `credential_id`, `alias`, `status` (`active` / `failed` / `unknown`), `identity`, `last_validated`, `created_at`
- `storage_id` property returns `"{credential_id}/{alias}"` (e.g. `brave_search/work`)
- `to_account_dict()` returns same shape as Aden account dicts — feeds account picker without changes
**`registry.py`** — `LocalCredentialRegistry`, the core engine:
- `save_account(credential_id, alias, api_key)` — runs health check, extracts identity, stores at `{credential_id}/{alias}` in `EncryptedFileStorage`
- `list_accounts(credential_id=None)` — reads all `{x}/{y}` entries from storage
- `get_key(credential_id, alias)` — returns raw secret
- `delete_account(credential_id, alias)` — removes entry
- `validate_account(credential_id, alias)` — re-runs health check, updates `_status` and `last_refreshed` in-place
- `default()` classmethod — uses `~/.hive/credentials`
Storage convention: `{credential_id}/{alias}` as `CredentialObject.id`. Legacy flat entries (`brave_search`, no slash) continue to work — env var fallback is unchanged.
---
### Modified: `tools/src/aden_tools/credentials/store_adapter.py`
- `get(name, account=None)` — added `account=` param for per-call routing to a named local account; mirrors Aden `account=` routing
- `activate_local_account(credential_id, alias)` — injects a named account's key into `os.environ[spec.env_var]` for session-level activation
- `list_local_accounts(credential_id=None)` — delegates to `LocalCredentialRegistry`
---
### Modified: `core/framework/credentials/__init__.py`
Exports `LocalAccountInfo` and `LocalCredentialRegistry`.
---
### Modified: `core/framework/agents/credential_tester/agent.py`
Full rewrite of account listing and configuration:
- `_list_aden_accounts()` — extracted from old `list_connected_accounts()`
- `_list_local_accounts()` — uses `LocalCredentialRegistry`
- `_list_env_fallback_accounts()` — detects credentials configured via env var **or** in old flat encrypted format; fixes the invisible-credential bug
- `list_connected_accounts()` — combines all three, deduplicates
- `configure_for_account()` — branches on `source` field:
- `"aden"` → adds `get_account_info` tool, prompts with `account="alias"`
- `"local"` → calls `_activate_local_account()`, prompt has no `account=` param
- `_activate_local_account()` — handles three cases: named registry entry, old flat encrypted entry, env var already set; also handles grouped credentials (e.g. `google_custom_search` sets both `GOOGLE_API_KEY` and `GOOGLE_CSE_ID`)
- `get_tools_for_provider()` — fixed to match both `credential_id` AND `credential_group`
---
### Modified: `core/framework/builder/package_generator.py`
- `store_credential(name, value, alias="default", ...)` — added `alias` param; now delegates to `LocalCredentialRegistry.save_account()` with auto health check; returns `status` and `identity`
- `list_stored_credentials()` — delegates to `LocalCredentialRegistry.list_accounts()`; returns `credential_id`, `alias`, `status`, `identity`, `last_validated`
- `delete_stored_credential(name, alias="default")` — added `alias` param
- `validate_credential(name, alias="default")`**new tool** — re-runs health check via `LocalCredentialRegistry.validate_account()`, returns updated status and identity
---
### Modified: `core/framework/tui/screens/account_selection.py`
- Aden accounts rendered first, local accounts second
- Local accounts display a `[local]` badge
- Identity label shows email, username, or workspace when available
---
### New: `core/framework/tui/screens/add_local_credential.py`
Two-phase modal for adding a named local API key:
1. **Type selection** — filtered list of all `direct_api_key_supported=True` credentials
2. **Form** — alias input + password input → "Test & Save" runs health check inline, shows identity result, auto-dismisses on success
Exported from `core/framework/tui/screens/__init__.py`.
---
## Bug fix
**Credential tester not showing configured credentials** (e.g. Brave Search stored via `store_credential`):
- `_list_env_fallback_accounts()` previously used `CredentialStoreAdapter.with_env_storage()`, which only checked `os.environ`. Credentials stored in `EncryptedFileStorage` with the old flat format (`brave_search`, no slash) were invisible.
- `_activate_local_account()` early-returned when `alias == "default"`, assuming the env var was already set. Old flat encrypted credentials are not in `os.environ`.
**Fix**: `_list_env_fallback_accounts()` now also reads `EncryptedFileStorage.list_all()` and treats any flat entry (no `/`) as configured. `_activate_local_account()` now falls through to load from the flat encrypted entry when the env var is not set and the registry has no named entry.
---
## Test plan
- [ ] `store_credential("brave_search", "BSA-xxx", alias="work")` → health check runs, identity shown, stored as `brave_search/work`
- [ ] `list_stored_credentials()` → shows `credential_id`, `alias`, `status`, `identity`, `last_validated`
- [ ] `validate_credential("brave_search", "work")` → re-runs health check, updates status
- [ ] `delete_stored_credential("brave_search", alias="work")` → removes entry
- [ ] Credential tester account picker shows local accounts with `[local]` badge alongside Aden accounts
- [ ] Selecting a local account activates the key and tools work without `account=` param
- [ ] Selecting a legacy flat credential (stored before this PR) activates it correctly
- [ ] `AddLocalCredentialScreen` — select type, enter alias + key, health check runs inline, screen closes on success
- [ ] `CredentialStoreAdapter.get("brave_search", account="work")` returns key from registry
-56
View File
@@ -1,56 +0,0 @@
# feat(queen): Hive Queen Bee — native agent-building agent
## Summary
Introduces **Hive Coder** (codename "Queen Bee"), a framework-native coding agent that builds complete Hive agent packages from natural language descriptions. This is a single-node, forever-alive agent inspired by opencode's `while(true)` loop — one continuous conversation handles the full lifecycle: understand, qualify, design, implement, verify, and iterate.
The agent is deeply integrated with the framework: it can discover available MCP tools at runtime, inspect sessions and checkpoints of agents it builds, run their test suites, and self-verify its own output. It ships with a dedicated MCP tools server (`coder_tools_server.py`) providing rich file I/O, fuzzy-match editing, git snapshots, and shell execution — all scoped to a configurable project root.
## What's included
### New: `hive_coder` agent (`core/framework/agents/hive_coder/`)
- **`agent.py`** — Goal with 4 success criteria and 4 constraints, single-node graph, `HiveCoderAgent` class with full runtime lifecycle (start/stop/trigger_and_wait)
- **`nodes/__init__.py`** — Single `coder` EventLoopNode with a comprehensive system prompt covering coding mandates, tool discovery, meta-agent capabilities, node count rules, implementation templates, and a 6-phase workflow
- **`config.py`** — RuntimeConfig with auto-detection of preferred model from `~/.hive/configuration.json`
- **`__main__.py`** — Click CLI with `run`, `tui`, `info`, `validate`, and `shell` subcommands
- **`reference/`** — Framework guide, file templates, and anti-patterns docs embedded as agent reference material
### New: Coder Tools MCP Server (`tools/coder_tools_server.py`)
- 1500-line MCP server providing 13 tools: `read_file`, `write_file`, `edit_file` (with opencode-style 9-strategy fuzzy matching), `list_directory`, `search_files`, `run_command`, `undo_changes`, `discover_mcp_tools`, `list_agents`, `list_agent_sessions`, `list_agent_checkpoints`, `get_agent_checkpoint`, `run_agent_tests`
- Path-scoped security: all file operations sandboxed to project root
- Git-based undo: automatic snapshots before writes with `undo_changes` rollback
### Framework changes
- **`hive code` CLI command** — Direct launch shortcut for Hive Coder via `cmd_code` in `runner/cli.py`
- **`hive tui` updated** — Now discovers framework agents alongside exports/ and examples/
- **Cron timer support**`AgentRuntime` now supports cron expressions (`croniter`) in addition to fixed-interval timers for async entry points
- **Datetime in system prompts**`prompt_composer._with_datetime()` appends current datetime to all composed system prompts; EventLoopNode also applies it for isolated conversations
- **`max_node_visits` default → 0** — Changed from 1 to 0 (unbounded) across `NodeSpec` and executor, matching the forever-alive pattern as the standard default
- **TUI graph view** — Timer display updated to show cron expressions and hours in countdown
- **CredentialError handling**`_setup()` calls in TUI launch paths now catch and display credential errors gracefully
### Tests
- New `test_agent_runtime.py` tests for cron-based timer scheduling
## Architecture
```
User ──▶ [coder] (EventLoopNode, client_facing, forever-alive)
│ Tools: coder_tools_server.py (file I/O, shell, git)
│ + meta-agent tools (discover, inspect, test)
└──▶ loops continuously until user exits
```
Single node. No edges. No terminal nodes. The agent stays alive and handles multiple build requests in one session — context accumulates across interactions.
## Test plan
- [ ] `hive code` launches Hive Coder TUI successfully
- [ ] `hive tui` shows "Framework Agents" as a source option
- [ ] Agent can discover tools via `discover_mcp_tools()`
- [ ] Agent generates a valid agent package from a natural language request
- [ ] Generated packages pass `AgentRunner.load()` validation
- [ ] Cron timer tests pass (`test_agent_runtime.py`)
- [ ] Existing tests unaffected by `max_node_visits` default change
@@ -12,7 +12,6 @@ from .agent import (
nodes,
edges,
loop_config,
async_entry_points,
entry_node,
entry_points,
pause_nodes,
@@ -31,7 +30,6 @@ __all__ = [
"nodes",
"edges",
"loop_config",
"async_entry_points",
"entry_node",
"entry_points",
"pause_nodes",
@@ -4,7 +4,7 @@ from pathlib import Path
from framework.graph import EdgeCondition, EdgeSpec, Goal, SuccessCriterion, Constraint
from framework.graph.checkpoint_config import CheckpointConfig
from framework.graph.edge import AsyncEntryPointSpec, GraphSpec
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
@@ -152,17 +152,6 @@ edges = [
# Graph configuration
entry_node = "intake"
entry_points = {"start": "intake"}
async_entry_points = [
AsyncEntryPointSpec(
id="email-timer",
name="Scheduled Inbox Check",
entry_node="fetch-emails",
trigger_type="timer",
trigger_config={"interval_minutes": 5},
isolation_level="shared",
max_concurrent=1,
),
]
pause_nodes = []
terminal_nodes = []
loop_config = {
@@ -224,7 +213,6 @@ class EmailInboxManagementAgent:
loop_config=loop_config,
conversation_mode=conversation_mode,
identity_prompt=identity_prompt,
async_entry_points=async_entry_points,
)
def _setup(self, mock_mode=False) -> None:
@@ -275,16 +263,6 @@ class EmailInboxManagementAgent:
trigger_type="manual",
isolation_level="shared",
),
# Timer-driven entry point
EntryPointSpec(
id="email-timer",
name="Scheduled Inbox Check",
entry_node="fetch-emails",
trigger_type="timer",
trigger_config={"interval_minutes": 5},
isolation_level="shared",
max_concurrent=1,
),
]
self._agent_runtime = create_agent_runtime(
@@ -360,10 +338,6 @@ class EmailInboxManagementAgent:
"pause_nodes": self.pause_nodes,
"terminal_nodes": self.terminal_nodes,
"client_facing_nodes": [n.id for n in self.nodes if n.client_facing],
"async_entry_points": [
{"id": ep.id, "name": ep.name, "entry_node": ep.entry_node}
for ep in async_entry_points
],
}
def validate(self):
@@ -391,13 +365,6 @@ class EmailInboxManagementAgent:
f"Entry point '{ep_id}' references unknown node '{node_id}'"
)
# Validate async entry points
for ep in async_entry_points:
if ep.entry_node not in node_ids:
errors.append(
f"Async entry point '{ep.id}' references unknown node '{ep.entry_node}'"
)
return {
"valid": len(errors) == 0,
"errors": errors,
@@ -0,0 +1,11 @@
[
{
"id": "email-timer",
"name": "Scheduled Inbox Check",
"trigger_type": "timer",
"trigger_config": {
"interval_minutes": 5
},
"task": "Fetch and process inbox emails according to the user's rules"
}
]
+50 -43
View File
@@ -810,26 +810,26 @@ $DefaultModels = @{
# Model choices: array of hashtables per provider
$ModelChoices = @{
anthropic = @(
@{ Id = "claude-haiku-4-5-20251001"; Label = "Haiku 4.5 - Fast + cheap (recommended)"; MaxTokens = 8192 },
@{ Id = "claude-sonnet-4-20250514"; Label = "Sonnet 4 - Fast + capable"; MaxTokens = 8192 },
@{ Id = "claude-sonnet-4-5-20250929"; Label = "Sonnet 4.5 - Best balance"; MaxTokens = 16384 },
@{ Id = "claude-opus-4-6"; Label = "Opus 4.6 - Most capable"; MaxTokens = 32768 }
@{ Id = "claude-haiku-4-5-20251001"; Label = "Haiku 4.5 - Fast + cheap (recommended)"; MaxTokens = 8192; MaxContextTokens = 180000 },
@{ Id = "claude-sonnet-4-20250514"; Label = "Sonnet 4 - Fast + capable"; MaxTokens = 8192; MaxContextTokens = 180000 },
@{ Id = "claude-sonnet-4-5-20250929"; Label = "Sonnet 4.5 - Best balance"; MaxTokens = 16384; MaxContextTokens = 180000 },
@{ Id = "claude-opus-4-6"; Label = "Opus 4.6 - Most capable"; MaxTokens = 32768; MaxContextTokens = 180000 }
)
openai = @(
@{ Id = "gpt-5-mini"; Label = "GPT-5 Mini - Fast + cheap (recommended)"; MaxTokens = 16384 },
@{ Id = "gpt-5.2"; Label = "GPT-5.2 - Most capable"; MaxTokens = 16384 }
@{ Id = "gpt-5-mini"; Label = "GPT-5 Mini - Fast + cheap (recommended)"; MaxTokens = 16384; MaxContextTokens = 120000 },
@{ Id = "gpt-5.2"; Label = "GPT-5.2 - Most capable"; MaxTokens = 16384; MaxContextTokens = 120000 }
)
gemini = @(
@{ Id = "gemini-3-flash-preview"; Label = "Gemini 3 Flash - Fast (recommended)"; MaxTokens = 8192 },
@{ Id = "gemini-3.1-pro-preview"; Label = "Gemini 3.1 Pro - Best quality"; MaxTokens = 8192 }
@{ Id = "gemini-3-flash-preview"; Label = "Gemini 3 Flash - Fast (recommended)"; MaxTokens = 8192; MaxContextTokens = 900000 },
@{ Id = "gemini-3.1-pro-preview"; Label = "Gemini 3.1 Pro - Best quality"; MaxTokens = 8192; MaxContextTokens = 900000 }
)
groq = @(
@{ Id = "moonshotai/kimi-k2-instruct-0905"; Label = "Kimi K2 - Best quality (recommended)"; MaxTokens = 8192 },
@{ Id = "openai/gpt-oss-120b"; Label = "GPT-OSS 120B - Fast reasoning"; MaxTokens = 8192 }
@{ Id = "moonshotai/kimi-k2-instruct-0905"; Label = "Kimi K2 - Best quality (recommended)"; MaxTokens = 8192; MaxContextTokens = 120000 },
@{ Id = "openai/gpt-oss-120b"; Label = "GPT-OSS 120B - Fast reasoning"; MaxTokens = 8192; MaxContextTokens = 120000 }
)
cerebras = @(
@{ Id = "zai-glm-4.7"; Label = "ZAI-GLM 4.7 - Best quality (recommended)"; MaxTokens = 8192 },
@{ Id = "qwen3-235b-a22b-instruct-2507"; Label = "Qwen3 235B - Frontier reasoning"; MaxTokens = 8192 }
@{ Id = "zai-glm-4.7"; Label = "ZAI-GLM 4.7 - Best quality (recommended)"; MaxTokens = 8192; MaxContextTokens = 120000 },
@{ Id = "qwen3-235b-a22b-instruct-2507"; Label = "Qwen3 235B - Frontier reasoning"; MaxTokens = 8192; MaxContextTokens = 120000 }
)
}
@@ -838,10 +838,10 @@ function Get-ModelSelection {
$choices = $ModelChoices[$ProviderId]
if (-not $choices -or $choices.Count -eq 0) {
return @{ Model = $DefaultModels[$ProviderId]; MaxTokens = 8192 }
return @{ Model = $DefaultModels[$ProviderId]; MaxTokens = 8192; MaxContextTokens = 120000 }
}
if ($choices.Count -eq 1) {
return @{ Model = $choices[0].Id; MaxTokens = $choices[0].MaxTokens }
return @{ Model = $choices[0].Id; MaxTokens = $choices[0].MaxTokens; MaxContextTokens = $choices[0].MaxContextTokens }
}
# Find default index from previous model (if same provider)
@@ -874,7 +874,7 @@ function Get-ModelSelection {
$sel = $choices[$num - 1]
Write-Host ""
Write-Ok "Model: $($sel.Id)"
return @{ Model = $sel.Id; MaxTokens = $sel.MaxTokens }
return @{ Model = $sel.Id; MaxTokens = $sel.MaxTokens; MaxContextTokens = $sel.MaxContextTokens }
}
}
Write-Color -Text "Invalid choice. Please enter 1-$($choices.Count)" -Color Red
@@ -891,11 +891,12 @@ Write-Step -Number "" -Text "Configuring LLM provider..."
$HiveConfigDir = Join-Path $env:USERPROFILE ".hive"
$HiveConfigFile = Join-Path $HiveConfigDir "configuration.json"
$SelectedProviderId = ""
$SelectedEnvVar = ""
$SelectedModel = ""
$SelectedMaxTokens = 8192
$SubscriptionMode = ""
$SelectedProviderId = ""
$SelectedEnvVar = ""
$SelectedModel = ""
$SelectedMaxTokens = 8192
$SelectedMaxContextTokens = 120000
$SubscriptionMode = ""
# ── Credential detection (silent — just set flags) ───────────
$ClaudeCredDetected = $false
@@ -1071,20 +1072,22 @@ switch ($num) {
Write-Host ""
exit 1
}
$SubscriptionMode = "claude_code"
$SelectedProviderId = "anthropic"
$SelectedModel = "claude-opus-4-6"
$SelectedMaxTokens = 32768
$SubscriptionMode = "claude_code"
$SelectedProviderId = "anthropic"
$SelectedModel = "claude-opus-4-6"
$SelectedMaxTokens = 32768
$SelectedMaxContextTokens = 180000
Write-Host ""
Write-Ok "Using Claude Code subscription"
}
2 {
# ZAI Code Subscription
$SubscriptionMode = "zai_code"
$SelectedProviderId = "openai"
$SelectedEnvVar = "ZAI_API_KEY"
$SelectedModel = "glm-5"
$SelectedMaxTokens = 32768
$SubscriptionMode = "zai_code"
$SelectedProviderId = "openai"
$SelectedEnvVar = "ZAI_API_KEY"
$SelectedModel = "glm-5"
$SelectedMaxTokens = 32768
$SelectedMaxContextTokens = 120000
Write-Host ""
Write-Ok "Using ZAI Code subscription"
Write-Color -Text " Model: glm-5 | API: api.z.ai" -Color DarkGray
@@ -1113,21 +1116,23 @@ switch ($num) {
}
}
if ($CodexCredDetected) {
$SubscriptionMode = "codex"
$SelectedProviderId = "openai"
$SelectedModel = "gpt-5.3-codex"
$SelectedMaxTokens = 16384
$SubscriptionMode = "codex"
$SelectedProviderId = "openai"
$SelectedModel = "gpt-5.3-codex"
$SelectedMaxTokens = 16384
$SelectedMaxContextTokens = 120000
Write-Host ""
Write-Ok "Using OpenAI Codex subscription"
}
}
4 {
# Kimi Code Subscription
$SubscriptionMode = "kimi_code"
$SelectedProviderId = "kimi"
$SelectedEnvVar = "KIMI_API_KEY"
$SelectedModel = "kimi-k2.5"
$SelectedMaxTokens = 32768
$SubscriptionMode = "kimi_code"
$SelectedProviderId = "kimi"
$SelectedEnvVar = "KIMI_API_KEY"
$SelectedModel = "kimi-k2.5"
$SelectedMaxTokens = 32768
$SelectedMaxContextTokens = 120000
Write-Host ""
Write-Ok "Using Kimi Code subscription"
Write-Color -Text " Model: kimi-k2.5 | API: api.kimi.com/coding" -Color DarkGray
@@ -1349,8 +1354,9 @@ if ($SubscriptionMode -eq "kimi_code") {
# Prompt for model if not already selected (manual provider path)
if ($SelectedProviderId -and -not $SelectedModel) {
$modelSel = Get-ModelSelection $SelectedProviderId
$SelectedModel = $modelSel.Model
$SelectedMaxTokens = $modelSel.MaxTokens
$SelectedModel = $modelSel.Model
$SelectedMaxTokens = $modelSel.MaxTokens
$SelectedMaxContextTokens = $modelSel.MaxContextTokens
}
# Save configuration
@@ -1367,9 +1373,10 @@ if ($SelectedProviderId) {
$config = @{
llm = @{
provider = $SelectedProviderId
model = $SelectedModel
max_tokens = $SelectedMaxTokens
provider = $SelectedProviderId
model = $SelectedModel
max_tokens = $SelectedMaxTokens
max_context_tokens = $SelectedMaxContextTokens
}
created_at = (Get-Date).ToUniversalTime().ToString("yyyy-MM-ddTHH:mm:ss+00:00")
}
+11 -1
View File
@@ -68,10 +68,16 @@ interface LeaderboardEntry {
// ---------------------------------------------------------------------------
const POINTS: Record<string, number> = {
// Integration bounties
"bounty:test": 20,
"bounty:docs": 20,
"bounty:code": 30,
"bounty:new-tool": 75,
// Standard bounties
"bounty:small": 10,
"bounty:medium": 30,
"bounty:large": 75,
"bounty:extreme": 150,
};
// ---------------------------------------------------------------------------
@@ -276,6 +282,10 @@ function formatBountyNotification(bounty: BountyResult): string {
docs: "\u{1F4DD}",
code: "\u{1F527}",
"new-tool": "\u{2B50}",
small: "\u{1F4A1}",
medium: "\u{1F6E0}",
large: "\u{1F680}",
extreme: "\u{1F525}",
};
const emoji = typeEmoji[bounty.bountyType] ?? "\u{1F3AF}";
@@ -301,7 +311,7 @@ function formatLeaderboard(entries: LeaderboardEntry[]): string {
const medals = ["\u{1F947}", "\u{1F948}", "\u{1F949}"];
let msg = "**\u{1F3C6} Integration Bounty Leaderboard**\n\n";
let msg = "**\u{1F3C6} Bounty Leaderboard**\n\n";
for (let i = 0; i < top10.length; i++) {
const entry = top10[i];
+8 -2
View File
@@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Creates GitHub labels for the Integration Bounty Program.
# Creates GitHub labels for the Bounty Program.
# Usage: ./scripts/setup-bounty-labels.sh [owner/repo]
# Requires: gh CLI authenticated
@@ -9,12 +9,18 @@ REPO="${1:-adenhq/hive}"
echo "Setting up bounty labels for $REPO..."
# Bounty type labels
# Integration bounty labels
gh label create "bounty:test" --repo "$REPO" --color "1D76DB" --description "Bounty: test a tool with real API key (20 pts)" --force
gh label create "bounty:docs" --repo "$REPO" --color "FBCA04" --description "Bounty: write or improve documentation (20 pts)" --force
gh label create "bounty:code" --repo "$REPO" --color "D93F0B" --description "Bounty: health checker, bug fix, or improvement (30 pts)" --force
gh label create "bounty:new-tool" --repo "$REPO" --color "6F42C1" --description "Bounty: build a new integration from scratch (75 pts)" --force
# Standard bounty labels
gh label create "bounty:small" --repo "$REPO" --color "C2E0C6" --description "Bounty: quick fix — typos, links, error messages (10 pts)" --force
gh label create "bounty:medium" --repo "$REPO" --color "0E8A16" --description "Bounty: bug fix, tests, guides, CLI improvements (30 pts)" --force
gh label create "bounty:large" --repo "$REPO" --color "B60205" --description "Bounty: new feature, perf work, architecture docs (75 pts)" --force
gh label create "bounty:extreme" --repo "$REPO" --color "000000" --description "Bounty: major subsystem, security audit, core refactor (150 pts)" --force
# Difficulty labels
gh label create "difficulty:easy" --repo "$REPO" --color "BFD4F2" --description "Good first contribution" --force
gh label create "difficulty:medium" --repo "$REPO" --color "D4C5F9" --description "Requires some familiarity" --force
+2 -1
View File
@@ -56,7 +56,8 @@ def register_advanced_tools(mcp: FastMCP) -> None:
return {"ok": True, "action": "wait", "condition": "selector", "selector": selector}
elif text:
await page.wait_for_function(
f"document.body.innerText.includes('{text}')",
"(text) => document.body.innerText.includes(text)",
arg=text,
timeout=timeout_ms,
)
return {"ok": True, "action": "wait", "condition": "text", "text": text}
@@ -0,0 +1,42 @@
"""Tests for browser advanced tools."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastmcp import FastMCP
from gcu.browser.tools.advanced import register_advanced_tools
@pytest.fixture
def mcp() -> FastMCP:
"""Create a fresh FastMCP instance for testing."""
return FastMCP("test-browser-advanced")
@pytest.fixture
def browser_wait_fn(mcp):
"""Register browser tools and return the browser_wait function."""
register_advanced_tools(mcp)
return mcp._tool_manager._tools["browser_wait"].fn
@pytest.mark.asyncio
async def test_browser_wait_passes_text_as_function_argument(browser_wait_fn):
"""Quoted and multiline text should be passed as data, not JS source."""
text = "O'Reilly\nMedia"
page = MagicMock()
page.wait_for_function = AsyncMock()
session = MagicMock()
session.get_page.return_value = page
with patch("gcu.browser.tools.advanced.get_session", return_value=session):
result = await browser_wait_fn(text=text, timeout_ms=1234)
assert result == {"ok": True, "action": "wait", "condition": "text", "text": text}
page.wait_for_function.assert_awaited_once_with(
"(text) => document.body.innerText.includes(text)",
arg=text,
timeout=1234,
)