Compare commits

..

46 Commits

Author SHA1 Message Date
Timothy 65aa5629e8 chore: fix lint 2026-03-04 08:34:01 -08:00
Omar Shareef 7193d09bed formatting warning fix 2026-03-04 16:43:46 +02:00
Omar Shareef 49f8fae0b4 fix: systematically enforce UTF-8 encoding across tools and core to fix Windows charmap decode errors 2026-03-04 16:04:53 +02:00
Omar Shareef e1a490756e fix: systematically enforce UTF-8 encoding across tools and core to fix Windows charmap decode errors 2026-03-04 15:58:03 +02:00
Omar Shareef 91bfaf36e3 fix(core): add utf-8 encoding to backend open calls
This fixes a charmap decoding error on Windows when opening agent files without explicitly specifying the encoding.
2026-03-04 13:32:59 +02:00
RichardTang-Aden 7b98a6613a Merge pull request #5656 from aden-hive/feature/queen-worker-comm
Release / Create Release (push) Waiting to run
Feature/queen worker comm
2026-03-02 22:50:13 -08:00
Richard Tang 26481e27a6 fix: fix tests and lint 2026-03-02 22:46:38 -08:00
Richard Tang bb227b3d73 chore: ruff lint 2026-03-02 21:30:07 -08:00
Richard Tang 8a0cf5e0ae Merge remote-tracking branch 'origin/feature/queen-worker-comm' into feature/queen-worker-comm 2026-03-02 21:27:22 -08:00
Timothy 69218d5699 chore: lint codes 2026-03-02 20:16:34 -08:00
Timothy 7d1433af21 fix: queen agent flakiness 2026-03-02 19:57:18 -08:00
Richard Tang 0bfbf1e9c5 fix: unused /hive-credentials prompts in the validation 2026-03-02 19:53:57 -08:00
Richard Tang 1ca4f5b22b refactor: update the preload_validation logics 2026-03-02 19:46:50 -08:00
Richard Tang 0984e4c1e8 feat: add gcu subagent validation and refactor the prestart validation steps 2026-03-02 18:35:25 -08:00
Sarthak Karode 4cbf5a7434 feat(core): add pytest framework testing integration with helpful error messages (#5485) 2026-03-03 10:01:33 +08:00
Hundao b33178c5be fix(graph): move auto-block grace period check before _await_user_input (#5672)
The grace period logic for client-facing auto-blocks was placed after
_await_user_input(), which blocks forever since no inject_event is
scheduled for text-only turns. This caused test_text_after_user_input
_goes_to_judge to hang indefinitely, blocking CI framework tests.

Move the grace period check before the blocking call so that within
the grace window, auto-blocks with missing outputs skip blocking
entirely and continue to the next LLM turn for judge RETRY pressure.

Also adds an _auto_missing check: nodes with no missing outputs
(e.g. queen monitoring with output_keys=[]) should still block
as their text-only output is legitimate conversation.

Fixes #5633
2026-03-03 09:39:14 +08:00
Richard Tang dc6a336c60 fix: removed the unused build_capability_summary 2026-03-02 16:26:47 -08:00
Richard Tang b855336448 chore: ruff format issue 2026-03-02 15:47:30 -08:00
Richard Tang de021977fd Merge remote-tracking branch 'origin/main' into feature/queen-worker-comm 2026-03-02 15:39:15 -08:00
Timothy cd2b3fcd16 Merge branch 'feature/new-inbox-management-agent' into feature/queen-worker-comm 2026-03-02 14:46:14 -08:00
Timothy b64024ede5 fix: gcu error log throwing 2026-03-02 14:45:57 -08:00
bryan a280d23113 fix: removing escalate to coder from worker tools 2026-03-02 12:02:35 -08:00
Timothy 41785abdba fix: rephrasing 2026-03-02 11:54:22 -08:00
Timothy de494c7e55 Merge branch 'feature/queen-worker-comm' into feature/new-inbox-management-agent 2026-03-02 11:44:08 -08:00
Timothy 5fa0903ea8 fix: teach email agent to search emails 2026-03-02 11:43:40 -08:00
Timothy 7bd99fe074 fix: email inbox management agent 2026-03-02 11:01:21 -08:00
bryan c838e1ca6d feat: agent building animation 2026-03-02 10:54:57 -08:00
bryan f475923353 feat: subagents populate node panel 2026-03-02 09:59:24 -08:00
Timothy 43f43c92e3 Merge branch 'feature/queen-worker-comm' into feature/new-inbox-management-agent 2026-03-02 09:40:55 -08:00
Timothy 5463134322 fix: inbox management template v2 2026-03-02 09:40:36 -08:00
Timothy 3fbb392103 fix: add credentials to queen lifecycle tools 2026-03-02 09:39:38 -08:00
RichardTang-Aden a162da17e1 Merge pull request #5639 from RichardTang-Aden/main
feat: support Gemini 3.1 pro
2026-03-02 09:24:27 -08:00
Richard Tang b565134d57 chore: fix the ruff lint 2026-03-02 09:23:02 -08:00
Richard Tang 3aafc89912 feat: support Gemini 3.1 pro 2026-03-02 09:20:48 -08:00
bryan 93449f92fe fix: clear build cache in quickstart 2026-03-02 09:00:48 -08:00
Bryan @ Aden d766e68d42 Merge pull request #5494 from Antiarin/security/harden-validate-agent-path
[Bug][Security]: agent_path accepts arbitrary filesystem paths with no validation
2026-03-02 16:57:51 +00:00
Hundao 1d8b1f9774 fix: enforce 0600 permissions on OAuth token files (#5631)
* fix: enforce 0600 permissions on OAuth token files

Credential files were written with default umask permissions.
Use os.open with explicit 0o600 mode to ensure token files
are always owner-read/write only, regardless of umask.

Fixes #5530

* style: fix line too long in checkpoint_store.py
2026-03-02 18:30:40 +08:00
Rajneesh Chaudhary 5ea9abae83 fix(core): prevent sse critical event queue from blocking event bus (#5533) (#5536)
Disconnects slow clients instead of blocking the publisher task.

Signed-off-by: Rajneesh180 <rajneeshrehsaan48@gmail.com>
2026-03-02 17:57:52 +08:00
ArshpreetSingh04 15957499c5 docs(core): fix outdated goal-agent path reference in README (#5629)
Update the MCP client configuration example in core/README.md to replace the outdated `goal-agent` path with the correct `hive/core` path.

Fixes #5628
2026-03-02 17:07:25 +08:00
Timothy 0b50d9e874 fix: block idle event 2026-03-01 21:01:59 -08:00
Timothy a1e54922bd fix: timer count down update 2026-03-01 20:22:46 -08:00
Timothy 63c0ca34ea Merge branch 'feature/agent-runtime-idling' into feature/queen-worker-comm 2026-03-01 20:14:46 -08:00
Timothy 8cac49cd91 feat: frontend display of scheduler count down 2026-03-01 20:13:21 -08:00
RichardTang-Aden d562670425 Merge pull request #5501 from aden-hive/feat/open-hive
Feat: v6 windows compatibility support
2026-02-27 19:58:48 -08:00
RichardTang-Aden 4ba950f155 Merge pull request #5499 from aden-hive/feat/open-hive
Release / Create Release (push) Waiting to run
feat: tool call revamp, Intercom & GA integrations, credential improvements
2026-02-27 19:41:11 -08:00
Antiarin e5b6c8581a feat: implement agent path validation and restrict loading to allowed directories 2026-02-28 02:56:31 +05:30
78 changed files with 3062 additions and 734 deletions
+1
View File
@@ -78,3 +78,4 @@ core/tests/*dumps/*
screenshots/*
.gemini/*
+1 -1
View File
@@ -64,7 +64,7 @@ To use the agent builder with Claude Desktop or other MCP clients, add this to y
"agent-builder": {
"command": "python",
"args": ["-m", "framework.mcp.agent_builder_server"],
"cwd": "/path/to/goal-agent"
"cwd": "/path/to/hive/core"
}
}
}
+4 -2
View File
@@ -15,6 +15,7 @@ import base64
import hashlib
import http.server
import json
import os
import platform
import secrets
import subprocess
@@ -150,8 +151,9 @@ def save_credentials(token_data: dict, account_id: str) -> None:
if "id_token" in token_data:
auth_data["tokens"]["id_token"] = token_data["id_token"]
CODEX_AUTH_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(CODEX_AUTH_FILE, "w") as f:
CODEX_AUTH_FILE.parent.mkdir(parents=True, exist_ok=True, mode=0o700)
fd = os.open(CODEX_AUTH_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
with os.fdopen(fd, "w") as f:
json.dump(auth_data, f, indent=2)
+1 -1
View File
@@ -10,7 +10,7 @@ def _load_preferred_model() -> str:
config_path = Path.home() / ".hive" / "configuration.json"
if config_path.exists():
try:
with open(config_path) as f:
with open(config_path, encoding="utf-8") as f:
config = json.load(f)
llm = config.get("llm", {})
if llm.get("provider") and llm.get("model"):
@@ -7,11 +7,11 @@ from framework.graph import NodeSpec
# Load reference docs at import time so they're always in the system prompt.
# No voluntary read_file() calls needed — the LLM gets everything upfront.
_ref_dir = Path(__file__).parent.parent / "reference"
_framework_guide = (_ref_dir / "framework_guide.md").read_text()
_file_templates = (_ref_dir / "file_templates.md").read_text()
_anti_patterns = (_ref_dir / "anti_patterns.md").read_text()
_framework_guide = (_ref_dir / "framework_guide.md").read_text(encoding="utf-8")
_file_templates = (_ref_dir / "file_templates.md").read_text(encoding="utf-8")
_anti_patterns = (_ref_dir / "anti_patterns.md").read_text(encoding="utf-8")
_gcu_guide_path = _ref_dir / "gcu_guide.md"
_gcu_guide = _gcu_guide_path.read_text() if _gcu_guide_path.exists() else ""
_gcu_guide = _gcu_guide_path.read_text(encoding="utf-8") if _gcu_guide_path.exists() else ""
def _is_gcu_enabled() -> bool:
@@ -410,7 +410,10 @@ If list_agent_tools() shows these don't exist, use alternatives \
**Node rules**:
- **2-4 nodes MAX.** Never exceed 4. Merge thin nodes aggressively.
- A node with 0 tools is NOT a real node merge it.
- node_type always "event_loop"
- node_type "event_loop" for all regular graph nodes. Use "gcu" ONLY for
browser automation subagents (see GCU appendix). GCU nodes MUST be in a
parent node's sub_agents list, NEVER connected via edges, and NEVER used
as entry/terminal nodes.
- max_node_visits default is 0 (unbounded) correct for forever-alive. \
Only set >0 in one-shot agents with bounded feedback loops.
- Feedback inputs: nullable_output_keys
@@ -558,6 +561,11 @@ critical issue. Use sparingly.
this session. If a worker is already loaded, it is automatically unloaded \
first. Call after building and validating an agent to make it available \
immediately.
## Credentials
- list_credentials(credential_id?) List all authorized credentials in the \
local store. Returns IDs, aliases, status, and identity metadata (never \
secrets). Optionally filter by credential_id.
"""
_queen_behavior = """
@@ -608,14 +616,29 @@ If NO worker is loaded, say so and offer to build one.
- For tasks matching the worker's goal, call start_worker(task).
- For everything else, do it directly.
## When the user clicks Run (external event notification)
When you receive an event that the user clicked Run:
- If the worker started successfully, briefly acknowledge it do NOT \
repeat the full status. The user can see the graph is running.
- If the worker failed to start (credential or structural error), \
explain the problem clearly and help fix it. For credential errors, \
guide the user to set up the missing credentials. For structural \
issues, offer to fix the agent graph directly.
## When worker is running:
- If the user asks about progress, call get_worker_status().
- If the user asks about progress, call get_worker_status() ONCE and \
report the result. Do NOT poll in a loop.
- NEVER call get_worker_status() repeatedly without user input in between. \
The worker will surface results through client-facing nodes. You do not \
need to monitor it. One check per user request is enough.
- If the user has a concern or instruction for the worker, call \
inject_worker_message(content) to relay it.
- You can still do coding tasks directly while the worker runs.
- If an escalation ticket arrives from the judge, assess severity:
- Low/transient: acknowledge silently, do not disturb the user.
- High/critical: notify the user with a brief analysis and suggested action.
- After starting the worker or checking its status, WAIT for the user's \
next message. Do not take autonomous actions unless the user asks.
## When worker asks user a question:
- The system will route the user's response directly to the worker. \
@@ -797,6 +820,8 @@ queen_node = NodeSpec(
"notify_operator",
# Agent loading
"load_built_agent",
# Credentials
"list_credentials",
],
system_prompt=(
"You are the Queen — the user's primary interface. You are a coding agent "
@@ -822,6 +847,8 @@ ALL_QUEEN_TOOLS = _SHARED_TOOLS + [
"notify_operator",
# Agent loading
"load_built_agent",
# Credentials
"list_credentials",
]
__all__ = [
@@ -107,3 +107,5 @@ def test_research_routes_back_to_interact(self):
24. **Not using auto_responder for client-facing nodes** — Tests with client-facing nodes hang without an auto-responder that injects input. But note: even WITH auto_responder, forever-alive agents still hang because the graph never terminates. Auto-responder only helps for agents with terminal nodes.
25. **Manually wiring browser tools on event_loop nodes** — If the agent needs browser automation, use `node_type="gcu"` which auto-includes all browser tools and prepends best-practices guidance. Do NOT manually list browser tool names on event_loop nodes — they may not exist in the MCP server or may be incomplete. See the GCU Guide appendix.
26. **Using GCU nodes as regular graph nodes** — GCU nodes (`node_type="gcu"`) are exclusively subagents. They must ONLY appear in a parent node's `sub_agents=["gcu-node-id"]` list and be invoked via `delegate_to_sub_agent()`. They must NEVER be connected via edges, used as entry nodes, or used as terminal nodes. If a GCU node appears as an edge source or target, the graph will fail pre-load validation.
+2 -2
View File
@@ -660,7 +660,7 @@ class GraphBuilder:
# Generate Python code
code = self._generate_code(graph)
Path(path).write_text(code)
Path(path).write_text(code, encoding="utf-8")
self.session.phase = BuildPhase.EXPORTED
self._save_session()
@@ -754,7 +754,7 @@ class GraphBuilder:
"""Save session to disk."""
self.session.updated_at = datetime.now()
path = self.storage_path / f"{self.session.id}.json"
path.write_text(self.session.model_dump_json(indent=2))
path.write_text(self.session.model_dump_json(indent=2), encoding="utf-8")
def _load_session(self, session_id: str) -> BuildSession:
"""Load session from disk."""
+1 -1
View File
@@ -69,7 +69,7 @@ def save_credential_key(key: str) -> Path:
# Restrict the secrets directory itself
path.parent.chmod(stat.S_IRWXU) # 0o700
path.write_text(key)
path.write_text(key, encoding="utf-8")
path.chmod(stat.S_IRUSR | stat.S_IWUSR) # 0o600
os.environ[CREDENTIAL_KEY_ENV_VAR] = key
+1 -1
View File
@@ -568,7 +568,7 @@ def _load_nodes_from_python_agent(agent_path: Path) -> list:
def _load_nodes_from_json_agent(agent_json: Path) -> list:
"""Load nodes from a JSON-based agent."""
try:
with open(agent_json) as f:
with open(agent_json, encoding="utf-8") as f:
data = json.load(f)
from framework.graph import NodeSpec
+3 -3
View File
@@ -227,7 +227,7 @@ class EncryptedFileStorage(CredentialStorage):
index_path = self.base_path / "metadata" / "index.json"
if not index_path.exists():
return []
with open(index_path) as f:
with open(index_path, encoding="utf-8") as f:
index = json.load(f)
return list(index.get("credentials", {}).keys())
@@ -268,7 +268,7 @@ class EncryptedFileStorage(CredentialStorage):
index_path = self.base_path / "metadata" / "index.json"
if index_path.exists():
with open(index_path) as f:
with open(index_path, encoding="utf-8") as f:
index = json.load(f)
else:
index = {"credentials": {}, "version": "1.0"}
@@ -283,7 +283,7 @@ class EncryptedFileStorage(CredentialStorage):
index["last_modified"] = datetime.now(UTC).isoformat()
with open(index_path, "w") as f:
with open(index_path, "w", encoding="utf-8") as f:
json.dump(index, f, indent=2)
+1 -5
View File
@@ -159,11 +159,7 @@ class CredentialValidationResult:
f" {c.env_var} for {_label(c)}"
f"\n Connect this integration at hive.adenhq.com first."
)
lines.append(
"\nTo fix: run /hive-credentials in Claude Code."
"\nIf you've already set up credentials, "
"restart your terminal to load them."
)
lines.append("\nIf you've already set up credentials, restart your terminal to load them.")
return "\n".join(lines)
+26 -5
View File
@@ -107,17 +107,38 @@ _TC_ARG_LIMIT = 200 # max chars per tool_call argument after compaction
def _compact_tool_calls(tool_calls: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Truncate tool_call arguments to save context tokens during compaction.
Preserves ``id``, ``type``, and ``function.name`` exactly. Truncates
``function.arguments`` (a JSON string) to at most ``_TC_ARG_LIMIT`` chars
so that large payloads (e.g. set_output with full findings) don't survive
compaction and defeat the purpose of context reduction.
Preserves ``id``, ``type``, and ``function.name`` exactly. When arguments
exceed ``_TC_ARG_LIMIT``, replaces the full JSON string with a compact
**valid** JSON summary. The Anthropic API parses tool_call arguments and
rejects requests with malformed JSON (e.g. unterminated strings), so we
must never produce broken JSON here.
"""
compact = []
for tc in tool_calls:
func = tc.get("function", {})
args = func.get("arguments", "")
if len(args) > _TC_ARG_LIMIT:
args = args[:_TC_ARG_LIMIT] + "…[truncated]"
# Build a valid JSON summary instead of slicing mid-string.
# Try to extract top-level keys for a meaningful preview.
try:
parsed = json.loads(args)
if isinstance(parsed, dict):
# Preserve key names, truncate values
summary_parts = []
for k, v in parsed.items():
v_str = str(v)
if len(v_str) > 60:
v_str = v_str[:60] + "..."
summary_parts.append(f"{k}={v_str}")
summary = ", ".join(summary_parts)
if len(summary) > _TC_ARG_LIMIT:
summary = summary[:_TC_ARG_LIMIT] + "..."
args = json.dumps({"_compacted": summary})
else:
args = json.dumps({"_compacted": str(parsed)[:_TC_ARG_LIMIT]})
except (json.JSONDecodeError, TypeError):
# Args were already invalid JSON — wrap the preview safely
args = json.dumps({"_compacted": args[:_TC_ARG_LIMIT]})
compact.append(
{
"id": tc.get("id", ""),
+48 -39
View File
@@ -338,6 +338,10 @@ class AsyncEntryPointSpec(BaseModel):
max_concurrent: int = Field(
default=10, description="Maximum concurrent executions for this entry point"
)
max_resurrections: int = Field(
default=3,
description="Auto-restart on non-fatal failure (0 to disable)",
)
model_config = {"extra": "allow"}
@@ -503,45 +507,6 @@ class GraphSpec(BaseModel):
"""Get all edges entering a node."""
return [e for e in self.edges if e.target == node_id]
def build_capability_summary(self, from_node_id: str) -> str:
"""Build a summary of the agent's downstream workflow phases and tools.
Walks the graph from *from_node_id* and collects all reachable nodes
(excluding the starting node itself) so that client-facing entry nodes
can inform the user about what the overall agent is capable of.
Returns:
A formatted string listing each downstream node's name,
description, and tools or an empty string when there are
no downstream nodes.
"""
reachable: list[Any] = []
visited: set[str] = set()
queue = [from_node_id]
while queue:
nid = queue.pop()
if nid in visited:
continue
visited.add(nid)
node = self.get_node(nid)
if node and nid != from_node_id:
reachable.append(node)
for edge in self.get_outgoing_edges(nid):
queue.append(edge.target)
if not reachable:
return ""
lines = [
"## Agent Capabilities",
"This agent has the following workflow phases and tools:",
]
for node in reachable:
tool_str = f" (tools: {', '.join(node.tools)})" if node.tools else ""
lines.append(f"- {node.name}: {node.description}{tool_str}")
return "\n".join(lines)
def detect_fan_out_nodes(self) -> dict[str, list[str]]:
"""
Detect nodes that fan-out to multiple targets.
@@ -741,4 +706,48 @@ class GraphSpec(BaseModel):
else:
seen_keys[key] = node_id
# GCU nodes must only be used as subagents
gcu_node_ids = {n.id for n in self.nodes if n.node_type == "gcu"}
if gcu_node_ids:
# GCU nodes must not be entry nodes
if self.entry_node in gcu_node_ids:
errors.append(
f"GCU node '{self.entry_node}' is used as entry node. "
"GCU nodes must only be used as subagents via delegate_to_sub_agent()."
)
# GCU nodes must not be terminal nodes
for term in self.terminal_nodes:
if term in gcu_node_ids:
errors.append(
f"GCU node '{term}' is used as terminal node. "
"GCU nodes must only be used as subagents."
)
# GCU nodes must not be connected via edges
for edge in self.edges:
if edge.source in gcu_node_ids:
errors.append(
f"GCU node '{edge.source}' is used as edge source (edge '{edge.id}'). "
"GCU nodes must only be used as subagents, not connected via edges."
)
if edge.target in gcu_node_ids:
errors.append(
f"GCU node '{edge.target}' is used as edge target (edge '{edge.id}'). "
"GCU nodes must only be used as subagents, not connected via edges."
)
# GCU nodes must be referenced in at least one parent's sub_agents
referenced_subagents = set()
for node in self.nodes:
for sa_id in node.sub_agents or []:
referenced_subagents.add(sa_id)
orphaned = gcu_node_ids - referenced_subagents
for nid in orphaned:
errors.append(
f"GCU node '{nid}' is not referenced in any node's sub_agents list. "
"GCU nodes must be declared as subagents of a parent node."
)
return errors
+152 -117
View File
@@ -17,8 +17,8 @@ import logging
import re
import time
from collections.abc import Awaitable, Callable
from datetime import datetime, timezone
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Literal, Protocol, runtime_checkable
@@ -57,7 +57,7 @@ class _EscalationReceiver:
self._event = asyncio.Event()
self._response: str | None = None
async def inject_event(self, content: str) -> None:
async def inject_event(self, content: str, *, is_client_input: bool = False) -> None:
"""Called by ExecutionStream.inject_input() when the user responds."""
self._response = content
self._event.set()
@@ -470,7 +470,6 @@ class EventLoopNode(NodeProtocol):
if ctx.node_spec.client_facing and not ctx.event_triggered:
if stream_id != "queen":
tools.append(self._build_ask_user_tool())
tools.append(self._build_escalate_tool())
# Add delegate_to_sub_agent tool if:
# - Node has sub_agents defined
@@ -512,6 +511,7 @@ class EventLoopNode(NodeProtocol):
# 5. Stall / doom loop detection state (restored from cursor if resuming)
recent_responses: list[str] = _restored_recent_responses
recent_tool_fingerprints: list[list[tuple[str, str]]] = _restored_tool_fingerprints
_consecutive_empty_turns: int = 0
# 6. Main loop
for iteration in range(start_iteration, self._config.max_iterations):
@@ -650,6 +650,22 @@ class EventLoopNode(NodeProtocol):
error=str(e)[:500],
execution_id=execution_id,
)
# For malformed tool call errors, inject feedback into
# the conversation before retrying. Retrying with the
# same messages is futile — the LLM will reproduce the
# same truncated JSON. The nudge tells it to shorten
# its arguments.
error_str = str(e).lower()
if "failed to parse tool call" in error_str:
await conversation.add_user_message(
"[System: Your previous tool call had malformed "
"JSON arguments (likely truncated). Keep your "
"tool call arguments shorter and simpler. Do NOT "
"repeat the same long argument — summarize or "
"split into multiple calls.]"
)
await asyncio.sleep(delay)
continue # retry same iteration
@@ -775,6 +791,57 @@ class EventLoopNode(NodeProtocol):
latency_ms=latency_ms,
conversation=conversation if _is_continuous else None,
)
else:
# Ghost empty stream: LLM returned nothing and outputs
# are still missing. The conversation hasn't changed, so
# repeating the same call will produce the same empty
# result. Inject a nudge to break the cycle.
_consecutive_empty_turns += 1
logger.warning(
"[%s] iter=%d: empty response with missing outputs %s (consecutive=%d)",
node_id,
iteration,
missing,
_consecutive_empty_turns,
)
if _consecutive_empty_turns >= self._config.stall_detection_threshold:
# Persistent ghost stream — fail the node.
error_msg = (
f"Ghost empty stream: {_consecutive_empty_turns} "
f"consecutive empty responses with missing "
f"outputs {missing}"
)
latency_ms = int((time.time() - start_time) * 1000)
if ctx.runtime_logger:
ctx.runtime_logger.log_node_complete(
node_id=node_id,
node_name=ctx.node_spec.name,
node_type="event_loop",
success=False,
error=error_msg,
total_steps=iteration + 1,
tokens_used=total_input_tokens + total_output_tokens,
input_tokens=total_input_tokens,
output_tokens=total_output_tokens,
latency_ms=latency_ms,
exit_status="ghost_stream",
accept_count=_accept_count,
retry_count=_retry_count,
escalate_count=_escalate_count,
continue_count=_continue_count,
)
raise RuntimeError(error_msg)
# First nudge — inject a system message to break the
# empty-response cycle.
await conversation.add_user_message(
"[System: Your response was empty. You have required "
f"outputs that are not yet set: {missing}. Review "
"your task and call the appropriate tools to make "
"progress.]"
)
continue
else:
_consecutive_empty_turns = 0
# 6f. Stall detection
recent_responses.append(assistant_text)
@@ -836,7 +903,7 @@ class EventLoopNode(NodeProtocol):
mcp_tool_calls = [
tc
for tc in logged_tool_calls
if tc.get("tool_name") not in ("set_output", "ask_user", "escalate_to_coder")
if tc.get("tool_name") not in ("set_output", "ask_user")
]
if mcp_tool_calls:
fps = self._fingerprint_tool_calls(mcp_tool_calls)
@@ -889,11 +956,13 @@ class EventLoopNode(NodeProtocol):
# (a) Explicit ask_user() — blocks, then skips judge (6i).
# The LLM intentionally asked a question; judging before the
# user answers would inject confusing "missing outputs"
# feedback.
# (b) Auto-block — a text-only turn (no real tools, no
# set_output) from a client-facing node. Blocks for the
# user's response, then falls through to judge so models
# stuck in a clarification loop get RETRY feedback.
# feedback. Works for all client-facing nodes.
# (b) Auto-block (queen only) — a text-only turn (no real
# tools, no set_output) from the queen node. Blocks for
# the user's response, then falls through to judge so
# models stuck in a clarification loop get RETRY feedback.
# Workers are autonomous and don't auto-block — they use
# ask_user() explicitly when they need input.
#
# Turns that include tool calls or set_output are *work*, not
# conversation — they flow through without blocking.
@@ -904,13 +973,66 @@ class EventLoopNode(NodeProtocol):
if user_input_requested:
_cf_block = True
_cf_prompt = ask_user_prompt
elif assistant_text and not real_tool_results and not outputs_set:
# Text-only response from client-facing node — this is
# addressed to the user. Always block for their reply.
elif (
stream_id == "queen"
and assistant_text
and not real_tool_results
and not outputs_set
):
# Auto-block: only for the queen (conversational node).
# Workers are autonomous — they block only on explicit
# ask_user(). Text-only turns from workers are narration,
# not questions addressed to the user.
_cf_block = True
_cf_auto = True
if _cf_block:
# Auto-block grace: when required outputs are still
# missing and we're within the grace period, skip
# blocking and continue to the next LLM turn so the
# judge can apply RETRY pressure on lazy models.
# Without this, _await_user_input() would block
# forever since no inject_event is coming.
#
# When no outputs are missing (e.g. queen monitoring
# with output_keys=[]), text-only is legitimate
# conversation and should always block.
if _cf_auto:
_auto_missing = (
self._get_missing_output_keys(
accumulator,
ctx.node_spec.output_keys,
ctx.node_spec.nullable_output_keys,
)
if accumulator is not None
else True
)
if _auto_missing:
_cf_text_only_streak += 1
if _cf_text_only_streak <= self._config.cf_grace_turns:
_continue_count += 1
if ctx.runtime_logger:
iter_latency_ms = int((time.time() - iter_start) * 1000)
ctx.runtime_logger.log_step(
node_id=node_id,
node_type="event_loop",
step_index=iteration,
verdict="CONTINUE",
verdict_feedback=(
"Auto-block grace"
f" ({_cf_text_only_streak}"
f"/{self._config.cf_grace_turns})"
),
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
output_tokens=turn_tokens.get("output", 0),
latency_ms=iter_latency_ms,
)
continue
# Beyond grace — block below, then fall
# through to judge
if self._shutdown:
await self._publish_loop_completed(
stream_id, node_id, iteration + 1, execution_id
@@ -1013,18 +1135,11 @@ class EventLoopNode(NodeProtocol):
# -- Judge-skip decision after client-facing blocking --
#
# Explicit ask_user: skip judge while the agent is still
# gathering information from the user. BUT if all required
# outputs have already been set, don't skip — fall through to
# the judge so it can accept the completed node.
#
# Auto-block (text-only, no tools): skip judge within a
# grace period of N consecutive text-only turns. Normal
# conversations are 1-3 exchanges before set_output.
# After the grace period, fall through to judge so models
# stuck in a clarification loop get RETRY pressure.
# Explicit ask_user: skip judge while the agent is
# still gathering information from the user. BUT if
# all required outputs have already been set, don't
# skip -- fall through to the judge so it can accept.
if not _cf_auto:
# Explicit ask_user: skip judge only if outputs are incomplete
_missing = (
self._get_missing_output_keys(
accumulator,
@@ -1045,7 +1160,7 @@ class EventLoopNode(NodeProtocol):
node_type="event_loop",
step_index=iteration,
verdict="CONTINUE",
verdict_feedback="Blocked for ask_user input (skip judge)",
verdict_feedback=("Blocked for ask_user input (skip judge)"),
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
@@ -1053,31 +1168,9 @@ class EventLoopNode(NodeProtocol):
latency_ms=iter_latency_ms,
)
continue
# All outputs set fall through to judge for acceptance
# All outputs set -- fall through to judge
# Auto-block: apply grace period
_cf_text_only_streak += 1
if _cf_text_only_streak <= self._config.cf_grace_turns:
_continue_count += 1
if ctx.runtime_logger:
iter_latency_ms = int((time.time() - iter_start) * 1000)
ctx.runtime_logger.log_step(
node_id=node_id,
node_type="event_loop",
step_index=iteration,
verdict="CONTINUE",
verdict_feedback=(
f"Auto-block grace ({_cf_text_only_streak}"
f"/{self._config.cf_grace_turns})"
),
tool_calls=logged_tool_calls,
llm_text=assistant_text,
input_tokens=turn_tokens.get("input", 0),
output_tokens=turn_tokens.get("output", 0),
latency_ms=iter_latency_ms,
)
continue
# Beyond grace period — fall through to judge (6i)
# Auto-block beyond grace -- fall through to judge (6i)
# 6i. Judge evaluation
should_judge = (
@@ -1591,7 +1684,9 @@ class EventLoopNode(NodeProtocol):
# Phase 1: triage — handle framework tools immediately,
# queue real tools and subagents for parallel execution.
results_by_id: dict[str, ToolResult] = {}
timing_by_id: dict[str, dict[str, Any]] = {} # tool_use_id -> {start_timestamp, duration_s}
timing_by_id: dict[
str, dict[str, Any]
] = {} # tool_use_id -> {start_timestamp, duration_s}
pending_real: list[ToolCallEvent] = []
pending_subagent: list[ToolCallEvent] = []
@@ -1620,7 +1715,7 @@ class EventLoopNode(NodeProtocol):
if tc.tool_name == "set_output":
# --- Framework-level set_output handling ---
_tc_start = time.time()
_tc_ts = datetime.now(timezone.utc).isoformat()
_tc_ts = datetime.now(UTC).isoformat()
result = self._handle_set_output(tc.tool_input, ctx.node_spec.output_keys)
result = ToolResult(
tool_use_id=tc.tool_use_id,
@@ -1678,26 +1773,6 @@ class EventLoopNode(NodeProtocol):
)
results_by_id[tc.tool_use_id] = result
elif tc.tool_name == "escalate_to_coder":
# --- Framework-level escalation handling ---
if self._event_bus:
await self._event_bus.emit_escalation_requested(
stream_id=stream_id,
node_id=node_id,
reason=tc.tool_input.get("reason", ""),
context=tc.tool_input.get("context", ""),
execution_id=ctx.execution_id,
)
# Block like ask_user — the TUI loads the coder,
# and /back injects a message to unblock us.
user_input_requested = True
result = ToolResult(
tool_use_id=tc.tool_use_id,
content="Escalating to Hive Coder. You will resume when done.",
is_error=False,
)
results_by_id[tc.tool_use_id] = result
elif tc.tool_name == "delegate_to_sub_agent":
# --- Framework-level subagent delegation ---
# Queue for parallel execution in Phase 2
@@ -1775,7 +1850,7 @@ class EventLoopNode(NodeProtocol):
) -> tuple[ToolResult | BaseException, str, float]:
"""Execute a tool and return (result, start_iso, duration_s)."""
_s = time.time()
_iso = datetime.now(timezone.utc).isoformat()
_iso = datetime.now(UTC).isoformat()
try:
_r = await self._execute_tool(_tc)
except BaseException as _exc:
@@ -1796,7 +1871,7 @@ class EventLoopNode(NodeProtocol):
for tc, entry in zip(pending_real, timed_results, strict=True):
if isinstance(entry, BaseException):
raw = entry
_start_iso = datetime.now(timezone.utc).isoformat()
_start_iso = datetime.now(UTC).isoformat()
_dur_s = 0
else:
raw, _start_iso, _dur_s = entry
@@ -1822,7 +1897,7 @@ class EventLoopNode(NodeProtocol):
_tc: ToolCallEvent,
) -> tuple[ToolResult | BaseException, str, float]:
_s = time.time()
_iso = datetime.now(timezone.utc).isoformat()
_iso = datetime.now(UTC).isoformat()
try:
_r = await self._execute_subagent(
_ctx,
@@ -1841,7 +1916,7 @@ class EventLoopNode(NodeProtocol):
for tc, entry in zip(pending_subagent, subagent_timed, strict=True):
if isinstance(entry, BaseException):
raw = entry
_start_iso = datetime.now(timezone.utc).isoformat()
_start_iso = datetime.now(UTC).isoformat()
_dur_s = 0
else:
raw, _start_iso, _dur_s = entry
@@ -1891,7 +1966,6 @@ class EventLoopNode(NodeProtocol):
if tc.tool_name not in (
"set_output",
"ask_user",
"escalate_to_coder",
"delegate_to_sub_agent",
"report_to_parent",
):
@@ -2043,46 +2117,6 @@ class EventLoopNode(NodeProtocol):
},
)
def _build_escalate_tool(self) -> Tool:
"""Build the synthetic escalate_to_coder tool.
Client-facing nodes call this when the user's request requires
capabilities beyond the current agent (code changes, feature
expansion, debugging). The TUI intercepts the event and loads
hive_coder in the foreground.
"""
return Tool(
name="escalate_to_coder",
description=(
"Call this tool when the user requests something you "
"cannot handle — a code change, feature expansion, bug "
"fix, or framework-level modification. This will bring "
"in Hive Coder, a coding agent that can read and write "
"files. Provide a clear reason and relevant context so "
"the coder can pick up where you left off."
),
parameters={
"type": "object",
"properties": {
"reason": {
"type": "string",
"description": (
"Why you are escalating (what the user needs that you cannot do)."
),
},
"context": {
"type": "string",
"description": (
"Relevant context: what you discussed, "
"what files are involved, what the user "
"wants changed."
),
},
},
"required": ["reason"],
},
)
def _build_set_output_tool(self, output_keys: list[str] | None) -> Tool | None:
"""Build the synthetic set_output tool for explicit output declaration."""
if not output_keys:
@@ -2553,6 +2587,7 @@ class EventLoopNode(NodeProtocol):
"service unavailable",
"bad gateway",
"overloaded",
"failed to parse tool call",
]
return any(kw in error_str for kw in transient_keywords)
@@ -3775,7 +3810,7 @@ class EventLoopNode(NodeProtocol):
except Exception as e:
logger.exception(
"\n" + "!" * 60 + "\n❌ SUBAGENT '%s' FAILED\n!" * 60 + "\nError: %s\n" + "!" * 60,
"\n" + "!" * 60 + "\n❌ SUBAGENT '%s' FAILED\nError: %s\n" + "!" * 60,
agent_id,
str(e),
)
@@ -4083,7 +4118,7 @@ class EventLoopNode(NodeProtocol):
except Exception as e:
logger.exception(
"\n" + "!" * 60 + "\n❌ SUBAGENT '%s' FAILED\n!" * 60 + "\nError: %s\n" + "!" * 60,
"\n" + "!" * 60 + "\n❌ SUBAGENT '%s' FAILED\nError: %s\n" + "!" * 60,
agent_id,
str(e),
)
+6 -10
View File
@@ -193,6 +193,9 @@ class GraphExecutor:
# Pause/resume control
self._pause_requested = asyncio.Event()
# Track the currently executing node for external injection routing
self.current_node_id: str | None = None
def _write_progress(
self,
current_node: str,
@@ -697,6 +700,9 @@ class GraphExecutor:
# Execute this node, then pause
# (We'll check again after execution and save state)
# Expose current node for external injection routing
self.current_node_id = current_node_id
self.logger.info(f"\n▶ Step {steps}: {node_spec.name} ({node_spec.node_type})")
self.logger.info(f" Inputs: {node_spec.input_keys}")
self.logger.info(f" Outputs: {node_spec.output_keys}")
@@ -1618,17 +1624,7 @@ class GraphExecutor:
node_tool_names=node_spec.tools,
)
# Build goal context, enriched with capability summary for
# client-facing nodes so the LLM knows what the full agent can do.
goal_context = goal.to_prompt_context()
if graph and node_spec.client_facing:
capability_summary = graph.build_capability_summary(graph.entry_node)
if capability_summary:
goal_context = (
f"{goal_context}\n\n{capability_summary}"
if goal_context
else capability_summary
)
return NodeContext(
runtime=self.runtime,
+29 -31
View File
@@ -170,7 +170,7 @@ def _dump_failed_request(
"temperature": kwargs.get("temperature"),
}
with open(filepath, "w") as f:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(dump_data, f, indent=2, default=str)
return str(filepath)
@@ -237,6 +237,11 @@ def _is_stream_transient_error(exc: BaseException) -> bool:
Transient errors (recoverable=True): network issues, server errors, timeouts.
Permanent errors (recoverable=False): auth, bad request, context window, etc.
NOTE: "Failed to parse tool call arguments" (malformed LLM output) is NOT
transient at the stream level retrying with the same messages produces the
same malformed output. This error is handled at the EventLoopNode level
where the conversation can be modified before retrying.
"""
try:
from litellm.exceptions import (
@@ -917,30 +922,6 @@ class LiteLLMProvider(LLMProvider):
# and we skip the retry path — nothing was yielded in vain.)
has_content = accumulated_text or tool_calls_acc
if not has_content:
# If the conversation ends with an assistant or tool
# message, an empty stream is expected — the LLM has
# nothing new to say. Don't burn retries on this;
# let the caller (EventLoopNode) decide what to do.
# Typical case: client_facing node where the LLM set
# all outputs via set_output tool calls, and the tool
# results are the last messages.
last_role = next(
(m["role"] for m in reversed(full_messages) if m.get("role") != "system"),
None,
)
if last_role in ("assistant", "tool"):
logger.warning(
"[stream] %s returned empty stream after %s message "
"(no text, no tool calls). Treating as a no-op turn. "
"If this repeats, the agent may be stuck — check for "
"ghost empty assistant messages in conversation history.",
self.model,
last_role,
)
for event in tail_events:
yield event
return
# finish_reason=length means the model exhausted
# max_tokens before producing content. Retrying with
# the same max_tokens will never help.
@@ -958,10 +939,16 @@ class LiteLLMProvider(LLMProvider):
yield event
return
# Empty stream after a user message — use short fixed
# retries, not the rate-limit backoff. This is likely
# a deterministic conversation-structure issue, so long
# exponential waits don't help.
# Empty stream — always retry regardless of last message
# role. Ghost empty streams after tool results are NOT
# expected no-ops; they create infinite loops when the
# conversation doesn't change between iterations.
# After retries, return the empty result and let the
# caller (EventLoopNode) decide how to handle it.
last_role = next(
(m["role"] for m in reversed(full_messages) if m.get("role") != "system"),
None,
)
if attempt < EMPTY_STREAM_MAX_RETRIES:
token_count, token_method = _estimate_tokens(
self.model,
@@ -974,7 +961,8 @@ class LiteLLMProvider(LLMProvider):
attempt=attempt,
)
logger.warning(
f"[stream-retry] {self.model} returned empty stream "
f"[stream-retry] {self.model} returned empty stream "
f"after {last_role} message — "
f"~{token_count} tokens ({token_method}). "
f"Request dumped to: {dump_path}. "
f"Retrying in {EMPTY_STREAM_RETRY_DELAY}s "
@@ -983,7 +971,17 @@ class LiteLLMProvider(LLMProvider):
await asyncio.sleep(EMPTY_STREAM_RETRY_DELAY)
continue
# Success (or final attempt) — flush remaining events.
# All retries exhausted — log and return the empty
# result. EventLoopNode's empty response guard will
# accept if all outputs are set, or handle the ghost
# stream case if outputs are still missing.
logger.error(
f"[stream] {self.model} returned empty stream after "
f"{EMPTY_STREAM_MAX_RETRIES} retries "
f"(last_role={last_role}). Returning empty result."
)
# Success (or empty after exhausted retries) — flush events.
for event in tail_events:
yield event
return
+57 -10
View File
@@ -10,6 +10,7 @@ Usage:
import json
import logging
import os
import shutil
import sys
from datetime import datetime
from pathlib import Path
@@ -161,7 +162,7 @@ def _load_session(session_id: str) -> BuildSession:
if not session_file.exists():
raise ValueError(f"Session '{session_id}' not found")
with open(session_file) as f:
with open(session_file, encoding="utf-8") as f:
data = json.load(f)
return BuildSession.from_dict(data)
@@ -173,7 +174,7 @@ def _load_active_session() -> BuildSession | None:
return None
try:
with open(ACTIVE_SESSION_FILE) as f:
with open(ACTIVE_SESSION_FILE, encoding="utf-8") as f:
session_id = f.read().strip()
if session_id:
@@ -227,7 +228,7 @@ def list_sessions() -> str:
if SESSIONS_DIR.exists():
for session_file in SESSIONS_DIR.glob("*.json"):
try:
with open(session_file) as f:
with open(session_file, encoding="utf-8") as f:
data = json.load(f)
sessions.append(
{
@@ -247,7 +248,7 @@ def list_sessions() -> str:
active_id = None
if ACTIVE_SESSION_FILE.exists():
try:
with open(ACTIVE_SESSION_FILE) as f:
with open(ACTIVE_SESSION_FILE, encoding="utf-8") as f:
active_id = f.read().strip()
except Exception:
pass
@@ -309,7 +310,7 @@ def delete_session(session_id: Annotated[str, "ID of the session to delete"]) ->
_session = None
if ACTIVE_SESSION_FILE.exists():
with open(ACTIVE_SESSION_FILE) as f:
with open(ACTIVE_SESSION_FILE, encoding="utf-8") as f:
active_id = f.read().strip()
if active_id == session_id:
ACTIVE_SESSION_FILE.unlink()
@@ -562,16 +563,29 @@ def _validate_agent_path(agent_path: str) -> tuple[Path | None, str | None]:
path = Path(agent_path)
# Resolve relative paths against project root (not MCP server's cwd)
if not path.is_absolute() and not path.exists():
resolved = _PROJECT_ROOT / path
if resolved.exists():
path = resolved
if not path.is_absolute():
path = _PROJECT_ROOT / path
# Restrict to allowed directories BEFORE checking existence to prevent
# leaking whether arbitrary filesystem paths exist on disk.
from framework.server.app import validate_agent_path
try:
path = validate_agent_path(path)
except ValueError:
return None, json.dumps(
{
"success": False,
"error": "agent_path must be inside an allowed directory "
"(exports/, examples/, or ~/.hive/agents/)",
}
)
if not path.exists():
return None, json.dumps(
{
"success": False,
"error": f"Agent path not found: {path}",
"error": f"Agent path not found: {agent_path}",
"hint": "Run export_graph to create an agent in exports/ first",
}
)
@@ -2795,6 +2809,21 @@ def run_tests(
import re
import subprocess
# Guard: pytest must be available as a subprocess command.
# Install with: pip install 'framework[testing]'
if shutil.which("pytest") is None:
return json.dumps(
{
"goal_id": goal_id,
"error": (
"pytest is not installed or not on PATH. "
"Hive's test runner requires pytest at runtime. "
"Install it with: pip install 'framework[testing]' "
"or: uv pip install 'framework[testing]'"
),
}
)
path, err = _validate_agent_path(agent_path)
if err:
return err
@@ -2865,6 +2894,7 @@ def run_tests(
try:
result = subprocess.run(
cmd,
encoding="utf-8",
capture_output=True,
text=True,
timeout=600, # 10 minute timeout
@@ -2988,6 +3018,22 @@ def debug_test(
import re
import subprocess
# Guard: pytest must be available as a subprocess command.
# Install with: pip install 'framework[testing]'
if shutil.which("pytest") is None:
return json.dumps(
{
"goal_id": goal_id,
"test_name": test_name,
"error": (
"pytest is not installed or not on PATH. "
"Hive's test runner requires pytest at runtime. "
"Install it with: pip install 'framework[testing]' "
"or: uv pip install 'framework[testing]'"
),
}
)
# Derive agent_path from session if not provided
if not agent_path and _session:
agent_path = f"exports/{_session.name}"
@@ -3040,6 +3086,7 @@ def debug_test(
try:
result = subprocess.run(
cmd,
encoding="utf-8",
capture_output=True,
text=True,
timeout=120, # 2 minute timeout for single test
+16 -6
View File
@@ -517,7 +517,7 @@ def cmd_run(args: argparse.Namespace) -> int:
return 1
elif args.input_file:
try:
with open(args.input_file) as f:
with open(args.input_file, encoding="utf-8") as f:
context = json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"Error reading input file: {e}", file=sys.stderr)
@@ -659,7 +659,7 @@ def cmd_run(args: argparse.Namespace) -> int:
# Output results
if args.output:
with open(args.output, "w") as f:
with open(args.output, "w", encoding="utf-8") as f:
json.dump(output, f, indent=2, default=str)
if not args.quiet:
print(f"Results written to {args.output}")
@@ -1517,7 +1517,7 @@ def _extract_python_agent_metadata(agent_path: Path) -> tuple[str, str]:
return fallback_name, fallback_desc
try:
with open(config_path) as f:
with open(config_path, encoding="utf-8") as f:
tree = ast.parse(f.read())
# Find AgentMetadata class definition
@@ -1932,10 +1932,18 @@ def _open_browser(url: str) -> None:
try:
if sys.platform == "darwin":
subprocess.Popen(["open", url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
subprocess.Popen(
["open", url],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
encoding="utf-8",
)
elif sys.platform == "linux":
subprocess.Popen(
["xdg-open", url], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
["xdg-open", url],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
encoding="utf-8",
)
except Exception:
pass # Best-effort — don't crash if browser can't open
@@ -1980,12 +1988,14 @@ def _build_frontend() -> bool:
# Ensure deps are installed
subprocess.run(
["npm", "install", "--no-fund", "--no-audit"],
encoding="utf-8",
cwd=frontend_dir,
check=True,
capture_output=True,
)
subprocess.run(
["npm", "run", "build"],
encoding="utf-8",
cwd=frontend_dir,
check=True,
capture_output=True,
@@ -2033,7 +2043,7 @@ def cmd_serve(args: argparse.Namespace) -> int:
print(f"Error loading {agent_path}: {e}")
# Start server using AppRunner/TCPSite (same pattern as webhook_server.py)
runner = web.AppRunner(app)
runner = web.AppRunner(app, access_log=None)
await runner.setup()
site = web.TCPSite(runner, args.host, args.port)
await site.start()
+185
View File
@@ -0,0 +1,185 @@
"""Pre-load validation for agent graphs.
Runs structural and credential checks before MCP servers are spawned.
Fails fast with actionable error messages.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from framework.graph.edge import GraphSpec
from framework.graph.node import NodeSpec
logger = logging.getLogger(__name__)
class PreloadValidationError(Exception):
"""Raised when pre-load validation fails."""
def __init__(self, errors: list[str]):
self.errors = errors
msg = "Pre-load validation failed:\n" + "\n".join(f" - {e}" for e in errors)
super().__init__(msg)
@dataclass
class PreloadResult:
"""Result of pre-load validation."""
valid: bool
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
def validate_graph_structure(graph: GraphSpec) -> list[str]:
"""Run graph structural validation (includes GCU subagent-only checks).
Delegates to GraphSpec.validate() which checks entry/terminal nodes,
edge references, reachability, fan-out rules, and GCU constraints.
"""
return graph.validate()
def validate_credentials(
nodes: list[NodeSpec],
*,
interactive: bool = True,
skip: bool = False,
) -> None:
"""Validate agent credentials.
Calls ``validate_agent_credentials`` which performs two-phase validation:
1. Presence check (env var, encrypted store, Aden sync)
2. Health check (lightweight HTTP call to verify the key works)
On failure raises ``CredentialError`` with ``validation_result`` and
``failed_cred_names`` attributes preserved from the upstream check.
In interactive mode (CLI with TTY), attempts recovery via the
credential setup flow before re-raising.
"""
if skip:
return
from framework.credentials.validation import validate_agent_credentials
if not interactive:
# Non-interactive: let CredentialError propagate with full context.
# validate_agent_credentials attaches .validation_result and
# .failed_cred_names to the exception automatically.
validate_agent_credentials(nodes)
return
import sys
from framework.credentials.models import CredentialError
try:
validate_agent_credentials(nodes)
except CredentialError as e:
if not sys.stdin.isatty():
raise
print(f"\n{e}", file=sys.stderr)
from framework.credentials.validation import build_setup_session_from_error
session = build_setup_session_from_error(e, nodes=nodes)
if not session.missing:
raise
result = session.run_interactive()
if not result.success:
# Preserve the original validation_result so callers can
# inspect which credentials are still missing.
exc = CredentialError(
"Credential setup incomplete. Run again after configuring the required credentials."
)
if hasattr(e, "validation_result"):
exc.validation_result = e.validation_result # type: ignore[attr-defined]
if hasattr(e, "failed_cred_names"):
exc.failed_cred_names = e.failed_cred_names # type: ignore[attr-defined]
raise exc from None
# Re-validate after successful setup — this will raise if still broken,
# with fresh validation_result attached to the new exception.
validate_agent_credentials(nodes)
def credential_errors_to_json(exc: Exception) -> dict:
"""Extract structured credential failure details from a CredentialError.
Returns a dict suitable for JSON serialization with enough detail for
the queen to report actionable guidance to the user. Falls back to
``str(exc)`` when rich metadata is not available.
"""
result = getattr(exc, "validation_result", None)
if result is None:
return {
"error": "credentials_required",
"message": str(exc),
}
failed = result.failed
missing = []
for c in failed:
if c.available:
status = "invalid"
elif c.aden_not_connected:
status = "aden_not_connected"
else:
status = "missing"
entry: dict = {
"credential": c.credential_name,
"env_var": c.env_var,
"status": status,
}
if c.tools:
entry["tools"] = c.tools
if c.node_types:
entry["node_types"] = c.node_types
if c.help_url:
entry["help_url"] = c.help_url
if c.validation_message:
entry["validation_message"] = c.validation_message
missing.append(entry)
return {
"error": "credentials_required",
"message": str(exc),
"missing_credentials": missing,
}
def run_preload_validation(
graph: GraphSpec,
*,
interactive: bool = True,
skip_credential_validation: bool = False,
) -> PreloadResult:
"""Run all pre-load validations.
Order:
1. Graph structure (includes GCU subagent-only checks) non-recoverable
2. Credentials potentially recoverable via interactive setup
Raises PreloadValidationError for structural issues.
Raises CredentialError for credential issues.
"""
# 1. Structural validation (calls graph.validate() which includes GCU checks)
graph_errors = validate_graph_structure(graph)
if graph_errors:
raise PreloadValidationError(graph_errors)
# 2. Credential validation
validate_credentials(
graph.nodes,
interactive=interactive,
skip=skip_credential_validation,
)
return PreloadResult(valid=True)
+19 -65
View File
@@ -12,7 +12,6 @@ from typing import TYPE_CHECKING, Any
from framework.config import get_hive_config, get_preferred_model
from framework.credentials.validation import (
ensure_credential_key_env as _ensure_credential_key_env,
validate_agent_credentials,
)
from framework.graph import Goal
from framework.graph.edge import (
@@ -25,6 +24,7 @@ from framework.graph.edge import (
from framework.graph.executor import ExecutionResult
from framework.graph.node import NodeSpec
from framework.llm.provider import LLMProvider, Tool
from framework.runner.preload_validation import run_preload_validation
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime, AgentRuntimeConfig, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
@@ -96,7 +96,7 @@ def _save_refreshed_credentials(token_data: dict) -> None:
return
try:
with open(CLAUDE_CREDENTIALS_FILE) as f:
with open(CLAUDE_CREDENTIALS_FILE, encoding="utf-8") as f:
creds = json.load(f)
oauth = creds.get("claudeAiOauth", {})
@@ -107,7 +107,7 @@ def _save_refreshed_credentials(token_data: dict) -> None:
oauth["expiresAt"] = int((time.time() + token_data["expires_in"]) * 1000)
creds["claudeAiOauth"] = oauth
with open(CLAUDE_CREDENTIALS_FILE, "w") as f:
with open(CLAUDE_CREDENTIALS_FILE, "w", encoding="utf-8") as f:
json.dump(creds, f, indent=2)
logger.debug("Claude Code credentials refreshed successfully")
except (json.JSONDecodeError, OSError, KeyError) as exc:
@@ -132,7 +132,7 @@ def get_claude_code_token() -> str | None:
return None
try:
with open(CLAUDE_CREDENTIALS_FILE) as f:
with open(CLAUDE_CREDENTIALS_FILE, encoding="utf-8") as f:
creds = json.load(f)
except (json.JSONDecodeError, OSError):
return None
@@ -212,7 +212,7 @@ def _read_codex_keychain() -> dict | None:
"-w",
],
capture_output=True,
text=True,
encoding="utf-8",
timeout=5,
)
if result.returncode != 0:
@@ -231,7 +231,7 @@ def _read_codex_auth_file() -> dict | None:
if not CODEX_AUTH_FILE.exists():
return None
try:
with open(CODEX_AUTH_FILE) as f:
with open(CODEX_AUTH_FILE, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
@@ -322,8 +322,9 @@ def _save_refreshed_codex_credentials(auth_data: dict, token_data: dict) -> None
auth_data["tokens"] = tokens
auth_data["last_refresh"] = datetime.now(UTC).isoformat()
CODEX_AUTH_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(CODEX_AUTH_FILE, "w") as f:
CODEX_AUTH_FILE.parent.mkdir(parents=True, exist_ok=True, mode=0o700)
fd = os.open(CODEX_AUTH_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(auth_data, f, indent=2)
logger.debug("Codex credentials refreshed successfully")
except (OSError, KeyError) as exc:
@@ -678,9 +679,13 @@ class AgentRunner:
self._agent_runtime: AgentRuntime | None = None
self._uses_async_entry_points = self.graph.has_async_entry_points()
# Validate credentials before spawning MCP servers.
# Pre-load validation: structural checks + credentials.
# Fails fast with actionable guidance — no MCP noise on screen.
self._validate_credentials()
run_preload_validation(
self.graph,
interactive=self._interactive,
skip_credential_validation=self.skip_credential_validation,
)
# Auto-discover tools from tools.py
tools_path = agent_path / "tools.py"
@@ -697,54 +702,6 @@ class AgentRunner:
if mcp_config_path.exists():
self._load_mcp_servers_from_config(mcp_config_path)
def _validate_credentials(self) -> None:
"""Check that required credentials are available before spawning MCP servers.
If ``interactive`` is True and stdin is a TTY, automatically launches
the interactive credential setup flow so the user can fix the issue
in-place. Re-validates after setup succeeds.
When ``interactive`` is False (e.g. TUI callers), the CredentialError
propagates immediately so the caller can handle it with its own UI.
"""
if self.skip_credential_validation:
return
if not self._interactive:
# Let the CredentialError propagate — caller handles UI.
validate_agent_credentials(self.graph.nodes)
return
import sys
from framework.credentials.models import CredentialError
try:
validate_agent_credentials(self.graph.nodes)
return # All good
except CredentialError as e:
if not sys.stdin.isatty():
raise
# Interactive: show the error then enter credential setup
print(f"\n{e}", file=sys.stderr)
from framework.credentials.validation import build_setup_session_from_error
session = build_setup_session_from_error(e, nodes=self.graph.nodes)
if not session.missing:
raise
result = session.run_interactive()
if not result.success:
raise CredentialError(
"Credential setup incomplete. "
"Run again after configuring the required credentials."
) from None
# Re-validate after setup
validate_agent_credentials(self.graph.nodes)
@staticmethod
def _import_agent_module(agent_path: Path):
"""Import an agent package from its directory path.
@@ -906,7 +863,7 @@ class AgentRunner:
if not agent_json_path.exists():
raise FileNotFoundError(f"No agent.py or agent.json found in {agent_path}")
with open(agent_json_path) as f:
with open(agent_json_path, encoding="utf-8") as f:
graph, goal = load_agent_export(f.read())
return cls(
@@ -1169,9 +1126,7 @@ class AgentRunner:
node.tools.append(tool_name)
# For event_loop/gcu nodes: auto-register file tools MCP server, then expand tool lists
has_loop_nodes = any(
node.node_type in ("event_loop", "gcu") for node in self.graph.nodes
)
has_loop_nodes = any(node.node_type in ("event_loop", "gcu") for node in self.graph.nodes)
if has_loop_nodes:
from framework.graph.files import FILES_MCP_SERVER_CONFIG, FILES_MCP_SERVER_NAME
@@ -1182,9 +1137,7 @@ class AgentRunner:
if cwd and not Path(cwd).is_absolute():
files_config["cwd"] = str((self.agent_path / cwd).resolve())
self._tool_registry.register_mcp_server(files_config)
files_tool_names = self._tool_registry.get_server_tool_names(
FILES_MCP_SERVER_NAME
)
files_tool_names = self._tool_registry.get_server_tool_names(FILES_MCP_SERVER_NAME)
if files_tool_names:
for node in self.graph.nodes:
@@ -1321,6 +1274,7 @@ class AgentRunner:
isolation_level=async_ep.isolation_level,
priority=async_ep.priority,
max_concurrent=async_ep.max_concurrent,
max_resurrections=async_ep.max_resurrections,
)
entry_points.append(ep)
+1 -1
View File
@@ -340,7 +340,7 @@ class ToolRegistry:
self._mcp_config_path = Path(config_path)
try:
with open(config_path) as f:
with open(config_path, encoding="utf-8") as f:
config = json.load(f)
except Exception as e:
logger.warning(f"Failed to load MCP config from {config_path}: {e}")
+96 -31
View File
@@ -412,7 +412,9 @@ class AgentRuntime:
continue
def _make_cron_timer(
entry_point_id: str, expr: str, immediate: bool,
entry_point_id: str,
expr: str,
immediate: bool,
idle_timeout: float = 300,
):
async def _cron_loop():
@@ -445,21 +447,29 @@ class AgentRuntime:
await asyncio.sleep(max(0, sleep_secs))
continue
# Gate: skip tick if agent is actively working.
# Gate: skip tick if ANY stream is actively working.
# If the execution is idle (no LLM/tool activity
# beyond idle_timeout) let the timer proceed —
# execute() will cancel the stale execution.
_stream = self._streams.get(entry_point_id)
if (
_stream
and _stream.active_execution_ids
and _stream.agent_idle_seconds < idle_timeout
):
logger.debug(
"Cron '%s': agent actively working (idle %.0fs < %ds), skipping tick",
_any_active = False
_min_idle = float("inf")
for _s in self._streams.values():
if _s.active_execution_ids:
_any_active = True
_idle = _s.agent_idle_seconds
if _idle < _min_idle:
_min_idle = _idle
logger.info(
"Cron '%s': gate — active=%s, idle=%.1fs, timeout=%ds",
entry_point_id,
_any_active,
_min_idle,
idle_timeout,
)
if _any_active and _min_idle < idle_timeout:
logger.info(
"Cron '%s': agent actively working, skipping tick",
entry_point_id,
_stream.agent_idle_seconds,
idle_timeout,
)
self._timer_next_fire[entry_point_id] = (
time.monotonic() + sleep_secs
@@ -530,7 +540,9 @@ class AgentRuntime:
task = asyncio.create_task(
_make_cron_timer(
ep_id, cron_expr, run_immediately,
ep_id,
cron_expr,
run_immediately,
idle_timeout=tc.get("idle_timeout_seconds", 300),
)()
)
@@ -545,7 +557,9 @@ class AgentRuntime:
elif interval and interval > 0:
# Fixed interval mode (original behavior)
def _make_timer(
entry_point_id: str, mins: float, immediate: bool,
entry_point_id: str,
mins: float,
immediate: bool,
idle_timeout: float = 300,
):
async def _timer_loop():
@@ -570,17 +584,26 @@ class AgentRuntime:
continue
# Gate: skip tick if agent is actively working.
_stream = self._streams.get(entry_point_id)
if (
_stream
and _stream.active_execution_ids
and _stream.agent_idle_seconds < idle_timeout
):
logger.debug(
"Timer '%s': agent actively working (idle %.0fs < %ds), skipping tick",
# Gate: skip tick if ANY stream is actively working.
_any_active = False
_min_idle = float("inf")
for _s in self._streams.values():
if _s.active_execution_ids:
_any_active = True
_idle = _s.agent_idle_seconds
if _idle < _min_idle:
_min_idle = _idle
logger.info(
"Timer '%s': gate — active=%s, idle=%.1fs, timeout=%ds",
entry_point_id,
_any_active,
_min_idle,
idle_timeout,
)
if _any_active and _min_idle < idle_timeout:
logger.info(
"Timer '%s': agent actively working, skipping tick",
entry_point_id,
_stream.agent_idle_seconds,
idle_timeout,
)
self._timer_next_fire[entry_point_id] = (
time.monotonic() + interval_secs
@@ -647,7 +670,9 @@ class AgentRuntime:
task = asyncio.create_task(
_make_timer(
ep_id, interval, run_immediately,
ep_id,
interval,
run_immediately,
idle_timeout=tc.get("idle_timeout_seconds", 300),
)()
)
@@ -990,6 +1015,7 @@ class AgentRuntime:
local_ep: str,
mins: float,
immediate: bool,
idle_timeout: float = 300,
):
async def _timer_loop():
interval_secs = mins * 60
@@ -1019,12 +1045,28 @@ class AgentRuntime:
await asyncio.sleep(interval_secs)
continue
# Gate: skip tick if previous execution still running
# Gate: skip tick if ANY stream in this graph is actively working.
_reg = self._graphs.get(gid)
_stream = _reg.streams.get(local_ep) if _reg else None
if _stream and _stream.active_execution_ids:
logger.debug(
"Timer '%s::%s': execution already in progress, skipping tick",
_any_active = False
_min_idle = float("inf")
if _reg:
for _sid, _s in _reg.streams.items():
if _s.active_execution_ids:
_any_active = True
_idle = _s.agent_idle_seconds
if _idle < _min_idle:
_min_idle = _idle
logger.info(
"Timer '%s::%s': gate — active=%s, idle=%.1fs, timeout=%ds",
gid,
local_ep,
_any_active,
_min_idle,
idle_timeout,
)
if _any_active and _min_idle < idle_timeout:
logger.info(
"Timer '%s::%s': agent actively working, skipping tick",
gid,
local_ep,
)
@@ -1095,7 +1137,13 @@ class AgentRuntime:
return _timer_loop
task = asyncio.create_task(
_make_timer(graph_id, ep_id, interval, run_immediately)()
_make_timer(
graph_id,
ep_id,
interval,
run_immediately,
idle_timeout=tc.get("idle_timeout_seconds", 300),
)()
)
timer_tasks.append(task)
logger.info("Timer task created for '%s::%s': %s", graph_id, ep_id, task)
@@ -1412,6 +1460,23 @@ class AgentRuntime:
# Fallback: primary graph
return list(self._entry_points.values())
def get_timer_next_fire_in(self, entry_point_id: str) -> float | None:
"""Return seconds until the next timer fire for *entry_point_id*.
Checks the primary graph's ``_timer_next_fire`` dict as well as
all registered secondary graphs. Returns ``None`` when no fire
time is recorded (e.g. the timer is currently executing or the
entry point is not a timer).
"""
mono = self._timer_next_fire.get(entry_point_id)
if mono is not None:
return max(0.0, mono - time.monotonic())
for reg in self._graphs.values():
mono = reg.timer_next_fire.get(entry_point_id)
if mono is not None:
return max(0.0, mono - time.monotonic())
return None
def get_stream(self, entry_point_id: str) -> ExecutionStream | None:
"""Get a specific execution stream."""
return self._streams.get(entry_point_id)
+3
View File
@@ -130,6 +130,9 @@ class EventType(StrEnum):
WORKER_ESCALATION_TICKET = "worker_escalation_ticket"
QUEEN_INTERVENTION_REQUESTED = "queen_intervention_requested"
# Execution resurrection (auto-restart on non-fatal failure)
EXECUTION_RESURRECTED = "execution_resurrected"
# Worker lifecycle (session manager → frontend)
WORKER_LOADED = "worker_loaded"
CREDENTIALS_REQUIRED = "credentials_required"
+143 -44
View File
@@ -108,6 +108,7 @@ class EntryPointSpec:
isolation_level: str = "shared" # "isolated" | "shared" | "synchronized"
priority: int = 0
max_concurrent: int = 10 # Max concurrent executions for this entry point
max_resurrections: int = 3 # Auto-restart on non-fatal failure (0 to disable)
def get_isolation_level(self) -> IsolationLevel:
"""Convert string isolation level to enum."""
@@ -250,9 +251,7 @@ class ExecutionStream:
# Graph-scoped event bus (stamps graph_id on published events)
# Always wrap in GraphScopedEventBus so we can track last_activity_time.
if self._event_bus:
self._scoped_event_bus = GraphScopedEventBus(
self._event_bus, self.graph_id or ""
)
self._scoped_event_bus = GraphScopedEventBus(self._event_bus, self.graph_id or "")
else:
self._scoped_event_bus = None
@@ -326,13 +325,21 @@ class ExecutionStream:
"""Return nodes that support message injection (have ``inject_event``).
Each entry is ``{"node_id": ..., "execution_id": ...}``.
The currently executing node is placed first so that
``inject_worker_message`` targets the active node, not a stale one.
"""
injectable: list[dict[str, str]] = []
current_first: list[dict[str, str]] = []
for exec_id, executor in self._active_executors.items():
current = getattr(executor, "current_node_id", None)
for node_id, node in executor.node_registry.items():
if hasattr(node, "inject_event"):
injectable.append({"node_id": node_id, "execution_id": exec_id})
return injectable
entry = {"node_id": node_id, "execution_id": exec_id}
if node_id == current:
current_first.append(entry)
else:
injectable.append(entry)
return current_first + injectable
def _record_execution_result(self, execution_id: str, result: ExecutionResult) -> None:
"""Record a completed execution result with retention pruning."""
@@ -447,7 +454,8 @@ class ExecutionStream:
for eid in active:
logger.info(
"Cancelling running execution %s on stream '%s' before starting new one",
eid, self.stream_id,
eid,
self.stream_id,
)
executor = self._active_executors.get(eid)
if executor:
@@ -503,8 +511,37 @@ class ExecutionStream:
logger.debug(f"Queued execution {execution_id} for stream {self.stream_id}")
return execution_id
# Errors that indicate a fundamental configuration or environment problem.
# Resurrecting after these is pointless — the same error will recur.
_FATAL_ERROR_PATTERNS: tuple[str, ...] = (
"credential",
"authentication",
"unauthorized",
"forbidden",
"api key",
"import error",
"module not found",
"no module named",
"permission denied",
"invalid api",
"configuration error",
)
@classmethod
def _is_fatal_error(cls, error: str | None) -> bool:
"""Return True if the error is life-threatening (no point resurrecting)."""
if not error:
return False
error_lower = error.lower()
return any(pat in error_lower for pat in cls._FATAL_ERROR_PATTERNS)
async def _run_execution(self, ctx: ExecutionContext) -> None:
"""Run a single execution within the stream."""
"""Run a single execution within the stream.
Supports automatic resurrection: when the execution fails with a
non-fatal error, it restarts from the failed node up to
``entry_spec.max_resurrections`` times (default 3).
"""
execution_id = ctx.id
# When sharing a session with another entry point (resume_session_id),
@@ -512,6 +549,11 @@ class ExecutionStream:
# owns the state.json and _write_progress() keeps memory up-to-date.
_is_shared_session = bool(ctx.session_state and ctx.session_state.get("resume_session_id"))
max_resurrections = self.entry_spec.max_resurrections
_resurrection_count = 0
_current_session_state = ctx.session_state
_current_input_data = ctx.input_data
# Acquire semaphore to limit concurrency
async with self._semaphore:
ctx.status = "running"
@@ -552,12 +594,6 @@ class ExecutionStream:
store=self._runtime_log_store, agent_id=self.graph.id
)
# Create executor for this execution.
# Each execution gets its own storage under sessions/{exec_id}/
# so conversations, spillover, and data files are all scoped
# to this execution. The executor sets data_dir via execution
# context (contextvars) so data tools and spillover share the
# same session-scoped directory.
# Derive storage from session_store (graph-specific for secondary
# graphs) so that all files — conversations, state, checkpoints,
# data — land under the graph's own sessions/ directory, not the
@@ -566,43 +602,106 @@ class ExecutionStream:
exec_storage = self._session_store.sessions_dir / execution_id
else:
exec_storage = self._storage.base_path / "sessions" / execution_id
executor = GraphExecutor(
runtime=runtime_adapter,
llm=self._llm,
tools=self._tools,
tool_executor=self._tool_executor,
event_bus=self._scoped_event_bus,
stream_id=self.stream_id,
execution_id=execution_id,
storage_path=exec_storage,
runtime_logger=runtime_logger,
loop_config=self.graph.loop_config,
accounts_prompt=self._accounts_prompt,
accounts_data=self._accounts_data,
tool_provider_map=self._tool_provider_map,
)
# Track executor so inject_input() can reach EventLoopNode instances
self._active_executors[execution_id] = executor
# Write initial session state
if not _is_shared_session:
await self._write_session_state(execution_id, ctx)
# Create modified graph with entry point
# We need to override the entry_node to use our entry point
modified_graph = self._create_modified_graph()
# Execute
result = await executor.execute(
graph=modified_graph,
goal=self.goal,
input_data=ctx.input_data,
session_state=ctx.session_state,
checkpoint_config=self._checkpoint_config,
)
# Write initial session state
if not _is_shared_session:
await self._write_session_state(execution_id, ctx)
# Clean up executor reference
self._active_executors.pop(execution_id, None)
# --- Resurrection loop ---
# Each iteration creates a fresh executor. On non-fatal failure,
# the executor's session_state (memory + resume_from) carries
# forward so the next attempt resumes at the failed node.
while True:
# Create executor for this execution.
# Each execution gets its own storage under sessions/{exec_id}/
# so conversations, spillover, and data files are all scoped
# to this execution. The executor sets data_dir via execution
# context (contextvars) so data tools and spillover share the
# same session-scoped directory.
executor = GraphExecutor(
runtime=runtime_adapter,
llm=self._llm,
tools=self._tools,
tool_executor=self._tool_executor,
event_bus=self._scoped_event_bus,
stream_id=self.stream_id,
execution_id=execution_id,
storage_path=exec_storage,
runtime_logger=runtime_logger,
loop_config=self.graph.loop_config,
accounts_prompt=self._accounts_prompt,
accounts_data=self._accounts_data,
tool_provider_map=self._tool_provider_map,
)
# Track executor so inject_input() can reach EventLoopNode instances
self._active_executors[execution_id] = executor
# Execute
result = await executor.execute(
graph=modified_graph,
goal=self.goal,
input_data=_current_input_data,
session_state=_current_session_state,
checkpoint_config=self._checkpoint_config,
)
# Clean up executor reference
self._active_executors.pop(execution_id, None)
# Check if resurrection is appropriate
if (
not result.success
and not result.paused_at
and _resurrection_count < max_resurrections
and result.session_state
and not self._is_fatal_error(result.error)
):
_resurrection_count += 1
logger.warning(
"Execution %s failed (%s) — resurrecting (%d/%d) from node '%s'",
execution_id,
(result.error or "unknown")[:200],
_resurrection_count,
max_resurrections,
result.session_state.get("resume_from", "?"),
)
# Emit resurrection event
if self._scoped_event_bus:
from framework.runtime.event_bus import AgentEvent, EventType
await self._scoped_event_bus.publish(
AgentEvent(
type=EventType.EXECUTION_RESURRECTED,
stream_id=self.stream_id,
execution_id=execution_id,
data={
"attempt": _resurrection_count,
"max_resurrections": max_resurrections,
"error": (result.error or "")[:500],
"resume_from": result.session_state.get("resume_from"),
},
)
)
# Resume from the failed node with preserved memory
_current_session_state = {
**result.session_state,
"resume_session_id": execution_id,
}
# On resurrection, input_data is already in memory —
# pass empty so we don't overwrite intermediate results.
_current_input_data = {}
# Brief cooldown before resurrection
await asyncio.sleep(2.0)
continue
break # success, fatal failure, or resurrections exhausted
# Store result with retention
self._record_execution_result(execution_id, result)
+47 -1
View File
@@ -11,6 +11,52 @@ from framework.server.session_manager import Session, SessionManager
logger = logging.getLogger(__name__)
# Anchor to the repository root so allowed roots are independent of CWD.
# app.py lives at core/framework/server/app.py, so four .parent calls
# reach the repo root where exports/ and examples/ live.
_REPO_ROOT = Path(__file__).resolve().parent.parent.parent.parent
_ALLOWED_AGENT_ROOTS: tuple[Path, ...] | None = None
def _get_allowed_agent_roots() -> tuple[Path, ...]:
"""Return resolved allowed root directories for agent loading.
Roots are anchored to the repository root (derived from ``__file__``)
so the allowlist is correct regardless of the process's working
directory.
"""
global _ALLOWED_AGENT_ROOTS
if _ALLOWED_AGENT_ROOTS is None:
_ALLOWED_AGENT_ROOTS = (
(_REPO_ROOT / "exports").resolve(),
(_REPO_ROOT / "examples").resolve(),
(Path.home() / ".hive" / "agents").resolve(),
)
return _ALLOWED_AGENT_ROOTS
def validate_agent_path(agent_path: str | Path) -> Path:
"""Validate that an agent path resolves inside an allowed directory.
Prevents arbitrary code execution via ``importlib.import_module`` by
restricting agent loading to known safe directories: ``exports/``,
``examples/``, and ``~/.hive/agents/``.
Returns the resolved ``Path`` on success.
Raises:
ValueError: If the path is outside all allowed roots.
"""
resolved = Path(agent_path).expanduser().resolve()
for root in _get_allowed_agent_roots():
if resolved.is_relative_to(root) and resolved != root:
return resolved
raise ValueError(
"agent_path must be inside an allowed directory (exports/, examples/, or ~/.hive/agents/)"
)
def safe_path_segment(value: str) -> str:
"""Validate a URL path parameter is a safe filesystem name.
@@ -18,7 +64,7 @@ def safe_path_segment(value: str) -> str:
traversal sequences. aiohttp decodes ``%2F`` inside route params,
so a raw ``{session_id}`` can contain ``/`` or ``..`` after decoding.
"""
if "/" in value or "\\" in value or ".." in value:
if not value or value == "." or "/" in value or "\\" in value or ".." in value:
raise web.HTTPBadRequest(reason="Invalid path parameter")
return value
@@ -8,6 +8,7 @@ from pydantic import SecretStr
from framework.credentials.models import CredentialKey, CredentialObject
from framework.credentials.store import CredentialStore
from framework.server.app import validate_agent_path
logger = logging.getLogger(__name__)
@@ -128,6 +129,11 @@ async def handle_check_agent(request: web.Request) -> web.Response:
if not agent_path:
return web.json_response({"error": "agent_path is required"}, status=400)
try:
agent_path = str(validate_agent_path(agent_path))
except ValueError as e:
return web.json_response({"error": str(e)}, status=400)
try:
from framework.credentials.setup import load_agent_nodes
from framework.credentials.validation import (
+18 -2
View File
@@ -37,6 +37,7 @@ DEFAULT_EVENT_TYPES = [
EventType.CONTEXT_COMPACTED,
EventType.WORKER_LOADED,
EventType.CREDENTIALS_REQUIRED,
EventType.SUBAGENT_REPORT,
]
# Keepalive interval in seconds
@@ -92,11 +93,23 @@ async def handle_events(request: web.Request) -> web.StreamResponse:
"worker_loaded",
}
client_disconnected = asyncio.Event()
async def on_event(event) -> None:
"""Push event dict into queue; drop non-critical events if full."""
if client_disconnected.is_set():
return
evt_dict = event.to_dict()
if evt_dict.get("type") in _CRITICAL_EVENTS:
await queue.put(evt_dict) # block rather than drop
try:
queue.put_nowait(evt_dict)
except asyncio.QueueFull:
logger.warning(
"SSE client queue full on critical event; disconnecting session='%s'",
session.id,
)
client_disconnected.set()
else:
try:
queue.put_nowait(evt_dict)
@@ -120,7 +133,7 @@ async def handle_events(request: web.Request) -> web.StreamResponse:
event_count = 0
close_reason = "unknown"
try:
while True:
while not client_disconnected.is_set():
try:
data = await asyncio.wait_for(queue.get(), timeout=KEEPALIVE_INTERVAL)
await sse.send_event(data)
@@ -137,6 +150,9 @@ async def handle_events(request: web.Request) -> web.StreamResponse:
except Exception as exc:
close_reason = f"error: {exc}"
break
if client_disconnected.is_set() and close_reason == "unknown":
close_reason = "slow_client"
except asyncio.CancelledError:
close_reason = "cancelled"
finally:
+7
View File
@@ -45,6 +45,7 @@ def _node_to_dict(node) -> dict:
"client_facing": node.client_facing,
"success_criteria": node.success_criteria,
"system_prompt": node.system_prompt or "",
"sub_agents": node.sub_agents,
}
@@ -99,6 +100,7 @@ async def handle_list_nodes(request: web.Request) -> web.Response:
{"source": e.source, "target": e.target, "condition": e.condition, "priority": e.priority}
for e in graph.edges
]
rt = session.worker_runtime
entry_points = [
{
"id": ep.id,
@@ -106,6 +108,11 @@ async def handle_list_nodes(request: web.Request) -> web.Response:
"entry_node": ep.entry_node,
"trigger_type": ep.trigger_type,
"trigger_config": ep.trigger_config,
**(
{"next_fire_in": nf}
if rt and (nf := rt.get_timer_next_fire_in(ep.id)) is not None
else {}
),
}
for ep in reg.entry_points.values()
]
+40 -9
View File
@@ -30,7 +30,12 @@ from pathlib import Path
from aiohttp import web
from framework.server.app import resolve_session, safe_path_segment, sessions_dir
from framework.server.app import (
resolve_session,
safe_path_segment,
sessions_dir,
validate_agent_path,
)
from framework.server.session_manager import SessionManager
logger = logging.getLogger(__name__)
@@ -118,6 +123,12 @@ async def handle_create_session(request: web.Request) -> web.Response:
model = body.get("model")
initial_prompt = body.get("initial_prompt")
if agent_path:
try:
agent_path = str(validate_agent_path(agent_path))
except ValueError as e:
return web.json_response({"error": str(e)}, status=400)
try:
if agent_path:
# One-step: create session + load worker
@@ -143,14 +154,17 @@ async def handle_create_session(request: web.Request) -> web.Response:
status=409,
)
return web.json_response({"error": msg}, status=409)
except FileNotFoundError as e:
return web.json_response({"error": str(e)}, status=404)
except FileNotFoundError:
return web.json_response(
{"error": f"Agent not found: {agent_path or 'no path'}"},
status=404,
)
except Exception as e:
resp = _credential_error_response(e, agent_path)
if resp is not None:
return resp
logger.exception("Error creating session: %s", e)
return web.json_response({"error": str(e)}, status=500)
return web.json_response({"error": "Internal server error"}, status=500)
return web.json_response(_session_to_live_dict(session), status=201)
@@ -182,6 +196,7 @@ async def handle_get_live_session(request: web.Request) -> web.Response:
data = _session_to_live_dict(session)
if session.worker_runtime:
rt = session.worker_runtime
data["entry_points"] = [
{
"id": ep.id,
@@ -189,8 +204,13 @@ async def handle_get_live_session(request: web.Request) -> web.Response:
"entry_node": ep.entry_node,
"trigger_type": ep.trigger_type,
"trigger_config": ep.trigger_config,
**(
{"next_fire_in": nf}
if (nf := rt.get_timer_next_fire_in(ep.id)) is not None
else {}
),
}
for ep in session.worker_runtime.get_entry_points()
for ep in rt.get_entry_points()
]
data["graphs"] = session.worker_runtime.list_graphs()
@@ -230,6 +250,11 @@ async def handle_load_worker(request: web.Request) -> web.Response:
if not agent_path:
return web.json_response({"error": "agent_path is required"}, status=400)
try:
agent_path = str(validate_agent_path(agent_path))
except ValueError as e:
return web.json_response({"error": str(e)}, status=400)
worker_id = body.get("worker_id")
model = body.get("model")
@@ -242,14 +267,14 @@ async def handle_load_worker(request: web.Request) -> web.Response:
)
except ValueError as e:
return web.json_response({"error": str(e)}, status=409)
except FileNotFoundError as e:
return web.json_response({"error": str(e)}, status=404)
except FileNotFoundError:
return web.json_response({"error": f"Agent not found: {agent_path}"}, status=404)
except Exception as e:
resp = _credential_error_response(e, agent_path)
if resp is not None:
return resp
logger.exception("Error loading worker: %s", e)
return web.json_response({"error": str(e)}, status=500)
return web.json_response({"error": "Internal server error"}, status=500)
return web.json_response(_session_to_live_dict(session))
@@ -308,7 +333,8 @@ async def handle_session_entry_points(request: web.Request) -> web.Response:
status=404,
)
eps = session.worker_runtime.get_entry_points() if session.worker_runtime else []
rt = session.worker_runtime
eps = rt.get_entry_points() if rt else []
return web.json_response(
{
"entry_points": [
@@ -318,6 +344,11 @@ async def handle_session_entry_points(request: web.Request) -> web.Response:
"entry_node": ep.entry_node,
"trigger_type": ep.trigger_type,
"trigger_config": ep.trigger_config,
**(
{"next_fire_in": nf}
if rt and (nf := rt.get_timer_next_fire_in(ep.id)) is not None
else {}
),
}
for ep in eps
]
+3 -1
View File
@@ -123,7 +123,9 @@ class CheckpointStore:
return None
try:
return CheckpointIndex.model_validate_json(self.index_path.read_text(encoding="utf-8"))
return CheckpointIndex.model_validate_json(
self.index_path.read_text(encoding="utf-8")
)
except Exception as e:
logger.error(f"Failed to load checkpoint index: {e}")
return None
+2 -2
View File
@@ -270,10 +270,10 @@ def _edit_test_code(code: str) -> str:
try:
# Open editor
subprocess.run([editor, temp_path], check=True)
subprocess.run([editor, temp_path], check=True, encoding="utf-8")
# Read edited code
with open(temp_path) as f:
with open(temp_path, encoding="utf-8") as f:
return f.read()
except subprocess.CalledProcessError:
print("Editor failed, keeping original code")
+32 -1
View File
@@ -11,10 +11,35 @@ Provides commands:
import argparse
import ast
import os
import shutil
import subprocess
import sys
from pathlib import Path
def _check_pytest_available() -> bool:
"""Check if pytest is available as a runnable command.
Returns True if pytest is found, otherwise prints an error message
with install instructions and returns False.
"""
if shutil.which("pytest") is None:
print(
"Error: pytest is not installed or not on PATH.\n"
"Hive's testing commands require pytest at runtime.\n"
"Install it with:\n"
"\n"
" pip install 'framework[testing]'\n"
"\n"
"or if using uv:\n"
"\n"
" uv pip install 'framework[testing]'",
file=sys.stderr,
)
return False
return True
def register_testing_commands(subparsers: argparse._SubParsersAction) -> None:
"""Register testing CLI commands."""
@@ -105,6 +130,9 @@ def register_testing_commands(subparsers: argparse._SubParsersAction) -> None:
def cmd_test_run(args: argparse.Namespace) -> int:
"""Run tests for an agent using pytest subprocess."""
if not _check_pytest_available():
return 1
agent_path = Path(args.agent_path)
tests_dir = agent_path / "tests"
@@ -162,6 +190,7 @@ def cmd_test_run(args: argparse.Namespace) -> int:
try:
result = subprocess.run(
cmd,
encoding="utf-8",
env=env,
timeout=600, # 10 minute timeout
)
@@ -177,7 +206,8 @@ def cmd_test_run(args: argparse.Namespace) -> int:
def cmd_test_debug(args: argparse.Namespace) -> int:
"""Debug a failed test by re-running with verbose output."""
import subprocess
if not _check_pytest_available():
return 1
agent_path = Path(args.agent_path)
test_name = args.test_name
@@ -219,6 +249,7 @@ def cmd_test_debug(args: argparse.Namespace) -> int:
try:
result = subprocess.run(
cmd,
encoding="utf-8",
env=env,
timeout=120, # 2 minute timeout for single test
)
+181 -64
View File
@@ -41,8 +41,9 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any
from framework.credentials.models import CredentialError
from framework.credentials.validation import validate_agent_credentials
from framework.runner.preload_validation import credential_errors_to_json, validate_credentials
from framework.runtime.event_bus import AgentEvent, EventType
from framework.server.app import validate_agent_path
if TYPE_CHECKING:
from framework.runner.tool_registry import ToolRegistry
@@ -184,12 +185,15 @@ def register_queen_lifecycle_tools(
cred_error: CredentialError | None = None
try:
await loop.run_in_executor(
None, lambda: validate_agent_credentials(runtime.graph.nodes)
None,
lambda: validate_credentials(
runtime.graph.nodes,
interactive=False,
skip=False,
),
)
except CredentialError as e:
cred_error = e
except Exception as e:
logger.warning("Credential validation failed: %s", e)
runner = getattr(session, "runner", None)
if runner:
@@ -240,6 +244,11 @@ def register_queen_lifecycle_tools(
}
)
except CredentialError as e:
# Build structured error with per-credential details so the
# queen can report exactly what's missing and how to fix it.
error_payload = credential_errors_to_json(e)
error_payload["agent_path"] = str(getattr(session, "worker_path", "") or "")
# Emit SSE event so the frontend opens the credentials modal
bus = getattr(session, "event_bus", None)
if bus is not None:
@@ -247,14 +256,10 @@ def register_queen_lifecycle_tools(
AgentEvent(
type=EventType.CREDENTIALS_REQUIRED,
stream_id="queen",
data={
"error": "credentials_required",
"message": str(e),
"agent_path": str(getattr(session, "worker_path", "") or ""),
},
data=error_payload,
)
)
return json.dumps({"error": "credentials_required", "message": str(e)})
return json.dumps(error_payload)
except Exception as e:
return json.dumps({"error": f"Failed to start worker: {e}"})
@@ -344,6 +349,9 @@ def register_queen_lifecycle_tools(
"""Get the session's event bus for querying history."""
return getattr(session, "event_bus", None)
_status_last_called: dict[str, float] = {} # {"ts": monotonic time}
_STATUS_COOLDOWN = 30.0 # seconds between full status checks
async def get_worker_status(last_n: int = 20) -> str:
"""Comprehensive worker status: state, execution details, and recent activity.
@@ -359,6 +367,24 @@ def register_queen_lifecycle_tools(
Args:
last_n: Number of recent events to include per category (default 20).
"""
import time as _time
now = _time.monotonic()
last = _status_last_called.get("ts", 0.0)
if now - last < _STATUS_COOLDOWN:
remaining = int(_STATUS_COOLDOWN - (now - last))
return json.dumps(
{
"status": "cooldown",
"message": (
f"Status was checked {int(now - last)}s ago. "
f"Wait {remaining}s before checking again. "
"Do NOT call this tool in a loop — wait for user input instead."
),
}
)
_status_last_called["ts"] = now
runtime = _get_runtime()
if runtime is None:
return json.dumps({"status": "not_loaded", "message": "No worker loaded."})
@@ -414,9 +440,7 @@ def register_queen_lifecycle_tools(
try:
# Pending user question (from ask_user tool)
if result.get("status") == "waiting_for_input":
input_events = bus.get_history(
event_type=EventType.CLIENT_INPUT_REQUESTED, limit=1
)
input_events = bus.get_history(event_type=EventType.CLIENT_INPUT_REQUESTED, limit=1)
if input_events:
prompt = input_events[0].data.get("prompt", "")
if prompt:
@@ -434,22 +458,17 @@ def register_queen_lifecycle_tools(
result["current_iteration"] = iter_events[0].data.get("iteration")
# Running tool calls (started but not yet completed)
tool_started = bus.get_history(
event_type=EventType.TOOL_CALL_STARTED, limit=last_n * 2
)
tool_started = bus.get_history(event_type=EventType.TOOL_CALL_STARTED, limit=last_n * 2)
tool_completed = bus.get_history(
event_type=EventType.TOOL_CALL_COMPLETED, limit=last_n * 2
)
completed_ids = {
evt.data.get("tool_use_id")
for evt in tool_completed
if evt.data.get("tool_use_id")
evt.data.get("tool_use_id") for evt in tool_completed if evt.data.get("tool_use_id")
}
running = [
evt
for evt in tool_started
if evt.data.get("tool_use_id")
and evt.data.get("tool_use_id") not in completed_ids
if evt.data.get("tool_use_id") and evt.data.get("tool_use_id") not in completed_ids
]
if running:
result["running_tools"] = [
@@ -505,19 +524,23 @@ def register_queen_lifecycle_tools(
doom_loops = bus.get_history(event_type=EventType.NODE_TOOL_DOOM_LOOP, limit=5)
issues = []
for evt in stalls:
issues.append({
"type": "stall",
"node": evt.node_id,
"reason": evt.data.get("reason", "")[:200],
"time": evt.timestamp.isoformat(),
})
issues.append(
{
"type": "stall",
"node": evt.node_id,
"reason": evt.data.get("reason", "")[:200],
"time": evt.timestamp.isoformat(),
}
)
for evt in doom_loops:
issues.append({
"type": "tool_doom_loop",
"node": evt.node_id,
"description": evt.data.get("description", "")[:200],
"time": evt.timestamp.isoformat(),
})
issues.append(
{
"type": "tool_doom_loop",
"node": evt.node_id,
"description": evt.data.get("description", "")[:200],
"time": evt.timestamp.isoformat(),
}
)
if issues:
result["issues"] = issues
@@ -542,16 +565,10 @@ def register_queen_lifecycle_tools(
pass
# Token summary
llm_events = bus.get_history(
event_type=EventType.LLM_TURN_COMPLETE, limit=200
)
llm_events = bus.get_history(event_type=EventType.LLM_TURN_COMPLETE, limit=200)
if llm_events:
total_in = sum(
evt.data.get("input_tokens", 0) or 0 for evt in llm_events
)
total_out = sum(
evt.data.get("output_tokens", 0) or 0 for evt in llm_events
)
total_in = sum(evt.data.get("input_tokens", 0) or 0 for evt in llm_events)
total_out = sum(evt.data.get("output_tokens", 0) or 0 for evt in llm_events)
result["token_summary"] = {
"llm_turns": len(llm_events),
"input_tokens": total_in,
@@ -560,27 +577,27 @@ def register_queen_lifecycle_tools(
}
# Execution completions/failures
exec_completed = bus.get_history(
event_type=EventType.EXECUTION_COMPLETED, limit=5
)
exec_failed = bus.get_history(
event_type=EventType.EXECUTION_FAILED, limit=5
)
exec_completed = bus.get_history(event_type=EventType.EXECUTION_COMPLETED, limit=5)
exec_failed = bus.get_history(event_type=EventType.EXECUTION_FAILED, limit=5)
if exec_completed or exec_failed:
result["execution_outcomes"] = []
for evt in exec_completed:
result["execution_outcomes"].append({
"outcome": "completed",
"execution_id": evt.execution_id,
"time": evt.timestamp.isoformat(),
})
result["execution_outcomes"].append(
{
"outcome": "completed",
"execution_id": evt.execution_id,
"time": evt.timestamp.isoformat(),
}
)
for evt in exec_failed:
result["execution_outcomes"].append({
"outcome": "failed",
"execution_id": evt.execution_id,
"error": evt.data.get("error", "")[:200],
"time": evt.timestamp.isoformat(),
})
result["execution_outcomes"].append(
{
"outcome": "failed",
"execution_id": evt.execution_id,
"error": evt.data.get("error", "")[:200],
"time": evt.timestamp.isoformat(),
}
)
except Exception:
pass # Non-critical enrichment
@@ -606,9 +623,7 @@ def register_queen_lifecycle_tools(
"required": [],
},
)
registry.register(
"get_worker_status", _status_tool, lambda inputs: get_worker_status(**inputs)
)
registry.register("get_worker_status", _status_tool, lambda inputs: get_worker_status(**inputs))
tools_registered += 1
# --- inject_worker_message ------------------------------------------------
@@ -672,6 +687,105 @@ def register_queen_lifecycle_tools(
)
tools_registered += 1
# --- list_credentials -----------------------------------------------------
async def list_credentials(credential_id: str = "") -> str:
"""List all authorized credentials (Aden OAuth + local encrypted store).
Returns credential IDs, aliases, status, and identity metadata.
Never returns secret values. Optionally filter by credential_id.
"""
try:
# Primary: CredentialStoreAdapter sees both Aden OAuth and local accounts
from aden_tools.credentials import CredentialStoreAdapter
store = CredentialStoreAdapter.default()
all_accounts = store.get_all_account_info()
# Filter by credential_id / provider if requested
if credential_id:
all_accounts = [
a
for a in all_accounts
if a.get("credential_id", "").startswith(credential_id)
or a.get("provider", "") == credential_id
]
return json.dumps(
{
"count": len(all_accounts),
"credentials": all_accounts,
},
default=str,
)
except ImportError:
pass
except Exception as e:
return json.dumps({"error": f"Failed to list credentials: {e}"})
# Fallback: local encrypted store only
try:
from framework.credentials.local.registry import LocalCredentialRegistry
registry = LocalCredentialRegistry.default()
accounts = registry.list_accounts(
credential_id=credential_id or None,
)
credentials = []
for info in accounts:
entry: dict[str, Any] = {
"credential_id": info.credential_id,
"alias": info.alias,
"storage_id": info.storage_id,
"status": info.status,
"created_at": info.created_at.isoformat() if info.created_at else None,
"last_validated": (
info.last_validated.isoformat() if info.last_validated else None
),
}
identity = info.identity.to_dict()
if identity:
entry["identity"] = identity
credentials.append(entry)
return json.dumps(
{
"count": len(credentials),
"credentials": credentials,
"location": "~/.hive/credentials",
},
default=str,
)
except Exception as e:
return json.dumps({"error": f"Failed to list credentials: {e}"})
_list_creds_tool = Tool(
name="list_credentials",
description=(
"List all authorized credentials in the local store. Returns credential IDs, "
"aliases, status (active/failed/unknown), and identity metadata — never secret "
"values. Optionally filter by credential_id (e.g. 'brave_search')."
),
parameters={
"type": "object",
"properties": {
"credential_id": {
"type": "string",
"description": (
"Filter to a specific credential type (e.g. 'brave_search'). "
"Omit to list all credentials."
),
},
},
"required": [],
},
)
registry.register(
"list_credentials", _list_creds_tool, lambda inputs: list_credentials(**inputs)
)
tools_registered += 1
# --- load_built_agent (server context only) --------------------------------
if session_manager is not None and manager_session_id is not None:
@@ -691,9 +805,12 @@ def register_queen_lifecycle_tools(
logger.error("Failed to unload existing worker: %s", e, exc_info=True)
return json.dumps({"error": f"Failed to unload existing worker: {e}"})
resolved_path = Path(agent_path).resolve()
try:
resolved_path = validate_agent_path(agent_path)
except ValueError as e:
return json.dumps({"error": str(e)})
if not resolved_path.exists():
return json.dumps({"error": f"Agent path does not exist: {resolved_path}"})
return json.dumps({"error": f"Agent path does not exist: {agent_path}"})
try:
updated_session = await session_manager.load_worker(
+6 -3
View File
@@ -18,7 +18,6 @@ from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
@@ -48,10 +47,14 @@ def register_graph_tools(registry: ToolRegistry, runtime: AgentRuntime) -> int:
"""
from framework.runner.runner import AgentRunner
from framework.runtime.execution_stream import EntryPointSpec
from framework.server.app import validate_agent_path
path = Path(agent_path).resolve()
try:
path = validate_agent_path(agent_path)
except ValueError as e:
return json.dumps({"error": str(e)})
if not path.exists():
return json.dumps({"error": f"Agent path does not exist: {path}"})
return json.dumps({"error": f"Agent path does not exist: {agent_path}"})
try:
runner = AgentRunner.load(path)
+1 -1
View File
@@ -256,7 +256,7 @@ class AdenTUI(App):
"""Override to use native `open` for file:// URLs on macOS."""
if url.startswith("file://") and platform.system() == "Darwin":
path = url.removeprefix("file://")
subprocess.Popen(["open", path])
subprocess.Popen(["open", path], encoding="utf-8")
else:
super().open_url(url, new_tab=new_tab)
+7 -11
View File
@@ -488,7 +488,7 @@ class ChatRepl(Vertical):
if not state_file.exists():
continue
with open(state_file) as f:
with open(state_file, encoding="utf-8") as f:
state = json.load(f)
status = state.get("status", "").lower()
@@ -547,7 +547,7 @@ class ChatRepl(Vertical):
# Read session state
try:
with open(state_file) as f:
with open(state_file, encoding="utf-8") as f:
state = json.load(f)
# Track this session for /resume <number> lookup
@@ -599,7 +599,7 @@ class ChatRepl(Vertical):
try:
import json
with open(state_file) as f:
with open(state_file, encoding="utf-8") as f:
state = json.load(f)
# Basic info
@@ -640,7 +640,7 @@ class ChatRepl(Vertical):
# Load and show checkpoints
for i, cp_file in enumerate(checkpoint_files[-5:], 1): # Last 5
try:
with open(cp_file) as f:
with open(cp_file, encoding="utf-8") as f:
cp_data = json.load(f)
cp_id = cp_data.get("checkpoint_id", cp_file.stem)
@@ -687,7 +687,7 @@ class ChatRepl(Vertical):
import json
with open(state_file) as f:
with open(state_file, encoding="utf-8") as f:
state = json.load(f)
# Resume from session state (not checkpoint)
@@ -1112,7 +1112,7 @@ class ChatRepl(Vertical):
continue
try:
with open(state_file) as f:
with open(state_file, encoding="utf-8") as f:
state = json.load(f)
status = state.get("status", "").lower()
@@ -1460,10 +1460,6 @@ class ChatRepl(Vertical):
indicator.update("Preparing question...")
return
if tool_name == "escalate_to_coder":
indicator.update("Escalating to coder...")
return
# Update indicator to show tool activity
indicator.update(f"Using tool: {tool_name}...")
@@ -1475,7 +1471,7 @@ class ChatRepl(Vertical):
def handle_tool_completed(self, tool_name: str, result: str, is_error: bool) -> None:
"""Handle a tool call completing."""
if tool_name in ("ask_user", "escalate_to_coder"):
if tool_name == "ask_user":
return
result_str = str(result)
@@ -38,6 +38,7 @@ def _linux_file_dialog() -> subprocess.CompletedProcess | None:
"--title=Select a PDF file",
"--file-filter=PDF files (*.pdf)|*.pdf",
],
encoding="utf-8",
capture_output=True,
text=True,
timeout=300,
@@ -54,6 +55,7 @@ def _linux_file_dialog() -> subprocess.CompletedProcess | None:
".",
"PDF files (*.pdf)",
],
encoding="utf-8",
capture_output=True,
text=True,
timeout=300,
@@ -79,6 +81,7 @@ def _pick_pdf_subprocess() -> Path | None:
'POSIX path of (choose file of type {"com.adobe.pdf"} '
'with prompt "Select a PDF file")',
],
encoding="utf-8",
capture_output=True,
text=True,
timeout=300,
@@ -93,6 +96,7 @@ def _pick_pdf_subprocess() -> Path | None:
)
result = subprocess.run(
["powershell", "-NoProfile", "-Command", ps_script],
encoding="utf-8",
capture_output=True,
text=True,
timeout=300,
@@ -199,10 +199,11 @@ def _copy_to_clipboard(text: str) -> None:
"""Copy text to system clipboard using platform-native tools."""
try:
if sys.platform == "darwin":
subprocess.run(["pbcopy"], input=text.encode(), check=True, timeout=5)
subprocess.run(["pbcopy"], encoding="utf-8", input=text.encode(), check=True, timeout=5)
elif sys.platform == "win32":
subprocess.run(
["clip.exe"],
encoding="utf-8",
input=text.encode("utf-16le"),
check=True,
timeout=5,
@@ -211,6 +212,7 @@ def _copy_to_clipboard(text: str) -> None:
try:
subprocess.run(
["xclip", "-selection", "clipboard"],
encoding="utf-8",
input=text.encode(),
check=True,
timeout=5,
@@ -218,6 +220,7 @@ def _copy_to_clipboard(text: str) -> None:
except (subprocess.SubprocessError, FileNotFoundError):
subprocess.run(
["xsel", "--clipboard", "--input"],
encoding="utf-8",
input=text.encode(),
check=True,
timeout=5,
+5 -1
View File
@@ -27,6 +27,8 @@ export interface EntryPoint {
entry_node: string;
trigger_type: string;
trigger_config?: Record<string, unknown>;
/** Seconds until the next timer fire (only present for timer entry points). */
next_fire_in?: number;
}
export interface DiscoverEntry {
@@ -153,6 +155,7 @@ export interface NodeSpec {
client_facing: boolean;
success_criteria: string | null;
system_prompt: string;
sub_agents?: string[];
// Runtime enrichment (when session_id provided)
visit_count?: number;
has_failures?: boolean;
@@ -267,7 +270,8 @@ export type EventTypeName =
| "custom"
| "escalation_requested"
| "worker_loaded"
| "credentials_required";
| "credentials_required"
| "subagent_report";
export interface AgentEvent {
type: EventTypeName;
+43 -4
View File
@@ -30,6 +30,7 @@ interface AgentGraphProps {
onPause?: () => void;
version?: string;
runState?: RunState;
building?: boolean;
}
// --- Extracted RunButton so hover state survives parent re-renders ---
@@ -144,7 +145,7 @@ function truncateLabel(label: string, availablePx: number, fontSize: number): st
return label.slice(0, Math.max(maxChars - 1, 1)) + "\u2026";
}
export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, onPause, version, runState: externalRunState }: AgentGraphProps) {
export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, onPause, version, runState: externalRunState, building }: AgentGraphProps) {
const [localRunState, setLocalRunState] = useState<RunState>("idle");
const runState = externalRunState ?? localRunState;
const runBtnRef = useRef<HTMLButtonElement>(null);
@@ -279,7 +280,14 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
<RunButton runState={runState} disabled={nodes.length === 0} onRun={handleRun} onPause={onPause ?? (() => {})} btnRef={runBtnRef} />
</div>
<div className="flex-1 flex items-center justify-center px-5">
<p className="text-xs text-muted-foreground/60 text-center italic">No pipeline configured yet.<br/>Chat with the Queen to get started.</p>
{building ? (
<div className="flex flex-col items-center gap-3">
<Loader2 className="w-6 h-6 animate-spin text-primary/60" />
<p className="text-xs text-muted-foreground/80 text-center">Building agent...</p>
</div>
) : (
<p className="text-xs text-muted-foreground/60 text-center italic">No pipeline configured yet.<br/>Chat with the Queen to get started.</p>
)}
</div>
</div>
);
@@ -407,6 +415,18 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
const triggerFontSize = nodeW < 140 ? 10.5 : 11.5;
const triggerAvailW = nodeW - 38;
const triggerDisplayLabel = truncateLabel(node.label, triggerAvailW, triggerFontSize);
const nextFireIn = node.triggerConfig?.next_fire_in as number | undefined;
// Format countdown for display below node
let countdownLabel: string | null = null;
if (nextFireIn != null && nextFireIn > 0) {
const h = Math.floor(nextFireIn / 3600);
const m = Math.floor((nextFireIn % 3600) / 60);
const s = Math.floor(nextFireIn % 60);
countdownLabel = h > 0
? `next in ${h}h ${String(m).padStart(2, "0")}m`
: `next in ${m}m ${String(s).padStart(2, "0")}s`;
}
return (
<g key={node.id} onClick={() => onNodeClick?.(node)} style={{ cursor: onNodeClick ? "pointer" : "default" }}>
@@ -442,6 +462,17 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
>
{triggerDisplayLabel}
</text>
{/* Countdown label below node */}
{countdownLabel && (
<text
x={pos.x + nodeW / 2} y={pos.y + NODE_H + 13}
fill="hsl(210,30%,50%)" fontSize={9.5}
textAnchor="middle" fontStyle="italic" opacity={0.7}
>
{countdownLabel}
</text>
)}
</g>
);
};
@@ -568,18 +599,26 @@ export default function AgentGraph({ nodes, title: _title, onNodeClick, onRun, o
</div>
{/* Graph */}
<div className="flex-1 overflow-y-auto overflow-x-hidden px-3 pb-5">
<div className="flex-1 overflow-y-auto overflow-x-hidden px-3 pb-5 relative">
<svg
width={svgWidth}
height={svgHeight}
viewBox={`0 0 ${svgWidth} ${svgHeight}`}
className="select-none"
className={`select-none${building ? " opacity-30" : ""}`}
style={{ fontFamily: "'Inter', system-ui, sans-serif" }}
>
{forwardEdges.map((e, i) => renderForwardEdge(e, i))}
{backEdges.map((e, i) => renderBackEdge(e, i))}
{nodes.map((n, i) => renderNode(n, i))}
</svg>
{building && (
<div className="absolute inset-0 flex items-center justify-center">
<div className="flex flex-col items-center gap-3">
<Loader2 className="w-6 h-6 animate-spin text-primary/60" />
<p className="text-xs text-muted-foreground/80">Rebuilding agent...</p>
</div>
</div>
)}
</div>
</div>
);
@@ -20,9 +20,19 @@ interface ToolCredential {
value?: string;
}
export interface SubagentReport {
subagent_id: string;
message: string;
data?: Record<string, unknown>;
timestamp: string;
status?: "running" | "complete" | "error";
}
interface NodeDetailPanelProps {
node: GraphNode | null;
nodeSpec?: NodeSpec | null;
allNodeSpecs?: NodeSpec[];
subagentReports?: SubagentReport[];
sessionId?: string;
graphId?: string;
workerSessionId?: string | null;
@@ -195,10 +205,96 @@ function SystemPromptTab({ systemPrompt }: { systemPrompt?: string }) {
);
}
function SubagentsTab() {
function SubagentStatusBadge({ status }: { status?: "running" | "complete" | "error" }) {
if (!status) return null;
if (status === "running") {
return (
<span className="ml-auto flex items-center gap-1 text-[10px] font-medium flex-shrink-0" style={{ color: "hsl(45,95%,58%)" }}>
<span className="relative flex h-1.5 w-1.5">
<span className="animate-ping absolute inline-flex h-full w-full rounded-full opacity-75" style={{ backgroundColor: "hsl(45,95%,58%)" }} />
<span className="relative inline-flex rounded-full h-1.5 w-1.5" style={{ backgroundColor: "hsl(45,95%,58%)" }} />
</span>
Running
</span>
);
}
if (status === "complete") {
return (
<span className="ml-auto flex items-center gap-1 text-[10px] font-medium flex-shrink-0" style={{ color: "hsl(43,70%,45%)" }}>
<CheckCircle2 className="w-3 h-3" />
Complete
</span>
);
}
return (
<div className="flex-1 flex items-center justify-center">
<p className="text-xs text-muted-foreground/60 italic text-center">No subagents assigned to this node.</p>
<span className="ml-auto flex items-center gap-1 text-[10px] font-medium flex-shrink-0" style={{ color: "hsl(0,65%,55%)" }}>
<AlertCircle className="w-3 h-3" />
Failed
</span>
);
}
function SubagentsTab({ subAgentIds, allNodeSpecs, subagentReports }: { subAgentIds: string[]; allNodeSpecs: NodeSpec[]; subagentReports: SubagentReport[] }) {
if (subAgentIds.length === 0) {
return (
<div className="flex-1 flex items-center justify-center">
<p className="text-xs text-muted-foreground/60 italic text-center">No subagents assigned to this node.</p>
</div>
);
}
return (
<div className="space-y-3">
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mb-1">Sub-agents ({subAgentIds.length})</p>
{subAgentIds.map(saId => {
const spec = allNodeSpecs.find(n => n.id === saId);
const reports = subagentReports.filter(r => r.subagent_id === saId);
// Derive status from latest report that has a status field
const latestStatus = [...reports].reverse().find(r => r.status)?.status;
// Progress messages are reports without a status field (from report_to_parent)
const progressReports = reports.filter(r => !r.status);
return (
<div key={saId} className="rounded-xl border border-border/20 overflow-hidden">
<div className="p-3 bg-muted/30">
<div className="flex items-center gap-2 mb-1">
<Bot className="w-3.5 h-3.5 text-primary/70 flex-shrink-0" />
<span className="text-xs font-medium text-foreground truncate">{spec?.name || saId}</span>
<SubagentStatusBadge status={latestStatus} />
</div>
{spec?.description && (
<p className="text-[11px] text-muted-foreground leading-relaxed mt-1">{spec.description}</p>
)}
</div>
{/* Static info: tools + output keys */}
<div className="px-3 py-2 border-t border-border/15 bg-muted/15">
{spec?.tools && spec.tools.length > 0 && (
<div className="mb-1.5">
<span className="text-[10px] text-muted-foreground font-medium">Tools: </span>
<span className="text-[10px] text-foreground/70">{spec.tools.join(", ")}</span>
</div>
)}
{spec?.output_keys && spec.output_keys.length > 0 && (
<div>
<span className="text-[10px] text-muted-foreground font-medium">Outputs: </span>
<span className="text-[10px] text-foreground/70 font-mono">{spec.output_keys.join(", ")}</span>
</div>
)}
</div>
{/* Live progress reports (from report_to_parent) */}
{progressReports.length > 0 && (
<div className="px-3 py-2 border-t border-border/15 bg-background/60">
<p className="text-[10px] text-muted-foreground font-medium mb-1">Reports ({progressReports.length})</p>
{progressReports.map((r, i) => (
<div key={i} className="text-[10.5px] text-foreground/70 leading-relaxed py-0.5">{r.message}</div>
))}
</div>
)}
</div>
);
})}
</div>
);
}
@@ -213,7 +309,7 @@ const tabs: { id: Tab; label: string; Icon: React.FC<{ className?: string }> }[]
{ id: "subagents", label: "Subagents", Icon: ({ className }) => <Bot className={className} /> },
];
export default function NodeDetailPanel({ node, nodeSpec, sessionId, graphId, workerSessionId, nodeLogs, actionPlan, onClose }: NodeDetailPanelProps) {
export default function NodeDetailPanel({ node, nodeSpec, allNodeSpecs, subagentReports, sessionId, graphId, workerSessionId, nodeLogs, actionPlan, onClose }: NodeDetailPanelProps) {
const [activeTab, setActiveTab] = useState<Tab>("overview");
const [realTools, setRealTools] = useState<ToolInfo[] | null>(null);
const [realCriteria, setRealCriteria] = useState<NodeCriteria | null>(null);
@@ -295,7 +391,7 @@ export default function NodeDetailPanel({ node, nodeSpec, sessionId, graphId, wo
{/* Tab bar */}
<div className="flex border-b border-border/30 flex-shrink-0 px-2 pt-1 overflow-x-auto scrollbar-hide">
{tabs.map(tab => (
{tabs.filter(t => t.id !== "subagents" || (nodeSpec?.sub_agents && nodeSpec.sub_agents.length > 0)).map(tab => (
<button
key={tab.id}
onClick={() => setActiveTab(tab.id)}
@@ -397,8 +493,12 @@ export default function NodeDetailPanel({ node, nodeSpec, sessionId, graphId, wo
<SystemPromptTab systemPrompt={nodeSpec?.system_prompt} />
)}
{activeTab === "subagents" && (
<SubagentsTab />
{activeTab === "subagents" && nodeSpec?.sub_agents && (
<SubagentsTab
subAgentIds={nodeSpec.sub_agents}
allNodeSpecs={allNodeSpecs || []}
subagentReports={subagentReports || []}
/>
)}
</div>
</div>
+25 -3
View File
@@ -12,8 +12,27 @@ import type { GraphNode, NodeStatus } from "@/components/AgentGraph";
* 4. Map session enrichment fields to NodeStatus
*/
export function topologyToGraphNodes(topology: GraphTopology): GraphNode[] {
const { nodes, edges, entry_node, entry_points } = topology;
if (nodes.length === 0) return [];
const { nodes: allNodes, edges, entry_node, entry_points } = topology;
if (allNodes.length === 0) return [];
// Filter out subagent-only nodes (referenced in sub_agents but not in any edge)
const subagentIds = new Set<string>();
for (const n of allNodes) {
for (const sa of n.sub_agents ?? []) {
subagentIds.add(sa);
}
}
const edgeParticipants = new Set<string>();
for (const e of edges) {
edgeParticipants.add(e.source);
edgeParticipants.add(e.target);
}
const nodes = allNodes.filter(
(n) =>
!subagentIds.has(n.id) ||
edgeParticipants.has(n.id) ||
n.id === entry_node,
);
// --- Synthesize trigger nodes for non-manual entry points ---
const schedulerEntryPoints = (entry_points || []).filter(
@@ -29,7 +48,10 @@ export function topologyToGraphNodes(topology: GraphTopology): GraphNode[] {
status: "pending",
nodeType: "trigger",
triggerType: ep.trigger_type,
triggerConfig: ep.trigger_config,
triggerConfig: {
...ep.trigger_config,
...(ep.next_fire_in != null ? { next_fire_in: ep.next_fire_in } : {}),
},
next: [ep.entry_node],
});
}
+191 -6
View File
@@ -20,6 +20,37 @@ import { ApiError } from "@/api/client";
const makeId = () => Math.random().toString(36).slice(2, 9);
/** Format seconds into a compact countdown string. */
function formatCountdown(totalSecs: number): string {
const h = Math.floor(totalSecs / 3600);
const m = Math.floor((totalSecs % 3600) / 60);
const s = Math.floor(totalSecs % 60);
if (h > 0) return `${h}h ${String(m).padStart(2, "0")}m ${String(s).padStart(2, "0")}s`;
return `${m}m ${String(s).padStart(2, "0")}s`;
}
/** Live countdown from an initial seconds value, ticking every second. */
function TimerCountdown({ initialSeconds }: { initialSeconds: number }) {
const [remaining, setRemaining] = useState(Math.max(0, Math.round(initialSeconds)));
const startRef = useRef({ wallTime: Date.now(), initial: Math.max(0, Math.round(initialSeconds)) });
useEffect(() => {
startRef.current = { wallTime: Date.now(), initial: Math.max(0, Math.round(initialSeconds)) };
setRemaining(Math.max(0, Math.round(initialSeconds)));
}, [initialSeconds]);
useEffect(() => {
const id = setInterval(() => {
const elapsed = (Date.now() - startRef.current.wallTime) / 1000;
setRemaining(Math.max(0, Math.round(startRef.current.initial - elapsed)));
}, 1000);
return () => clearInterval(id);
}, []);
if (remaining <= 0) return <span className="text-amber-400/80">firing...</span>;
return <span>{formatCountdown(remaining)}</span>;
}
// --- Session types ---
interface Session {
id: string;
@@ -208,10 +239,12 @@ interface AgentBackendState {
awaitingInput: boolean;
/** The message ID of the current worker input request (for inline reply box) */
workerInputMessageId: string | null;
queenBuilding: boolean;
workerRunState: "idle" | "deploying" | "running";
currentExecutionId: string | null;
nodeLogs: Record<string, string[]>;
nodeActionPlans: Record<string, string>;
subagentReports: { subagent_id: string; message: string; data?: Record<string, unknown>; timestamp: string }[];
isTyping: boolean;
isStreaming: boolean;
llmSnapshots: Record<string, string>;
@@ -230,10 +263,12 @@ function defaultAgentState(): AgentBackendState {
nodeSpecs: [],
awaitingInput: false,
workerInputMessageId: null,
queenBuilding: false,
workerRunState: "idle",
currentExecutionId: null,
nodeLogs: {},
nodeActionPlans: {},
subagentReports: [],
isTyping: false,
isStreaming: false,
llmSnapshots: {},
@@ -669,12 +704,14 @@ export default function Workspace() {
}, [sessionsByAgent, agentStates, loadAgentForType, updateAgentState]);
// --- Fetch graph topology when a session becomes ready ---
const fetchGraphForAgent = useCallback(async (agentType: string, sessionId: string) => {
const fetchGraphForAgent = useCallback(async (agentType: string, sessionId: string, knownGraphId?: string) => {
try {
const { graphs } = await sessionsApi.graphs(sessionId);
if (!graphs.length) return;
const graphId = graphs[0];
let graphId = knownGraphId;
if (!graphId) {
const { graphs } = await sessionsApi.graphs(sessionId);
if (!graphs.length) return;
graphId = graphs[0];
}
const topology = await graphsApi.nodes(sessionId, graphId);
updateAgentState(agentType, { graphId, nodeSpecs: topology.nodes });
@@ -711,6 +748,51 @@ export default function Workspace() {
}
}, [agentStates, fetchGraphForAgent]);
// Poll entry points every second for agents with timers to keep
// next_fire_in countdowns fresh without re-fetching the full topology.
useEffect(() => {
const id = setInterval(async () => {
for (const [agentType, sessions] of Object.entries(sessionsByAgent)) {
const session = sessions[0];
if (!session) continue;
const timerNodes = session.graphNodes.filter(
(n) => n.nodeType === "trigger" && n.triggerType === "timer",
);
if (timerNodes.length === 0) continue;
const state = agentStates[agentType];
if (!state?.sessionId) continue;
try {
const { entry_points } = await sessionsApi.entryPoints(state.sessionId);
const fireMap = new Map<string, number>();
for (const ep of entry_points) {
if (ep.next_fire_in != null) {
fireMap.set(`__trigger_${ep.id}`, ep.next_fire_in);
}
}
if (fireMap.size === 0) continue;
setSessionsByAgent((prev) => {
const ss = prev[agentType];
if (!ss?.length) return prev;
const updated = ss[0].graphNodes.map((n) => {
const nfi = fireMap.get(n.id);
if (nfi == null || n.nodeType !== "trigger") return n;
return { ...n, triggerConfig: { ...n.triggerConfig, next_fire_in: nfi } };
});
// Skip update if nothing changed
if (updated.every((n, idx) => n === ss[0].graphNodes[idx])) return prev;
return {
...prev,
[agentType]: ss.map((s, i) => (i === 0 ? { ...s, graphNodes: updated } : s)),
};
});
} catch {
// Entry points fetch failed — skip this tick
}
}
}, 1_000);
return () => clearInterval(id);
}, [sessionsByAgent, agentStates]);
// --- Graph node status helpers (now accept agentType) ---
const updateGraphNodeStatus = useCallback(
(agentType: string, nodeId: string, status: NodeStatus, extra?: Partial<GraphNode>) => {
@@ -910,6 +992,7 @@ export default function Workspace() {
workerRunState: "running",
currentExecutionId: event.execution_id || agentStates[agentType]?.currentExecutionId || null,
nodeLogs: {},
subagentReports: [],
llmSnapshots: {},
activeToolCalls: {},
});
@@ -938,6 +1021,11 @@ export default function Workspace() {
llmSnapshots: {},
});
markAllNodesAs(agentType, ["running", "looping"], "complete");
// Re-fetch graph topology so timer countdowns refresh
const sid = agentStates[agentType]?.sessionId;
const gid = agentStates[agentType]?.graphId;
if (sid) fetchGraphForAgent(agentType, sid, gid || undefined);
}
break;
@@ -978,7 +1066,7 @@ export default function Workspace() {
if (event.type === "client_input_requested") {
console.log('[CLIENT_INPUT_REQ] stream_id:', streamId, 'isQueen:', isQueen, 'node_id:', event.node_id, 'prompt:', (event.data?.prompt as string)?.slice(0, 80), 'agentType:', agentType);
if (isQueen) {
updateAgentState(agentType, { awaitingInput: true, isTyping: false, isStreaming: false });
updateAgentState(agentType, { awaitingInput: true, isTyping: false, isStreaming: false, queenBuilding: false });
} else {
// Worker input request.
// If the prompt is non-empty (explicit ask_user), create a visible
@@ -1096,6 +1184,15 @@ export default function Workspace() {
case "tool_call_started": {
console.log('[TOOL_PILL] tool_call_started received:', { isQueen, nodeId: event.node_id, streamId: event.stream_id, agentType, executionId: event.execution_id, toolName: event.data?.tool_name });
// Detect queen building: when the queen starts writing/editing files, she's building an agent
if (isQueen) {
const tn = (event.data?.tool_name as string) || "";
if (tn === "write_file" || tn === "edit_file") {
updateAgentState(agentType, { queenBuilding: true });
}
}
if (event.node_id) {
if (!isQueen) {
const pendingText = agentStates[agentType]?.llmSnapshots[event.node_id];
@@ -1109,6 +1206,28 @@ export default function Workspace() {
});
}
appendNodeLog(agentType, event.node_id, `${ts} INFO Calling ${(event.data?.tool_name as string) || "unknown"}(${event.data?.tool_input ? truncate(JSON.stringify(event.data.tool_input), 200) : ""})`);
// Track subagent delegation start
if ((event.data?.tool_name as string) === "delegate_to_sub_agent") {
const saInput = event.data?.tool_input as Record<string, unknown> | undefined;
const saId = (saInput?.agent_id as string) || "";
if (saId) {
setAgentStates(prev => {
const state = prev[agentType];
if (!state) return prev;
return {
...prev,
[agentType]: {
...state,
subagentReports: [
...state.subagentReports,
{ subagent_id: saId, message: "Delegating...", timestamp: event.timestamp, status: "running" as const },
],
},
};
});
}
}
}
const toolName = (event.data?.tool_name as string) || "unknown";
@@ -1158,6 +1277,31 @@ export default function Workspace() {
appendNodeLog(agentType, event.node_id, `${ts} INFO ${toolName} done${resultStr}`);
}
// Track subagent delegation completion
if (toolName === "delegate_to_sub_agent" && result) {
try {
const parsed = JSON.parse(result);
const saId = (parsed?.metadata?.agent_id as string) || "";
const success = parsed?.metadata?.success as boolean;
if (saId) {
setAgentStates(prev => {
const state = prev[agentType];
if (!state) return prev;
return {
...prev,
[agentType]: {
...state,
subagentReports: [
...state.subagentReports,
{ subagent_id: saId, message: success ? "Completed" : "Failed", timestamp: event.timestamp, status: success ? "complete" as const : "error" as const },
],
},
};
});
}
} catch { /* ignore parse errors */ }
}
// Mark tool as done and update activity row
const sid = event.stream_id;
setAgentStates(prev => {
@@ -1198,6 +1342,32 @@ export default function Workspace() {
}
break;
case "subagent_report": {
if (!isQueen && event.node_id) {
const subagentId = (event.data?.subagent_id as string) || "";
const message = (event.data?.message as string) || "";
const data = event.data?.data as Record<string, unknown> | undefined;
// Extract parent node ID from "parentNodeId:subagent:agentId" format
const parentNodeId = event.node_id.split(":subagent:")[0] || event.node_id;
appendNodeLog(agentType, parentNodeId, `${ts} INFO [Subagent:${subagentId}] ${truncate(message, 200)}`);
setAgentStates(prev => {
const state = prev[agentType];
if (!state) return prev;
return {
...prev,
[agentType]: {
...state,
subagentReports: [
...state.subagentReports,
{ subagent_id: subagentId, message, data, timestamp: event.timestamp },
],
},
};
});
}
break;
}
case "node_stalled":
if (!isQueen && event.node_id) {
const reason = (event.data?.reason as string) || "unknown";
@@ -1270,6 +1440,7 @@ export default function Workspace() {
// Update agent state: new display name, reset graph so topology refetch triggers
updateAgentState(agentType, {
displayName,
queenBuilding: false,
workerRunState: "idle",
graphId: null,
nodeSpecs: [],
@@ -1596,6 +1767,7 @@ export default function Workspace() {
onRun={handleRun}
onPause={handlePause}
runState={activeAgentState?.workerRunState ?? "idle"}
building={activeAgentState?.queenBuilding ?? false}
/>
</div>
</div>
@@ -1712,6 +1884,17 @@ export default function Workspace() {
</div>
) : null;
})()}
{(() => {
const nfi = (selectedNode.triggerConfig as Record<string, unknown> | undefined)?.next_fire_in as number | undefined;
return nfi != null ? (
<div>
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mb-1.5">Next run</p>
<p className="text-xs text-foreground/80 font-mono bg-muted/30 rounded-lg px-3 py-2 border border-border/20">
<TimerCountdown initialSeconds={nfi} />
</p>
</div>
) : null;
})()}
<div>
<p className="text-[10px] font-medium text-muted-foreground uppercase tracking-wider mb-1.5">Fires into</p>
<p className="text-xs text-foreground/80 font-mono bg-muted/30 rounded-lg px-3 py-2 border border-border/20">
@@ -1724,6 +1907,8 @@ export default function Workspace() {
<NodeDetailPanel
node={selectedNode}
nodeSpec={activeAgentState?.nodeSpecs.find(n => n.id === selectedNode.id) ?? null}
allNodeSpecs={activeAgentState?.nodeSpecs}
subagentReports={activeAgentState?.subagentReports}
sessionId={activeAgentState?.sessionId || undefined}
graphId={activeAgentState?.graphId || undefined}
workerSessionId={null}
+12 -4
View File
@@ -12,9 +12,6 @@ dependencies = [
"mcp>=1.0.0",
"fastmcp>=2.0.0",
"textual>=1.0.0",
"pytest>=8.0",
"pytest-asyncio>=0.23",
"pytest-xdist>=3.0",
"tools",
]
@@ -22,6 +19,11 @@ dependencies = [
tui = ["textual>=0.75.0"]
webhook = ["aiohttp>=3.9.0"]
server = ["aiohttp>=3.9.0"]
testing = [
"pytest>=8.0",
"pytest-asyncio>=0.23",
"pytest-xdist>=3.0",
]
[project.scripts]
hive = "framework.cli:main"
@@ -63,4 +65,10 @@ lint.isort.section-order = [
]
[dependency-groups]
dev = ["ty>=0.0.13", "ruff>=0.14.14"]
dev = [
"ty>=0.0.13",
"ruff>=0.14.14",
"pytest>=8.0",
"pytest-asyncio>=0.23",
"pytest-xdist>=3.0",
]
+10 -3
View File
@@ -53,7 +53,13 @@ def log_error(message: str):
def run_command(cmd: list, error_msg: str) -> bool:
"""Run a command and return success status."""
try:
subprocess.run(cmd, check=True, capture_output=True, text=True)
subprocess.run(
cmd,
check=True,
capture_output=True,
text=True,
encoding="utf-8",
)
return True
except subprocess.CalledProcessError as e:
log_error(error_msg)
@@ -97,7 +103,7 @@ def main():
if mcp_config_path.exists():
log_success("MCP configuration found at .mcp.json")
logger.info("Configuration:")
with open(mcp_config_path) as f:
with open(mcp_config_path, encoding="utf-8") as f:
config = json.load(f)
logger.info(json.dumps(config, indent=2))
else:
@@ -114,7 +120,7 @@ def main():
}
}
with open(mcp_config_path, "w") as f:
with open(mcp_config_path, "w", encoding="utf-8") as f:
json.dump(config, f, indent=2)
log_success("Created .mcp.json")
@@ -129,6 +135,7 @@ def main():
check=True,
capture_output=True,
text=True,
encoding="utf-8",
)
log_success("MCP server module verified")
except subprocess.CalledProcessError as e:
+5
View File
@@ -68,6 +68,7 @@ class TestFrameworkModule:
[sys.executable, "-m", "framework", "--help"],
capture_output=True,
text=True,
encoding="utf-8",
cwd=str(project_root / "core"),
)
assert result.returncode == 0
@@ -79,6 +80,7 @@ class TestFrameworkModule:
[sys.executable, "-m", "framework", "list", "--help"],
capture_output=True,
text=True,
encoding="utf-8",
cwd=str(project_root / "core"),
)
assert result.returncode == 0
@@ -104,6 +106,7 @@ class TestHiveEntryPoint:
["hive", "--help"],
capture_output=True,
text=True,
encoding="utf-8",
)
assert result.returncode == 0
assert "run" in result.stdout.lower()
@@ -115,6 +118,7 @@ class TestHiveEntryPoint:
["hive", "list", "--help"],
capture_output=True,
text=True,
encoding="utf-8",
)
assert result.returncode == 0
@@ -124,5 +128,6 @@ class TestHiveEntryPoint:
["hive", "run", "nonexistent_agent_xyz"],
capture_output=True,
text=True,
encoding="utf-8",
)
assert result.returncode != 0
+8 -5
View File
@@ -47,8 +47,11 @@ class DummyLLMProvider(LLMProvider):
) -> AsyncIterator[StreamEvent]:
self._call_count += 1
if self._call_count == 1:
# First call: set the output via tool call
# Each execution takes 2 LLM calls:
# - Odd calls (1, 3, 5, ...): set output via tool call
# - Even calls (2, 4, 6, ...): finish with text
if self._call_count % 2 == 1:
# First call of each execution: set the output via tool call
yield ToolCallEvent(
tool_use_id=f"tc_{self._call_count}",
tool_name="set_output",
@@ -56,7 +59,7 @@ class DummyLLMProvider(LLMProvider):
)
yield FinishEvent(stop_reason="tool_use", input_tokens=10, output_tokens=10)
else:
# Subsequent calls: just finish with text
# Second call of each execution: finish with text
yield TextDeltaEvent(content="Done.", snapshot="Done.")
yield FinishEvent(stop_reason="end_turn", input_tokens=5, output_tokens=5)
@@ -229,7 +232,7 @@ async def test_shared_session_reuses_directory_and_memory(tmp_path):
# Verify primary session's state.json exists and has the primary entry_point
primary_state_path = tmp_path / "sessions" / primary_exec_id / "state.json"
assert primary_state_path.exists()
primary_state = json.loads(primary_state_path.read_text())
primary_state = json.loads(primary_state_path.read_text(encoding="utf-8"))
assert primary_state["entry_point"] == "primary"
# Async stream — simulates a webhook entry point sharing the session
@@ -272,7 +275,7 @@ async def test_shared_session_reuses_directory_and_memory(tmp_path):
# State.json should NOT have been overwritten by the async execution
# (it should still show the primary entry point)
final_state = json.loads(primary_state_path.read_text())
final_state = json.loads(primary_state_path.read_text(encoding="utf-8"))
assert final_state["entry_point"] == "primary"
# Verify only ONE session directory exists (not two)
+2 -2
View File
@@ -184,7 +184,7 @@ class TestPathTraversalWithActualFiles:
# Create a secret file outside storage
secret_file = tmpdir_path / "secret.txt"
secret_file.write_text("SENSITIVE_DATA")
secret_file.write_text("SENSITIVE_DATA", encoding="utf-8")
storage = FileStorage(storage_dir)
@@ -193,7 +193,7 @@ class TestPathTraversalWithActualFiles:
storage.get_runs_by_goal("../secret")
# Verify the secret file was not accessed (still contains original data)
assert secret_file.read_text() == "SENSITIVE_DATA"
assert secret_file.read_text(encoding="utf-8") == "SENSITIVE_DATA"
def test_cannot_write_outside_storage(self):
"""Verify that we can't write files outside storage directory."""
+5 -2
View File
@@ -353,7 +353,9 @@ class TestRuntimeLogger:
# Verify the file exists and has one line
jsonl_path = tmp_path / "logs" / "sessions" / run_id / "logs" / "tool_logs.jsonl"
assert jsonl_path.exists()
lines = [line for line in jsonl_path.read_text().strip().split("\n") if line]
lines = [
line for line in jsonl_path.read_text(encoding="utf-8").strip().split("\n") if line
]
assert len(lines) == 1
data = json.loads(lines[0])
@@ -376,7 +378,8 @@ class TestRuntimeLogger:
jsonl_path = tmp_path / "logs" / "sessions" / run_id / "logs" / "details.jsonl"
assert jsonl_path.exists()
lines = [line for line in jsonl_path.read_text().strip().split("\n") if line]
content = jsonl_path.read_text(encoding="utf-8").strip()
lines = [line for line in content.split("\n") if line]
assert len(lines) == 1
data = json.loads(lines[0])
+1 -1
View File
@@ -98,7 +98,7 @@ class TestFileStorageRunOperations:
assert run_file.exists()
# Verify it's valid JSON
with open(run_file) as f:
with open(run_file, encoding="utf-8") as f:
data = json.load(f)
assert data["id"] == "my_run"
+368
View File
@@ -0,0 +1,368 @@
"""Tests for validate_agent_path() and _get_allowed_agent_roots().
Verifies the allowlist-based path validation that prevents arbitrary code
execution via importlib.import_module() (Issue #5471).
"""
from pathlib import Path
from unittest.mock import patch
import pytest
from aiohttp.test_utils import TestClient, TestServer
from framework.server.app import (
_get_allowed_agent_roots,
create_app,
validate_agent_path,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _reset_allowed_roots():
"""Reset the cached _ALLOWED_AGENT_ROOTS so tests start fresh."""
import framework.server.app as app_module
app_module._ALLOWED_AGENT_ROOTS = None
# ---------------------------------------------------------------------------
# _get_allowed_agent_roots
# ---------------------------------------------------------------------------
class TestGetAllowedAgentRoots:
def setup_method(self):
_reset_allowed_roots()
def teardown_method(self):
_reset_allowed_roots()
def test_returns_tuple(self):
roots = _get_allowed_agent_roots()
assert isinstance(roots, tuple), f"Expected tuple, got {type(roots).__name__}"
def test_contains_three_roots(self):
roots = _get_allowed_agent_roots()
assert len(roots) == 3
def test_cached_on_repeated_calls(self):
first = _get_allowed_agent_roots()
second = _get_allowed_agent_roots()
assert first is second
def test_roots_are_resolved_paths(self):
for root in _get_allowed_agent_roots():
assert root.is_absolute()
# A resolved path has no '..' components
assert ".." not in root.parts
def test_roots_anchored_to_repo_not_cwd(self):
"""exports/ and examples/ should be relative to the repo root
(derived from __file__), not the process CWD."""
from framework.server.app import _REPO_ROOT
roots = _get_allowed_agent_roots()
exports_root, examples_root = roots[0], roots[1]
assert exports_root == (_REPO_ROOT / "exports").resolve()
assert examples_root == (_REPO_ROOT / "examples").resolve()
# ---------------------------------------------------------------------------
# validate_agent_path: positive cases (should return resolved Path)
# ---------------------------------------------------------------------------
class TestValidateAgentPathPositive:
def setup_method(self):
_reset_allowed_roots()
def teardown_method(self):
_reset_allowed_roots()
def test_path_inside_exports(self, tmp_path):
with patch("framework.server.app._ALLOWED_AGENT_ROOTS", None):
import framework.server.app as app_module
agent_dir = tmp_path / "my_agent"
agent_dir.mkdir()
app_module._ALLOWED_AGENT_ROOTS = (tmp_path,)
result = validate_agent_path(str(agent_dir))
assert result == agent_dir.resolve()
def test_path_inside_examples(self, tmp_path):
import framework.server.app as app_module
examples_root = tmp_path / "examples"
examples_root.mkdir()
agent_dir = examples_root / "some_agent"
agent_dir.mkdir()
app_module._ALLOWED_AGENT_ROOTS = (examples_root,)
result = validate_agent_path(str(agent_dir))
assert result == agent_dir.resolve()
def test_path_inside_hive_agents(self, tmp_path):
import framework.server.app as app_module
hive_root = tmp_path / ".hive" / "agents"
hive_root.mkdir(parents=True)
agent_dir = hive_root / "my_agent"
agent_dir.mkdir()
app_module._ALLOWED_AGENT_ROOTS = (hive_root,)
result = validate_agent_path(str(agent_dir))
assert result == agent_dir.resolve()
def test_returns_path_object(self, tmp_path):
import framework.server.app as app_module
agent_dir = tmp_path / "agent"
agent_dir.mkdir()
app_module._ALLOWED_AGENT_ROOTS = (tmp_path,)
result = validate_agent_path(str(agent_dir))
assert isinstance(result, Path)
# ---------------------------------------------------------------------------
# validate_agent_path: negative cases (should raise ValueError)
# ---------------------------------------------------------------------------
class TestValidateAgentPathNegative:
def setup_method(self):
_reset_allowed_roots()
def teardown_method(self):
_reset_allowed_roots()
def _set_roots(self, tmp_path):
import framework.server.app as app_module
exports = tmp_path / "exports"
exports.mkdir(exist_ok=True)
app_module._ALLOWED_AGENT_ROOTS = (exports,)
def test_absolute_path_outside_roots(self, tmp_path):
self._set_roots(tmp_path)
with pytest.raises(ValueError, match="allowed directory"):
validate_agent_path("/tmp/evil")
def test_traversal_escape(self, tmp_path):
self._set_roots(tmp_path)
exports = tmp_path / "exports"
traversal = str(exports / ".." / ".." / "tmp" / "evil")
with pytest.raises(ValueError, match="allowed directory"):
validate_agent_path(traversal)
def test_sibling_directory_name(self, tmp_path):
self._set_roots(tmp_path)
# "exports-evil" is NOT a child of "exports"
sibling = tmp_path / "exports-evil" / "agent"
sibling.mkdir(parents=True)
with pytest.raises(ValueError, match="allowed directory"):
validate_agent_path(str(sibling))
def test_empty_string(self, tmp_path):
self._set_roots(tmp_path)
# Empty string resolves to CWD, which is outside the allowed roots
with pytest.raises(ValueError, match="allowed directory"):
validate_agent_path("")
def test_home_directory(self, tmp_path):
self._set_roots(tmp_path)
with pytest.raises(ValueError, match="allowed directory"):
validate_agent_path("~")
def test_root(self, tmp_path):
self._set_roots(tmp_path)
with pytest.raises(ValueError, match="allowed directory"):
validate_agent_path("/")
def test_null_byte(self, tmp_path):
"""Null bytes in paths must be rejected (pathlib raises ValueError)."""
self._set_roots(tmp_path)
with pytest.raises(ValueError):
validate_agent_path("exports/\x00evil")
def test_symlink_escape(self, tmp_path):
"""A symlink inside an allowed root pointing outside must be rejected."""
import framework.server.app as app_module
allowed = tmp_path / "exports"
allowed.mkdir()
outside = tmp_path / "outside"
outside.mkdir()
link = allowed / "sneaky"
link.symlink_to(outside)
app_module._ALLOWED_AGENT_ROOTS = (allowed,)
# The symlink resolves to outside the allowed root
with pytest.raises(ValueError, match="allowed directory"):
validate_agent_path(str(link))
def test_root_itself_rejected(self, tmp_path):
"""Passing the exact root directory itself should be rejected."""
import framework.server.app as app_module
allowed = tmp_path / "exports"
allowed.mkdir()
app_module._ALLOWED_AGENT_ROOTS = (allowed,)
with pytest.raises(ValueError, match="allowed directory"):
validate_agent_path(str(allowed))
def test_tilde_expansion(self, tmp_path, monkeypatch):
"""Paths with ~ prefix should be expanded via expanduser()."""
import framework.server.app as app_module
# Set both HOME (POSIX) and USERPROFILE (Windows) so
# Path.expanduser() resolves ~ to tmp_path on all platforms.
monkeypatch.setenv("HOME", str(tmp_path))
monkeypatch.setenv("USERPROFILE", str(tmp_path))
hive_agents = tmp_path / ".hive" / "agents"
hive_agents.mkdir(parents=True)
agent_dir = hive_agents / "my_agent"
agent_dir.mkdir()
app_module._ALLOWED_AGENT_ROOTS = (hive_agents,)
result = validate_agent_path("~/.hive/agents/my_agent")
assert result == agent_dir.resolve()
# ---------------------------------------------------------------------------
# _ALLOWED_AGENT_ROOTS immutability
# ---------------------------------------------------------------------------
class TestAllowedRootsImmutability:
def setup_method(self):
_reset_allowed_roots()
def teardown_method(self):
_reset_allowed_roots()
def test_is_tuple_not_list(self):
roots = _get_allowed_agent_roots()
assert isinstance(roots, tuple), "Should be tuple to prevent mutation"
assert not isinstance(roots, list)
# ---------------------------------------------------------------------------
# Integration tests: HTTP endpoints reject malicious paths
# ---------------------------------------------------------------------------
class TestHTTPEndpointsRejectMaliciousPaths:
"""Test that HTTP route handlers return 400 for paths outside allowed roots."""
@pytest.mark.asyncio
async def test_create_session_rejects_outside_path(self, tmp_path):
import framework.server.app as app_module
exports = tmp_path / "exports"
exports.mkdir()
app_module._ALLOWED_AGENT_ROOTS = (exports,)
try:
app = create_app()
async with TestClient(TestServer(app)) as client:
resp = await client.post(
"/api/sessions",
json={"agent_path": "/tmp/evil"},
)
assert resp.status == 400
body = await resp.json()
assert "allowed directory" in body["error"]
finally:
_reset_allowed_roots()
@pytest.mark.asyncio
async def test_create_session_rejects_traversal(self, tmp_path):
import framework.server.app as app_module
exports = tmp_path / "exports"
exports.mkdir()
app_module._ALLOWED_AGENT_ROOTS = (exports,)
try:
app = create_app()
async with TestClient(TestServer(app)) as client:
resp = await client.post(
"/api/sessions",
json={"agent_path": "exports/../../tmp/evil"},
)
assert resp.status == 400
body = await resp.json()
assert "allowed directory" in body["error"]
finally:
_reset_allowed_roots()
@pytest.mark.asyncio
async def test_load_worker_rejects_outside_path(self, tmp_path):
import framework.server.app as app_module
exports = tmp_path / "exports"
exports.mkdir()
app_module._ALLOWED_AGENT_ROOTS = (exports,)
try:
app = create_app()
async with TestClient(TestServer(app)) as client:
# First create a queen-only session
create_resp = await client.post("/api/sessions", json={})
if create_resp.status != 201:
pytest.skip(f"Cannot create queen-only session (status={create_resp.status})")
session_id = (await create_resp.json())["session_id"]
resp = await client.post(
f"/api/sessions/{session_id}/worker",
json={"agent_path": "/tmp/evil"},
)
assert resp.status == 400
body = await resp.json()
assert "allowed directory" in body["error"]
finally:
_reset_allowed_roots()
@pytest.mark.asyncio
async def test_check_agent_credentials_rejects_traversal(self, tmp_path):
import framework.server.app as app_module
exports = tmp_path / "exports"
exports.mkdir()
app_module._ALLOWED_AGENT_ROOTS = (exports,)
try:
app = create_app()
async with TestClient(TestServer(app)) as client:
resp = await client.post(
"/api/credentials/check-agent",
json={"agent_path": "exports/../../etc/passwd"},
)
assert resp.status == 400
body = await resp.json()
assert "allowed directory" in body["error"]
finally:
_reset_allowed_roots()
@pytest.mark.asyncio
async def test_error_message_does_not_leak_resolved_path(self, tmp_path):
import framework.server.app as app_module
exports = tmp_path / "exports"
exports.mkdir()
app_module._ALLOWED_AGENT_ROOTS = (exports,)
try:
app = create_app()
async with TestClient(TestServer(app)) as client:
resp = await client.post(
"/api/sessions",
json={"agent_path": "/tmp/evil"},
)
body = await resp.json()
# The error message should not contain the resolved absolute path
# It should use the generic allowlist message
assert "/tmp/evil" not in body["error"]
assert "allowed directory" in body["error"]
finally:
_reset_allowed_roots()
if __name__ == "__main__":
pytest.main([__file__, "-v"])
+14 -3
View File
@@ -71,6 +71,7 @@ def main():
capture_output=True,
text=True,
check=True,
encoding="utf-8",
)
framework_path = result.stdout.strip()
success(f"installed at {framework_path}")
@@ -84,7 +85,12 @@ def main():
missing_deps = []
for dep in ["mcp", "fastmcp"]:
try:
subprocess.run([sys.executable, "-c", f"import {dep}"], capture_output=True, check=True)
subprocess.run(
[sys.executable, "-c", f"import {dep}"],
capture_output=True,
check=True,
encoding="utf-8",
)
except subprocess.CalledProcessError:
missing_deps.append(dep)
@@ -103,6 +109,7 @@ def main():
capture_output=True,
text=True,
check=True,
encoding="utf-8",
)
success("loads successfully")
except subprocess.CalledProcessError as e:
@@ -115,7 +122,7 @@ def main():
mcp_config = script_dir / ".mcp.json"
if mcp_config.exists():
try:
with open(mcp_config) as f:
with open(mcp_config, encoding="utf-8") as f:
config = json.load(f)
if "mcpServers" in config and "agent-builder" in config["mcpServers"]:
@@ -149,7 +156,10 @@ def main():
for module in modules_to_check:
try:
subprocess.run(
[sys.executable, "-c", f"import {module}"], capture_output=True, check=True
[sys.executable, "-c", f"import {module}"],
capture_output=True,
check=True,
encoding="utf-8",
)
except subprocess.CalledProcessError:
failed_modules.append(module)
@@ -174,6 +184,7 @@ def main():
text=True,
check=True,
timeout=5,
encoding="utf-8",
)
if "OK" in result.stdout:
success("server can start")
+600
View File
@@ -0,0 +1,600 @@
FULL CALL PATH: FRONTEND SESSION START TO AGENT EXECUTION
===================================================================
STEP 1: FRONTEND HTTP REQUEST (API ENTRY POINT)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/server/routes_sessions.py
ENDPOINT: POST /api/sessions (line 103)
FUNCTION: async def handle_create_session(request: web.Request) -> web.Response
- Accepts optional "agent_path" in request body
- If agent_path provided: calls manager.create_session_with_worker()
- If no agent_path: calls manager.create_session()
- Returns 201 with session details
CALL CHAIN:
handle_create_session (line 103)
├─ validate_agent_path(agent_path) [line 128]
├─ manager.create_session_with_worker() [line 135] OR manager.create_session() [line 143]
└─ _session_to_live_dict(session) [line 169]
===================================================================
STEP 2: SESSION CREATION (MANAGER LAYER)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/server/session_manager.py
FLOW A: Create Session with Worker (Single Step)
─────────────────────────────────────────────────
FUNCTION: async def create_session_with_worker() (line 128)
- Creates session infrastructure (EventBus, LLM)
- Loads worker agent
- Starts queen
CALL SEQUENCE:
create_session_with_worker (line 128)
├─ _create_session_core(model=model) [line 150]
│ │ Creates RuntimeConfig, LiteLLMProvider, EventBus
│ │ Creates Session dataclass with event_bus and llm
│ │ Stores in self._sessions[resolved_id]
│ └─ returns Session object
├─ _load_worker_core(session, agent_path, worker_id) [line 153]
│ │ Loads AgentRunner (blocking I/O via executor)
│ │ Calls runner._setup(event_bus=session.event_bus)
│ │ Starts worker_runtime if not already running
│ │ Cleans up stale sessions on disk
│ │ Updates session.runner, session.worker_runtime, etc.
│ └─ returns None (modifies session in-place)
├─ build_worker_profile(session.worker_runtime) [line 162]
│ └─ returns worker identity string for queen
└─ _start_queen(session, worker_identity) [line 166]
(See STEP 3 below)
FLOW B: Create Queen-Only Session
─────────────────────────────────
FUNCTION: async def create_session() (line 109)
CALL SEQUENCE:
create_session (line 109)
├─ _create_session_core(session_id, model) [line 120]
│ └─ (same as above)
└─ _start_queen(session, worker_identity=None) [line 123]
(See STEP 3 below)
===================================================================
STEP 3: WORKER AGENT LOADING (AGENT RUNNER LAYER)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runner/runner.py
FUNCTION: AgentRunner.load() (line 789) - Static method
CALLED BY: _load_worker_core() via loop.run_in_executor() (line 213-220)
LOAD SEQUENCE:
load(agent_path, model, interactive, skip_credential_validation) (line 789)
├─ Tries agent.py path first:
│ └─ agent_py = agent_path / "agent.py"
│ ├─ _import_agent_module(agent_path) [line 823]
│ │ (Dynamically imports agent Python module)
│ │
│ ├─ Extract goal, nodes, edges from module [line 825-827]
│ ├─ Build GraphSpec from module variables [line 854-876]
│ └─ return AgentRunner(...) [line 889]
└─ Fallback to agent.json if no agent.py:
└─ load_agent_export(agent_json_path) [line 911]
└─ return AgentRunner(...) [line 913]
RETURN: AgentRunner instance (NOT YET STARTED)
AgentRunner.__init__() (line 609) - Constructor
├─ Stores graph, goal, model, storage_path
├─ _validate_credentials() [line 684]
│ (Checks required credentials are available)
├─ Auto-discover tools from tools.py [line 687-689]
│ └─ _tool_registry.discover_from_module(tools_path)
└─ Auto-discover MCP servers from mcp_servers.json [line 697-699]
└─ _load_mcp_servers_from_config(mcp_config_path)
NOTE: __init__ does NOT call _setup() yet — that happens later.
===================================================================
STEP 4: WORKER RUNTIME SETUP (AFTER LOAD)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runner/runner.py
FUNCTION: runner._setup(event_bus=None) (line 1012)
CALLED BY: _load_worker_core() via loop.run_in_executor() (line 225-227)
SETUP SEQUENCE:
_setup(event_bus=session.event_bus) (line 1012)
├─ Configure logging [line 1015-1017]
│ └─ configure_logging(level="INFO", format="auto")
├─ Create LLM provider [line 1031-1145]
│ ├─ Check for mock mode → MockLLMProvider
│ ├─ Check for Claude Code subscription → LiteLLMProvider with OAuth
│ ├─ Check for Codex subscription → LiteLLMProvider with Codex API
│ ├─ Fallback to environment variables or credential store
│ └─ self._llm = <LLMProvider instance>
├─ Auto-register GCU MCP server if needed [line 1148-1170]
├─ Auto-register file tools MCP server [line 1173-1192]
├─ Get all tools from registry [line 1195-1196]
│ └─ tools = list(self._tool_registry.get_tools().values())
└─ _setup_agent_runtime(tools, tool_executor, accounts_prompt, event_bus) [line 1215]
(See STEP 5 below)
===================================================================
STEP 5: AGENT RUNTIME CREATION (CORE RUNTIME INSTANTIATION)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runner/runner.py
(method _setup_agent_runtime, line 1299)
& /Users/timothy/repo/hive/core/framework/runtime/agent_runtime.py
(function create_agent_runtime, line 1642)
FUNCTION: runner._setup_agent_runtime() (line 1299)
CALLED BY: runner._setup() [line 1215]
SETUP SEQUENCE:
_setup_agent_runtime(tools, tool_executor, accounts_prompt, event_bus) (line 1299)
├─ Convert AsyncEntryPointSpec to EntryPointSpec [line 1310-1323]
├─ Create primary entry point for entry_node [line 1328-1338]
├─ Create RuntimeLogStore [line 1341]
├─ Create CheckpointConfig [line 1346-1352]
│ (Enables checkpointing by default for resumable sessions)
└─ create_agent_runtime(
graph=self.graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=entry_points,
llm=self._llm,
tools=tools,
tool_executor=tool_executor,
runtime_log_store=log_store,
checkpoint_config=checkpoint_config,
event_bus=event_bus,
) [line 1364]
NEXT: create_agent_runtime() in agent_runtime.py
FUNCTION: create_agent_runtime() (line 1642)
CREATION SEQUENCE:
create_agent_runtime(...) (line 1642)
├─ Auto-create RuntimeLogStore if needed [line 1689-1694]
├─ Create AgentRuntime instance [line 1696]
│ └─ runtime = AgentRuntime(
│ graph=graph,
│ goal=goal,
│ storage_path=storage_path,
│ llm=llm,
│ tools=tools,
│ tool_executor=tool_executor,
│ runtime_log_store=runtime_log_store,
│ checkpoint_config=checkpoint_config,
│ event_bus=event_bus, # <-- SHARED WITH QUEEN/JUDGE
│ ) [line 1696]
├─ Register each entry point [line 1713-1714]
│ └─ runtime.register_entry_point(spec) for each spec
└─ return runtime [line 1716]
RETURN: AgentRuntime instance (NOT YET STARTED)
===================================================================
STEP 6: AGENT RUNTIME INITIALIZATION (RUNTIME CLASS)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runtime/agent_runtime.py
FUNCTION: AgentRuntime.__init__() (line 118)
INITIALIZATION:
AgentRuntime.__init__(...) (line 118)
├─ Initialize storage (ConcurrentStorage) [line 175-179]
├─ Initialize SessionStore for unified sessions [line 182]
├─ Initialize shared components:
│ ├─ SharedStateManager [line 185]
│ ├─ EventBus (or use shared one) [line 186]
│ └─ OutcomeAggregator [line 187]
├─ Store LLM, tools, tool_executor [line 190-195]
├─ Initialize entry points dict [line 198]
├─ Initialize execution streams dict [line 199]
└─ Set state to NOT running [line 211: self._running = False]
RETURN: Unstarted AgentRuntime instance
NEXT: register_entry_point() for each entry point
FUNCTION: AgentRuntime.register_entry_point() (line 218)
├─ Validate entry node exists [line 236-237]
└─ Store spec in self._entry_points[spec.id] [line 239]
===================================================================
STEP 7: QUEEN STARTUP (CONCURRENT WITH WORKER)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/server/session_manager.py
FUNCTION: _start_queen() (line 394)
CALLED BY: create_session() OR create_session_with_worker()
QUEEN STARTUP SEQUENCE:
_start_queen(session, worker_identity, initial_prompt) (line 394)
├─ Create queen directory [line 410-411]
│ └─ ~/.hive/queen/session/{session.id}/
├─ Register MCP coding tools [line 414-424]
│ └─ Load from hive_coder/mcp_servers.json
├─ Register lifecycle tools [line 428-436]
│ └─ register_queen_lifecycle_tools()
├─ Register worker monitoring tools if worker exists [line 438-448]
│ └─ register_worker_monitoring_tools()
├─ Build queen graph with adjusted prompt [line 454-478]
│ ├─ Add worker_identity to system prompt
│ └─ Filter tools to available ones
├─ Create queen executor task [line 482-519]
│ └─ async def _queen_loop():
│ ├─ Create GraphExecutor [line 484]
│ ├─ Call executor.execute(graph=queen_graph, goal=queen_goal, ...) [line 501]
│ └─ (Queen stays alive forever unless error)
└─ session.queen_task = asyncio.create_task(_queen_loop()) [line 519]
RESULT: Queen task starts in background, never awaited
===================================================================
STEP 8: WORKER RUNTIME START
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runtime/agent_runtime.py
FUNCTION: AgentRuntime.start() (line 263)
CALLED BY: _load_worker_core() [line 234 in session_manager.py]
START SEQUENCE:
await runtime.start() (line 263)
├─ Mark as running [line 266: self._running = True]
├─ Create ExecutionStream for each registered entry point [loop in start()]
│ └─ stream = ExecutionStream(
│ stream_id=entry_point.id,
│ entry_spec=entry_point_spec,
│ graph=self.graph,
│ goal=self.goal,
│ state_manager=self._state_manager,
│ storage=self._storage,
│ outcome_aggregator=self._outcome_aggregator,
│ event_bus=self._event_bus, # <-- SHARED
│ llm=self._llm,
│ tools=self._tools,
│ tool_executor=self._tool_executor,
│ )
├─ Start each stream [await stream.start() for each stream]
├─ Setup webhook server if configured [line ~350]
├─ Register event-driven entry points (timers, webhooks) [line ~400]
└─ self._running = True [line 266]
RESULT: AgentRuntime ready to execute
===================================================================
STEP 9: TRIGGER EXECUTION (MANUAL VIA ENTRY POINT)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runtime/agent_runtime.py
FUNCTION: async def trigger() (line 790)
CALLED BY: Frontend API, timers, webhooks, manual calls
TRIGGER SEQUENCE:
await runtime.trigger(entry_point_id, input_data, session_state) (line 790)
├─ Verify runtime is running [line 818]
├─ Resolve stream for entry point [line 821]
│ └─ stream = self._resolve_stream(entry_point_id)
└─ return await stream.execute(input_data, correlation_id, session_state) [line 825]
(See STEP 10 below)
RETURNS: execution_id (non-blocking)
===================================================================
STEP 10: EXECUTION STREAM MANAGEMENT
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runtime/execution_stream.py
FUNCTION: ExecutionStream.execute() (line 426)
CALLED BY: AgentRuntime.trigger() [line 825]
EXECUTE SEQUENCE:
await stream.execute(input_data, correlation_id, session_state) (line 426)
├─ Verify stream is running [line 445]
├─ Cancel any existing running executions [line 453-467]
│ (Only one execution per stream at a time)
├─ Generate execution_id [line 473-487]
│ ├─ If resuming: use resume_session_id [line 474]
│ ├─ Otherwise: generate from SessionStore [line 476]
│ └─ Format: session_{timestamp}_{uuid}
├─ Create ExecutionContext [line 493]
│ └─ ctx = ExecutionContext(
│ id=execution_id,
│ correlation_id=correlation_id,
│ stream_id=stream_id,
│ input_data=input_data,
│ session_state=session_state,
│ )
├─ Store context in self._active_executions [line 504]
├─ Create completion event [line 505]
├─ Start async execution task [line 508]
│ └─ task = asyncio.create_task(self._run_execution(ctx))
└─ return execution_id [line 512] (non-blocking)
RESULT: Execution queued, _run_execution() runs in background
===================================================================
STEP 11: EXECUTION RUNNER (BACKGROUND TASK)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/runtime/execution_stream.py
FUNCTION: ExecutionStream._run_execution() (line 538)
CALLED BY: asyncio.create_task() [line 508]
RUNS IN BACKGROUND: Yes, non-blocking
EXECUTION SEQUENCE:
await _run_execution(ctx) (line 538)
├─ Acquire semaphore for concurrency control [line 558]
├─ Mark status as "running" [line 559]
├─ Create execution-scoped memory [line 572-576]
│ └─ self._state_manager.create_memory(execution_id, stream_id, isolation)
├─ Start runtime adapter [line 579-586]
│ └─ runtime_adapter.start_run(goal_id, goal_description, input_data)
├─ Create RuntimeLogger [line 589-595]
├─ Determine storage location [line 601-604]
│ └─ exec_storage = self._session_store.sessions_dir / execution_id
├─ Write initial session state [line 611-612]
├─ RESURRECTION LOOP [line 618]
│ └─ while True:
│ ├─ Create GraphExecutor [line 625-639]
│ │ └─ executor = GraphExecutor(
│ │ runtime=runtime_adapter,
│ │ llm=self._llm,
│ │ tools=self._tools,
│ │ tool_executor=self._tool_executor,
│ │ event_bus=self._scoped_event_bus, # <-- SHARED
│ │ storage_path=exec_storage,
│ │ checkpoint_config=self._checkpoint_config,
│ │ )
│ │
│ ├─ Execute graph [line 644]
│ │ └─ result = await executor.execute(
│ │ graph=modified_graph,
│ │ goal=self.goal,
│ │ input_data=_current_input_data,
│ │ session_state=_current_session_state,
│ │ checkpoint_config=self._checkpoint_config,
│ │ )
│ │
│ └─ Check for resurrection [line 656-707]
│ (On non-fatal error, retry from failed node)
├─ Record result [line 710]
│ └─ self._record_execution_result(execution_id, result)
├─ Emit completion event [line 730-754]
│ ├─ execution_completed (if success)
│ ├─ execution_paused (if paused)
│ └─ execution_failed (if error)
└─ Mark completion event [line 774]
└─ self._completion_events[execution_id].set()
RESULT: Execution complete, event emitted, task ends
===================================================================
STEP 12: GRAPH EXECUTION (THE ACTUAL AGENT LOGIC)
===================================================================
FILE: /Users/timothy/repo/hive/core/framework/graph/executor.py
FUNCTION: GraphExecutor.execute() (line 289)
CALLED BY: ExecutionStream._run_execution() [line 644]
RUNS IN BACKGROUND: Yes, as part of _run_execution task
EXECUTION SEQUENCE:
await executor.execute(graph, goal, input_data, session_state, checkpoint_config) (line 289)
├─ Validate graph [line 312-318]
├─ Validate tool availability [line 320-332]
├─ Initialize SharedMemory for session [line 335]
├─ Restore session state if resuming [line 353-369]
│ └─ Load memory from previous session
├─ Restore checkpoints if available [line 412-463]
├─ Determine entry point (normal or resume) [line 464-492]
├─ Start run in observability system [line 567-579]
├─ MAIN EXECUTION LOOP [line 596]
│ └─ while steps < graph.max_steps:
│ │
│ ├─ Check for pause requests [line 599-636]
│ │
│ ├─ Get current node spec [line 648-650]
│ │ └─ node_spec = graph.get_node(current_node_id)
│ │
│ ├─ Enforce max_node_visits [line 652-678]
│ │
│ ├─ Append node to execution path [line 680]
│ │
│ ├─ Clear stale nullable outputs [line 682-695]
│ │
│ ├─ Create node context [line 730-745]
│ │ └─ ctx = self._build_context(node_spec, memory, goal, ...)
│ │
│ ├─ Get/create node implementation [line 760]
│ │ └─ node_impl = self._get_node_implementation(node_spec, ...)
│ │
│ ├─ Validate inputs [line 762-769]
│ │
│ ├─ Create checkpoints [line 771-790]
│ │
│ ├─ EXECUTE NODE [line 800-802]
│ │ └─ result = await node_impl.execute(ctx)
│ │ (Executes LLM call, tool calls, or other logic)
│ │
│ ├─ Handle success [line 825-876]
│ │ ├─ Validate output [line 836-850]
│ │ └─ Write to memory [line 874-876]
│ │
│ ├─ Handle failure and retries [line 884-934]
│ │ ├─ Track retry count [line 886-888]
│ │ ├─ Check max_retries [line 906-934]
│ │ └─ Sleep with exponential backoff before retry
│ │
│ ├─ Update progress in state.json [line 941]
│ │ └─ self._write_progress(current_node_id, path, memory, ...)
│ │
│ ├─ FOLLOW EDGES [line 942+]
│ │ └─ next_node = await self._follow_edges(
│ │ graph, goal, current_node_id,
│ │ node_spec, result, memory
│ │ )
│ │ Evaluates conditional edges, determines next node
│ │
│ └─ Transition to next node [line steps += 1]
│ (Loop continues with next node)
├─ Handle timeout/max_steps [line 596: while steps < graph.max_steps]
└─ Return ExecutionResult [line 1100+]
└─ ExecutionResult(
success=success,
output=final_output,
error=error_message,
paused_at=paused_node_id,
session_state={memory, path, ...},
)
RESULT: ExecutionResult returned to ExecutionStream._run_execution()
===================================================================
DATA FLOW SUMMARY
===================================================================
Shared Component: EventBus
├─ Created in Session (line 95 in session_manager.py)
├─ Passed to AgentRuntime.__init__ (line 186 in agent_runtime.py)
├─ Stored and used by ExecutionStream (line 219 in execution_stream.py)
├─ Wrapped as GraphScopedEventBus (line 254 in execution_stream.py)
├─ Passed to GraphExecutor (line 630 in execution_stream.py)
└─ Used for event publishing during execution
Shared Component: LLM Provider
├─ Created in Session._create_session_core() (line 89-94 in session_manager.py)
├─ Passed to AgentRuntime.__init__ (line 123 in agent_runtime.py)
├─ Stored and used by ExecutionStream (line 220 in execution_stream.py)
├─ Passed to GraphExecutor (line 627 in execution_stream.py)
└─ Used by node implementations for LLM calls
Memory Flow:
├─ Each execution has ExecutionContext with input_data
├─ SharedMemory created per execution (line 572-576 in execution_stream.py)
├─ Session state restored if resuming (line 354-369 in executor.py)
├─ Each node reads from memory via input_keys
├─ Each node writes to memory via output_keys
├─ Memory checkpoints created for resumability
└─ Final memory returned in ExecutionResult
===================================================================
KEY FILE PATHS AND LINE NUMBERS
===================================================================
1. API Entry: /Users/timothy/repo/hive/core/framework/server/routes_sessions.py:103
2. Session Manager: /Users/timothy/repo/hive/core/framework/server/session_manager.py:128
3. Agent Runner Load: /Users/timothy/repo/hive/core/framework/runner/runner.py:789
4. Agent Runner Setup: /Users/timothy/repo/hive/core/framework/runner/runner.py:1012
5. Runtime Creation: /Users/timothy/repo/hive/core/framework/runtime/agent_runtime.py:1642
6. Runtime Class: /Users/timothy/repo/hive/core/framework/runtime/agent_runtime.py:66
7. Trigger Method: /Users/timothy/repo/hive/core/framework/runtime/agent_runtime.py:790
8. Execution Stream: /Users/timothy/repo/hive/core/framework/runtime/execution_stream.py:134
9. Graph Executor: /Users/timothy/repo/hive/core/framework/graph/executor.py:102
10. Main Loop: /Users/timothy/repo/hive/core/framework/graph/executor.py:596
@@ -46,7 +46,7 @@ def cli():
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def run(rules, max_emails, mock, quiet, verbose, debug):
"""Execute inbox triage with the given rules."""
"""Execute inbox management with the given rules."""
if not quiet:
setup_logging(verbose=verbose, debug=debug)
@@ -3,7 +3,7 @@
"id": "email_inbox_management",
"name": "Email Inbox Management",
"version": "1.0.0",
"description": "Manage Gmail inbox emails autonomously using user-defined free-text rules. For every five minutes, fetch inbox emails (configurable batch size, default 100), apply the user's rules to each email, and execute the appropriate Gmail actions \u2014 trash, mark as spam, mark important, mark read/unread, star, and more."
"description": "Manage Gmail inbox emails autonomously using user-defined free-text rules. For every five minutes, fetch inbox emails (configurable page size, default 100), loop through ALL emails by paginating, apply the user's rules to each email, and execute the appropriate Gmail actions \u2014 trash, mark as spam, mark important, mark read/unread, star, draft replies, create/apply custom labels, and more."
},
"graph": {
"id": "email-inbox-management-graph",
@@ -16,7 +16,7 @@
"pause_nodes": [],
"terminal_nodes": [],
"conversation_mode": "continuous",
"identity_prompt": "You are an email inbox management assistant. You help users manage their Gmail inbox by applying free-text rules to emails \u2014 trash, mark as spam, mark important, mark read/unread, star, and more.",
"identity_prompt": "You are an email inbox management assistant. You help users manage their Gmail inbox by applying free-text rules to emails \u2014 trash, mark as spam, mark important, mark read/unread, star, draft replies, create/apply custom labels, and more.",
"nodes": [
{
"id": "intake",
@@ -29,13 +29,14 @@
],
"output_keys": [
"rules",
"max_emails"
"max_emails",
"query"
],
"nullable_output_keys": [],
"nullable_output_keys": ["query"],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are an email inbox management assistant. The user has provided rules for managing their emails.\n\n**STEP 1 \u2014 Respond to the user (text only, NO tool calls):**\n\nRead the user's rules from the input context. Present a clear summary of what you will do with their emails based on their rules.\n\nThe following Gmail actions are available \u2014 map the user's rules to whichever apply:\n- **Trash** emails\n- **Mark as spam**\n- **Mark as important** / unmark important\n- **Mark as read** / mark as unread\n- **Star** / unstar emails\n- **Add/remove Gmail labels** (INBOX, UNREAD, IMPORTANT, STARRED, SPAM, CATEGORY_PERSONAL, CATEGORY_SOCIAL, CATEGORY_PROMOTIONS, CATEGORY_UPDATES, CATEGORY_FORUMS)\n\nPresent the rules back to the user in plain language. Do NOT refuse rules \u2014 if the user asks for any of the above actions, confirm you will do it.\n\nAlso confirm the batch size (max_emails). If max_emails is not provided, default to 100.\n\nAsk the user to confirm: \"Does this look right? I'll proceed once you confirm.\"\n\n**STEP 2 \u2014 After the user confirms, call set_output:**\n\n- set_output(\"rules\", <the confirmed rules as a clear text description>)\n- set_output(\"max_emails\", <the confirmed max_emails as a string number, e.g. \"100\">)",
"tools": [],
"system_prompt": "You are an email inbox management assistant. The user has provided rules for managing their emails.\n\n**RULES ARE ADDITIVE.** If existing rules are already present in context from a previous cycle, present ALL of them (old + new). The user can add, modify, or remove rules. When calling set_output(\"rules\", ...), include ALL active rules \u2014 old and new combined.\n\n**STEP 1 \u2014 Respond to the user (text only, NO tool calls):**\n\nRead the user's rules from the input context. Present a clear summary of what you will do with their emails based on their rules.\n\nThe following Gmail actions are available \u2014 map the user's rules to whichever apply:\n- **Trash** emails\n- **Mark as spam**\n- **Mark as important** / unmark important\n- **Mark as read** / mark as unread\n- **Star** / unstar emails\n- **Add/remove Gmail labels** (INBOX, UNREAD, IMPORTANT, STARRED, SPAM, CATEGORY_PERSONAL, CATEGORY_SOCIAL, CATEGORY_PROMOTIONS, CATEGORY_UPDATES, CATEGORY_FORUMS)\n- **Draft replies** \u2014 create draft reply emails (never sent automatically)\n- **Create/apply custom labels** \u2014 create new Gmail labels and apply them to emails\n\nPresent the rules back to the user in plain language. Do NOT refuse rules \u2014 if the user asks for any of the above actions, confirm you will do it.\n\nAlso confirm the page size (max_emails). If max_emails is not provided, default to 100.\nNote: max_emails is the page size per fetch cycle. The agent will loop through ALL inbox emails by fetching max_emails at a time until no more remain.\n\nAsk the user to confirm: \"Does this look right? I'll proceed once you confirm.\"\n\n**STEP 2 \u2014 Show existing labels (tool call):**\n\nCall gmail_list_labels() to show the user their current Gmail labels. This helps them reference existing labels or decide whether new custom labels are needed for their rules.\n\n**STEP 3 \u2014 After the user confirms, call set_output:**\n\n- set_output(\"rules\", <ALL active rules as a clear text description>)\n- set_output(\"max_emails\", <the confirmed max_emails as a string number, e.g. \"100\">)\n- set_output(\"query\", <Gmail search query if the user wants to target specific emails>)\n\n**TARGETED QUERY (optional):**\n\nIf the user's rules target specific emails (e.g. \"delete all emails from newsletters@example.com\"), build a Gmail search query to fetch ONLY matching emails instead of the entire inbox. This is much faster and more efficient.\n\nGmail search query syntax:\n- `from:sender@example.com` \u2014 from a specific sender\n- `to:recipient@example.com` \u2014 to a specific recipient\n- `subject:keyword` \u2014 subject contains keyword\n- `is:unread` / `is:read` \u2014 read status\n- `is:starred` / `is:important` \u2014 flags\n- `has:attachment` \u2014 has attachments\n- `filename:pdf` \u2014 attachment filename\n- `label:LABEL_NAME` \u2014 has a specific label\n- `category:promotions` / `category:social` / `category:updates` \u2014 Gmail categories\n- `newer_than:7d` / `older_than:30d` \u2014 relative time (d=days, m=months, y=years)\n- `after:2024/01/01` / `before:2024/12/31` \u2014 absolute dates\n- Combine with spaces (AND): `from:boss@co.com subject:urgent`\n- OR operator: `from:alice OR from:bob`\n- NOT / exclude: `-from:noreply@example.com` or `NOT from:noreply`\n- Grouping: `{from:alice from:bob}` (same as OR)\n\nExamples:\n- User says \"trash all promotional emails\" \u2192 query: `category:promotions`\n- User says \"star emails from my boss jane@co.com\" \u2192 query: `from:jane@co.com`\n- User says \"mark unread emails older than a week as read\" \u2192 query: `is:unread older_than:7d`\n- User says \"apply rules to all inbox emails\" \u2192 no query needed (default: `label:INBOX`)\n\nIf the rules apply broadly to ALL emails, do NOT set a query \u2014 the default `label:INBOX` will be used. Only set a query when it would meaningfully narrow the search.",
"tools": ["gmail_list_labels"],
"model": null,
"function": null,
"routes": {},
@@ -50,24 +51,25 @@
{
"id": "fetch-emails",
"name": "Fetch Emails",
"description": "Fetch emails from the Gmail inbox up to the configured batch limit. Supports pagination for continuous mode \u2014 can fetch the next batch of emails beyond what was already processed.",
"description": "Fetch one page of emails from Gmail inbox. Returns emails filename and next_page_token for pagination. The graph loops back here if more pages remain.",
"node_type": "event_loop",
"input_keys": [
"rules",
"max_emails"
"max_emails",
"next_page_token",
"last_processed_timestamp",
"query"
],
"output_keys": [
"emails"
"emails",
"next_page_token"
],
"nullable_output_keys": [],
"nullable_output_keys": ["next_page_token"],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are a data pipeline step. Your job is to fetch emails from Gmail and write them to emails.jsonl.\n\n**FIRST-TIME FETCH (default path):**\n1. Read \"max_emails\" from input context.\n2. Call bulk_fetch_emails(max_emails=<value>).\n3. The tool returns {\"filename\": \"emails.jsonl\"}.\n4. Call set_output(\"emails\", \"emails.jsonl\").\n\n**NEXT-BATCH FETCH (when user asks for \"the next N\" emails):**\nThe user wants emails BEYOND what was already fetched. Use pagination:\n1. Call gmail_list_messages(query=\"label:INBOX\", max_results=<previous + new count>) to get message IDs. Use page_token if needed to paginate past already-fetched emails.\n2. Identify message IDs NOT in the previous batch (you remember them from continuous conversation).\n3. Call gmail_batch_get_messages(message_ids=<new_ids>, format=\"metadata\") for full metadata.\n4. For each message in the result, call append_data(filename=\"emails.jsonl\", data=<JSON: {id, subject, from, to, date, snippet, labels}>).\n5. Call set_output(\"emails\", \"emails.jsonl\").\n\n**TOOLS:**\n- bulk_fetch_emails(max_emails) \u2014 Bulk fetch from inbox, writes emails.jsonl. Use for first fetch.\n- gmail_list_messages(query, max_results, page_token) \u2014 List message IDs with pagination. Returns {messages, next_page_token}.\n- gmail_batch_get_messages(message_ids, format) \u2014 Fetch metadata for specific IDs (max 50 per call).\n- append_data(filename, data) \u2014 Append a line to a JSONL file.\n\nDo NOT add commentary or explanation. Execute the appropriate path and call set_output when done.",
"system_prompt": "You are a data pipeline step. Your job is to fetch ONE PAGE of emails from Gmail.\n\n**INSTRUCTIONS:**\n1. Read \"max_emails\", \"next_page_token\", \"last_processed_timestamp\", and \"query\" from input context.\n2. Call bulk_fetch_emails with:\n - max_emails=<max_emails value, default \"100\">\n - page_token=<next_page_token value, if present and non-empty>\n - after_timestamp=<last_processed_timestamp value, if present and non-empty>\n - query=<query value, if present and non-empty; omit to default to \"label:INBOX\">\n3. The tool returns {\"filename\": \"emails.jsonl\", \"count\": N, \"next_page_token\": \"<token or null>\"}.\n4. Call set_output(\"emails\", \"emails.jsonl\").\n5. Call set_output(\"next_page_token\", <the next_page_token from the tool result, or \"\" if null>).\n\n**IMPORTANT:** The graph will automatically loop back to this node if next_page_token is non-empty.\nYou only need to fetch ONE page per visit. Do NOT loop internally.\n\nDo NOT add commentary or explanation. Execute the steps and call set_output when done.",
"tools": [
"bulk_fetch_emails",
"gmail_list_messages",
"gmail_batch_get_messages",
"append_data"
"bulk_fetch_emails"
],
"model": null,
"function": null,
@@ -95,11 +97,14 @@
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are an email inbox management assistant. Apply the user's rules to their emails and execute Gmail actions.\n\n**YOUR TOOLS:**\n- load_data(filename, offset_bytes, limit_bytes) \u2014 Read emails from a local file using byte-based pagination. This is how you access the emails.\n- append_data(filename, data) \u2014 Append a line to a file. Use this to record actions taken.\n- gmail_batch_modify_messages(message_ids, add_labels, remove_labels) \u2014 Modify Gmail labels in batch. ALWAYS prefer this.\n- gmail_modify_message(message_id, add_labels, remove_labels) \u2014 Modify a single message's labels.\n- gmail_trash_message(message_id) \u2014 Move a message to trash. No batch version; call per email.\n- set_output(key, value) \u2014 Set an output value. Call ONLY after all actions are executed.\n\n**CONTEXT:**\n- \"rules\" = the user's rule to apply (e.g. \"mark all as unread\")\n- \"emails\" = a filename (e.g. \"emails.jsonl\") containing the fetched emails as JSONL. Each line has: id, subject, from, to, date, snippet, labels.\n\n**STEP 1 \u2014 LOAD EMAILS (your first tool call MUST be load_data):**\nCall load_data(filename=<the \"emails\" value from context>, limit_bytes=10000) to read the email data.\n- Each call reads ~10KB of data (automatically rounded to safe UTF-8 boundaries).\n- Parse the content as JSONL: split by \\n, then JSON.parse each line to get email objects.\n- If has_more=true, load more pages with load_data(filename=..., offset_bytes=<next_offset_bytes>) until all emails are loaded.\n- The result includes next_offset_bytes \u2014 use this for the next call's offset_bytes parameter.\n\n**STEP 2 \u2014 DETERMINE STRATEGY:**\n- **Blanket rule** (same action for ALL emails, e.g. \"mark all as unread\"): Collect all message IDs, then execute ONE gmail_batch_modify_messages call.\n- **Classification rule** (different actions for different emails): Classify each email, group by action, execute batch operations per group.\n\n**STEP 3 \u2014 EXECUTE ACTIONS:**\nCall the appropriate Gmail tool(s) with the real message IDs from the loaded emails. Then record each action:\n- append_data(filename=\"actions.jsonl\", data=<JSON of {email_id, subject, from, action}>)\n\n**STEP 4 \u2014 FINISH:**\nAfter ALL actions are executed, call set_output(\"actions_taken\", \"actions.jsonl\").\n\n**GMAIL LABEL REFERENCE:**\n- MARK AS UNREAD \u2014 add_labels=[\"UNREAD\"]\n- MARK AS READ \u2014 remove_labels=[\"UNREAD\"]\n- MARK IMPORTANT \u2014 add_labels=[\"IMPORTANT\"]\n- REMOVE IMPORTANT \u2014 remove_labels=[\"IMPORTANT\"]\n- STAR \u2014 add_labels=[\"STARRED\"]\n- UNSTAR \u2014 remove_labels=[\"STARRED\"]\n- ARCHIVE \u2014 remove_labels=[\"INBOX\"]\n- MARK AS SPAM \u2014 add_labels=[\"SPAM\"], remove_labels=[\"INBOX\"]\n- TRASH \u2014 use gmail_trash_message(message_id) per email\n\n**CRITICAL RULES:**\n- Your FIRST tool call MUST be load_data. Do NOT skip this.\n- You MUST call Gmail tools to execute real actions. Do NOT just report what should be done.\n- Do NOT call set_output until all Gmail actions are executed.\n- Pass ONLY the filename \"actions.jsonl\" to set_output, NOT raw data.",
"system_prompt": "You are an email inbox management assistant. Apply the user's rules to their emails and execute Gmail actions.\n\n**YOUR TOOLS:**\n- load_data(filename, offset_bytes, limit_bytes) \u2014 Read emails from a local file using byte-based pagination. This is how you access the emails.\n- append_data(filename, data) \u2014 Append a line to a file. Use this to record actions taken.\n- gmail_batch_modify_messages(message_ids, add_labels, remove_labels) \u2014 Modify Gmail labels in batch. ALWAYS prefer this.\n- gmail_modify_message(message_id, add_labels, remove_labels) \u2014 Modify a single message's labels.\n- gmail_trash_message(message_id) \u2014 Move a message to trash. No batch version; call per email.\n- gmail_create_draft(to, subject, body) \u2014 Create a draft reply. NEVER sends automatically.\n- gmail_create_label(name) \u2014 Create a new Gmail label. Returns the label ID.\n- gmail_list_labels() \u2014 List all existing Gmail labels with their IDs.\n- set_output(key, value) \u2014 Set an output value. Call ONLY after all actions are executed.\n\n**CONTEXT:**\n- \"rules\" = the user's rule to apply (e.g. \"mark all as unread\")\n- \"emails\" = a filename (e.g. \"emails.jsonl\") containing the fetched emails as JSONL. Each line has: id, subject, from, to, date, snippet, labels.\n\n**STEP 1 \u2014 LOAD EMAILS (your first tool call MUST be load_data):**\nCall load_data(filename=<the \"emails\" value from context>, limit_bytes=10000) to read the email data.\n- Each call reads ~10KB of data (automatically rounded to safe UTF-8 boundaries).\n- Parse the content as JSONL: split by \\n, then JSON.parse each line to get email objects.\n- If has_more=true, load more pages with load_data(filename=..., offset_bytes=<next_offset_bytes>) until all emails are loaded.\n- The result includes next_offset_bytes \u2014 use this for the next call's offset_bytes parameter.\n\n**STEP 2 \u2014 DETERMINE STRATEGY:**\n- **Blanket rule** (same action for ALL emails, e.g. \"mark all as unread\"): Collect all message IDs, then execute ONE gmail_batch_modify_messages call.\n- **Classification rule** (different actions for different emails): Classify each email, group by action, execute batch operations per group.\n\n**STEP 3 \u2014 EXECUTE ACTIONS:**\nCall the appropriate Gmail tool(s) with the real message IDs from the loaded emails. Then record each action:\n- append_data(filename=\"actions.jsonl\", data=<JSON of {email_id, subject, from, action}>)\n\n**STEP 4 \u2014 FINISH:**\nAfter ALL actions are executed, call set_output(\"actions_taken\", \"actions.jsonl\").\n\n**GMAIL LABEL REFERENCE:**\n- MARK AS UNREAD \u2014 add_labels=[\"UNREAD\"]\n- MARK AS READ \u2014 remove_labels=[\"UNREAD\"]\n- MARK IMPORTANT \u2014 add_labels=[\"IMPORTANT\"]\n- REMOVE IMPORTANT \u2014 remove_labels=[\"IMPORTANT\"]\n- STAR \u2014 add_labels=[\"STARRED\"]\n- UNSTAR \u2014 remove_labels=[\"STARRED\"]\n- ARCHIVE \u2014 remove_labels=[\"INBOX\"]\n- MARK AS SPAM \u2014 add_labels=[\"SPAM\"], remove_labels=[\"INBOX\"]\n- TRASH \u2014 use gmail_trash_message(message_id) per email\n- DRAFT REPLY \u2014 use gmail_create_draft(to=<sender>, subject=\"Re: <subject>\", body=<contextual reply based on email content>). Creates a draft only, never sends.\n- CREATE CUSTOM LABEL \u2014 use gmail_create_label(name=<label_name>) to create, then apply via gmail_modify_message with add_labels=[<label_id>]\n- APPLY CUSTOM LABEL \u2014 add_labels=[<label_id>] using the ID from gmail_create_label or gmail_list_labels\n\n**QUEEN RULE INJECTION:**\nIf a new rule appears in the conversation mid-processing (injected by the queen), apply it to the remaining unprocessed emails alongside the existing rules.\n\n**CRITICAL RULES:**\n- Your FIRST tool call MUST be load_data. Do NOT skip this.\n- You MUST call Gmail tools to execute real actions. Do NOT just report what should be done.\n- Do NOT call set_output until all Gmail actions are executed.\n- Pass ONLY the filename \"actions.jsonl\" to set_output, NOT raw data.\n- NEVER send emails. Only create drafts via gmail_create_draft.",
"tools": [
"gmail_trash_message",
"gmail_modify_message",
"gmail_batch_modify_messages",
"gmail_create_draft",
"gmail_create_label",
"gmail_list_labels",
"load_data",
"append_data"
],
@@ -120,17 +125,21 @@
"description": "Generate a summary report of all actions taken on the emails and present it to the user.",
"node_type": "event_loop",
"input_keys": [
"actions_taken"
"actions_taken",
"rules"
],
"output_keys": [
"summary_report"
"summary_report",
"rules",
"last_processed_timestamp"
],
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are an email inbox management assistant. Your job is to generate a clear summary report of the actions taken on the user's emails, present it, and ask if they want to run another batch.\n\n**STEP 1 \u2014 Load actions and generate the report (tool calls first):**\n\nThe \"actions_taken\" value from context is a filename (e.g. \"actions.jsonl\"), NOT raw action data.\n- If it equals \"[]\", there are no actions \u2014 skip to STEP 2 with a message that no emails were processed.\n- Otherwise, call load_data(filename=<the actions_taken value>, limit_bytes=10000) to read the action records.\n- The file is in JSONL format: each line is one JSON object with: email_id, subject, from, action.\n- If load_data returns has_more=true, call it again with offset_bytes=<next_offset_bytes> to get more records.\n- Read ALL records before generating the report.\n\n**STEP 2 \u2014 Present the report to the user (text only, NO tool calls):**\n\nPresent a clean, readable summary:\n\n1. **Overview** \u2014 Total emails processed, breakdown by action type.\n2. **By Action** \u2014 Group emails by action taken. For each action group, list the emails with subject and sender.\n3. **No Action Taken** \u2014 Any emails that didn't match any rules (if applicable).\n\nThen ask: \"Would you like to run another inbox triage with new rules?\"\n\n**STEP 3 \u2014 After the user responds, call set_output:**\n- set_output(\"summary_report\", <the formatted report text>)",
"system_prompt": "You are an email inbox management assistant. Your job is to generate a clear summary report of the actions taken on the user's emails, present it, and ask if they want to run another batch.\n\n**STEP 1 \u2014 Load actions and generate the report (tool calls first):**\n\nThe \"actions_taken\" value from context is a filename (e.g. \"actions.jsonl\"), NOT raw action data.\n- If it equals \"[]\", there are no actions \u2014 skip to STEP 2 with a message that no emails were processed.\n- Otherwise, call load_data(filename=<the actions_taken value>, limit_bytes=10000) to read the action records.\n- The file is in JSONL format: each line is one JSON object with: email_id, subject, from, action.\n- If load_data returns has_more=true, call it again with offset_bytes=<next_offset_bytes> to get more records.\n- Read ALL records before generating the report.\n\n**STEP 2 \u2014 Present the report to the user (text only, NO tool calls):**\n\nPresent a clean, readable summary:\n\n1. **Overview** \u2014 Total emails processed, breakdown by action type.\n2. **By Action** \u2014 Group emails by action taken. For each action group, list the emails with subject and sender.\n3. **No Action Taken** \u2014 Any emails that didn't match any rules (if applicable).\n\nThen ask: \"Would you like to run another inbox management cycle with new rules?\"\n\n**STEP 3 \u2014 After the user responds, call set_output to persist state:**\n- set_output(\"summary_report\", <the formatted report text>)\n- set_output(\"rules\", <the current rules from context \u2014 pass them through unchanged so they persist for the next cycle>)\n- Call get_current_timestamp() and set_output(\"last_processed_timestamp\", <the returned timestamp>)\n\nThis ensures the next timer cycle knows when emails were last processed and which rules to apply.",
"tools": [
"load_data"
"load_data",
"get_current_timestamp"
],
"model": null,
"function": null,
@@ -163,12 +172,21 @@
"priority": 1,
"input_mapping": {}
},
{
"id": "classify-to-fetch-loop",
"source": "classify-and-act",
"target": "fetch-emails",
"condition": "conditional",
"condition_expr": "str(next_page_token).strip() not in ('', 'None', 'null')",
"priority": 2,
"input_mapping": {}
},
{
"id": "classify-to-report",
"source": "classify-and-act",
"target": "report",
"condition": "on_success",
"condition_expr": null,
"condition": "conditional",
"condition_expr": "str(next_page_token).strip() in ('', 'None', 'null')",
"priority": 1,
"input_mapping": {}
},
@@ -182,14 +200,14 @@
"input_mapping": {}
}
],
"max_steps": 100,
"max_steps": 500,
"max_retries_per_node": 3,
"description": "Manage Gmail inbox emails autonomously using user-defined free-text rules. For every five minutes, fetch inbox emails (configurable batch size, default 100), apply the user's rules to each email, and execute the appropriate Gmail actions \u2014 trash, mark as spam, mark important, mark read/unread, star, and more."
"description": "Manage Gmail inbox emails autonomously using user-defined free-text rules. For every five minutes, fetch inbox emails (configurable page size, default 100), loop through ALL emails by paginating, apply the user's rules to each email, and execute the appropriate Gmail actions \u2014 trash, mark as spam, mark important, mark read/unread, star, draft replies, create/apply custom labels, and more."
},
"goal": {
"id": "email-inbox-management",
"name": "Email Inbox Management",
"description": "Manage Gmail inbox emails autonomously using user-defined free-text rules. For every five minutes, fetch inbox emails (configurable batch size, default 100), apply the user's rules to each email, and execute the appropriate Gmail actions \u2014 trash, mark as spam, mark important, mark read/unread, star, and more.",
"description": "Manage Gmail inbox emails autonomously using user-defined free-text rules. For every five minutes, fetch inbox emails (configurable page size, default 100), loop through ALL emails by paginating, apply the user's rules to each email, and execute the appropriate Gmail actions \u2014 trash, mark as spam, mark important, mark read/unread, star, draft replies, create/apply custom labels, and more.",
"status": "draft",
"success_criteria": [
{
@@ -197,7 +215,7 @@
"description": "Gmail actions are applied correctly to the right emails based on the user's rules",
"metric": "action_correctness",
"target": ">=95%",
"weight": 0.35,
"weight": 0.30,
"met": false
},
{
@@ -205,7 +223,7 @@
"description": "Produces a summary report showing what was done: how many emails were affected by each action type, with email subjects listed",
"metric": "report_completeness",
"target": "100%",
"weight": 0.3,
"weight": 0.25,
"met": false
},
{
@@ -213,14 +231,22 @@
"description": "All fetched emails up to the configured max are processed and acted upon; none are silently skipped",
"metric": "emails_processed_ratio",
"target": "100%",
"weight": 0.35,
"weight": 0.30,
"met": false
},
{
"id": "label-management",
"description": "Custom labels are created and applied correctly when rules require them",
"metric": "label_coverage",
"target": "100%",
"weight": 0.15,
"met": false
}
],
"constraints": [
{
"id": "respect-batch-limit",
"description": "Must not process more emails than the configured max_emails parameter",
"id": "process-all-emails",
"description": "Must loop through all inbox emails by paginating with max_emails as page size; no emails should be silently skipped",
"constraint_type": "hard",
"category": "operational",
"check": ""
@@ -231,6 +257,13 @@
"constraint_type": "hard",
"category": "safety",
"check": ""
},
{
"id": "draft-not-send",
"description": "Agent creates draft replies but NEVER sends them automatically",
"constraint_type": "hard",
"category": "safety",
"check": ""
}
],
"context": {},
@@ -243,16 +276,18 @@
},
"required_tools": [
"bulk_fetch_emails",
"gmail_list_messages",
"gmail_batch_get_messages",
"get_current_timestamp",
"gmail_trash_message",
"gmail_modify_message",
"gmail_batch_modify_messages",
"gmail_create_draft",
"gmail_create_label",
"gmail_list_labels",
"load_data",
"append_data"
],
"metadata": {
"node_count": 4,
"edge_count": 4
"edge_count": 5
}
}
}
@@ -2,7 +2,7 @@
from pathlib import Path
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph import EdgeCondition, EdgeSpec, Goal, SuccessCriterion, Constraint
from framework.graph.checkpoint_config import CheckpointConfig
from framework.graph.edge import AsyncEntryPointSpec, GraphSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
@@ -72,8 +72,11 @@ goal = Goal(
],
constraints=[
Constraint(
id="respect-batch-limit",
description="Must not process more emails than the configured max_emails parameter",
id="process-all-emails",
description=(
"Must loop through all inbox emails by paginating with max_emails as page size; "
"no emails should be silently skipped"
),
constraint_type="hard",
category="operational",
),
@@ -119,11 +122,22 @@ edges = [
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# Pagination loop: if next_page_token is non-empty, loop back to fetch
EdgeSpec(
id="classify-to-fetch-loop",
source="classify-and-act",
target="fetch-emails",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_page_token).strip() not in ('', 'None', 'null')",
priority=2,
),
# Exit to report when no more pages
EdgeSpec(
id="classify-to-report",
source="classify-and-act",
target="report",
condition=EdgeCondition.ON_SUCCESS,
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_page_token).strip() in ('', 'None', 'null')",
priority=1,
),
EdgeSpec(
@@ -15,10 +15,15 @@ intake_node = NodeSpec(
client_facing=True,
max_node_visits=0,
input_keys=["rules", "max_emails"],
output_keys=["rules", "max_emails"],
output_keys=["rules", "max_emails", "query"],
nullable_output_keys=["query"],
system_prompt="""\
You are an inbox management assistant. The user has provided rules for managing their emails.
**RULES ARE ADDITIVE.** If existing rules are already present in context from a previous cycle,
present ALL of them (old + new). The user can add, modify, or remove rules. When calling
set_output("rules", ...), include ALL active rules old and new combined.
**STEP 1 Respond to the user (text only, NO tool calls):**
Read the user's rules from the input context. Present a clear summary of what you will do with their emails based on their rules.
@@ -35,7 +40,9 @@ The following Gmail actions are available — map the user's rules to whichever
Present the rules back to the user in plain language. Do NOT refuse rules if the user asks for any of the above actions, confirm you will do it.
Also confirm the batch size (max_emails). If max_emails is not provided, default to 100.
Also confirm the page size (max_emails). If max_emails is not provided, default to 100.
Note: max_emails is the page size per fetch cycle. The agent will loop through ALL inbox emails
by fetching max_emails at a time until no more remain.
Ask the user to confirm: "Does this look right? I'll proceed once you confirm."
@@ -45,8 +52,41 @@ Call gmail_list_labels() to show the user their current Gmail labels. This helps
**STEP 3 After the user confirms, call set_output:**
- set_output("rules", <the confirmed rules as a clear text description>)
- set_output("rules", <ALL active rules as a clear text description>)
- set_output("max_emails", <the confirmed max_emails as a string number, e.g. "100">)
- set_output("query", <Gmail search query if the user wants to target specific emails>)
**TARGETED QUERY (optional):**
If the user's rules target specific emails (e.g. "delete all emails from newsletters@example.com"),
build a Gmail search query to fetch ONLY matching emails instead of the entire inbox. This is much
faster and more efficient.
Gmail search query syntax:
- `from:sender@example.com` from a specific sender
- `to:recipient@example.com` to a specific recipient
- `subject:keyword` subject contains keyword
- `is:unread` / `is:read` read status
- `is:starred` / `is:important` flags
- `has:attachment` has attachments
- `filename:pdf` attachment filename
- `label:LABEL_NAME` has a specific label
- `category:promotions` / `category:social` / `category:updates` Gmail categories
- `newer_than:7d` / `older_than:30d` relative time (d=days, m=months, y=years)
- `after:2024/01/01` / `before:2024/12/31` absolute dates
- Combine with spaces (AND): `from:boss@co.com subject:urgent`
- OR operator: `from:alice OR from:bob`
- NOT / exclude: `-from:noreply@example.com` or `NOT from:noreply`
- Grouping: `{from:alice from:bob}` (same as OR)
Examples:
- User says "trash all promotional emails" query: `category:promotions`
- User says "star emails from my boss jane@co.com" query: `from:jane@co.com`
- User says "mark unread emails older than a week as read" query: `is:unread older_than:7d`
- User says "apply rules to all inbox emails" no query needed (default: `label:INBOX`)
If the rules apply broadly to ALL emails, do NOT set a query the default `label:INBOX` will be used.
Only set a query when it would meaningfully narrow the search.
""",
tools=["gmail_list_labels"],
@@ -59,47 +99,43 @@ fetch_emails_node = NodeSpec(
id="fetch-emails",
name="Fetch Emails",
description=(
"Fetch emails from the Gmail inbox up to the configured batch limit. "
"Supports pagination for continuous mode — can fetch the next batch "
"of emails beyond what was already processed."
"Fetch one page of emails from Gmail inbox. Returns emails filename "
"and next_page_token for pagination. The graph loops back here if "
"more pages remain."
),
node_type="event_loop",
client_facing=False,
max_node_visits=0,
input_keys=["rules", "max_emails"],
output_keys=["emails"],
input_keys=[
"rules",
"max_emails",
"next_page_token",
"last_processed_timestamp",
"query",
],
output_keys=["emails", "next_page_token"],
nullable_output_keys=["next_page_token"],
system_prompt="""\
You are a data pipeline step. Your job is to fetch emails from Gmail and write them to emails.jsonl.
You are a data pipeline step. Your job is to fetch ONE PAGE of emails from Gmail.
**FIRST-TIME FETCH (default path):**
1. Read "max_emails" and "rules" from input context.
2. Call bulk_fetch_emails(max_emails=<value>).
3. The tool returns {"filename": "emails.jsonl"}.
**INSTRUCTIONS:**
1. Read "max_emails", "next_page_token", "last_processed_timestamp", and "query" from input context.
2. Call bulk_fetch_emails with:
- max_emails=<max_emails value, default "100">
- page_token=<next_page_token value, if present and non-empty>
- after_timestamp=<last_processed_timestamp value, if present and non-empty>
- query=<query value, if present and non-empty; omit to default to "label:INBOX">
3. The tool returns {"filename": "emails.jsonl", "count": N, "next_page_token": "<token or null>"}.
4. Call set_output("emails", "emails.jsonl").
5. Call set_output("next_page_token", <the next_page_token from the tool result, or "" if null>).
**NEXT-BATCH FETCH (when user asks for "the next N" emails):**
The user wants emails BEYOND what was already fetched. Use pagination:
1. Call gmail_list_messages(query="label:INBOX", max_results=<previous + new count>).
Use page_token if needed to paginate past already-fetched emails.
2. Identify message IDs NOT in the previous batch.
3. Call gmail_batch_get_messages(message_ids=<new_ids>, format="metadata").
4. For each message, call append_data(filename="emails.jsonl",
data=<JSON: {id, subject, from, to, date, snippet, labels}>).
5. Call set_output("emails", "emails.jsonl").
**IMPORTANT:** The graph will automatically loop back to this node if next_page_token is non-empty.
You only need to fetch ONE page per visit. Do NOT loop internally.
**TOOLS:**
- bulk_fetch_emails(max_emails) Bulk fetch from inbox, writes emails.jsonl.
- gmail_list_messages(query, max_results, page_token) List message IDs.
- gmail_batch_get_messages(message_ids, format) Fetch metadata (max 50/call).
- append_data(filename, data) Append a line to a JSONL file.
Do NOT add commentary or explanation. Execute the appropriate path and call set_output when done.
Do NOT add commentary or explanation. Execute the steps and call set_output when done.
""",
tools=[
"bulk_fetch_emails",
"gmail_list_messages",
"gmail_batch_get_messages",
"append_data",
],
)
@@ -172,6 +208,10 @@ Each turn, process exactly ONE chunk: load → classify → act → record. Then
- CREATE CUSTOM LABEL use gmail_create_label(name=<label_name>) to create, then apply via gmail_modify_message with add_labels=[<label_id>]
- APPLY CUSTOM LABEL add_labels=[<label_id>] using the ID from gmail_create_label or gmail_list_labels
**QUEEN RULE INJECTION:**
If a new rule appears in the conversation mid-processing (injected by the queen),
apply it to the remaining unprocessed emails alongside the existing rules.
**CRITICAL RULES:**
- Your FIRST tool call MUST be load_data. Do NOT skip this.
- You MUST call Gmail tools to execute real actions. Do NOT just report what should be done.
@@ -200,8 +240,8 @@ report_node = NodeSpec(
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["actions_taken"],
output_keys=["summary_report"],
input_keys=["actions_taken", "rules"],
output_keys=["summary_report", "rules", "last_processed_timestamp"],
system_prompt="""\
You are an inbox management assistant. Your job is to generate a clear summary report of the actions taken on the user's emails, present it, and ask if they want to run another batch.
@@ -224,12 +264,16 @@ Present a clean, readable summary:
3. **No Action Taken** Any emails that didn't match any rules (if applicable).
Then ask: "Would you like to run another inbox triage with new rules?"
Then ask: "Would you like to run another inbox management cycle with new rules?"
**STEP 3 After the user responds, call set_output:**
**STEP 3 After the user responds, call set_output to persist state:**
- set_output("summary_report", <the formatted report text>)
- set_output("rules", <the current rules from context pass them through unchanged so they persist for the next cycle>)
- Call get_current_timestamp() and set_output("last_processed_timestamp", <the returned timestamp>)
This ensures the next timer cycle knows when emails were last processed and which rules to apply.
""",
tools=["load_data"],
tools=["load_data", "get_current_timestamp"],
)
__all__ = [
@@ -31,15 +31,31 @@ TOOLS = {
"bulk_fetch_emails": Tool(
name="bulk_fetch_emails",
description=(
"Fetch emails from the Gmail inbox and write them to a JSONL file. "
"Returns the filename of the written file."
"Fetch emails from Gmail and write them to a JSONL file. "
"Returns {filename, count, next_page_token}. Pass next_page_token "
"from a previous call to fetch the next page. "
"Supports Gmail search query syntax via the 'query' parameter."
),
parameters={
"type": "object",
"properties": {
"max_emails": {
"type": "string",
"description": "Maximum number of emails to fetch (default '100')",
"description": "Maximum number of emails to fetch in this page (default '100')",
},
"page_token": {
"type": "string",
"description": (
"Gmail API page token from a previous call's next_page_token. "
"Omit for the first page."
),
},
"after_timestamp": {
"type": "string",
"description": (
"Unix epoch seconds. Only fetch emails received after this time. "
"Used by timer cycles to skip already-processed emails."
),
},
"account": {
"type": "string",
@@ -48,10 +64,31 @@ TOOLS = {
"Required when multiple Google accounts are connected."
),
},
"query": {
"type": "string",
"description": (
"Gmail search query. Defaults to 'label:INBOX'. Supports full Gmail "
"search syntax: from:, to:, subject:, is:unread, is:starred, "
"has:attachment, label:, newer_than:, older_than:, category:, "
"filename:, and boolean operators (AND, OR, NOT, -, {}). "
"Examples: 'from:boss@example.com', 'subject:invoice is:unread', "
"'label:INBOX -from:noreply'. The after_timestamp parameter is "
"appended automatically if provided."
),
},
},
"required": [],
},
),
"get_current_timestamp": Tool(
name="get_current_timestamp",
description="Return the current Unix epoch timestamp in seconds.",
parameters={
"type": "object",
"properties": {},
"required": [],
},
),
}
@@ -122,44 +159,60 @@ def _parse_headers(headers: list[dict]) -> dict[str, str]:
# ---------------------------------------------------------------------------
def _bulk_fetch_emails(max_emails: str = "100", account: str = "") -> str:
"""Fetch inbox emails and write them to emails.jsonl.
def _bulk_fetch_emails(
max_emails: str = "100",
page_token: str = "",
after_timestamp: str = "",
account: str = "",
query: str = "",
) -> dict:
"""Fetch emails from Gmail and write them to emails.jsonl.
Uses synchronous httpx.Client since this runs as a tool call inside
an already-running async event loop.
Args:
max_emails: Maximum number of emails to fetch.
max_emails: Maximum number of emails to fetch in this page.
page_token: Gmail API page token for pagination. Omit for the first page.
after_timestamp: Unix epoch seconds only fetch emails after this time.
account: Account alias (e.g. 'timothy-home') for multi-account routing.
query: Gmail search query. Defaults to 'label:INBOX'. Supports full
Gmail search syntax (from:, subject:, is:, label:, etc.).
Returns:
The filename "emails.jsonl" (written to session data_dir).
Dict with {filename, count, next_page_token}.
"""
max_count = int(max_emails) if max_emails else 100
access_token = _get_access_token(account)
data_dir = _get_data_dir()
Path(data_dir).mkdir(parents=True, exist_ok=True)
headers = {
http_headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
message_ids: list[str] = []
page_token: str | None = None
# Build Gmail query
gmail_query = query.strip() if query and query.strip() else "label:INBOX"
if after_timestamp and after_timestamp.strip():
gmail_query += f" after:{after_timestamp.strip()}"
with httpx.Client(headers=headers, timeout=30.0) as client:
message_ids: list[str] = []
current_page_token: str | None = page_token if page_token else None
next_page_token: str | None = None
with httpx.Client(headers=http_headers, timeout=30.0) as client:
# Phase 1: Collect message IDs (paginated, sequential)
while len(message_ids) < max_count:
remaining = max_count - len(message_ids)
page_size = min(remaining, 500)
params: dict[str, str | int] = {
"q": "label:INBOX",
"q": gmail_query,
"maxResults": page_size,
}
if page_token:
params["pageToken"] = page_token
if current_page_token:
params["pageToken"] = current_page_token
resp = client.get(f"{GMAIL_API_BASE}/messages", params=params)
if resp.status_code != 200:
@@ -177,14 +230,21 @@ def _bulk_fetch_emails(max_emails: str = "100", account: str = "") -> str:
break
message_ids.append(msg["id"])
page_token = data.get("nextPageToken")
if not page_token:
current_page_token = data.get("nextPageToken")
if not current_page_token:
break
# Expose the Gmail API's nextPageToken so the graph can loop
next_page_token = current_page_token
if not message_ids:
(Path(data_dir) / "emails.jsonl").write_text("", encoding="utf-8")
logger.info("No inbox emails found.")
return "emails.jsonl"
return {
"filename": "emails.jsonl",
"count": 0,
"next_page_token": None,
}
logger.info(f"Found {len(message_ids)} message IDs. Fetching metadata...")
@@ -236,16 +296,20 @@ def _bulk_fetch_emails(max_emails: str = "100", account: str = "") -> str:
f"(wrote {len(emails)} to emails.jsonl)"
)
# Phase 3: Write JSONL
# Phase 3: Append JSONL (append so pagination accumulates across pages)
output_path = Path(data_dir) / "emails.jsonl"
with open(output_path, "w", encoding="utf-8") as f:
with open(output_path, "a", encoding="utf-8") as f:
for email in emails:
f.write(json.dumps(email, ensure_ascii=False) + "\n")
logger.info(
f"Wrote {len(emails)} emails to emails.jsonl ({output_path.stat().st_size} bytes)"
)
return "emails.jsonl"
return {
"filename": "emails.jsonl",
"count": len(emails),
"next_page_token": next_page_token,
}
# ---------------------------------------------------------------------------
@@ -253,16 +317,25 @@ def _bulk_fetch_emails(max_emails: str = "100", account: str = "") -> str:
# ---------------------------------------------------------------------------
def _get_current_timestamp() -> dict:
"""Return current Unix epoch timestamp."""
return {"timestamp": str(int(time.time()))}
def tool_executor(tool_use: ToolUse) -> ToolResult:
"""Dispatch tool calls to their implementations."""
if tool_use.name == "bulk_fetch_emails":
try:
max_emails = tool_use.input.get("max_emails", "100")
account = tool_use.input.get("account", "")
filename = _bulk_fetch_emails(max_emails=max_emails, account=account)
result = _bulk_fetch_emails(
max_emails=tool_use.input.get("max_emails", "100"),
page_token=tool_use.input.get("page_token", ""),
after_timestamp=tool_use.input.get("after_timestamp", ""),
account=tool_use.input.get("account", ""),
query=tool_use.input.get("query", ""),
)
return ToolResult(
tool_use_id=tool_use.id,
content=json.dumps({"filename": filename}),
content=json.dumps(result),
is_error=False,
)
except Exception as e:
@@ -272,6 +345,13 @@ def tool_executor(tool_use: ToolUse) -> ToolResult:
is_error=True,
)
if tool_use.name == "get_current_timestamp":
return ToolResult(
tool_use_id=tool_use.id,
content=json.dumps(_get_current_timestamp()),
is_error=False,
)
return ToolResult(
tool_use_id=tool_use.id,
content=json.dumps({"error": f"Unknown tool: {tool_use.name}"}),
@@ -161,7 +161,7 @@ Only include the jobs the user explicitly selected.
customize_node = NodeSpec(
id="customize",
name="Customize",
description="For each selected job, generate resume customization list and cold outreach email as HTML",
description="For each selected job, generate resume customization list and cold outreach email, create Gmail drafts",
node_type="event_loop",
client_facing=True,
max_node_visits=1,
@@ -169,7 +169,7 @@ customize_node = NodeSpec(
output_keys=["application_materials"],
success_criteria=(
"Resume customization list and cold outreach email generated "
"for each selected job, saved as a single HTML file and opened for the user."
"for each selected job, saved as HTML, and Gmail drafts created in user's inbox."
),
system_prompt="""\
You are a career coach creating personalized application materials.
@@ -223,8 +223,8 @@ append_data(filename="application_materials.html", data="</body>\\n</html>")
```
**Step 4 Serve the file:**
Call serve_file_to_user(filename="application_materials.html", open_in_browser=true)
Print the file_path from the result so the user can click it later.
Call serve_file_to_user(filename="application_materials.html")
Print the file_path from the result so the user can access it later.
**Step 5 Create Gmail Drafts (in batches of 5):**
IMPORTANT: Do NOT create all drafts in one turn. Create at most 5 gmail_create_draft calls \
@@ -234,12 +234,17 @@ drafts, then create the remaining drafts in the next turn.
For each selected job, call gmail_create_draft with:
- to: hiring manager email if available, otherwise "hiring@company-domain.com"
- subject: the cold email subject line
- html: the cold email body as HTML
- body: the cold email body as plain text
- draft: true (create as draft, not send immediately)
If gmail_create_draft errors (e.g. credentials not configured), skip ALL remaining drafts and tell the user:
"Gmail drafts could not be created (Gmail not connected). You can copy the emails from the HTML report instead."
**Step 6 Finish:**
Call set_output("application_materials", "Created application_materials.html with materials for {N} jobs")
**Step 6 Confirm Gmail Drafts Created:**
After all drafts are created, tell the user: "Created {N} draft emails in your Gmail inbox. You can review and send them when ready."
**Step 7 Finish:**
Call set_output("application_materials", "Created application_materials.html with materials for {N} jobs and {N} Gmail drafts")
**IMPORTANT:**
- Only suggest truthful resume changes enhance presentation, never fabricate
+4 -1
View File
@@ -494,6 +494,9 @@ if ($NodeAvailable) {
$null = & npm install --no-fund --no-audit 2>&1
if ($LASTEXITCODE -eq 0) {
Write-Ok "ok"
# Clean stale tsbuildinfo cache — tsc -b incremental builds fail
# silently when these are out of sync with source files
Get-ChildItem -Path $frontendDir -Filter "tsconfig*.tsbuildinfo" -ErrorAction SilentlyContinue | Remove-Item -Force
Write-Host " Building frontend... " -NoNewline
$null = & npm run build 2>&1
if ($LASTEXITCODE -eq 0) {
@@ -757,7 +760,7 @@ $ModelChoices = @{
)
gemini = @(
@{ Id = "gemini-3-flash-preview"; Label = "Gemini 3 Flash - Fast (recommended)"; MaxTokens = 8192 },
@{ Id = "gemini-3-pro-preview"; Label = "Gemini 3 Pro - Best quality"; MaxTokens = 8192 }
@{ Id = "gemini-3.1-pro-preview"; Label = "Gemini 3.1 Pro - Best quality"; MaxTokens = 8192 }
)
groq = @(
@{ Id = "moonshotai/kimi-k2-instruct-0905"; Label = "Kimi K2 - Best quality (recommended)"; MaxTokens = 8192 },
+7 -4
View File
@@ -286,6 +286,9 @@ if [ "$NODE_AVAILABLE" = true ]; then
fi
if [ "$NODE_AVAILABLE" = true ]; then
# Clean stale tsbuildinfo cache — tsc -b incremental builds fail
# silently when these are out of sync with source files
rm -f "$FRONTEND_DIR"/tsconfig*.tsbuildinfo
echo -n " Building frontend... "
if (cd "$FRONTEND_DIR" && npm run build) > /dev/null 2>&1; then
echo -e "${GREEN}ok${NC}"
@@ -424,7 +427,7 @@ if [ "$USE_ASSOC_ARRAYS" = true ]; then
["openai:0"]="gpt-5.2"
["openai:1"]="gpt-5-mini"
["gemini:0"]="gemini-3-flash-preview"
["gemini:1"]="gemini-3-pro-preview"
["gemini:1"]="gemini-3.1-pro-preview"
["groq:0"]="moonshotai/kimi-k2-instruct-0905"
["groq:1"]="openai/gpt-oss-120b"
["cerebras:0"]="zai-glm-4.7"
@@ -439,7 +442,7 @@ if [ "$USE_ASSOC_ARRAYS" = true ]; then
["openai:0"]="GPT-5.2 - Most capable (recommended)"
["openai:1"]="GPT-5 Mini - Fast + cheap"
["gemini:0"]="Gemini 3 Flash - Fast (recommended)"
["gemini:1"]="Gemini 3 Pro - Best quality"
["gemini:1"]="Gemini 3.1 Pro - Best quality"
["groq:0"]="Kimi K2 - Best quality (recommended)"
["groq:1"]="GPT-OSS 120B - Fast reasoning"
["cerebras:0"]="ZAI-GLM 4.7 - Best quality (recommended)"
@@ -549,8 +552,8 @@ else
# Model choices per provider - flat parallel arrays with provider offsets
# Provider order: anthropic(4), openai(2), gemini(2), groq(2), cerebras(2)
MC_PROVIDERS=(anthropic anthropic anthropic anthropic openai openai gemini gemini groq groq cerebras cerebras)
MC_IDS=("claude-opus-4-6" "claude-sonnet-4-5-20250929" "claude-sonnet-4-20250514" "claude-haiku-4-5-20251001" "gpt-5.2" "gpt-5-mini" "gemini-3-flash-preview" "gemini-3-pro-preview" "moonshotai/kimi-k2-instruct-0905" "openai/gpt-oss-120b" "zai-glm-4.7" "qwen3-235b-a22b-instruct-2507")
MC_LABELS=("Opus 4.6 - Most capable (recommended)" "Sonnet 4.5 - Best balance" "Sonnet 4 - Fast + capable" "Haiku 4.5 - Fast + cheap" "GPT-5.2 - Most capable (recommended)" "GPT-5 Mini - Fast + cheap" "Gemini 3 Flash - Fast (recommended)" "Gemini 3 Pro - Best quality" "Kimi K2 - Best quality (recommended)" "GPT-OSS 120B - Fast reasoning" "ZAI-GLM 4.7 - Best quality (recommended)" "Qwen3 235B - Frontier reasoning")
MC_IDS=("claude-opus-4-6" "claude-sonnet-4-5-20250929" "claude-sonnet-4-20250514" "claude-haiku-4-5-20251001" "gpt-5.2" "gpt-5-mini" "gemini-3-flash-preview" "gemini-3.1-pro-preview" "moonshotai/kimi-k2-instruct-0905" "openai/gpt-oss-120b" "zai-glm-4.7" "qwen3-235b-a22b-instruct-2507")
MC_LABELS=("Opus 4.6 - Most capable (recommended)" "Sonnet 4.5 - Best balance" "Sonnet 4 - Fast + capable" "Haiku 4.5 - Fast + cheap" "GPT-5.2 - Most capable (recommended)" "GPT-5 Mini - Fast + cheap" "Gemini 3 Flash - Fast (recommended)" "Gemini 3.1 Pro - Best quality" "Kimi K2 - Best quality (recommended)" "GPT-OSS 120B - Fast reasoning" "ZAI-GLM 4.7 - Best quality (recommended)" "Qwen3 235B - Frontier reasoning")
MC_MAXTOKENS=(32768 16384 8192 8192 16384 16384 8192 8192 8192 8192 8192 8192)
# Helper: get number of model choices for a provider
+2
View File
@@ -20,6 +20,7 @@ def test_check_requirements():
[sys.executable, "scripts/check_requirements.py", "json", "sys", "os"],
capture_output=True,
text=True,
encoding="utf-8",
)
print(f"Exit code: {result.returncode}")
print(f"Output:\n{result.stdout}")
@@ -39,6 +40,7 @@ def test_check_requirements():
[sys.executable, "scripts/check_requirements.py", "json", "nonexistent_module"],
capture_output=True,
text=True,
encoding="utf-8",
)
print(f"Exit code: {result.returncode}")
print(f"Output:\n{result.stdout}")
+49 -2
View File
@@ -90,7 +90,13 @@ def _resolve_path(path: str) -> str:
def _snapshot_git(*args: str) -> str:
"""Run a git command with the snapshot GIT_DIR and PROJECT_ROOT worktree."""
cmd = ["git", "--git-dir", SNAPSHOT_DIR, "--work-tree", PROJECT_ROOT, *args]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30,
encoding="utf-8",
)
return result.stdout.strip()
@@ -104,6 +110,7 @@ def _ensure_snapshot_repo():
["git", "init", "--bare", SNAPSHOT_DIR],
capture_output=True,
timeout=10,
encoding="utf-8",
)
_snapshot_git("config", "core.autocrlf", "false")
@@ -152,6 +159,7 @@ def run_command(command: str, cwd: str = "", timeout: int = 120) -> str:
capture_output=True,
text=True,
timeout=timeout,
encoding="utf-8",
env={
**os.environ,
"PYTHONPATH": (
@@ -228,6 +236,7 @@ def undo_changes(path: str = "") -> str:
capture_output=True,
text=True,
timeout=10,
encoding="utf-8",
)
return f"Restored: {path}"
else:
@@ -453,7 +462,28 @@ def validate_agent_tools(agent_path: str) -> str:
Returns:
JSON with validation result: pass/fail, missing tools per node, available tools
"""
resolved = _resolve_path(agent_path)
try:
resolved = _resolve_path(agent_path)
except ValueError:
return json.dumps({"error": "Access denied: path is outside the project root."})
# Restrict to allowed directories to prevent arbitrary code execution
# via importlib.import_module() below.
try:
from framework.server.app import validate_agent_path
except ImportError:
return json.dumps({"error": "Cannot validate agent path: framework package not available"})
try:
resolved = str(validate_agent_path(resolved))
except ValueError:
return json.dumps(
{
"error": "agent_path must be inside an allowed directory "
"(exports/, examples/, or ~/.hive/agents/)"
}
)
if not os.path.isdir(resolved):
return json.dumps({"error": f"Agent directory not found: {agent_path}"})
@@ -1015,6 +1045,22 @@ def run_agent_tests(
# Parse test types
types_list = [t.strip() for t in test_types.split(",")]
# Guard: pytest must be available as a subprocess command.
# Install with: pip install 'framework[testing]'
import shutil
if shutil.which("pytest") is None:
return json.dumps(
{
"error": (
"pytest is not installed or not on PATH. "
"Hive's test runner requires pytest at runtime. "
"Install it with: pip install 'framework[testing]' "
"or: uv pip install 'framework[testing]'"
),
}
)
# Build pytest command
cmd = ["pytest"]
@@ -1052,6 +1098,7 @@ def run_agent_tests(
text=True,
timeout=120,
env=env,
encoding="utf-8",
)
except subprocess.TimeoutExpired:
return json.dumps(
+6 -3
View File
@@ -48,9 +48,10 @@ if "--stdio" in sys.argv:
rich.console.Console.__init__ = _patched_console_init
from aden_tools.file_ops import register_file_tools # noqa: E402
from fastmcp import FastMCP # noqa: E402
from aden_tools.file_ops import register_file_tools # noqa: E402
mcp = FastMCP("files-tools")
register_file_tools(mcp)
@@ -80,8 +81,10 @@ def main() -> None:
args = parser.parse_args()
if not args.stdio:
logger.info("Registered 6 file tools: read_file, write_file, edit_file, "
"list_directory, search_files, run_command")
logger.info(
"Registered 6 file tools: read_file, write_file, edit_file, "
"list_directory, search_files, run_command"
)
if args.stdio:
mcp.run(transport="stdio")
@@ -40,6 +40,7 @@ def open_browser(url: str) -> tuple[bool, str]:
["open", url],
check=True,
capture_output=True,
encoding="utf-8",
)
return True, "Opened in browser"
@@ -50,6 +51,7 @@ def open_browser(url: str) -> tuple[bool, str]:
["xdg-open", url],
check=True,
capture_output=True,
encoding="utf-8",
)
return True, "Opened in browser"
except FileNotFoundError:
@@ -84,7 +84,7 @@ def check_env_var_in_shell_config(
if not config_path.exists():
return False, None
content = config_path.read_text()
content = config_path.read_text(encoding="utf-8")
# Look for export ENV_VAR=value or export ENV_VAR="value"
pattern = rf"^export\s+{re.escape(env_var)}=(.+)$"
@@ -130,7 +130,7 @@ def add_env_var_to_shell_config(
try:
if config_path.exists():
content = config_path.read_text()
content = config_path.read_text(encoding="utf-8")
# Check if already exists
pattern = rf"^export\s+{re.escape(env_var)}=.*$"
@@ -142,11 +142,11 @@ def add_env_var_to_shell_config(
content,
flags=re.MULTILINE,
)
config_path.write_text(new_content)
config_path.write_text(new_content, encoding="utf-8")
return True, str(config_path)
# Append to file
with open(config_path, "a") as f:
with open(config_path, "a", encoding="utf-8") as f:
f.write(f"\n# {comment}\n")
f.write(f"{export_line}\n")
@@ -178,7 +178,7 @@ def remove_env_var_from_shell_config(
return True, "Config file does not exist"
try:
content = config_path.read_text()
content = config_path.read_text(encoding="utf-8")
lines = content.split("\n")
new_lines = []
@@ -206,7 +206,7 @@ def remove_env_var_from_shell_config(
new_lines.append(line)
config_path.write_text("\n".join(new_lines))
config_path.write_text("\n".join(new_lines), encoding="utf-8")
return True, str(config_path)
except PermissionError:
+9 -5
View File
@@ -473,7 +473,7 @@ def register_file_tools(
@mcp.tool()
def search_files(pattern: str, path: str = ".", include: str = "") -> str:
"""Search file contents using regex. Uses ripgrep when available, falls back to Python regex.
"""Search file contents using regex. Uses ripgrep if available.
Results sorted by file with line numbers.
@@ -501,7 +501,13 @@ def register_file_tools(
cmd.extend(["--glob", include])
cmd.append(resolved)
rg_result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
rg_result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30,
encoding="utf-8",
)
if rg_result.returncode <= 1:
output = rg_result.stdout.strip()
if not output:
@@ -538,9 +544,7 @@ def register_file_tools(
if include and not fnmatch.fnmatch(fname, include):
continue
fpath = os.path.join(root, fname)
display_path = (
os.path.relpath(fpath, project_root) if project_root else fpath
)
display_path = os.path.relpath(fpath, project_root) if project_root else fpath
try:
with open(fpath, encoding="utf-8", errors="ignore") as f:
for i, line in enumerate(f, 1):
@@ -48,7 +48,13 @@ def register_tools(mcp: FastMCP) -> None:
secure_cwd = session_root
result = subprocess.run(
command, shell=True, cwd=secure_cwd, capture_output=True, text=True, timeout=60
command,
shell=True,
cwd=secure_cwd,
capture_output=True,
text=True,
timeout=60,
encoding="utf-8",
)
return {
+2 -2
View File
@@ -70,7 +70,7 @@ def allocate_port(profile: str, storage_path: Path | None = None) -> int:
# Check for stored port
if port_file and port_file.exists():
try:
stored_port = int(port_file.read_text().strip())
stored_port = int(port_file.read_text(encoding="utf-8").strip())
if CDP_PORT_MIN <= stored_port <= CDP_PORT_MAX:
if _is_port_available(stored_port):
_allocated_ports.add(stored_port)
@@ -87,7 +87,7 @@ def allocate_port(profile: str, storage_path: Path | None = None) -> int:
# Persist port assignment
if port_file:
try:
port_file.write_text(str(port))
port_file.write_text(str(port), encoding="utf-8")
except OSError as e:
logger.warning(f"Failed to save port to file: {e}")
return port
+2 -2
View File
@@ -48,7 +48,7 @@ def _discover_tool_modules() -> list[tuple[str, str]]:
continue
if item.is_dir() and (item / "__init__.py").exists():
init_text = (item / "__init__.py").read_text()
init_text = (item / "__init__.py").read_text(encoding="utf-8")
if "register_tools" in init_text:
# Direct tool package (e.g., web_search_tool, email_tool)
@@ -59,7 +59,7 @@ def _discover_tool_modules() -> list[tuple[str, str]]:
if sub.name.startswith("_") or sub.name == "__pycache__":
continue
if sub.is_dir() and (sub / "__init__.py").exists():
sub_init_text = (sub / "__init__.py").read_text()
sub_init_text = (sub / "__init__.py").read_text(encoding="utf-8")
if "register_tools" in sub_init_text:
modules.append(
(
+1 -4
View File
@@ -43,10 +43,7 @@ def ts():
def log(turn: int | str, action: str, result_summary: str, elapsed: float):
"""Pretty-print a turn log line."""
print(
f" [{ts()}] Turn {turn:>2} | {elapsed:5.1f}s "
f"| {action:<45} | {result_summary}"
)
print(f" [{ts()}] Turn {turn:>2} | {elapsed:5.1f}s | {action:<45} | {result_summary}")
async def reproduce_agent_session(session: BrowserSession):
+19 -14
View File
@@ -49,7 +49,10 @@ def session_dir(tmp_path: Path) -> Path:
def basic_csv(session_dir: Path) -> Path:
"""Create a basic CSV file for testing."""
csv_file = session_dir / "basic.csv"
csv_file.write_text("name,age,city\nAlice,30,NYC\nBob,25,LA\nCharlie,35,Chicago\n")
csv_file.write_text(
"name,age,city\nAlice,30,NYC\nBob,25,LA\nCharlie,35,Chicago\n",
encoding="utf-8",
)
return csv_file
@@ -60,7 +63,7 @@ def large_csv(session_dir: Path) -> Path:
lines = ["id,value"]
for i in range(100):
lines.append(f"{i},{i * 10}")
csv_file.write_text("\n".join(lines) + "\n")
csv_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
return csv_file
@@ -68,7 +71,7 @@ def large_csv(session_dir: Path) -> Path:
def empty_csv(session_dir: Path) -> Path:
"""Create an empty CSV file (no content)."""
csv_file = session_dir / "empty.csv"
csv_file.write_text("")
csv_file.write_text("", encoding="utf-8")
return csv_file
@@ -76,7 +79,7 @@ def empty_csv(session_dir: Path) -> Path:
def headers_only_csv(session_dir: Path) -> Path:
"""Create a CSV file with only headers."""
csv_file = session_dir / "headers_only.csv"
csv_file.write_text("name,age,city\n")
csv_file.write_text("name,age,city\n", encoding="utf-8")
return csv_file
@@ -217,7 +220,7 @@ class TestCsvRead:
"""Return error for non-CSV file extension."""
# Create a text file
txt_file = session_dir / "data.txt"
txt_file.write_text("name,age\nAlice,30\n")
txt_file.write_text("name,age\nAlice,30\n", encoding="utf-8")
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
result = csv_tool_fn(
@@ -317,7 +320,8 @@ class TestCsvRead:
"""Read CSV with quoted fields containing commas."""
csv_file = session_dir / "quoted.csv"
csv_file.write_text(
'name,address,note\n"Smith, John","123 Main St, Apt 4","Hello, world"\n'
'name,address,note\n"Smith, John","123 Main St, Apt 4","Hello, world"\n',
encoding="utf-8",
)
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
@@ -385,7 +389,7 @@ class TestCsvWrite:
assert result["rows_written"] == 2
# Verify file content
content = (session_dir / "output.csv").read_text()
content = (session_dir / "output.csv").read_text(encoding="utf-8")
assert "name,age,city" in content
assert "Alice,30,NYC" in content
assert "Bob,25,LA" in content
@@ -449,7 +453,7 @@ class TestCsvWrite:
assert result["success"] is True
content = (session_dir / "output.csv").read_text()
content = (session_dir / "output.csv").read_text(encoding="utf-8")
assert "extra" not in content
assert "ignored" not in content
@@ -468,7 +472,7 @@ class TestCsvWrite:
assert result["success"] is True
assert result["rows_written"] == 0
content = (session_dir / "output.csv").read_text()
content = (session_dir / "output.csv").read_text(encoding="utf-8")
assert "name,age" in content
def test_write_unicode_content(self, csv_tools, session_dir, tmp_path):
@@ -511,7 +515,7 @@ class TestCsvWrite:
csv_file = session_dir / "data.csv"
assert csv_file.exists()
content = csv_file.read_text()
content = csv_file.read_text(encoding="utf-8")
assert "id,value" in content
assert "1,test1" in content
assert "2,test2" in content
@@ -579,7 +583,7 @@ class TestCsvAppend:
assert result["success"] is True
content = (session_dir / "basic.csv").read_text()
content = (session_dir / "basic.csv").read_text(encoding="utf-8")
assert "extra" not in content
assert "ignored" not in content
assert "David" in content
@@ -587,7 +591,7 @@ class TestCsvAppend:
def test_append_non_csv_extension_error(self, csv_tools, session_dir, tmp_path):
"""Return error for non-CSV file extension."""
txt_file = session_dir / "data.txt"
txt_file.write_text("name\nAlice\n")
txt_file.write_text("name\nAlice\n", encoding="utf-8")
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
result = csv_tools["csv_append"](
@@ -679,7 +683,7 @@ class TestCsvInfo:
def test_get_info_non_csv_extension_error(self, csv_tools, session_dir, tmp_path):
"""Return error for non-CSV file extension."""
txt_file = session_dir / "data.txt"
txt_file.write_text("name\nAlice\n")
txt_file.write_text("name\nAlice\n", encoding="utf-8")
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
result = csv_tools["csv_info"](
@@ -707,7 +711,8 @@ class TestCsvSql:
"2,MacBook,Electronics,1999,30\n"
"3,Coffee Mug,Kitchen,15,200\n"
"4,Headphones,Electronics,299,75\n"
"5,Water Bottle,Kitchen,25,150\n"
"5,Water Bottle,Kitchen,25,150\n",
encoding="utf-8",
)
return csv_file
+4 -4
View File
@@ -280,7 +280,7 @@ class TestExcelRead:
"""Return error for non-Excel file extension."""
# Create a text file
txt_file = session_dir / "data.txt"
txt_file.write_text("name,age\nAlice,30\n")
txt_file.write_text("name,age\nAlice,30\n", encoding="utf-8")
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
result = excel_read_fn(
@@ -602,7 +602,7 @@ class TestExcelAppend:
def test_append_non_xlsx_extension_error(self, excel_tools, session_dir, tmp_path):
"""Return error for non-Excel file extension."""
txt_file = session_dir / "data.txt"
txt_file.write_text("name\nAlice\n")
txt_file.write_text("name\nAlice\n", encoding="utf-8")
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
result = excel_tools["excel_append"](
@@ -672,7 +672,7 @@ class TestExcelInfo:
def test_get_info_non_xlsx_extension_error(self, excel_tools, session_dir, tmp_path):
"""Return error for non-Excel file extension."""
txt_file = session_dir / "data.txt"
txt_file.write_text("name\nAlice\n")
txt_file.write_text("name\nAlice\n", encoding="utf-8")
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
result = excel_tools["excel_info"](
@@ -735,7 +735,7 @@ class TestExcelSheetList:
def test_list_sheets_non_xlsx_extension_error(self, excel_tools, session_dir, tmp_path):
"""Return error for non-Excel file extension."""
txt_file = session_dir / "data.txt"
txt_file.write_text("name\nAlice\n")
txt_file.write_text("name\nAlice\n", encoding="utf-8")
with patch("aden_tools.tools.file_system_toolkits.security.WORKSPACES_DIR", str(tmp_path)):
result = excel_tools["excel_sheet_list"](
+50 -49
View File
@@ -86,7 +86,7 @@ class TestViewFileTool:
def test_view_existing_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing an existing file returns content and metadata."""
test_file = tmp_path / "test.txt"
test_file.write_text("Hello, World!")
test_file.write_text("Hello, World!", encoding="utf-8")
result = view_file_fn(path="test.txt", **mock_workspace)
@@ -106,7 +106,7 @@ class TestViewFileTool:
"""Viewing a multiline file returns correct line count."""
test_file = tmp_path / "multiline.txt"
content = "Line 1\nLine 2\nLine 3\nLine 4\n"
test_file.write_text(content)
test_file.write_text(content, encoding="utf-8")
result = view_file_fn(path="multiline.txt", **mock_workspace)
@@ -117,7 +117,7 @@ class TestViewFileTool:
def test_view_empty_file(self, view_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Viewing an empty file returns empty content."""
test_file = tmp_path / "empty.txt"
test_file.write_text("")
test_file.write_text("", encoding="utf-8")
result = view_file_fn(path="empty.txt", **mock_workspace)
@@ -143,7 +143,7 @@ class TestViewFileTool:
nested = tmp_path / "nested" / "dir"
nested.mkdir(parents=True)
test_file = nested / "file.txt"
test_file.write_text("nested content")
test_file.write_text("nested content", encoding="utf-8")
result = view_file_fn(path="nested/dir/file.txt", **mock_workspace)
@@ -156,7 +156,7 @@ class TestViewFileTool:
"""Viewing a file with max_size truncates content when exceeding limit."""
test_file = tmp_path / "large.txt"
content = "x" * 1000
test_file.write_text(content)
test_file.write_text(content, encoding="utf-8")
result = view_file_fn(path="large.txt", max_size=100, **mock_workspace)
@@ -171,7 +171,7 @@ class TestViewFileTool:
):
"""Viewing a file with negative max_size returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("content")
test_file.write_text("content", encoding="utf-8")
result = view_file_fn(path="test.txt", max_size=-1, **mock_workspace)
@@ -196,7 +196,7 @@ class TestViewFileTool:
):
"""Viewing a file with invalid encoding returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("content")
test_file.write_text("content", encoding="utf-8")
result = view_file_fn(path="test.txt", encoding="invalid-encoding", **mock_workspace)
@@ -225,12 +225,12 @@ class TestWriteToFileTool:
# Verify file was created
created_file = tmp_path / "new_file.txt"
assert created_file.exists()
assert created_file.read_text() == "Test content"
assert created_file.read_text(encoding="utf-8") == "Test content"
def test_write_append_mode(self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path):
"""Writing with append=True appends to existing file."""
test_file = tmp_path / "append_test.txt"
test_file.write_text("Line 1\n")
test_file.write_text("Line 1\n", encoding="utf-8")
result = write_to_file_fn(
path="append_test.txt", content="Line 2\n", append=True, **mock_workspace
@@ -238,20 +238,20 @@ class TestWriteToFileTool:
assert result["success"] is True
assert result["mode"] == "appended"
assert test_file.read_text() == "Line 1\nLine 2\n"
assert test_file.read_text(encoding="utf-8") == "Line 1\nLine 2\n"
def test_write_overwrite_existing(
self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Writing to existing file overwrites it by default."""
test_file = tmp_path / "overwrite.txt"
test_file.write_text("Original content")
test_file.write_text("Original content", encoding="utf-8")
result = write_to_file_fn(path="overwrite.txt", content="New content", **mock_workspace)
assert result["success"] is True
assert result["mode"] == "written"
assert test_file.read_text() == "New content"
assert test_file.read_text(encoding="utf-8") == "New content"
def test_write_creates_parent_directories(
self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path
@@ -262,7 +262,7 @@ class TestWriteToFileTool:
assert result["success"] is True
created_file = tmp_path / "nested" / "dir" / "file.txt"
assert created_file.exists()
assert created_file.read_text() == "Test"
assert created_file.read_text(encoding="utf-8") == "Test"
def test_write_empty_content(
self, write_to_file_fn, mock_workspace, mock_secure_path, tmp_path
@@ -274,7 +274,7 @@ class TestWriteToFileTool:
assert result["bytes_written"] == 0
created_file = tmp_path / "empty.txt"
assert created_file.exists()
assert created_file.read_text() == ""
assert created_file.read_text(encoding="utf-8") == ""
class TestListDirTool:
@@ -290,8 +290,8 @@ class TestListDirTool:
def test_list_directory(self, list_dir_fn, mock_workspace, mock_secure_path, tmp_path):
"""Listing a directory returns all entries."""
# Create test files and directories
(tmp_path / "file1.txt").write_text("content")
(tmp_path / "file2.txt").write_text("content")
(tmp_path / "file1.txt").write_text("content", encoding="utf-8")
(tmp_path / "file2.txt").write_text("content", encoding="utf-8")
(tmp_path / "subdir").mkdir()
result = list_dir_fn(path=".", **mock_workspace)
@@ -328,8 +328,8 @@ class TestListDirTool:
self, list_dir_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Listing a directory returns file sizes for files."""
(tmp_path / "small.txt").write_text("hi")
(tmp_path / "larger.txt").write_text("hello world")
(tmp_path / "small.txt").write_text("hi", encoding="utf-8")
(tmp_path / "larger.txt").write_text("hello world", encoding="utf-8")
(tmp_path / "subdir").mkdir()
result = list_dir_fn(path=".", **mock_workspace)
@@ -366,7 +366,7 @@ class TestReplaceFileContentTool:
):
"""Replacing content in a file works correctly."""
test_file = tmp_path / "replace_test.txt"
test_file.write_text("Hello World! Hello again!")
test_file.write_text("Hello World! Hello again!", encoding="utf-8")
result = replace_file_content_fn(
path="replace_test.txt", target="Hello", replacement="Hi", **mock_workspace
@@ -374,14 +374,14 @@ class TestReplaceFileContentTool:
assert result["success"] is True
assert result["occurrences_replaced"] == 2
assert test_file.read_text() == "Hi World! Hi again!"
assert test_file.read_text(encoding="utf-8") == "Hi World! Hi again!"
def test_replace_target_not_found(
self, replace_file_content_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Replacing non-existent target returns error."""
test_file = tmp_path / "test.txt"
test_file.write_text("Hello World")
test_file.write_text("Hello World", encoding="utf-8")
result = replace_file_content_fn(
path="test.txt", target="nonexistent", replacement="new", **mock_workspace
@@ -406,7 +406,7 @@ class TestReplaceFileContentTool:
):
"""Replacing content with single occurrence works correctly."""
test_file = tmp_path / "single.txt"
test_file.write_text("Hello World")
test_file.write_text("Hello World", encoding="utf-8")
result = replace_file_content_fn(
path="single.txt", target="Hello", replacement="Hi", **mock_workspace
@@ -414,14 +414,14 @@ class TestReplaceFileContentTool:
assert result["success"] is True
assert result["occurrences_replaced"] == 1
assert test_file.read_text() == "Hi World"
assert test_file.read_text(encoding="utf-8") == "Hi World"
def test_replace_multiline_content(
self, replace_file_content_fn, mock_workspace, mock_secure_path, tmp_path
):
"""Replacing content across multiple lines works correctly."""
test_file = tmp_path / "multiline.txt"
test_file.write_text("Line 1\nTODO: fix this\nLine 3\nTODO: add tests\n")
test_file.write_text("Line 1\nTODO: fix this\nLine 3\nTODO: add tests\n", encoding="utf-8")
result = replace_file_content_fn(
path="multiline.txt", target="TODO:", replacement="DONE:", **mock_workspace
@@ -429,7 +429,8 @@ class TestReplaceFileContentTool:
assert result["success"] is True
assert result["occurrences_replaced"] == 2
assert test_file.read_text() == "Line 1\nDONE: fix this\nLine 3\nDONE: add tests\n"
expected = "Line 1\nDONE: fix this\nLine 3\nDONE: add tests\n"
assert test_file.read_text(encoding="utf-8") == expected
class TestGrepSearchTool:
@@ -447,7 +448,7 @@ class TestGrepSearchTool:
):
"""Searching a single file returns matches."""
test_file = tmp_path / "search_test.txt"
test_file.write_text("Line 1\nLine 2 with pattern\nLine 3")
test_file.write_text("Line 1\nLine 2 with pattern\nLine 3", encoding="utf-8")
result = grep_search_fn(path="search_test.txt", pattern="pattern", **mock_workspace)
@@ -462,7 +463,7 @@ class TestGrepSearchTool:
):
"""Searching with no matches returns empty list."""
test_file = tmp_path / "test.txt"
test_file.write_text("Hello World")
test_file.write_text("Hello World", encoding="utf-8")
result = grep_search_fn(path="test.txt", pattern="nonexistent", **mock_workspace)
@@ -475,13 +476,13 @@ class TestGrepSearchTool:
):
"""Searching directory non-recursively only searches immediate files."""
# Create files in root
(tmp_path / "file1.txt").write_text("pattern here")
(tmp_path / "file2.txt").write_text("no match here")
(tmp_path / "file1.txt").write_text("pattern here", encoding="utf-8")
(tmp_path / "file2.txt").write_text("no match here", encoding="utf-8")
# Create nested directory with file
nested = tmp_path / "nested"
nested.mkdir()
(nested / "nested_file.txt").write_text("pattern in nested")
(nested / "nested_file.txt").write_text("pattern in nested", encoding="utf-8")
result = grep_search_fn(path=".", pattern="pattern", recursive=False, **mock_workspace)
@@ -494,12 +495,12 @@ class TestGrepSearchTool:
):
"""Searching directory recursively finds matches in subdirectories."""
# Create files in root
(tmp_path / "file1.txt").write_text("pattern here")
(tmp_path / "file1.txt").write_text("pattern here", encoding="utf-8")
# Create nested directory with file
nested = tmp_path / "nested"
nested.mkdir()
(nested / "nested_file.txt").write_text("pattern in nested")
(nested / "nested_file.txt").write_text("pattern in nested", encoding="utf-8")
result = grep_search_fn(path=".", pattern="pattern", recursive=True, **mock_workspace)
@@ -512,7 +513,7 @@ class TestGrepSearchTool:
):
"""Searching with regex pattern finds complex matches."""
test_file = tmp_path / "regex_test.txt"
test_file.write_text("foo123bar\nfoo456bar\nbaz789baz\n")
test_file.write_text("foo123bar\nfoo456bar\nbaz789baz\n", encoding="utf-8")
result = grep_search_fn(path="regex_test.txt", pattern=r"foo\d+bar", **mock_workspace)
@@ -526,7 +527,7 @@ class TestGrepSearchTool:
):
"""Searching returns one match per line even with multiple occurrences."""
test_file = tmp_path / "multi_match.txt"
test_file.write_text("hello hello hello\nworld\nhello again")
test_file.write_text("hello hello hello\nworld\nhello again", encoding="utf-8")
result = grep_search_fn(path="multi_match.txt", pattern="hello", **mock_workspace)
@@ -573,7 +574,7 @@ class TestExecuteCommandTool:
):
"""Executing ls command lists files."""
# Create a test file
(tmp_path / "testfile.txt").write_text("content")
(tmp_path / "testfile.txt").write_text("content", encoding="utf-8")
result = execute_command_fn(command=f"ls {tmp_path}", **mock_workspace)
@@ -610,7 +611,7 @@ class TestApplyDiffTool:
def test_apply_diff_successful(self, apply_diff_fn, mock_workspace, mock_secure_path, tmp_path):
"""Applying a valid diff successfully modifies the file."""
test_file = tmp_path / "diff_test.txt"
test_file.write_text("Hello World")
test_file.write_text("Hello World", encoding="utf-8")
# Create a simple diff using diff_match_patch format
import diff_match_patch as dmp_module
@@ -624,13 +625,13 @@ class TestApplyDiffTool:
assert result["success"] is True
assert result["all_successful"] is True
assert result["patches_applied"] > 0
assert test_file.read_text() == "Hello Universe"
assert test_file.read_text(encoding="utf-8") == "Hello Universe"
def test_apply_diff_multiline(self, apply_diff_fn, mock_workspace, mock_secure_path, tmp_path):
"""Applying diff to multiline content works correctly."""
test_file = tmp_path / "multiline.txt"
original = "Line 1\nLine 2\nLine 3\n"
test_file.write_text(original)
test_file.write_text(original, encoding="utf-8")
import diff_match_patch as dmp_module
@@ -643,7 +644,7 @@ class TestApplyDiffTool:
assert result["success"] is True
assert result["all_successful"] is True
assert test_file.read_text() == modified
assert test_file.read_text(encoding="utf-8") == modified
def test_apply_diff_invalid_patch(
self, apply_diff_fn, mock_workspace, mock_secure_path, tmp_path
@@ -651,7 +652,7 @@ class TestApplyDiffTool:
"""Applying an invalid diff handles gracefully."""
test_file = tmp_path / "test.txt"
original_content = "Original content"
test_file.write_text(original_content)
test_file.write_text(original_content, encoding="utf-8")
# Invalid diff text
result = apply_diff_fn(path="test.txt", diff_text="invalid diff format", **mock_workspace)
@@ -660,7 +661,7 @@ class TestApplyDiffTool:
if "error" not in result:
assert result.get("patches_applied", 0) == 0
# File should remain unchanged
assert test_file.read_text() == original_content
assert test_file.read_text(encoding="utf-8") == original_content
class TestApplyPatchTool:
@@ -685,7 +686,7 @@ class TestApplyPatchTool:
):
"""Applying a valid patch successfully modifies the file."""
test_file = tmp_path / "patch_test.txt"
test_file.write_text("Hello World")
test_file.write_text("Hello World", encoding="utf-8")
# Create a simple patch using diff_match_patch format
import diff_match_patch as dmp_module
@@ -699,7 +700,7 @@ class TestApplyPatchTool:
assert result["success"] is True
assert result["all_successful"] is True
assert result["patches_applied"] > 0
assert test_file.read_text() == "Hello Python"
assert test_file.read_text(encoding="utf-8") == "Hello Python"
def test_apply_patch_multiline(
self, apply_patch_fn, mock_workspace, mock_secure_path, tmp_path
@@ -707,7 +708,7 @@ class TestApplyPatchTool:
"""Applying patch to multiline content works correctly."""
test_file = tmp_path / "multiline.txt"
original = "Line 1\nLine 2\nLine 3\n"
test_file.write_text(original)
test_file.write_text(original, encoding="utf-8")
import diff_match_patch as dmp_module
@@ -720,7 +721,7 @@ class TestApplyPatchTool:
assert result["success"] is True
assert result["all_successful"] is True
assert test_file.read_text() == modified
assert test_file.read_text(encoding="utf-8") == modified
def test_apply_patch_invalid_patch(
self, apply_patch_fn, mock_workspace, mock_secure_path, tmp_path
@@ -728,7 +729,7 @@ class TestApplyPatchTool:
"""Applying an invalid patch handles gracefully."""
test_file = tmp_path / "test.txt"
original_content = "Original content"
test_file.write_text(original_content)
test_file.write_text(original_content, encoding="utf-8")
# Invalid patch text
result = apply_patch_fn(
@@ -739,7 +740,7 @@ class TestApplyPatchTool:
if "error" not in result:
assert result.get("patches_applied", 0) == 0
# File should remain unchanged
assert test_file.read_text() == original_content
assert test_file.read_text(encoding="utf-8") == original_content
def test_apply_patch_multiple_changes(
self, apply_patch_fn, mock_workspace, mock_secure_path, tmp_path
@@ -747,7 +748,7 @@ class TestApplyPatchTool:
"""Applying patch with multiple changes works correctly."""
test_file = tmp_path / "complex.txt"
original = "Function foo() {\n return 42;\n}\n"
test_file.write_text(original)
test_file.write_text(original, encoding="utf-8")
import diff_match_patch as dmp_module
@@ -760,4 +761,4 @@ class TestApplyPatchTool:
assert result["success"] is True
assert result["all_successful"] is True
assert test_file.read_text() == modified
assert test_file.read_text(encoding="utf-8") == modified
+1 -1
View File
@@ -28,7 +28,7 @@ class TestPdfReadTool:
def test_read_pdf_invalid_extension(self, pdf_read_fn, tmp_path: Path):
"""Reading non-PDF file returns error."""
txt_file = tmp_path / "test.txt"
txt_file.write_text("not a pdf")
txt_file.write_text("not a pdf", encoding="utf-8")
result = pdf_read_fn(file_path=str(txt_file))
+4 -2
View File
@@ -47,7 +47,8 @@ def runtime_logs_dir(tmp_path: Path) -> Path:
"duration_ms": 3000,
"execution_quality": "clean",
}
)
),
encoding="utf-8",
)
_write_jsonl(
run1_dir / "details.jsonl",
@@ -143,7 +144,8 @@ def runtime_logs_dir(tmp_path: Path) -> Path:
"duration_ms": 60000,
"execution_quality": "failed",
}
)
),
encoding="utf-8",
)
_write_jsonl(
run2_dir / "details.jsonl",
+2 -2
View File
@@ -238,7 +238,7 @@ class TestGetSecurePath:
# Create a target file and a symlink to it
target_file = session_dir / "target.txt"
target_file.write_text("content")
target_file.write_text("content", encoding="utf-8")
symlink_path = session_dir / "link_to_target"
symlink_path.symlink_to(target_file)
@@ -263,7 +263,7 @@ class TestGetSecurePath:
# Create a symlink inside session pointing outside
outside_target = self.workspaces_dir / "outside_file.txt"
outside_target.write_text("sensitive data")
outside_target.write_text("sensitive data", encoding="utf-8")
symlink_path = session_dir / "escape_link"
symlink_path.symlink_to(outside_target)
Generated
+15 -7
View File
@@ -800,9 +800,6 @@ dependencies = [
{ name = "litellm" },
{ name = "mcp" },
{ name = "pydantic" },
{ name = "pytest" },
{ name = "pytest-asyncio" },
{ name = "pytest-xdist" },
{ name = "textual" },
{ name = "tools" },
]
@@ -811,6 +808,11 @@ dependencies = [
server = [
{ name = "aiohttp" },
]
testing = [
{ name = "pytest" },
{ name = "pytest-asyncio" },
{ name = "pytest-xdist" },
]
tui = [
{ name = "textual" },
]
@@ -820,6 +822,9 @@ webhook = [
[package.dev-dependencies]
dev = [
{ name = "pytest" },
{ name = "pytest-asyncio" },
{ name = "pytest-xdist" },
{ name = "ruff" },
{ name = "ty" },
]
@@ -834,17 +839,20 @@ requires-dist = [
{ name = "litellm", specifier = ">=1.81.0" },
{ name = "mcp", specifier = ">=1.0.0" },
{ name = "pydantic", specifier = ">=2.0" },
{ name = "pytest", specifier = ">=8.0" },
{ name = "pytest-asyncio", specifier = ">=0.23" },
{ name = "pytest-xdist", specifier = ">=3.0" },
{ name = "pytest", marker = "extra == 'testing'", specifier = ">=8.0" },
{ name = "pytest-asyncio", marker = "extra == 'testing'", specifier = ">=0.23" },
{ name = "pytest-xdist", marker = "extra == 'testing'", specifier = ">=3.0" },
{ name = "textual", specifier = ">=1.0.0" },
{ name = "textual", marker = "extra == 'tui'", specifier = ">=0.75.0" },
{ name = "tools", editable = "tools" },
]
provides-extras = ["tui", "webhook", "server"]
provides-extras = ["tui", "webhook", "server", "testing"]
[package.metadata.requires-dev]
dev = [
{ name = "pytest", specifier = ">=8.0" },
{ name = "pytest-asyncio", specifier = ">=0.23" },
{ name = "pytest-xdist", specifier = ">=3.0" },
{ name = "ruff", specifier = ">=0.14.14" },
{ name = "ty", specifier = ">=0.0.13" },
]