Merge branch 'main' into fix/deprecation

This commit is contained in:
Timothy @aden
2026-02-18 20:29:39 -08:00
committed by GitHub
26 changed files with 2075 additions and 347 deletions
+13
View File
@@ -59,6 +59,13 @@ from .provider import (
CredentialProvider,
StaticProvider,
)
from .setup import (
CredentialSetupSession,
MissingCredential,
SetupResult,
detect_missing_credentials_from_nodes,
run_credential_setup_cli,
)
from .storage import (
CompositeStorage,
CredentialStorage,
@@ -115,6 +122,12 @@ __all__ = [
# Validation
"ensure_credential_key_env",
"validate_agent_credentials",
# Interactive setup
"CredentialSetupSession",
"MissingCredential",
"SetupResult",
"detect_missing_credentials_from_nodes",
"run_credential_setup_cli",
# Aden sync (optional - requires httpx)
"AdenSyncProvider",
"AdenCredentialClient",
+745
View File
@@ -0,0 +1,745 @@
"""
Interactive credential setup for CLI applications.
Provides a modular, reusable credential setup flow that can be triggered
when validate_agent_credentials() fails. Works with both TUI and headless CLIs.
Usage:
from framework.credentials.setup import CredentialSetupSession
# From agent path
session = CredentialSetupSession.from_agent_path("exports/my-agent")
result = session.run_interactive()
# From nodes directly
session = CredentialSetupSession.from_nodes(nodes)
result = session.run_interactive()
# With custom I/O (for integration with other UIs)
session = CredentialSetupSession(
missing=missing_creds,
input_fn=my_input,
print_fn=my_print,
)
"""
from __future__ import annotations
import getpass
import json
import os
import sys
from collections.abc import Callable
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from framework.graph import NodeSpec
# ANSI colors for terminal output
class Colors:
RED = "\033[0;31m"
GREEN = "\033[0;32m"
YELLOW = "\033[1;33m"
BLUE = "\033[0;34m"
CYAN = "\033[0;36m"
BOLD = "\033[1m"
DIM = "\033[2m"
NC = "\033[0m" # No Color
@classmethod
def disable(cls):
"""Disable colors (for non-TTY output)."""
cls.RED = cls.GREEN = cls.YELLOW = cls.BLUE = ""
cls.CYAN = cls.BOLD = cls.DIM = cls.NC = ""
@dataclass
class MissingCredential:
"""A credential that needs to be configured."""
credential_name: str
"""Internal credential name (e.g., 'brave_search')"""
env_var: str
"""Environment variable name (e.g., 'BRAVE_SEARCH_API_KEY')"""
description: str
"""Human-readable description"""
help_url: str
"""URL where user can obtain credential"""
api_key_instructions: str
"""Step-by-step instructions for getting API key"""
tools: list[str] = field(default_factory=list)
"""Tools that require this credential"""
node_types: list[str] = field(default_factory=list)
"""Node types that require this credential"""
aden_supported: bool = False
"""Whether Aden OAuth flow is supported"""
direct_api_key_supported: bool = True
"""Whether direct API key entry is supported"""
credential_id: str = ""
"""Credential store ID"""
credential_key: str = "api_key"
"""Key name within the credential"""
@dataclass
class SetupResult:
"""Result of credential setup session."""
success: bool
"""Whether all required credentials were configured"""
configured: list[str] = field(default_factory=list)
"""Credentials that were successfully set up"""
skipped: list[str] = field(default_factory=list)
"""Credentials user chose to skip"""
errors: list[str] = field(default_factory=list)
"""Any errors encountered"""
class CredentialSetupSession:
"""
Interactive credential setup session.
Can be used by any CLI (runner, coding agent, etc.) to guide users
through credential configuration when validation fails.
Example:
from framework.credentials.setup import CredentialSetupSession
from framework.credentials.models import CredentialError
try:
validate_agent_credentials(nodes)
except CredentialError:
session = CredentialSetupSession.from_nodes(nodes)
result = session.run_interactive()
if result.success:
# Retry - credentials are now configured
validate_agent_credentials(nodes)
"""
def __init__(
self,
missing: list[MissingCredential],
input_fn: Callable[[str], str] | None = None,
print_fn: Callable[[str], None] | None = None,
password_fn: Callable[[str], str] | None = None,
):
"""
Initialize the setup session.
Args:
missing: List of credentials that need setup
input_fn: Custom input function (default: built-in input)
print_fn: Custom print function (default: built-in print)
password_fn: Custom password input function (default: getpass.getpass)
"""
self.missing = missing
self.input_fn = input_fn or input
self.print_fn = print_fn or print
self.password_fn = password_fn or getpass.getpass
# Disable colors if not a TTY
if not sys.stdout.isatty():
Colors.disable()
@classmethod
def from_nodes(cls, nodes: list[NodeSpec]) -> CredentialSetupSession:
"""Create a setup session by detecting missing credentials from nodes."""
missing = detect_missing_credentials_from_nodes(nodes)
return cls(missing)
@classmethod
def from_agent_path(cls, agent_path: str | Path) -> CredentialSetupSession:
"""Create a setup session for an agent by path."""
agent_path = Path(agent_path)
# Load agent to get nodes
agent_json = agent_path / "agent.json"
agent_py = agent_path / "agent.py"
nodes = []
if agent_py.exists():
# Python-based agent
nodes = _load_nodes_from_python_agent(agent_path)
elif agent_json.exists():
# JSON-based agent
nodes = _load_nodes_from_json_agent(agent_json)
missing = detect_missing_credentials_from_nodes(nodes)
return cls(missing)
def run_interactive(self) -> SetupResult:
"""Run the interactive setup flow."""
configured: list[str] = []
skipped: list[str] = []
errors: list[str] = []
if not self.missing:
self._print(f"\n{Colors.GREEN}✓ All credentials are already configured!{Colors.NC}\n")
return SetupResult(success=True)
self._print_header()
# Ensure HIVE_CREDENTIAL_KEY is set before storing anything
if not self._ensure_credential_key():
return SetupResult(
success=False,
errors=["Failed to initialize credential store encryption key"],
)
for cred in self.missing:
try:
result = self._setup_single_credential(cred)
if result:
configured.append(cred.credential_name)
else:
skipped.append(cred.credential_name)
except KeyboardInterrupt:
self._print(f"\n{Colors.YELLOW}Setup interrupted.{Colors.NC}")
skipped.append(cred.credential_name)
break
except Exception as e:
errors.append(f"{cred.credential_name}: {e}")
self._print_summary(configured, skipped, errors)
return SetupResult(
success=len(errors) == 0 and len(skipped) == 0,
configured=configured,
skipped=skipped,
errors=errors,
)
def _print(self, msg: str) -> None:
"""Print a message."""
self.print_fn(msg)
def _input(self, prompt: str) -> str:
"""Get input from user."""
return self.input_fn(prompt)
def _print_header(self) -> None:
"""Print the setup header."""
self._print("")
self._print(f"{Colors.YELLOW}{'=' * 60}{Colors.NC}")
self._print(f"{Colors.BOLD} CREDENTIAL SETUP{Colors.NC}")
self._print(f"{Colors.YELLOW}{'=' * 60}{Colors.NC}")
self._print("")
self._print(f" {len(self.missing)} credential(s) need to be configured:")
for cred in self.missing:
affected = cred.tools or cred.node_types
self._print(f"{cred.env_var} ({', '.join(affected)})")
self._print("")
def _ensure_credential_key(self) -> bool:
"""Ensure HIVE_CREDENTIAL_KEY is available for encrypted storage."""
if os.environ.get("HIVE_CREDENTIAL_KEY"):
return True
# Try to load from shell config
try:
from aden_tools.credentials.shell_config import check_env_var_in_shell_config
found, value = check_env_var_in_shell_config("HIVE_CREDENTIAL_KEY")
if found and value:
os.environ["HIVE_CREDENTIAL_KEY"] = value
return True
except ImportError:
pass
# Generate a new key
self._print(f"{Colors.YELLOW}Initializing credential store...{Colors.NC}")
try:
from cryptography.fernet import Fernet
generated_key = Fernet.generate_key().decode()
os.environ["HIVE_CREDENTIAL_KEY"] = generated_key
# Save to shell config
self._save_key_to_shell_config(generated_key)
return True
except Exception as e:
self._print(f"{Colors.RED}Failed to initialize credential store: {e}{Colors.NC}")
return False
def _save_key_to_shell_config(self, key: str) -> None:
"""Save HIVE_CREDENTIAL_KEY to shell config."""
try:
from aden_tools.credentials.shell_config import (
add_env_var_to_shell_config,
)
success, config_path = add_env_var_to_shell_config(
"HIVE_CREDENTIAL_KEY",
key,
comment="Encryption key for Hive credential store",
)
if success:
self._print(f"{Colors.GREEN}✓ Encryption key saved to {config_path}{Colors.NC}")
except Exception:
# Fallback: just tell the user
self._print("\n")
self._print(
f"{Colors.YELLOW}Add this to your shell config (~/.zshrc or ~/.bashrc):{Colors.NC}"
)
self._print(f' export HIVE_CREDENTIAL_KEY="{key}"')
def _setup_single_credential(self, cred: MissingCredential) -> bool:
"""Set up a single credential. Returns True if configured."""
self._print(f"\n{Colors.CYAN}{'' * 60}{Colors.NC}")
self._print(f"{Colors.BOLD}Setting up: {cred.credential_name}{Colors.NC}")
affected = cred.tools or cred.node_types
self._print(f"{Colors.DIM}Required for: {', '.join(affected)}{Colors.NC}")
if cred.description:
self._print(f"{Colors.DIM}{cred.description}{Colors.NC}")
self._print(f"{Colors.CYAN}{'' * 60}{Colors.NC}")
# Show auth options
options = self._get_auth_options(cred)
choice = self._prompt_choice(options)
if choice == "skip":
return False
elif choice == "aden":
return self._setup_via_aden(cred)
elif choice == "direct":
return self._setup_direct_api_key(cred)
return False
def _get_auth_options(self, cred: MissingCredential) -> list[tuple[str, str, str]]:
"""Get available auth options as (key, label, description) tuples."""
options = []
if cred.direct_api_key_supported:
options.append(
(
"direct",
"Enter API key directly",
"Paste your API key from the provider's dashboard",
)
)
if cred.aden_supported:
options.append(
(
"aden",
"Use Aden Platform (OAuth)",
"Secure OAuth2 flow via hive.adenhq.com",
)
)
options.append(
(
"skip",
"Skip for now",
"Configure this credential later",
)
)
return options
def _prompt_choice(self, options: list[tuple[str, str, str]]) -> str:
"""Prompt user to choose from options."""
self._print("")
for i, (key, label, desc) in enumerate(options, 1):
if key == "skip":
self._print(f" {Colors.DIM}{i}) {label}{Colors.NC}")
else:
self._print(f" {Colors.CYAN}{i}){Colors.NC} {label}")
self._print(f" {Colors.DIM}{desc}{Colors.NC}")
self._print("")
while True:
try:
choice_str = self._input(f"Select option (1-{len(options)}): ").strip()
if not choice_str:
continue
choice_num = int(choice_str)
if 1 <= choice_num <= len(options):
return options[choice_num - 1][0]
except ValueError:
pass
self._print(f"{Colors.RED}Invalid choice. Enter 1-{len(options)}{Colors.NC}")
def _setup_direct_api_key(self, cred: MissingCredential) -> bool:
"""Guide user through direct API key setup."""
# Show instructions
if cred.api_key_instructions:
self._print(f"\n{Colors.BOLD}Setup Instructions:{Colors.NC}")
self._print(cred.api_key_instructions)
if cred.help_url:
self._print(f"\n{Colors.CYAN}Get your API key at:{Colors.NC} {cred.help_url}")
# Collect key (use password input to hide the value)
self._print("")
try:
api_key = self.password_fn(f"Paste your {cred.env_var}: ").strip()
except Exception:
# Fallback to regular input if password input fails
api_key = self._input(f"Paste your {cred.env_var}: ").strip()
if not api_key:
self._print(f"{Colors.YELLOW}No value entered. Skipping.{Colors.NC}")
return False
# Health check
health_result = self._run_health_check(cred, api_key)
if health_result is not None:
if health_result["valid"]:
self._print(f"{Colors.GREEN}{health_result['message']}{Colors.NC}")
else:
self._print(f"{Colors.YELLOW}{health_result['message']}{Colors.NC}")
confirm = self._input("Continue anyway? [y/N]: ").strip().lower()
if confirm != "y":
return False
# Store credential
self._store_credential(cred, api_key)
return True
def _setup_via_aden(self, cred: MissingCredential) -> bool:
"""Guide user through Aden OAuth flow."""
self._print(f"\n{Colors.BOLD}Aden Platform Setup{Colors.NC}")
self._print("This will sync credentials from your Aden account.")
self._print("")
# Check for ADEN_API_KEY
aden_key = os.environ.get("ADEN_API_KEY")
if not aden_key:
self._print("You need an Aden API key to use this method.")
self._print(f"{Colors.CYAN}Get one at:{Colors.NC} https://hive.adenhq.com")
self._print("")
try:
aden_key = self.password_fn("Paste your ADEN_API_KEY: ").strip()
except Exception:
aden_key = self._input("Paste your ADEN_API_KEY: ").strip()
if not aden_key:
self._print(f"{Colors.YELLOW}No key entered. Skipping.{Colors.NC}")
return False
os.environ["ADEN_API_KEY"] = aden_key
# Save to shell config
try:
from aden_tools.credentials.shell_config import add_env_var_to_shell_config
add_env_var_to_shell_config(
"ADEN_API_KEY",
aden_key,
comment="Aden Platform API key",
)
except Exception:
pass
# Sync from Aden
try:
from framework.credentials import CredentialStore
store = CredentialStore.with_aden_sync(
base_url="https://api.adenhq.com",
auto_sync=True,
)
# Check if the credential was synced
cred_id = cred.credential_id or cred.credential_name
if store.is_available(cred_id):
self._print(f"{Colors.GREEN}{cred.credential_name} synced from Aden{Colors.NC}")
# Export to current session
try:
value = store.get_key(cred_id, cred.credential_key)
if value:
os.environ[cred.env_var] = value
except Exception:
pass
return True
else:
self._print(
f"{Colors.YELLOW}{cred.credential_name} not found in Aden account.{Colors.NC}"
)
self._print("Please connect this integration on https://hive.adenhq.com first.")
return False
except Exception as e:
self._print(f"{Colors.RED}Failed to sync from Aden: {e}{Colors.NC}")
return False
def _run_health_check(self, cred: MissingCredential, value: str) -> dict[str, Any] | None:
"""Run health check on credential value."""
try:
from aden_tools.credentials import check_credential_health
result = check_credential_health(cred.credential_name, value)
return {
"valid": result.valid,
"message": result.message,
"details": result.details,
}
except Exception:
# No health checker available
return None
def _store_credential(self, cred: MissingCredential, value: str) -> None:
"""Store credential in encrypted store and export to env."""
from pydantic import SecretStr
from framework.credentials import CredentialKey, CredentialObject, CredentialStore
try:
store = CredentialStore.with_encrypted_storage()
cred_id = cred.credential_id or cred.credential_name
key_name = cred.credential_key or "api_key"
cred_obj = CredentialObject(
id=cred_id,
name=cred.description or cred.credential_name,
keys={key_name: CredentialKey(name=key_name, value=SecretStr(value))},
)
store.save_credential(cred_obj)
self._print(f"{Colors.GREEN}✓ Stored in ~/.hive/credentials/{Colors.NC}")
except Exception as e:
self._print(f"{Colors.YELLOW}⚠ Could not store in credential store: {e}{Colors.NC}")
# Export to current session
os.environ[cred.env_var] = value
self._print(f"{Colors.GREEN}✓ Exported to current session{Colors.NC}")
def _print_summary(self, configured: list[str], skipped: list[str], errors: list[str]) -> None:
"""Print final summary."""
self._print("")
self._print(f"{Colors.YELLOW}{'=' * 60}{Colors.NC}")
self._print(f"{Colors.BOLD} SETUP COMPLETE{Colors.NC}")
self._print(f"{Colors.YELLOW}{'=' * 60}{Colors.NC}")
if configured:
self._print(f"\n{Colors.GREEN}✓ Configured:{Colors.NC}")
for name in configured:
self._print(f"{name}")
if skipped:
self._print(f"\n{Colors.YELLOW}⏭ Skipped:{Colors.NC}")
for name in skipped:
self._print(f"{name}")
if errors:
self._print(f"\n{Colors.RED}✗ Errors:{Colors.NC}")
for err in errors:
self._print(f"{err}")
if not skipped and not errors:
self._print(f"\n{Colors.GREEN}All credentials configured successfully!{Colors.NC}")
elif skipped:
self._print(f"\n{Colors.YELLOW}Note: Skipped credentials must be configured ")
self._print(f"before running the agent.{Colors.NC}")
self._print("")
def detect_missing_credentials_from_nodes(nodes: list) -> list[MissingCredential]:
"""
Detect missing credentials for a list of nodes.
Args:
nodes: List of NodeSpec objects
Returns:
List of MissingCredential objects for credentials that need setup
"""
try:
from aden_tools.credentials import CREDENTIAL_SPECS
from framework.credentials import CredentialStore
from framework.credentials.storage import (
CompositeStorage,
EncryptedFileStorage,
EnvVarStorage,
)
except ImportError:
return []
# Collect required tools and node types
required_tools: set[str] = set()
node_types: set[str] = set()
for node in nodes:
if hasattr(node, "tools") and node.tools:
required_tools.update(node.tools)
if hasattr(node, "node_type"):
node_types.add(node.node_type)
# Build credential store to check availability
env_mapping = {
(spec.credential_id or name): spec.env_var for name, spec in CREDENTIAL_SPECS.items()
}
storages: list = [EnvVarStorage(env_mapping=env_mapping)]
if os.environ.get("HIVE_CREDENTIAL_KEY"):
storages.insert(0, EncryptedFileStorage())
if len(storages) == 1:
storage = storages[0]
else:
storage = CompositeStorage(primary=storages[0], fallbacks=storages[1:])
store = CredentialStore(storage=storage)
# Build reverse mappings
tool_to_cred: dict[str, str] = {}
node_type_to_cred: dict[str, str] = {}
for cred_name, spec in CREDENTIAL_SPECS.items():
for tool_name in spec.tools:
tool_to_cred[tool_name] = cred_name
for nt in spec.node_types:
node_type_to_cred[nt] = cred_name
missing: list[MissingCredential] = []
checked: set[str] = set()
# Check tool credentials
for tool_name in sorted(required_tools):
cred_name = tool_to_cred.get(tool_name)
if cred_name is None or cred_name in checked:
continue
checked.add(cred_name)
spec = CREDENTIAL_SPECS[cred_name]
cred_id = spec.credential_id or cred_name
if spec.required and not store.is_available(cred_id):
affected_tools = sorted(t for t in required_tools if t in spec.tools)
missing.append(
MissingCredential(
credential_name=cred_name,
env_var=spec.env_var,
description=spec.description,
help_url=spec.help_url,
api_key_instructions=spec.api_key_instructions,
tools=affected_tools,
aden_supported=spec.aden_supported,
direct_api_key_supported=spec.direct_api_key_supported,
credential_id=spec.credential_id,
credential_key=spec.credential_key,
)
)
# Check node type credentials
for nt in sorted(node_types):
cred_name = node_type_to_cred.get(nt)
if cred_name is None or cred_name in checked:
continue
checked.add(cred_name)
spec = CREDENTIAL_SPECS[cred_name]
cred_id = spec.credential_id or cred_name
if spec.required and not store.is_available(cred_id):
affected_types = sorted(t for t in node_types if t in spec.node_types)
missing.append(
MissingCredential(
credential_name=cred_name,
env_var=spec.env_var,
description=spec.description,
help_url=spec.help_url,
api_key_instructions=spec.api_key_instructions,
node_types=affected_types,
aden_supported=spec.aden_supported,
direct_api_key_supported=spec.direct_api_key_supported,
credential_id=spec.credential_id,
credential_key=spec.credential_key,
)
)
return missing
def _load_nodes_from_python_agent(agent_path: Path) -> list:
"""Load nodes from a Python-based agent."""
import importlib.util
agent_py = agent_path / "agent.py"
if not agent_py.exists():
return []
try:
# Add agent path and its parent to sys.path so imports work
paths_to_add = [str(agent_path), str(agent_path.parent)]
for p in paths_to_add:
if p not in sys.path:
sys.path.insert(0, p)
spec = importlib.util.spec_from_file_location(
f"{agent_path.name}.agent",
agent_py,
submodule_search_locations=[str(agent_path)],
)
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return getattr(module, "nodes", [])
except Exception:
return []
def _load_nodes_from_json_agent(agent_json: Path) -> list:
"""Load nodes from a JSON-based agent."""
try:
with open(agent_json) as f:
data = json.load(f)
from framework.graph import NodeSpec
nodes_data = data.get("graph", {}).get("nodes", [])
nodes = []
for node_data in nodes_data:
nodes.append(
NodeSpec(
id=node_data.get("id", ""),
name=node_data.get("name", ""),
description=node_data.get("description", ""),
node_type=node_data.get("node_type", ""),
tools=node_data.get("tools", []),
input_keys=node_data.get("input_keys", []),
output_keys=node_data.get("output_keys", []),
)
)
return nodes
except Exception:
return []
def run_credential_setup_cli(agent_path: str | Path | None = None) -> int:
"""
Standalone CLI entry point for credential setup.
Can be called from:
- `hive setup-credentials <agent>`
- After CredentialError in runner CLI
- From coding agent CLI
Args:
agent_path: Optional path to agent directory
Returns:
Exit code (0 = success, 1 = failure/skipped)
"""
if agent_path:
session = CredentialSetupSession.from_agent_path(agent_path)
else:
# No agent specified - detect from current context or show error
print("Usage: hive setup-credentials <agent_path>")
return 1
result = session.run_interactive()
return 0 if result.success else 1
+38 -32
View File
@@ -8,64 +8,70 @@ from __future__ import annotations
import logging
import os
from dataclasses import dataclass
logger = logging.getLogger(__name__)
def ensure_credential_key_env() -> None:
"""Load HIVE_CREDENTIAL_KEY from shell config if not already in environment.
"""Load HIVE_CREDENTIAL_KEY and ADEN_API_KEY from shell config if not in environment.
The setup-credentials skill writes the encryption key to ~/.zshrc or ~/.bashrc.
If the user hasn't sourced their config in the current shell, this reads it
directly so the runner (and any MCP subprocesses it spawns) can unlock the
encrypted credential store.
Only HIVE_CREDENTIAL_KEY is loaded this way all other secrets (API keys, etc.)
come from the credential store itself.
The setup-credentials skill writes these to ~/.zshrc or ~/.bashrc.
If the user hasn't sourced their config in the current shell, this reads
them directly so the runner (and any MCP subprocesses it spawns) can:
- Unlock the encrypted credential store (HIVE_CREDENTIAL_KEY)
- Enable Aden OAuth sync for Google/HubSpot/etc. (ADEN_API_KEY)
"""
if os.environ.get("HIVE_CREDENTIAL_KEY"):
return
try:
from aden_tools.credentials.shell_config import check_env_var_in_shell_config
found, value = check_env_var_in_shell_config("HIVE_CREDENTIAL_KEY")
if found and value:
os.environ["HIVE_CREDENTIAL_KEY"] = value
logger.debug("Loaded HIVE_CREDENTIAL_KEY from shell config")
except ImportError:
pass
return
for var_name in ("HIVE_CREDENTIAL_KEY", "ADEN_API_KEY"):
if os.environ.get(var_name):
continue
found, value = check_env_var_in_shell_config(var_name)
if found and value:
os.environ[var_name] = value
logger.debug("Loaded %s from shell config", var_name)
def validate_agent_credentials(nodes: list) -> None:
@dataclass
class _CredentialCheck:
"""Result of checking a single credential."""
env_var: str
source: str
used_by: str
available: bool
help_url: str = ""
def validate_agent_credentials(nodes: list, quiet: bool = False) -> None:
"""Check that required credentials are available before running an agent.
Scans node specs for required tools and node types, then checks whether
the corresponding credentials exist in the credential store.
Uses CredentialStoreAdapter.default() which includes Aden sync support,
correctly resolving OAuth credentials stored under hashed IDs.
Prints a summary of all credentials and their sources (encrypted store, env var).
Raises CredentialError with actionable guidance if any are missing.
Args:
nodes: List of NodeSpec objects from the agent graph.
quiet: If True, suppress the credential summary output.
"""
required_tools: set[str] = set()
for node in nodes:
if node.tools:
required_tools.update(node.tools)
node_types: set[str] = {node.node_type for node in nodes}
# Collect required tools and node types
required_tools = {tool for node in nodes if node.tools for tool in node.tools}
node_types = {node.node_type for node in nodes}
try:
from aden_tools.credentials import CREDENTIAL_SPECS
from framework.credentials import CredentialStore
from framework.credentials.storage import (
CompositeStorage,
EncryptedFileStorage,
EnvVarStorage,
)
except ImportError:
return # aden_tools not installed, skip check
from framework.credentials.storage import CompositeStorage, EncryptedFileStorage, EnvVarStorage
from framework.credentials.store import CredentialStore
# Build credential store
env_mapping = {
(spec.credential_id or name): spec.env_var for name, spec in CREDENTIAL_SPECS.items()
+14 -5
View File
@@ -205,6 +205,7 @@ class EventLoopNode(NodeProtocol):
self._injection_queue: asyncio.Queue[str] = asyncio.Queue()
# Client-facing input blocking state
self._input_ready = asyncio.Event()
self._awaiting_input = False
self._shutdown = False
def validate_input(self, ctx: NodeContext) -> list[str]:
@@ -303,7 +304,7 @@ class EventLoopNode(NodeProtocol):
set_output_tool = self._build_set_output_tool(ctx.node_spec.output_keys)
if set_output_tool:
tools.append(set_output_tool)
if ctx.node_spec.client_facing:
if ctx.node_spec.client_facing and not ctx.event_triggered:
tools.append(self._build_ask_user_tool())
logger.info(
@@ -599,7 +600,7 @@ class EventLoopNode(NodeProtocol):
"same tool calls with identical arguments. "
"Try a different approach or different arguments."
)
if ctx.node_spec.client_facing:
if ctx.node_spec.client_facing and not ctx.event_triggered:
await conversation.add_user_message(warning_msg)
self._input_ready.clear()
if self._event_bus:
@@ -608,7 +609,11 @@ class EventLoopNode(NodeProtocol):
node_id=node_id,
prompt=doom_desc,
)
await self._input_ready.wait()
self._awaiting_input = True
try:
await self._input_ready.wait()
finally:
self._awaiting_input = False
recent_tool_fingerprints.clear()
recent_responses.clear()
else:
@@ -636,7 +641,7 @@ class EventLoopNode(NodeProtocol):
# conversation — they flow through without blocking.
_cf_block = False
_cf_auto = False
if ctx.node_spec.client_facing:
if ctx.node_spec.client_facing and not ctx.event_triggered:
if user_input_requested:
_cf_block = True
elif assistant_text and not real_tool_results and not outputs_set:
@@ -1035,7 +1040,11 @@ class EventLoopNode(NodeProtocol):
prompt="",
)
await self._input_ready.wait()
self._awaiting_input = True
try:
await self._input_ready.wait()
finally:
self._awaiting_input = False
return not self._shutdown
# -------------------------------------------------------------------
+6
View File
@@ -342,6 +342,9 @@ class GraphExecutor:
for key, value in input_data.items():
memory.write(key, value)
# Detect event-triggered execution (timer/webhook) — no interactive user.
_event_triggered = bool(input_data and isinstance(input_data.get("event"), dict))
path: list[str] = []
total_tokens = 0
total_latency = 0
@@ -686,6 +689,7 @@ class GraphExecutor:
inherited_conversation=continuous_conversation if is_continuous else None,
override_tools=cumulative_tools if is_continuous else None,
cumulative_output_keys=cumulative_output_keys if is_continuous else None,
event_triggered=_event_triggered,
)
# Log actual input data being read
@@ -1430,6 +1434,7 @@ class GraphExecutor:
inherited_conversation: Any = None,
override_tools: list | None = None,
cumulative_output_keys: list[str] | None = None,
event_triggered: bool = False,
) -> NodeContext:
"""Build execution context for a node."""
# Filter tools to those available to this node
@@ -1463,6 +1468,7 @@ class GraphExecutor:
continuous_mode=continuous_mode,
inherited_conversation=inherited_conversation,
cumulative_output_keys=cumulative_output_keys or [],
event_triggered=event_triggered,
)
VALID_NODE_TYPES = {
+3
View File
@@ -487,6 +487,9 @@ class NodeContext:
inherited_conversation: Any = None # NodeConversation | None (from prior node)
cumulative_output_keys: list[str] = field(default_factory=list) # All output keys from path
# Event-triggered execution (no interactive user attached)
event_triggered: bool = False
@dataclass
class NodeResult:
+19 -18
View File
@@ -3207,30 +3207,31 @@ def list_tests(
def _get_credential_store():
"""Get a CredentialStore that checks encrypted files and env vars.
"""Get a CredentialStore that checks encrypted files, env vars, and Aden sync.
Uses CompositeStorage: encrypted file storage (primary) with env var fallback.
This ensures credentials stored via `store_credential` AND env vars are both found.
Uses CredentialStoreAdapter.default() which handles:
- Aden sync + provider index (resolving hashed IDs for OAuth)
- CompositeStorage (encrypted primary + env fallback)
- Auto-refresh of OAuth tokens
- Graceful fallback if Aden is unavailable
"""
from framework.credentials import CredentialStore
from framework.credentials.storage import CompositeStorage, EncryptedFileStorage, EnvVarStorage
# Build env var mapping from CREDENTIAL_SPECS for the fallback
env_mapping: dict[str, str] = {}
try:
from aden_tools.credentials import CREDENTIAL_SPECS
from aden_tools.credentials.store_adapter import CredentialStoreAdapter
for name, spec in CREDENTIAL_SPECS.items():
cred_id = spec.credential_id or name
env_mapping[cred_id] = spec.env_var
return CredentialStoreAdapter.default().store
except ImportError:
pass
from framework.credentials import CredentialStore
from framework.credentials.storage import (
CompositeStorage,
EncryptedFileStorage,
EnvVarStorage,
)
storage = CompositeStorage(
primary=EncryptedFileStorage(),
fallbacks=[EnvVarStorage(env_mapping=env_mapping)],
)
return CredentialStore(storage=storage)
storage = CompositeStorage(
primary=EncryptedFileStorage(),
fallbacks=[EnvVarStorage(env_mapping={})],
)
return CredentialStore(storage=storage)
@mcp.tool()
+170 -3
View File
@@ -331,6 +331,20 @@ def register_commands(subparsers: argparse._SubParsersAction) -> None:
)
resume_parser.set_defaults(func=cmd_resume)
# setup-credentials command
setup_creds_parser = subparsers.add_parser(
"setup-credentials",
help="Interactive credential setup",
description="Guide through setting up required credentials for an agent.",
)
setup_creds_parser.add_argument(
"agent_path",
type=str,
nargs="?",
help="Path to agent folder (optional - runs general setup if not specified)",
)
setup_creds_parser.set_defaults(func=cmd_setup_credentials)
def _load_resume_state(
agent_path: str, session_id: str, checkpoint_id: str | None = None
@@ -388,6 +402,40 @@ def _load_resume_state(
}
def _prompt_before_start(agent_path: str, runner, model: str | None = None):
"""Prompt user to start agent or update credentials.
Returns:
Updated runner if user proceeds, None if user aborts.
"""
from framework.credentials.setup import CredentialSetupSession
from framework.runner import AgentRunner
while True:
print()
try:
choice = input("Press Enter to start agent, or 'u' to update credentials: ").strip()
except (EOFError, KeyboardInterrupt):
print()
return None
if choice == "":
return runner
elif choice.lower() == "u":
session = CredentialSetupSession.from_agent_path(agent_path)
result = session.run_interactive()
if result.success:
# Reload runner with updated credentials
try:
runner = AgentRunner.load(agent_path, model=model)
except Exception as e:
print(f"Error reloading agent: {e}")
return None
# Loop back to prompt again
elif choice.lower() == "q":
return None
def cmd_run(args: argparse.Namespace) -> int:
"""Run an exported agent."""
import logging
@@ -434,11 +482,46 @@ def cmd_run(args: argparse.Namespace) -> int:
)
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return
# Offer interactive credential setup if running in a terminal
if sys.stdin.isatty():
print()
try:
choice = input("Would you like to set up credentials now? [Y/n]: ")
choice = choice.strip()
except (EOFError, KeyboardInterrupt):
print()
return
if choice.lower() != "n":
from framework.credentials.setup import CredentialSetupSession
session = CredentialSetupSession.from_agent_path(args.agent_path)
result = session.run_interactive()
if result.success:
# Retry loading with credentials now configured
try:
runner = AgentRunner.load(args.agent_path, model=args.model)
except CredentialError as retry_e:
print(f"\n{retry_e}", file=sys.stderr)
return
except Exception as retry_e:
print(f"Error loading agent: {retry_e}")
return
else:
return
else:
return
else:
return
except Exception as e:
print(f"Error loading agent: {e}")
return
# Prompt before starting (allows credential updates)
if sys.stdin.isatty():
runner = _prompt_before_start(args.agent_path, runner, args.model)
if runner is None:
return
# Force setup inside the loop
if runner._agent_runtime is None:
runner._setup()
@@ -477,11 +560,45 @@ def cmd_run(args: argparse.Namespace) -> int:
)
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return 1
# Offer interactive credential setup if running in a terminal
if sys.stdin.isatty():
print()
try:
choice = input("Would you like to set up credentials now? [Y/n]: ").strip()
except (EOFError, KeyboardInterrupt):
print()
return 1
if choice.lower() != "n":
from framework.credentials.setup import CredentialSetupSession
session = CredentialSetupSession.from_agent_path(args.agent_path)
result = session.run_interactive()
if result.success:
# Retry loading with credentials now configured
try:
runner = AgentRunner.load(args.agent_path, model=args.model)
except CredentialError as retry_e:
print(f"\n{retry_e}", file=sys.stderr)
return 1
except Exception as retry_e:
print(f"Error loading agent: {retry_e}")
return 1
else:
return 1
else:
return 1
else:
return 1
except FileNotFoundError as e:
print(f"Error: {e}", file=sys.stderr)
return 1
# Prompt before starting (allows credential updates)
if sys.stdin.isatty() and not args.quiet:
runner = _prompt_before_start(args.agent_path, runner, args.model)
if runner is None:
return 1
# Load session/checkpoint state for resume (headless mode)
session_state = None
resume_session = getattr(args, "resume_session", None)
@@ -1283,7 +1400,35 @@ def cmd_tui(args: argparse.Namespace) -> int:
)
except CredentialError as e:
print(f"\n{e}", file=sys.stderr)
return
# Offer interactive credential setup if running in a terminal
if sys.stdin.isatty():
print()
try:
choice = input("Would you like to set up credentials now? [Y/n]: ").strip()
except (EOFError, KeyboardInterrupt):
print()
return
if choice.lower() != "n":
from framework.credentials.setup import CredentialSetupSession
session = CredentialSetupSession.from_agent_path(agent_path)
result = session.run_interactive()
if result.success:
# Retry loading with credentials now configured
try:
runner = AgentRunner.load(agent_path, model=args.model)
except CredentialError as retry_e:
print(f"\n{retry_e}", file=sys.stderr)
return
except Exception as retry_e:
print(f"Error loading agent: {retry_e}")
return
else:
return
else:
return
else:
return
except Exception as e:
print(f"Error loading agent: {e}")
return
@@ -1719,3 +1864,25 @@ def cmd_resume(args: argparse.Namespace) -> int:
if args.tui:
print("Mode: TUI")
return 1
def cmd_setup_credentials(args: argparse.Namespace) -> int:
"""Interactive credential setup for an agent."""
from framework.credentials.setup import CredentialSetupSession
agent_path = getattr(args, "agent_path", None)
if agent_path:
# Setup credentials for a specific agent
session = CredentialSetupSession.from_agent_path(agent_path)
else:
# No agent specified - show usage
print("Usage: hive setup-credentials <agent_path>")
print()
print("Examples:")
print(" hive setup-credentials exports/my-agent")
print(" hive setup-credentials examples/templates/deep_research_agent")
return 1
result = session.run_interactive()
return 0 if result.success else 1
+19 -69
View File
@@ -1064,83 +1064,33 @@ class AgentRunner:
warnings.append(f"Missing tool implementations: {', '.join(missing_tools)}")
# Check credentials for required tools and node types
# Uses CredentialStore (encrypted files + env var fallback)
# Uses CredentialStoreAdapter.default() which includes Aden sync support
missing_credentials = []
try:
from aden_tools.credentials import CREDENTIAL_SPECS
from aden_tools.credentials.store_adapter import CredentialStoreAdapter
from framework.credentials import CredentialStore
from framework.credentials.storage import (
CompositeStorage,
EncryptedFileStorage,
EnvVarStorage,
)
# Build env mapping for credential lookup
env_mapping = {
(spec.credential_id or name): spec.env_var
for name, spec in CREDENTIAL_SPECS.items()
}
# Only use EncryptedFileStorage if the encryption key is configured;
# otherwise just check env vars (avoids generating a throwaway key)
storages: list = [EnvVarStorage(env_mapping=env_mapping)]
if os.environ.get("HIVE_CREDENTIAL_KEY"):
storages.insert(0, EncryptedFileStorage())
if len(storages) == 1:
storage = storages[0]
else:
storage = CompositeStorage(
primary=storages[0],
fallbacks=storages[1:],
)
store = CredentialStore(storage=storage)
# Build reverse mappings
tool_to_cred: dict[str, str] = {}
node_type_to_cred: dict[str, str] = {}
for cred_name, spec in CREDENTIAL_SPECS.items():
for tool_name in spec.tools:
tool_to_cred[tool_name] = cred_name
for nt in spec.node_types:
node_type_to_cred[nt] = cred_name
adapter = CredentialStoreAdapter.default()
# Check tool credentials
checked: set[str] = set()
for tool_name in info.required_tools:
cred_name = tool_to_cred.get(tool_name)
if cred_name is None or cred_name in checked:
continue
checked.add(cred_name)
spec = CREDENTIAL_SPECS[cred_name]
cred_id = spec.credential_id or cred_name
if spec.required and not store.is_available(cred_id):
missing_credentials.append(spec.env_var)
affected_tools = [t for t in info.required_tools if t in spec.tools]
tools_str = ", ".join(affected_tools)
warning_msg = f"Missing {spec.env_var} for {tools_str}"
if spec.help_url:
warning_msg += f"\n Get it at: {spec.help_url}"
warnings.append(warning_msg)
for _cred_name, spec in adapter.get_missing_for_tools(list(info.required_tools)):
missing_credentials.append(spec.env_var)
affected_tools = [t for t in info.required_tools if t in spec.tools]
tools_str = ", ".join(affected_tools)
warning_msg = f"Missing {spec.env_var} for {tools_str}"
if spec.help_url:
warning_msg += f"\n Get it at: {spec.help_url}"
warnings.append(warning_msg)
# Check node type credentials (e.g., ANTHROPIC_API_KEY for LLM nodes)
node_types = list({node.node_type for node in self.graph.nodes})
for nt in node_types:
cred_name = node_type_to_cred.get(nt)
if cred_name is None or cred_name in checked:
continue
checked.add(cred_name)
spec = CREDENTIAL_SPECS[cred_name]
cred_id = spec.credential_id or cred_name
if spec.required and not store.is_available(cred_id):
missing_credentials.append(spec.env_var)
affected_types = [t for t in node_types if t in spec.node_types]
types_str = ", ".join(affected_types)
warning_msg = f"Missing {spec.env_var} for {types_str} nodes"
if spec.help_url:
warning_msg += f"\n Get it at: {spec.help_url}"
warnings.append(warning_msg)
for _cred_name, spec in adapter.get_missing_for_node_types(node_types):
missing_credentials.append(spec.env_var)
affected_types = [t for t in node_types if t in spec.node_types]
types_str = ", ".join(affected_types)
warning_msg = f"Missing {spec.env_var} for {types_str} nodes"
if spec.help_url:
warning_msg += f"\n Get it at: {spec.help_url}"
warnings.append(warning_msg)
except ImportError:
# aden_tools not installed - fall back to direct check
has_llm_nodes = any(node.node_type == "event_loop" for node in self.graph.nodes)
+36 -13
View File
@@ -341,19 +341,25 @@ class AgentRuntime:
while self._running:
self._timer_next_fire.pop(entry_point_id, None)
try:
session_state = self._get_primary_session_state(
exclude_entry_point=entry_point_id
)
await self.trigger(
entry_point_id,
{"event": {"source": "timer", "reason": "scheduled"}},
session_state=session_state,
)
logger.info(
"Timer fired for entry point '%s' (next in %s min)",
entry_point_id,
mins,
)
if self._should_skip_timer(entry_point_id):
logger.info(
"Timer '%s' skipped — primary stream busy",
entry_point_id,
)
else:
session_state = self._get_primary_session_state(
exclude_entry_point=entry_point_id
)
await self.trigger(
entry_point_id,
{"event": {"source": "timer", "reason": "scheduled"}},
session_state=session_state,
)
logger.info(
"Timer fired for entry point '%s' (next in %s min)",
entry_point_id,
mins,
)
except Exception:
logger.error(
"Timer trigger failed for '%s'",
@@ -469,6 +475,23 @@ class AgentRuntime:
raise ValueError(f"Entry point '{entry_point_id}' not found")
return await stream.wait_for_completion(exec_id, timeout)
def _should_skip_timer(self, timer_ep_id: str) -> bool:
"""Return True if a non-timer stream is actively running (not waiting for input).
Timers should only fire when the primary stream is idle (blocked
waiting for client input) or has no active execution. This prevents
concurrent pipeline runs that would race on shared memory.
"""
for ep_id, stream in self._streams.items():
if ep_id == timer_ep_id:
continue
spec = self._entry_points.get(ep_id)
if spec and spec.trigger_type == "timer":
continue
if stream.active_execution_ids and not stream.is_awaiting_input:
return True
return False
def _get_primary_session_state(self, exclude_entry_point: str) -> dict[str, Any] | None:
"""Build session_state so an async entry point runs in the primary session.
@@ -201,6 +201,17 @@ class ExecutionStream:
"""Return IDs of all currently active executions."""
return list(self._active_executions.keys())
@property
def is_awaiting_input(self) -> bool:
"""True when an active execution is blocked waiting for client input."""
if not self._active_executors:
return False
for executor in self._active_executors.values():
for node in executor.node_registry.values():
if getattr(node, "_awaiting_input", False):
return True
return False
def _record_execution_result(self, execution_id: str, result: ExecutionResult) -> None:
"""Record a completed execution result with retention pruning."""
self._execution_results[execution_id] = result
+19
View File
@@ -198,6 +198,7 @@ class AdenTUI(App):
Binding("ctrl+l", "toggle_logs", "Toggle Logs", show=True, priority=True),
Binding("ctrl+z", "pause_execution", "Pause", show=True, priority=True),
Binding("ctrl+r", "show_sessions", "Sessions", show=True, priority=True),
Binding("ctrl+p", "attach_pdf", "Attach PDF", show=True, priority=True),
Binding("tab", "focus_next", "Next Panel", show=True),
Binding("shift+tab", "focus_previous", "Previous Panel", show=False),
]
@@ -666,6 +667,24 @@ class AdenTUI(App):
timeout=3,
)
async def action_attach_pdf(self) -> None:
"""Open native OS file dialog for PDF selection (bound to Ctrl+P)."""
from framework.tui.widgets.file_browser import _has_gui, pick_pdf_file
if not _has_gui():
self.notify(
"No GUI available. Use /attach <path> instead.",
severity="warning",
timeout=5,
)
return
self.notify("Opening file dialog...", severity="information", timeout=2)
path = await pick_pdf_file()
if path is not None:
self.chat_repl.attach_pdf(path)
async def on_unmount(self) -> None:
"""Cleanup on app shutdown - cancel execution which will save state."""
self.is_ready = False
+83 -5
View File
@@ -17,6 +17,7 @@ Client-facing input:
import asyncio
import logging
import re
import shutil
import threading
from pathlib import Path
from typing import Any
@@ -119,6 +120,7 @@ class ChatRepl(Vertical):
self._session_index: list[str] = [] # IDs from last listing
self._show_logs: bool = False # Clean mode by default
self._log_buffer: list[str] = [] # Buffered log lines for backfill on toggle ON
self._attached_pdf: dict | None = None # Pending PDF attachment for next message
# Dedicated event loop for agent execution.
# Keeps blocking runtime code (LLM calls, MCP tools) off
@@ -196,6 +198,9 @@ class ChatRepl(Vertical):
if cmd == "/help":
self._write_history("""[bold cyan]Available Commands:[/bold cyan]
[bold]/attach[/bold] - Open file dialog to attach a PDF
[bold]/attach[/bold] <file_path> - Attach a PDF from a specific path
[bold]/detach[/bold] - Remove the currently attached PDF
[bold]/sessions[/bold] - List all sessions for this agent
[bold]/sessions[/bold] <session_id> - Show session details and checkpoints
[bold]/resume[/bold] - List sessions and pick one to resume
@@ -206,12 +211,11 @@ class ChatRepl(Vertical):
[bold]/help[/bold] - Show this help message
[dim]Examples:[/dim]
/attach [dim]# Open file picker dialog[/dim]
/attach ~/Documents/report.pdf [dim]# Attach a specific PDF[/dim]
/detach [dim]# Remove attached PDF[/dim]
/sessions [dim]# List all sessions[/dim]
/sessions session_20260208_143022 [dim]# Show session details[/dim]
/resume [dim]# Show numbered session list[/dim]
/resume 1 [dim]# Resume first listed session[/dim]
/resume session_20260208_143022 [dim]# Resume by full session ID[/dim]
/recover session_20260208_143022 cp_xxx [dim]# Recover from specific checkpoint[/dim]
/pause [dim]# Pause (or Ctrl+Z)[/dim]
""")
elif cmd == "/sessions":
@@ -252,6 +256,16 @@ class ChatRepl(Vertical):
session_id = parts[1].strip()
checkpoint_id = parts[2].strip()
await self._cmd_recover(session_id, checkpoint_id)
elif cmd == "/attach":
file_path = parts[1].strip() if len(parts) > 1 else None
await self._cmd_attach(file_path)
elif cmd == "/detach":
if self._attached_pdf:
name = self._attached_pdf["filename"]
self._attached_pdf = None
self._write_history(f"[dim]Detached: {name}[/dim]")
else:
self._write_history("[dim]No PDF attached.[/dim]")
elif cmd == "/pause":
await self._cmd_pause()
else:
@@ -260,6 +274,63 @@ class ChatRepl(Vertical):
"Type [bold]/help[/bold] for available commands"
)
def attach_pdf(self, path: Path) -> None:
"""Validate and stage a PDF file for the next message.
Copies the PDF to ~/.hive/assets/ and stores the path. The agent's
pdf_read tool handles text extraction at runtime.
Called by /attach <path> or by the native file dialog.
"""
path = Path(path).expanduser().resolve()
if not path.exists():
self._write_history(f"[bold red]Error:[/bold red] File not found: {path}")
return
if path.suffix.lower() != ".pdf":
self._write_history("[bold red]Error:[/bold red] Only PDF files are supported")
return
# Copy to ~/.hive/assets/, deduplicating like a normal filesystem:
# resume.pdf → resume(1).pdf → resume(2).pdf
assets_dir = Path.home() / ".hive" / "assets"
assets_dir.mkdir(parents=True, exist_ok=True)
dest = assets_dir / path.name
counter = 1
while dest.exists():
dest = assets_dir / f"{path.stem}({counter}){path.suffix}"
counter += 1
shutil.copy2(path, dest)
self._attached_pdf = {
"filename": path.name,
"path": str(dest),
}
self._write_history(f"[green]Attached:[/green] {path.name}")
self._write_history("[dim]PDF will be read by the agent on your next message.[/dim]")
async def _cmd_attach(self, file_path: str | None = None) -> None:
"""Attach a PDF file for context injection into the next message."""
if file_path is None:
from framework.tui.widgets.file_browser import _has_gui, pick_pdf_file
if not _has_gui():
self._write_history(
"[bold yellow]No GUI available.[/bold yellow] "
"Provide a path: [bold]/attach /path/to/file.pdf[/bold]"
)
return
self._write_history("[dim]Opening file dialog...[/dim]")
path = await pick_pdf_file()
if path is not None:
self.attach_pdf(path)
return
self.attach_pdf(Path(file_path))
async def _cmd_sessions(self, session_id: str | None) -> None:
"""List sessions or show details of a specific session."""
try:
@@ -895,6 +966,13 @@ class ChatRepl(Vertical):
chat_input = self.query_one("#chat-input", ChatTextArea)
chat_input.placeholder = "Commands available: /pause, /sessions, /help"
# Build input data, injecting attached PDF file path if present
input_data = {input_key: user_input}
if self._attached_pdf:
input_data["pdf_file_path"] = self._attached_pdf["path"]
self._write_history(f"[dim]Including PDF: {self._attached_pdf['filename']}[/dim]")
self._attached_pdf = None
# Submit execution to the dedicated agent loop so blocking
# runtime code (LLM, MCP tools) never touches Textual's loop.
# trigger() returns immediately with an exec_id; the heavy
@@ -902,7 +980,7 @@ class ChatRepl(Vertical):
future = asyncio.run_coroutine_threadsafe(
self.runtime.trigger(
entry_point_id=entry_point.id,
input_data={input_key: user_input},
input_data=input_data,
),
self._agent_loop,
)
+135
View File
@@ -0,0 +1,135 @@
"""
Native OS file dialog for PDF selection.
Launches the platform's native file picker (macOS: NSOpenPanel via osascript,
Linux: zenity/kdialog, Windows: PowerShell OpenFileDialog) in a background
thread so Textual's event loop stays responsive.
Falls back to None when no GUI is available (SSH, headless).
"""
import asyncio
import os
import subprocess
import sys
from pathlib import Path
def _has_gui() -> bool:
"""Detect whether a GUI display is available."""
if sys.platform == "darwin":
# macOS: GUI is available unless running over SSH without display forwarding.
return "SSH_CONNECTION" not in os.environ or "DISPLAY" in os.environ
elif sys.platform == "win32":
return True
else:
# Linux/BSD: Need X11 or Wayland.
return bool(os.environ.get("DISPLAY") or os.environ.get("WAYLAND_DISPLAY"))
def _linux_file_dialog() -> subprocess.CompletedProcess | None:
"""Try zenity, then kdialog, on Linux. Returns CompletedProcess or None."""
# Try zenity (GTK)
try:
return subprocess.run(
[
"zenity",
"--file-selection",
"--title=Select a PDF file",
"--file-filter=PDF files (*.pdf)|*.pdf",
],
capture_output=True,
text=True,
timeout=300,
)
except FileNotFoundError:
pass
# Try kdialog (KDE)
try:
return subprocess.run(
[
"kdialog",
"--getopenfilename",
".",
"PDF files (*.pdf)",
],
capture_output=True,
text=True,
timeout=300,
)
except FileNotFoundError:
pass
return None
def _pick_pdf_subprocess() -> Path | None:
"""Run the native file dialog. BLOCKS until user picks or cancels.
Returns a Path on success, None on cancel or error.
Must be called from a non-main thread (via asyncio.to_thread).
"""
try:
if sys.platform == "darwin":
result = subprocess.run(
[
"osascript",
"-e",
'POSIX path of (choose file of type {"com.adobe.pdf"} '
'with prompt "Select a PDF file")',
],
capture_output=True,
text=True,
timeout=300,
)
elif sys.platform == "win32":
ps_script = (
"Add-Type -AssemblyName System.Windows.Forms; "
"$f = New-Object System.Windows.Forms.OpenFileDialog; "
"$f.Filter = 'PDF files (*.pdf)|*.pdf'; "
"$f.Title = 'Select a PDF file'; "
"if ($f.ShowDialog() -eq 'OK') { $f.FileName }"
)
result = subprocess.run(
["powershell", "-NoProfile", "-Command", ps_script],
capture_output=True,
text=True,
timeout=300,
)
else:
result = _linux_file_dialog()
if result is None:
return None
if result.returncode != 0:
return None
path_str = result.stdout.strip()
if not path_str:
return None
path = Path(path_str)
if path.is_file() and path.suffix.lower() == ".pdf":
return path
return None
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
return None
async def pick_pdf_file() -> Path | None:
"""Open a native OS file dialog to pick a PDF file.
Non-blocking: runs the dialog subprocess in a background thread via
asyncio.to_thread(), so the calling event loop stays responsive.
Returns:
Path to the selected PDF, or None if the user cancelled,
no GUI is available, or the dialog command was not found.
"""
if not _has_gui():
return None
return await asyncio.to_thread(_pick_pdf_subprocess)
@@ -140,43 +140,99 @@ report_node = NodeSpec(
system_prompt="""\
Write a research report as an HTML file and present it to the user.
IMPORTANT: save_data requires TWO separate arguments: filename and data.
Call it like: save_data(filename="report.html", data="<html>...</html>")
**CRITICAL: You MUST build the file in multiple append_data calls. NEVER try to write the \
entire HTML in a single save_data call it will exceed the output token limit and fail.**
IMPORTANT: save_data and append_data require TWO separate arguments: filename and data.
Call like: save_data(filename="report.html", data="<html>...")
Do NOT use _raw, do NOT nest arguments inside a JSON string.
Do NOT include data_dir in tool calls it is auto-injected.
**STEP 1 Write and save the HTML report (tool calls, NO text to user yet):**
**PROCESS (follow exactly):**
Build a clean HTML document. Keep the HTML concise aim for clarity over length.
Use minimal embedded CSS (a few lines of style, not a full framework).
**Step 1 Write HTML head + executive summary (save_data):**
Call save_data to create the file with the HTML head, CSS, title, and executive summary.
```
save_data(filename="report.html", data="<!DOCTYPE html>\\n<html>...")
```
Report structure:
- Title & date
- Executive Summary (2-3 paragraphs)
- Key Findings (organized by theme, with [n] citation links)
- Analysis (synthesis, implications)
- Conclusion (key takeaways)
- References (numbered list with clickable URLs)
Include: DOCTYPE, head with ALL styles below, opening body, h1 title, date, and the \
executive summary (2-3 paragraphs). End after the executive summary section.
Requirements:
- Every factual claim must cite its source with [n] notation
- Be objective present multiple viewpoints where sources disagree
- Answer the original research questions from the brief
**CSS to use (copy exactly):**
```
body{font-family:Georgia,'Times New Roman',serif;max-width:800px;margin:0 auto;\
padding:40px;line-height:1.8;color:#333}
h1{font-size:1.8em;color:#1a1a1a;border-bottom:2px solid #333;padding-bottom:10px}
h2{font-size:1.4em;color:#1a1a1a;margin-top:40px;padding-top:20px;\
border-top:1px solid #ddd}
h3{font-size:1.1em;color:#444;margin-top:25px}
p{margin:12px 0}
.date{color:#666;font-size:0.95em;margin-bottom:30px}
.executive-summary{background:#f8f9fa;padding:25px;border-radius:8px;\
margin:25px 0;border-left:4px solid #333}
.finding-section{margin:20px 0}
.citation{color:#1a73e8;text-decoration:none;font-size:0.85em}
.citation:hover{text-decoration:underline}
.analysis{background:#fff;padding:20px 0}
.references{margin-top:40px;padding-top:20px;border-top:2px solid #333}
.references ol{padding-left:20px}
.references li{margin:8px 0;font-size:0.95em}
.references a{color:#1a73e8;text-decoration:none}
.references a:hover{text-decoration:underline}
.footer{text-align:center;color:#999;border-top:1px solid #ddd;\
padding-top:20px;margin-top:50px;font-size:0.85em;font-family:sans-serif}
```
Save the HTML:
save_data(filename="report.html", data="<html>...</html>")
**Step 2 Append key findings (append_data):**
```
append_data(filename="report.html", data="<h2>Key Findings</h2>...")
```
Then get the clickable link:
serve_file_to_user(filename="report.html", label="Research Report")
Organize findings by theme. Use [n] citation notation for every factual claim. \
Pattern per theme:
```
<div class="finding-section">
<h3>{Theme Name}</h3>
<p>{Finding text with <a class="citation" href="#ref-n">[n]</a> citations}</p>
</div>
```
If save_data fails, simplify and shorten the HTML, then retry.
**Step 3 Append analysis + conclusion (append_data):**
```
append_data(filename="report.html", data="<h2>Analysis</h2>...")
```
**STEP 2 Present the link to the user (text only, NO tool calls):**
Include: synthesis of findings, implications, and a Conclusion section with key \
takeaways. Be objective present multiple viewpoints where sources disagree.
Tell the user the report is ready and include the file:// URI from
serve_file_to_user so they can click it to open. Give a brief summary
of what the report covers. Ask if they have questions or want to continue.
**Step 4 Append references + footer (append_data):**
```
append_data(filename="report.html", data="<div class='references'>...")
```
**STEP 3 After the user responds:**
Include: numbered reference list with clickable URLs, then footer, then \
`</body></html>`. Pattern:
```
<div class="references">
<h2>References</h2>
<ol>
<li id="ref-1"><a href="{url}" target="_blank">{title}</a> {source}</li>
</ol>
</div>
```
**Step 5 Serve the file:**
```
serve_file_to_user(filename="report.html", label="Research Report", open_in_browser=true)
```
**Step 6 Present to user (text only, NO tool calls):**
**CRITICAL: Print the file_path from the serve_file_to_user result in your response** \
so the user can click it to reopen the report later. Give a brief summary of what the \
report covers. Ask if they have questions.
**Step 7 After the user responds:**
- Answer any follow-up questions from the research material
- When the user is ready to move on, ask what they'd like to do next:
- Research a new topic?
@@ -185,15 +241,13 @@ of what the report covers. Ask if they have questions or want to continue.
- set_output("delivery_status", "completed")
- set_output("next_action", "new_topic") if they want a new topic
- set_output("next_action", "more_research") if they want deeper research
**IMPORTANT:**
- Every factual claim MUST cite its source with [n] notation
- Answer the original research questions from the brief
- If an append_data call fails with a truncation error, break it into smaller chunks
""",
tools=[
"save_data",
"append_data",
"edit_data",
"serve_file_to_user",
"load_data",
"list_data_files",
],
tools=["save_data", "append_data", "serve_file_to_user"],
)
__all__ = [
@@ -28,7 +28,8 @@ goal = Goal(
"Manage Gmail inbox emails autonomously using user-defined free-text rules. "
"For every five minutes, fetch inbox emails (configurable batch size, default 100), "
"apply the user's rules to each email, and execute the appropriate Gmail actions — trash, "
"mark as spam, mark important, mark read/unread, star, and more."
"mark as spam, mark important, mark read/unread, star, draft replies, "
"create/apply custom labels, and more."
),
success_criteria=[
SuccessCriterion(
@@ -39,7 +40,7 @@ goal = Goal(
),
metric="action_correctness",
target=">=95%",
weight=0.35,
weight=0.30,
),
SuccessCriterion(
id="action-report",
@@ -49,7 +50,7 @@ goal = Goal(
),
metric="report_completeness",
target="100%",
weight=0.3,
weight=0.25,
),
SuccessCriterion(
id="batch-completeness",
@@ -59,7 +60,14 @@ goal = Goal(
),
metric="emails_processed_ratio",
target="100%",
weight=0.35,
weight=0.30,
),
SuccessCriterion(
id="label-management",
description="Custom labels are created and applied correctly when rules require them",
metric="label_coverage",
target="100%",
weight=0.15,
),
],
constraints=[
@@ -78,6 +86,12 @@ goal = Goal(
constraint_type="hard",
category="safety",
),
Constraint(
id="draft-not-send",
description="Agent creates draft replies but NEVER sends them automatically",
constraint_type="hard",
category="safety",
),
],
)
@@ -139,14 +153,16 @@ pause_nodes = []
terminal_nodes = []
loop_config = {
"max_iterations": 100,
"max_tool_calls_per_turn": 50,
"max_tool_calls_per_turn": 30,
"max_tool_result_chars": 8000,
"max_history_tokens": 32000,
}
conversation_mode = "continuous"
identity_prompt = (
"You are an email inbox management assistant. You help users manage "
"their Gmail inbox by applying free-text rules to emails — trash, "
"mark as spam, mark important, mark read/unread, star, and more."
"mark as spam, mark important, mark read/unread, star, draft replies, "
"create/apply custom labels, and more."
)
@@ -14,13 +14,15 @@ class AgentMetadata:
description: str = (
"Automatically manage Gmail inbox emails using free-text rules. "
"Trash junk, mark spam, mark important, mark read/unread, star, "
"and more — using only native Gmail actions."
"draft replies, create/apply custom labels, and more — using only "
"native Gmail actions."
)
intro_message: str = (
"Hi! I'm your email inbox management assistant. Tell me your rules "
"(what to trash, mark as spam, mark important, etc.) and I'll run an "
"initial triage of your inbox. After that, I'll automatically check "
"and process new emails every 5 minutes — so you can set it and forget it. "
"(what to trash, mark as spam, mark important, draft replies to, "
"label with custom labels, etc.) and I'll run an initial triage of "
"your inbox. After that, I'll automatically check and process new "
"emails every 5 minutes — so you can set it and forget it. "
"What rules would you like me to apply?"
)
@@ -30,6 +30,8 @@ The following Gmail actions are available — map the user's rules to whichever
- **Mark as read** / mark as unread
- **Star** / unstar emails
- **Add/remove Gmail labels** (INBOX, UNREAD, IMPORTANT, STARRED, SPAM, CATEGORY_PERSONAL, CATEGORY_SOCIAL, CATEGORY_PROMOTIONS, CATEGORY_UPDATES, CATEGORY_FORUMS)
- **Draft replies** create draft reply emails (never sent automatically)
- **Create/apply custom labels** create new Gmail labels and apply them to emails
Present the rules back to the user in plain language. Do NOT refuse rules if the user asks for any of the above actions, confirm you will do it.
@@ -37,12 +39,16 @@ Also confirm the batch size (max_emails). If max_emails is not provided, default
Ask the user to confirm: "Does this look right? I'll proceed once you confirm."
**STEP 2 After the user confirms, call set_output:**
**STEP 2 Show existing labels (tool call):**
Call gmail_list_labels() to show the user their current Gmail labels. This helps them reference existing labels or decide whether new custom labels are needed for their rules.
**STEP 3 After the user confirms, call set_output:**
- set_output("rules", <the confirmed rules as a clear text description>)
- set_output("max_emails", <the confirmed max_emails as a string number, e.g. "100">)
""",
tools=[],
tools=["gmail_list_labels"],
)
# Node 2: Fetch Emails (event_loop — fetches emails with pagination support)
@@ -117,27 +123,36 @@ You are an inbox management assistant. Apply the user's rules to their emails an
- gmail_batch_modify_messages(message_ids, add_labels, remove_labels) Modify Gmail labels in batch. ALWAYS prefer this.
- gmail_modify_message(message_id, add_labels, remove_labels) Modify a single message's labels.
- gmail_trash_message(message_id) Move a message to trash. No batch version; call per email.
- gmail_create_draft(to, subject, body) Create a draft reply. NEVER sends automatically.
- gmail_create_label(name) Create a new Gmail label. Returns the label ID.
- gmail_list_labels() List all existing Gmail labels with their IDs.
- set_output(key, value) Set an output value. Call ONLY after all actions are executed.
**CONTEXT:**
- "rules" = the user's rule to apply (e.g. "mark all as unread")
- "emails" = a filename (e.g. "emails.jsonl") containing the fetched emails as JSONL. Each line has: id, subject, from, to, date, snippet, labels.
**STEP 1 LOAD EMAILS (your first tool call MUST be load_data):**
Call load_data(filename=<the "emails" value from context>) to read the email data.
- If the result is empty, call set_output("actions_taken", "no emails to process") and stop.
- If has_more=true, load more pages with load_data(filename=..., offset=...) until all emails are loaded.
**PROCESS EMAILS ONE CHUNK AT A TIME (you will get multiple turns):**
**STEP 2 DETERMINE STRATEGY:**
- **Blanket rule** (same action for ALL emails, e.g. "mark all as unread"): Collect all message IDs, then execute ONE gmail_batch_modify_messages call.
- **Classification rule** (different actions for different emails): Classify each email, group by action, execute batch operations per group.
Each turn, process exactly ONE chunk: load classify act record. Then STOP and wait for your next turn to load the next chunk.
**STEP 3 EXECUTE ACTIONS:**
Call the appropriate Gmail tool(s) with the real message IDs from the loaded emails. Then record each action:
- append_data(filename="actions.jsonl", data=<JSON of {email_id, subject, from, action}>)
1. Call load_data(filename=<emails value>, limit_bytes=7500).
- Parse the visible JSONL lines: split by \n, JSON.parse each complete line.
- Ignore the last line if it appears cut off (incomplete JSON).
- Note the next_offset_bytes value from the result.
**STEP 4 FINISH:**
After ALL actions are executed, call set_output("actions_taken", "actions.jsonl").
2. Classify the emails in THIS chunk against the rules. For each email, decide the action: trash, draft reply, label change, or no action.
3. Execute Gmail actions for this chunk immediately:
- **Label changes:** gmail_batch_modify_messages for all IDs in this chunk that need the same label change.
- **Trash:** gmail_trash_message per email.
- **Drafts:** gmail_create_draft per email.
- Record each action: append_data(filename="actions.jsonl", data=<JSON of {email_id, subject, from, action}>)
4. If has_more=true, STOP HERE. On your next turn, call load_data with offset_bytes=<next_offset_bytes> and repeat from step 2.
If has_more=false, you are done processing call set_output("actions_taken", "actions.jsonl").
**CRITICAL:** Only call load_data ONCE per turn. Do NOT pre-load multiple chunks. You must see the emails before you can act on them.
**GMAIL LABEL REFERENCE:**
- MARK AS UNREAD add_labels=["UNREAD"]
@@ -149,17 +164,24 @@ After ALL actions are executed, call set_output("actions_taken", "actions.jsonl"
- ARCHIVE remove_labels=["INBOX"]
- MARK AS SPAM add_labels=["SPAM"], remove_labels=["INBOX"]
- TRASH use gmail_trash_message(message_id) per email
- DRAFT REPLY use gmail_create_draft(to=<sender>, subject="Re: <subject>", body=<contextual reply based on email content>). Creates a draft only, never sends.
- CREATE CUSTOM LABEL use gmail_create_label(name=<label_name>) to create, then apply via gmail_modify_message with add_labels=[<label_id>]
- APPLY CUSTOM LABEL add_labels=[<label_id>] using the ID from gmail_create_label or gmail_list_labels
**CRITICAL RULES:**
- Your FIRST tool call MUST be load_data. Do NOT skip this.
- You MUST call Gmail tools to execute real actions. Do NOT just report what should be done.
- Do NOT call set_output until all Gmail actions are executed.
- Pass ONLY the filename "actions.jsonl" to set_output, NOT raw data.
- NEVER send emails. Only create drafts via gmail_create_draft.
""",
tools=[
"gmail_trash_message",
"gmail_modify_message",
"gmail_batch_modify_messages",
"gmail_create_draft",
"gmail_create_label",
"gmail_list_labels",
"load_data",
"append_data",
],
+7 -3
View File
@@ -87,10 +87,12 @@
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are a career coach creating personalized application materials.\n\n**INPUT:** You have the user's resume and their selected jobs.\n\n**For EACH selected job, generate:**\n\n1. **Resume Customization List** \u2014 Specific, actionable changes:\n - Which bullet points to move to the top\n - Keywords from the job posting to incorporate\n - Which experiences to emphasize vs. de-emphasize\n - Suggested rewording for specific bullet points\n - Skills to highlight in the summary\n IMPORTANT: Only suggest truthful changes \u2014 enhance presentation, never fabricate.\n\n2. **Cold Outreach Email** \u2014 Professional email to HR/hiring manager:\n - Subject line\n - Personalized opening referencing the company/role\n - 2-3 sentences on why you're a strong fit (specific to this role)\n - Clear call to action\n - Professional sign-off\n Keep it concise (under 150 words) and not spammy.\n\n**PRESENT each job's materials to the user as you complete them.**\n\n**OUTPUT FORMAT: HTML**\nGenerate a polished HTML report with:\n- Clean, professional styling (use inline CSS)\n- Clear sections for each job with headers\n- Resume customizations as bulleted lists\n- Cold emails in styled blockquotes or cards\n- A table of contents at the top linking to each job section\n\nUse this structure:\n```html\n<!DOCTYPE html>\n<html>\n<head>\n <title>Job Application Materials</title>\n <style>\n body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; max-width: 900px; margin: 0 auto; padding: 40px; }\n h1 { color: #1a1a1a; border-bottom: 2px solid #0066cc; padding-bottom: 10px; }\n h2 { color: #0066cc; margin-top: 40px; }\n .email-card { background: #f8f9fa; border-left: 4px solid #0066cc; padding: 20px; margin: 20px 0; }\n .customization-list { background: #fff; border: 1px solid #e0e0e0; padding: 20px; border-radius: 8px; }\n ul { line-height: 1.8; }\n </style>\n</head>\n<body>...</body>\n</html>\n```\n\n**After generating all materials:**\n1. Use save_data to save the HTML to 'application_materials.html'\n2. Use serve_file_to_user to open it in the browser (set open_in_browser=true)\n3. **CRITICAL: Print the file_path from the serve_file_to_user result in your response** so the user can click it later:\n \"Your materials are ready! File saved to: [file_path]\"\n4. Call set_output(\"application_materials\", \"Created application_materials.html for N jobs\")",
"system_prompt": "You are a career coach creating personalized application materials.\n\n**INPUT:** You have the user's resume and their selected jobs.\n\n**OUTPUT FORMAT: Single HTML Report \u2014 Built Incrementally**\nBuild ONE polished HTML report, but write it in CHUNKS using append_data to avoid token limits.\n\n**CRITICAL: You MUST build the file in multiple append_data calls. NEVER try to write the entire HTML in a single save_data call \u2014 it will exceed the output token limit and fail.**\n\n**PROCESS (follow exactly):**\n\n**Step 1 \u2014 Write HTML header + table of contents:**\nCall save_data to create the file with the HTML head, styles, and TOC:\nInclude: DOCTYPE, head with styles, opening body tag, h1, and the table of contents linking to each selected job. End with the TOC closing div.\n\nCSS to use:\n body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; max-width: 900px; margin: 0 auto; padding: 40px; line-height: 1.6; }\n h1 { color: #1a1a1a; border-bottom: 2px solid #0066cc; padding-bottom: 10px; }\n h2 { color: #0066cc; margin-top: 40px; padding-top: 20px; border-top: 1px solid #e0e0e0; }\n h3 { color: #333; margin-top: 20px; }\n .job-section { margin-bottom: 60px; }\n .email-card { background: #f8f9fa; border-left: 4px solid #0066cc; padding: 20px; margin: 20px 0; white-space: pre-wrap; }\n .customization-list { background: #fff; border: 1px solid #e0e0e0; padding: 20px; border-radius: 8px; }\n ul { line-height: 1.8; }\n .toc { background: #f0f4f8; padding: 20px; border-radius: 8px; margin-bottom: 40px; }\n .toc a { color: #0066cc; text-decoration: none; }\n .toc a:hover { text-decoration: underline; }\n .job-url { color: #666; font-size: 0.9em; }\n\n**Step 2 \u2014 Append each job section ONE AT A TIME:**\nFor EACH selected job, call append_data with that job's section.\nEach section should contain:\n- Job title + company as h2\n- Job URL link\n- Resume Customization List (Priority Changes, Keywords, Experiences to Emphasize, Suggested Rewrites)\n- Cold Outreach Email in an email-card div (subject line + body, under 150 words)\n\n**Step 3 \u2014 Append HTML footer:**\nappend_data(filename=\"application_materials.html\", data=\"</body>\\n</html>\")\n\n**Step 4 \u2014 Serve the file:**\nCall serve_file_to_user(filename=\"application_materials.html\", open_in_browser=true)\nPrint the file_path from the result so the user can click it later.\n\n**Step 5 \u2014 Create Gmail Drafts (in batches of 5):**\nIMPORTANT: Do NOT create all drafts in one turn. Create at most 5 gmail_create_draft calls per turn to stay within tool call limits. If there are more than 5 jobs, create the first 5 drafts, then create the remaining drafts in the next turn.\nFor each selected job, call gmail_create_draft. If it errors, skip ALL remaining drafts and tell the user.\n\n**Step 6 \u2014 Finish:**\nCall set_output(\"application_materials\", \"Created application_materials.html with materials for {N} jobs\")\n\n**IMPORTANT:**\n- Only suggest truthful resume changes \u2014 enhance presentation, never fabricate\n- Cold emails must be professional, personalized, and under 150 words\n- ALWAYS print the full file path so users can easily access the file later\n- If a save_data or append_data call fails with a truncation error, you are writing too much in one call. Break it into smaller chunks.",
"tools": [
"save_data",
"serve_file_to_user"
"append_data",
"serve_file_to_user",
"gmail_create_draft"
],
"model": null,
"function": null,
@@ -256,8 +258,10 @@
},
"required_tools": [
"save_data",
"append_data",
"serve_file_to_user",
"web_scrape"
"web_scrape",
"gmail_create_draft"
],
"metadata": {
"created_at": "2026-02-13T18:41:10.324531",
+6 -5
View File
@@ -14,13 +14,14 @@ class AgentMetadata:
description: str = (
"Analyze your resume to identify your strongest role fits, find matching "
"job opportunities, and generate customized application materials including "
"resume customization lists and cold outreach emails."
"resume customization lists, cold outreach emails, and Gmail drafts."
)
intro_message: str = (
"Hi! I'm your job hunting assistant. Paste your resume and I'll analyze it to "
"identify roles where you have the highest chance of success, find matching "
"job openings, and help you create personalized application materials for "
"the positions you choose. Ready to get started?"
"Hi! I'm your job hunting assistant. Please upload your resume and I'll "
"analyze it to identify roles where you have the highest chance of success, "
"find matching job openings, and create personalized application materials "
"for the positions you choose — including Gmail drafts ready for you to "
"review and send. Ready to get started?"
)
+69 -75
View File
@@ -20,12 +20,18 @@ intake_node = NodeSpec(
system_prompt="""\
You are a career analyst helping a job seeker find their best opportunities.
**STEP 1 Greet and collect resume (text only, NO tool calls):**
**STEP 1 Collect the resume:**
Ask the user to paste their resume. Be friendly and concise:
"Please paste your resume below. I'll analyze your experience and identify the roles where you have the strongest chance of success."
Check your input context for a `pdf_file_path` key.
**STEP 2 After the user provides their resume:**
- **If `pdf_file_path` is present:** A PDF resume has been attached. Use the `pdf_read` tool \
to extract its text: `pdf_read(file_path=<the path>)`. Then greet the user and proceed \
directly to STEP 2 with the extracted text.
- **If no `pdf_file_path`:** Ask the user to paste their resume. Be friendly and concise:
"Please paste your resume below (or attach a PDF with /attach). I'll analyze your \
experience and identify the roles where you have the strongest chance of success."
**STEP 2 After you have the resume text:**
Analyze the resume thoroughly:
1. Identify key skills (technical and soft skills)
@@ -48,7 +54,7 @@ Use set_output to store:
NEVER ask the user to pick between roles. Your job is to identify the right roles, not make them choose.
""",
tools=[],
tools=["pdf_read"],
)
# Node 2: Job Search
@@ -170,91 +176,79 @@ You are a career coach creating personalized application materials.
**INPUT:** You have the user's resume and their selected jobs.
**OUTPUT FORMAT: Single HTML Report**
Generate ONE polished HTML report containing materials for ALL selected jobs.
**OUTPUT FORMAT: Single HTML Report Built Incrementally**
Build ONE polished HTML report, but write it in CHUNKS using append_data to avoid token limits.
**HTML Structure:**
```html
<!DOCTYPE html>
<html>
<head>
<title>Job Application Materials</title>
<style>
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; max-width: 900px; margin: 0 auto; padding: 40px; line-height: 1.6; }
h1 { color: #1a1a1a; border-bottom: 2px solid #0066cc; padding-bottom: 10px; }
h2 { color: #0066cc; margin-top: 40px; padding-top: 20px; border-top: 1px solid #e0e0e0; }
h3 { color: #333; margin-top: 20px; }
.job-section { margin-bottom: 60px; }
.email-card { background: #f8f9fa; border-left: 4px solid #0066cc; padding: 20px; margin: 20px 0; white-space: pre-wrap; }
.customization-list { background: #fff; border: 1px solid #e0e0e0; padding: 20px; border-radius: 8px; }
ul { line-height: 1.8; }
.toc { background: #f0f4f8; padding: 20px; border-radius: 8px; margin-bottom: 40px; }
.toc a { color: #0066cc; text-decoration: none; }
.toc a:hover { text-decoration: underline; }
.job-url { color: #666; font-size: 0.9em; }
</style>
</head>
<body>
<h1>Job Application Materials</h1>
<div class="toc">
<strong>Table of Contents:</strong>
<ol>
<li><a href="#job-1">Job Title at Company</a></li>
<!-- ... more jobs ... -->
</ol>
</div>
**CRITICAL: You MUST build the file in multiple append_data calls. NEVER try to write the \
entire HTML in a single save_data call it will exceed the output token limit and fail.**
<!-- For each job: -->
<div class="job-section" id="job-1">
<h2>Job Title at Company</h2>
<p class="job-url">URL: <a href="...">link</a></p>
**PROCESS (follow exactly):**
<h3>Resume Customization List</h3>
<div class="customization-list">
<h4>Priority Changes</h4>
<ul>
<li>...</li>
</ul>
<h4>Keywords to Incorporate</h4>
<ul>...</ul>
<h4>Experiences to Emphasize</h4>
<ul>...</ul>
<h4>Suggested Rewrites</h4>
<ul>...</ul>
</div>
**Step 1 Write HTML header + table of contents:**
Call save_data to create the file with the HTML head, styles, and TOC:
```
save_data(filename="application_materials.html", data="<!DOCTYPE html>\\n<html>\\n<head>...")
```
Include: DOCTYPE, head with styles, opening body tag, h1, and the table of contents \
linking to each selected job. End with the TOC closing div.
<h3>Cold Outreach Email</h3>
<div class="email-card">
Subject: ...
CSS to use:
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; max-width: 900px; margin: 0 auto; padding: 40px; line-height: 1.6; }
h1 { color: #1a1a1a; border-bottom: 2px solid #0066cc; padding-bottom: 10px; }
h2 { color: #0066cc; margin-top: 40px; padding-top: 20px; border-top: 1px solid #e0e0e0; }
h3 { color: #333; margin-top: 20px; }
.job-section { margin-bottom: 60px; }
.email-card { background: #f8f9fa; border-left: 4px solid #0066cc; padding: 20px; margin: 20px 0; white-space: pre-wrap; }
.customization-list { background: #fff; border: 1px solid #e0e0e0; padding: 20px; border-radius: 8px; }
ul { line-height: 1.8; }
.toc { background: #f0f4f8; padding: 20px; border-radius: 8px; margin-bottom: 40px; }
.toc a { color: #0066cc; text-decoration: none; }
.toc a:hover { text-decoration: underline; }
.job-url { color: #666; font-size: 0.9em; }
Dear Hiring Manager,
**Step 2 Append each job section ONE AT A TIME:**
For EACH selected job, call append_data with that job's section:
```
append_data(filename="application_materials.html", data="<div class='job-section' id='job-N'>...")
```
Each section should contain:
- Job title + company as h2
- Job URL link
- Resume Customization List (Priority Changes, Keywords, Experiences to Emphasize, Suggested Rewrites)
- Cold Outreach Email in an email-card div (subject line + body, under 150 words)
...
Best regards,
[Your Name]
</div>
</div>
</body>
</html>
**Step 3 Append HTML footer:**
```
append_data(filename="application_materials.html", data="</body>\\n</html>")
```
**PROCESS:**
1. Generate the complete HTML report for ALL selected jobs
2. Save it using: save_data(filename="application_materials.html", data="<the HTML content>")
3. Get the clickable file path using: serve_file_to_user(filename="application_materials.html", open_in_browser=true)
4. **CRITICAL: Print the file path in your response so the user can click it:**
"Your application materials have been saved and opened in your browser.
**Step 4 Serve the file:**
Call serve_file_to_user(filename="application_materials.html", open_in_browser=true)
Print the file_path from the result so the user can click it later.
**File location:** [print the file_path from serve_file_to_user result]"
5. Call set_output("application_materials", "Created application_materials.html with materials for {N} jobs")
**Step 5 Create Gmail Drafts (in batches of 5):**
IMPORTANT: Do NOT create all drafts in one turn. Create at most 5 gmail_create_draft calls \
per turn to stay within tool call limits. If there are more than 5 jobs, create the first 5 \
drafts, then create the remaining drafts in the next turn.
For each selected job, call gmail_create_draft with:
- to: hiring manager email if available, otherwise "hiring@company-domain.com"
- subject: the cold email subject line
- html: the cold email body as HTML
If gmail_create_draft errors (e.g. credentials not configured), skip ALL remaining drafts and tell the user:
"Gmail drafts could not be created (Gmail not connected). You can copy the emails from the HTML report instead."
**Step 6 Finish:**
Call set_output("application_materials", "Created application_materials.html with materials for {N} jobs")
**IMPORTANT:**
- Only suggest truthful resume changes enhance presentation, never fabricate
- Cold emails must be professional, personalized, and under 150 words
- ALWAYS print the full file path so users can easily access the file later
- If a save_data or append_data call fails with a truncation error, you are writing too much \
in one call. Break it into smaller chunks.
""",
tools=["save_data", "serve_file_to_user"],
tools=["save_data", "append_data", "serve_file_to_user", "gmail_create_draft"],
)
__all__ = [
@@ -131,30 +131,100 @@ compile_report_node = NodeSpec(
system_prompt="""\
You are the report compiler for a Tech & AI News Reporter agent.
Your task: Turn the articles_data into a polished, readable HTML report and deliver it to the user.
Your task: Turn the articles_data into a polished, readable HTML report and deliver it.
**Instructions:**
1. Parse the articles_data JSON to get the list of articles.
2. Generate a well-structured HTML report with:
- A header with the report title and date
- A table of contents / summary section listing topics covered
- Articles grouped by topic category
- For each article: title (linked to source URL), source name, date, and summary
- Clean, readable styling (inline CSS)
3. Use save_data to save the HTML report as "tech_news_report.html".
4. Use serve_file_to_user to get a clickable link for the user.
**CRITICAL: You MUST build the file in multiple append_data calls. NEVER try to write the \
entire HTML in a single save_data call it will exceed the output token limit and fail.**
**STEP 1 Respond to the user (text only, NO tool calls):**
Present a brief text summary of the report highlights how many articles, what topics are covered, and a few headline highlights. Tell the user you're generating their full report now.
**PROCESS (follow exactly):**
**STEP 2 After presenting the summary, save and serve the report:**
- save_data(filename="tech_news_report.html", data=<html_content>, data_dir=<data_dir>)
- serve_file_to_user(filename="tech_news_report.html", data_dir=<data_dir>, label="Tech & AI News Report", open_in_browser=True)
- set_output("report_file", "tech_news_report.html")
**Step 1 Write HTML head + header + TOC (save_data):**
Call save_data to create the file with the HTML head, CSS, header, and table of contents.
```
save_data(filename="tech_news_report.html", data="<!DOCTYPE html>\\n<html>...")
```
The report will auto-open in the user's default browser. Let them know the report has been opened.
Include: DOCTYPE, head with ALL styles below, opening body, header with report title \
and date, and a TOC listing all topic categories covered.
**CSS to use (copy exactly):**
```
body{font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',sans-serif;\
max-width:900px;margin:0 auto;padding:40px;line-height:1.6;color:#333}
header{border-bottom:3px solid #1a73e8;padding-bottom:20px;margin-bottom:30px}
header h1{color:#1a1a1a;font-size:2em}
header p{color:#666;margin-top:5px}
.toc{background:#f0f4f8;padding:20px;border-radius:8px;margin-bottom:40px}
.toc a{color:#1a73e8;text-decoration:none}
.toc a:hover{text-decoration:underline}
.topic-section{margin-bottom:50px}
.topic-section h2{color:#1a73e8;border-bottom:1px solid #e0e0e0;padding-bottom:8px}
.article-card{background:#fff;border:1px solid #e0e0e0;border-radius:8px;\
padding:20px;margin:15px 0}
.article-card h3{margin:0 0 8px 0}
.article-card h3 a{color:#1a1a1a;text-decoration:none}
.article-card h3 a:hover{color:#1a73e8;text-decoration:underline}
.article-meta{color:#666;font-size:0.9em;margin-bottom:10px}
.article-summary{line-height:1.7}
.footer{text-align:center;color:#999;border-top:1px solid #e0e0e0;\
padding-top:20px;margin-top:40px;font-size:0.85em}
```
**Header HTML pattern:**
```
<header>
<h1>Tech & AI News Report</h1>
<p>{date} | {article_count} articles across {topic_count} topics</p>
</header>
```
**TOC pattern:**
```
<div class="toc">
<strong>Topics Covered:</strong>
<ul>
<li><a href="#topic-{slug}">{Topic Name}</a> ({count} articles)</li>
</ul>
</div>
```
End Step 1 after the TOC closing div. Do NOT close body/html yet.
**Step 2 Append each topic section (one append_data per topic):**
For EACH topic group, call append_data with that topic's section:
```
append_data(filename="tech_news_report.html", data="<div class='topic-section' id='topic-{slug}'>...")
```
Use this pattern for each article within a topic:
```
<div class="article-card">
<h3><a href="{url}" target="_blank">{title}</a></h3>
<p class="article-meta">{source} | {date}</p>
<p class="article-summary">{summary}</p>
</div>
```
Close the topic-section div after all articles in that topic.
**Step 3 Append footer (append_data):**
```
append_data(filename="tech_news_report.html", data="<div class='footer'>...</div>\\n</body>\\n</html>")
```
**Step 4 Serve the file:**
```
serve_file_to_user(filename="tech_news_report.html", label="Tech & AI News Report", open_in_browser=true)
```
**CRITICAL: Print the file_path from the serve_file_to_user result in your response** \
so the user can click it to reopen the report later.
Then: set_output("report_file", "tech_news_report.html")
**IMPORTANT:**
- If an append_data call fails with a truncation error, break it into smaller chunks
- Do NOT include data_dir in tool calls it is auto-injected
""",
tools=["save_data", "serve_file_to_user"],
tools=["save_data", "append_data", "serve_file_to_user"],
)
__all__ = [
@@ -223,59 +223,164 @@ final_report_node = NodeSpec(
system_prompt="""\
Generate an HTML risk dashboard report and deliver it to the user.
**STEP 1 Generate the HTML report (tool calls first):**
**CRITICAL: You MUST build the file in multiple append_data calls. NEVER try to write the \
entire HTML in a single save_data call it will exceed the output token limit and fail.**
Create a self-contained HTML document with embedded CSS. Use a clean, professional \
security dashboard design.
**PROCESS (follow exactly):**
Report structure:
- **Header**: Target domain, scan date, "Security Risk Assessment" title
- **Overall Grade**: Large, color-coded letter grade (A=green, B=blue, C=yellow, \
D=orange, F=red) with numeric score
- **Grade Scale Legend**: Show what each grade means (A through F)
- **Category Breakdown**: 6 cards/panels, each showing:
- Category name
- Letter grade (color-coded)
- Numeric score
- Number of findings
- **Detailed Findings by Category**: For each of the 6 categories:
- Category header with grade
- List of findings organized by severity (high -> medium -> low -> info)
- For each finding:
- Title and severity badge (color-coded)
- Description of the issue
- Why it matters (impact)
- **Remediation**: Clear, step-by-step fix instructions for developers
- Code examples where relevant (e.g., header configurations, DNS records to add)
- **Top Risks Summary**: Prioritized action items (fix these first)
- **Methodology**: "This assessment used passive, OSINT-based scanning techniques..."
- **Disclaimer**: "This is an automated passive assessment, not a comprehensive \
penetration test"
**Step 1 Write HTML head + header + overall grade (save_data):**
Call save_data to create the file with the HTML head, full CSS, header, overall grade \
circle, and grade scale legend.
```
save_data(filename="risk_assessment_report.html", data="<!DOCTYPE html>\\n<html>...")
```
Design requirements:
- Every finding MUST have remediation steps
- Write for developers, not security experts
- Use severity color coding (red=critical/high, orange=medium, blue=low, gray=info)
- Responsive layout, works on mobile
- Self-contained no external CSS/JS dependencies
Include: DOCTYPE, head with ALL styles below, opening body, header with target domain \
and scan date, overall grade circle with score, and the grade scale legend table.
Save and serve:
- save_data(filename="risk_assessment_report.html", data=<html_content>)
- serve_file_to_user(filename="risk_assessment_report.html", \
label="Security Risk Assessment Report")
**CSS to use (copy exactly):**
```
*{margin:0;padding:0;box-sizing:border-box}
body{font-family:'Segoe UI',Tahoma,Geneva,Verdana,sans-serif;background:#f5f7fa;\
color:#333;line-height:1.6}
header{background:linear-gradient(135deg,#1e3c72 0%,#2a5298 100%);color:white;\
padding:40px 20px;text-align:center}
header h1{font-size:2.5em;margin-bottom:10px}
header p{font-size:1.1em;opacity:0.9}
.container{max-width:1200px;margin:40px auto;padding:0 20px}
h2{color:#1e3c72;border-bottom:2px solid #2a5298;padding-bottom:10px;margin-top:30px}
h3{color:#2a5298;margin-top:20px}
.grade-display{text-align:center;margin:40px 0;background:white;padding:40px;\
border-radius:10px;box-shadow:0 2px 10px rgba(0,0,0,0.1)}
.grade-circle{width:120px;height:120px;border-radius:50%;display:flex;\
align-items:center;justify-content:center;margin:0 auto 20px;font-size:3em;\
font-weight:bold;color:white}
.grade-a{background:#27ae60} .grade-b{background:#3498db}
.grade-c{background:#f39c12} .grade-d{background:#e74c3c}
.grade-f{background:#c0392b}
.category-grid{display:grid;grid-template-columns:repeat(auto-fit,minmax(300px,1fr));\
gap:20px;margin:40px 0}
.category-card{background:white;padding:25px;border-radius:10px;\
box-shadow:0 2px 10px rgba(0,0,0,0.1);border-left:5px solid #ccc}
.category-card.a{border-left-color:#27ae60} .category-card.b{border-left-color:#3498db}
.category-card.c{border-left-color:#f39c12} .category-card.d{border-left-color:#e74c3c}
.category-card.f{border-left-color:#c0392b}
.badge{display:inline-block;padding:4px 10px;border-radius:12px;color:white;\
font-weight:bold;font-size:0.85em}
.badge.high{background:#c0392b} .badge.medium{background:#f39c12}
.badge.low{background:#3498db} .badge.info{background:#95a5a6}
.finding{margin:20px 0;padding:20px;background:#f9f9f9;border-left:4px solid #ccc;\
border-radius:5px}
.finding.high{border-left-color:#c0392b} .finding.medium{border-left-color:#f39c12}
.finding.low{border-left-color:#3498db} .finding.info{border-left-color:#95a5a6}
.remediation{margin-top:15px;padding:15px;background:white;border-radius:5px;\
border-left:3px solid #27ae60}
.remediation h5{color:#27ae60;margin-bottom:10px}
pre{background:#2c3e50;color:#ecf0f1;padding:15px;border-radius:5px;overflow-x:auto;\
margin:10px 0;font-family:'Courier New',monospace;font-size:0.9em}
.card{background:white;border-radius:10px;padding:25px;margin:20px 0;\
box-shadow:0 2px 10px rgba(0,0,0,0.1)}
.footer{text-align:center;padding:30px 20px;color:#666;border-top:1px solid #ddd;\
margin-top:50px}
.grade-scale{background:white;padding:25px;border-radius:10px;margin:30px 0}
.grade-scale-item{padding:10px 0;border-bottom:1px solid #eee}
@media(max-width:768px){.category-grid{grid-template-columns:1fr}\
header h1{font-size:1.8em}.grade-circle{width:80px;height:80px;font-size:2em}}
```
**STEP 2 Present to user (text only, NO tool calls):**
Tell the user the report is ready. Summarize: overall grade, weakest category, \
top 3 action items.
**Grade circle HTML pattern:**
```
<div class="grade-display">
<div class="grade-circle grade-{letter}">{LETTER}</div>
<p style="font-size:1.8em;margin:20px 0">Overall Score: {score}/100</p>
<p style="color:#666">{one-line assessment}</p>
</div>
```
After presenting, call ask_user() to wait for follow-up questions.
**Grade scale legend pattern:**
```
<div class="grade-scale">
<h3>Grade Scale</h3>
<div class="grade-scale-item"><strong>A (90-100):</strong> Excellent</div>
<div class="grade-scale-item"><strong>B (75-89):</strong> Good</div>
<div class="grade-scale-item"><strong>C (60-74):</strong> Fair</div>
<div class="grade-scale-item"><strong>D (40-59):</strong> Poor</div>
<div class="grade-scale-item"><strong>F (0-39):</strong> Critical</div>
</div>
```
**STEP 3 After the user responds:**
End Step 1 after the grade scale closing div. Do NOT close body/html yet.
**Step 2 Append category breakdown grid (append_data):**
```
append_data(filename="risk_assessment_report.html", data="<h2>Category Breakdown</h2>...")
```
Use this pattern for each of the 6 category cards:
```
<div class="category-card {letter}">
<h3>{Category Name}</h3>
<p><span class="badge {letter_class}">Grade: {LETTER} ({score})</span></p>
<p>{findings_count} findings</p>
<p style="color:#666;font-size:0.95em">{one-line summary}</p>
</div>
```
Wrap all 6 cards in `<div class="category-grid">...</div>`. Close the grid div.
**Step 3 Append detailed findings PER CATEGORY (one append_data per category):**
For EACH of the 6 categories that has findings, call append_data separately:
```
append_data(filename="risk_assessment_report.html", data="<h3>{Category Name} (Grade: {LETTER})</h3>...")
```
Skip categories with 0 findings. For each finding, use this exact pattern:
```
<div class="finding {severity}">
<h4>{Title} <span class="badge {severity}">{SEVERITY}</span></h4>
<p><strong>Impact:</strong> {why it matters}</p>
<div class="remediation">
<h5>How to Fix</h5>
<p>{step-by-step instructions}</p>
<pre>{code example if relevant}</pre>
</div>
</div>
```
Where {severity} is one of: high, medium, low, info.
**Step 4 Append footer section (append_data):**
```
append_data(filename="risk_assessment_report.html", data="<h2>Top Risks</h2>...")
```
Include:
- Top Risks: prioritized action items as a numbered list
- Methodology: "This assessment used passive, OSINT-based scanning..."
- Disclaimer in a card: "This is an automated passive assessment, not a comprehensive \
penetration test..."
- Close with `</div></body></html>`
**Step 5 Serve the file:**
Call serve_file_to_user(filename="risk_assessment_report.html", open_in_browser=true)
Print the file_path from the result so the user can click it later.
**Step 6 Present to user (text only, NO tool calls):**
Summarize: overall grade, weakest category, top 3 action items. \
After presenting, call ask_user() for follow-ups.
**Step 7 After the user responds:**
- Answer any questions about findings or remediation
- Call ask_user() again if they have more questions
- When the user is satisfied: set_output("report_status", "completed")
**IMPORTANT:**
- Every finding MUST have remediation steps
- Write for developers, not security experts
- ALWAYS print the full file path so users can easily access the file later
- If an append_data call fails with a truncation error, break that chunk into smaller pieces
""",
tools=["save_data", "serve_file_to_user"],
tools=["save_data", "append_data", "serve_file_to_user"],
)
__all__ = [
@@ -45,6 +45,9 @@ EMAIL_CREDENTIALS = {
"gmail_modify_message",
"gmail_batch_modify_messages",
"gmail_batch_get_messages",
"gmail_create_draft",
"gmail_list_labels",
"gmail_create_label",
],
node_types=[],
required=True,
@@ -457,3 +457,143 @@ def register_tools(
messages.append(result)
return {"messages": messages, "count": len(messages), "errors": errors}
@mcp.tool()
def gmail_create_draft(
to: str,
subject: str,
html: str,
) -> dict:
"""
Create a draft email in the user's Gmail Drafts folder.
The draft can be reviewed and sent manually from Gmail.
Args:
to: Recipient email address.
subject: Email subject line.
html: Email body as HTML string.
Returns:
Dict with "success", "draft_id", and "message_id",
or error dict with "error" and optional "help" keys.
"""
if not to or not to.strip():
return {"error": "Recipient email (to) is required"}
if not subject or not subject.strip():
return {"error": "Subject is required"}
if not html:
return {"error": "Email body (html) is required"}
token = _require_token()
if isinstance(token, dict):
return token
from email.mime.text import MIMEText
msg = MIMEText(html, "html")
msg["To"] = to
msg["Subject"] = subject
raw = base64.urlsafe_b64encode(msg.as_bytes()).decode("ascii")
try:
response = _gmail_request(
"POST",
"drafts",
token,
json={"message": {"raw": raw}},
)
except httpx.HTTPError as e:
return {"error": f"Request failed: {e}"}
error = _handle_error(response)
if error:
return error
data = response.json()
return {
"success": True,
"draft_id": data.get("id", ""),
"message_id": data.get("message", {}).get("id", ""),
}
@mcp.tool()
def gmail_list_labels() -> dict:
"""
List all Gmail labels for the user's account.
Returns both system labels (INBOX, SENT, SPAM, TRASH, etc.) and
user-created custom labels.
Returns:
Dict with "labels" list (each has "id", "name", "type"),
or error dict.
"""
token = _require_token()
if isinstance(token, dict):
return token
try:
response = _gmail_request("GET", "labels", token)
except httpx.HTTPError as e:
return {"error": f"Request failed: {e}"}
error = _handle_error(response)
if error:
return error
data = response.json()
return {"labels": data.get("labels", [])}
@mcp.tool()
def gmail_create_label(
name: str,
label_list_visibility: Literal["labelShow", "labelShowIfUnread", "labelHide"] = "labelShow",
message_list_visibility: Literal["show", "hide"] = "show",
) -> dict:
"""
Create a new Gmail label.
Args:
name: The display name for the new label. Must be unique.
Supports nesting with "/" separator (e.g. "Agent/Important").
label_list_visibility: Whether label appears in the label list.
"labelShow" (default) - always visible.
"labelShowIfUnread" - only visible when unread mail exists.
"labelHide" - hidden from label list.
message_list_visibility: Whether label appears in message list.
"show" (default) or "hide".
Returns:
Dict with "success", "id", "name", and "type", or error dict.
"""
if not name or not name.strip():
return {"error": "Label name is required"}
token = _require_token()
if isinstance(token, dict):
return token
body = {
"name": name,
"labelListVisibility": label_list_visibility,
"messageListVisibility": message_list_visibility,
}
try:
response = _gmail_request("POST", "labels", token, json=body)
except httpx.HTTPError as e:
return {"error": f"Request failed: {e}"}
error = _handle_error(response)
if error:
return error
data = response.json()
return {
"success": True,
"id": data.get("id", ""),
"name": data.get("name", ""),
"type": data.get("type", "user"),
}
+151
View File
@@ -44,6 +44,16 @@ def batch_fn(gmail_tools):
return gmail_tools["gmail_batch_modify_messages"]
@pytest.fixture
def list_labels_fn(gmail_tools):
return gmail_tools["gmail_list_labels"]
@pytest.fixture
def create_label_fn(gmail_tools):
return gmail_tools["gmail_create_label"]
def _mock_response(
status_code: int = 200, json_data: dict | None = None, text: str = ""
) -> MagicMock:
@@ -91,6 +101,18 @@ class TestCredentials:
result = batch_fn(message_ids=["abc"], add_labels=["STARRED"])
assert "error" in result
def test_list_labels_no_credentials(self, list_labels_fn, monkeypatch):
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
result = list_labels_fn()
assert "error" in result
assert "Gmail credentials not configured" in result["error"]
def test_create_label_no_credentials(self, create_label_fn, monkeypatch):
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
result = create_label_fn(name="Test")
assert "error" in result
assert "Gmail credentials not configured" in result["error"]
# ---------------------------------------------------------------------------
# gmail_list_messages
@@ -391,3 +413,132 @@ class TestBatchModifyMessages:
result = batch_fn(message_ids=["msg1"], add_labels=["FAKE_LABEL"])
assert "error" in result
# ---------------------------------------------------------------------------
# gmail_list_labels
# ---------------------------------------------------------------------------
class TestListLabels:
def test_list_labels_success(self, list_labels_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
mock_resp = _mock_response(
200,
{
"labels": [
{"id": "INBOX", "name": "INBOX", "type": "system"},
{"id": "Label_1", "name": "MyLabel", "type": "user"},
],
},
)
with patch(HTTPX_MODULE, return_value=mock_resp) as mock_req:
result = list_labels_fn()
assert len(result["labels"]) == 2
assert result["labels"][0]["id"] == "INBOX"
assert result["labels"][1]["name"] == "MyLabel"
call_args = mock_req.call_args
assert call_args[0][0] == "GET"
assert "labels" in call_args[0][1]
def test_list_labels_empty(self, list_labels_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
mock_resp = _mock_response(200, {})
with patch(HTTPX_MODULE, return_value=mock_resp):
result = list_labels_fn()
assert result["labels"] == []
def test_list_labels_token_expired(self, list_labels_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "expired")
mock_resp = _mock_response(401)
with patch(HTTPX_MODULE, return_value=mock_resp):
result = list_labels_fn()
assert "error" in result
assert "expired" in result["error"].lower() or "invalid" in result["error"].lower()
def test_list_labels_network_error(self, list_labels_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
with patch(HTTPX_MODULE, side_effect=httpx.HTTPError("connection refused")):
result = list_labels_fn()
assert "error" in result
assert "Request failed" in result["error"]
# ---------------------------------------------------------------------------
# gmail_create_label
# ---------------------------------------------------------------------------
class TestCreateLabel:
def test_create_label_success(self, create_label_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
mock_resp = _mock_response(
200,
{
"id": "Label_42",
"name": "Agent/Important",
"type": "user",
},
)
with patch(HTTPX_MODULE, return_value=mock_resp) as mock_req:
result = create_label_fn(name="Agent/Important")
assert result["success"] is True
assert result["id"] == "Label_42"
assert result["name"] == "Agent/Important"
assert result["type"] == "user"
body = mock_req.call_args[1]["json"]
assert body["name"] == "Agent/Important"
assert body["labelListVisibility"] == "labelShow"
assert body["messageListVisibility"] == "show"
def test_create_label_custom_visibility(self, create_label_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
mock_resp = _mock_response(
200,
{"id": "Label_43", "name": "Hidden", "type": "user"},
)
with patch(HTTPX_MODULE, return_value=mock_resp) as mock_req:
result = create_label_fn(
name="Hidden",
label_list_visibility="labelHide",
message_list_visibility="hide",
)
assert result["success"] is True
body = mock_req.call_args[1]["json"]
assert body["labelListVisibility"] == "labelHide"
assert body["messageListVisibility"] == "hide"
def test_create_label_empty_name(self, create_label_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
result = create_label_fn(name="")
assert "error" in result
assert "Label name is required" in result["error"]
def test_create_label_whitespace_name(self, create_label_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
result = create_label_fn(name=" ")
assert "error" in result
assert "Label name is required" in result["error"]
def test_create_label_api_error(self, create_label_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
mock_resp = _mock_response(409, text="Label name exists")
with patch(HTTPX_MODULE, return_value=mock_resp):
result = create_label_fn(name="Duplicate")
assert "error" in result
assert "409" in result["error"]
def test_create_label_network_error(self, create_label_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
with patch(HTTPX_MODULE, side_effect=httpx.HTTPError("timeout")):
result = create_label_fn(name="Test")
assert "error" in result
assert "Request failed" in result["error"]