fixes to inbox agent

This commit is contained in:
bryan
2026-02-18 19:08:55 -08:00
parent 39216a4c12
commit 20c92b197a
17 changed files with 454 additions and 221 deletions
+32 -75
View File
@@ -13,35 +13,33 @@ logger = logging.getLogger(__name__)
def ensure_credential_key_env() -> None:
"""Load HIVE_CREDENTIAL_KEY from shell config if not already in environment.
"""Load HIVE_CREDENTIAL_KEY and ADEN_API_KEY from shell config if not in environment.
The setup-credentials skill writes the encryption key to ~/.zshrc or ~/.bashrc.
If the user hasn't sourced their config in the current shell, this reads it
directly so the runner (and any MCP subprocesses it spawns) can unlock the
encrypted credential store.
Only HIVE_CREDENTIAL_KEY is loaded this way all other secrets (API keys, etc.)
come from the credential store itself.
The setup-credentials skill writes these to ~/.zshrc or ~/.bashrc.
If the user hasn't sourced their config in the current shell, this reads
them directly so the runner (and any MCP subprocesses it spawns) can:
- Unlock the encrypted credential store (HIVE_CREDENTIAL_KEY)
- Enable Aden OAuth sync for Google/HubSpot/etc. (ADEN_API_KEY)
"""
if os.environ.get("HIVE_CREDENTIAL_KEY"):
return
try:
from aden_tools.credentials.shell_config import check_env_var_in_shell_config
found, value = check_env_var_in_shell_config("HIVE_CREDENTIAL_KEY")
if found and value:
os.environ["HIVE_CREDENTIAL_KEY"] = value
logger.debug("Loaded HIVE_CREDENTIAL_KEY from shell config")
except ImportError:
pass
return
for var_name in ("HIVE_CREDENTIAL_KEY", "ADEN_API_KEY"):
if os.environ.get(var_name):
continue
found, value = check_env_var_in_shell_config(var_name)
if found and value:
os.environ[var_name] = value
logger.debug("Loaded %s from shell config", var_name)
def validate_agent_credentials(nodes: list) -> None:
"""Check that required credentials are available before running an agent.
Scans node specs for required tools and node types, then checks whether
the corresponding credentials exist in the credential store.
Uses CredentialStoreAdapter.default() which includes Aden sync support,
correctly resolving OAuth credentials stored under hashed IDs.
Raises CredentialError with actionable guidance if any are missing.
@@ -55,71 +53,30 @@ def validate_agent_credentials(nodes: list) -> None:
node_types: set[str] = {node.node_type for node in nodes}
try:
from aden_tools.credentials import CREDENTIAL_SPECS
from framework.credentials import CredentialStore
from framework.credentials.storage import (
CompositeStorage,
EncryptedFileStorage,
EnvVarStorage,
)
from aden_tools.credentials.store_adapter import CredentialStoreAdapter
except ImportError:
return # aden_tools not installed, skip check
# Build credential store
env_mapping = {
(spec.credential_id or name): spec.env_var for name, spec in CREDENTIAL_SPECS.items()
}
storages: list = [EnvVarStorage(env_mapping=env_mapping)]
if os.environ.get("HIVE_CREDENTIAL_KEY"):
storages.insert(0, EncryptedFileStorage())
if len(storages) == 1:
storage = storages[0]
else:
storage = CompositeStorage(primary=storages[0], fallbacks=storages[1:])
store = CredentialStore(storage=storage)
# Build reverse mappings
tool_to_cred: dict[str, str] = {}
node_type_to_cred: dict[str, str] = {}
for cred_name, spec in CREDENTIAL_SPECS.items():
for tool_name in spec.tools:
tool_to_cred[tool_name] = cred_name
for nt in spec.node_types:
node_type_to_cred[nt] = cred_name
ensure_credential_key_env()
adapter = CredentialStoreAdapter.default()
missing: list[str] = []
checked: set[str] = set()
# Check tool credentials
for tool_name in sorted(required_tools):
cred_name = tool_to_cred.get(tool_name)
if cred_name is None or cred_name in checked:
continue
checked.add(cred_name)
spec = CREDENTIAL_SPECS[cred_name]
cred_id = spec.credential_id or cred_name
if spec.required and not store.is_available(cred_id):
affected = sorted(t for t in required_tools if t in spec.tools)
entry = f" {spec.env_var} for {', '.join(affected)}"
if spec.help_url:
entry += f"\n Get it at: {spec.help_url}"
missing.append(entry)
for _cred_name, spec in adapter.get_missing_for_tools(sorted(required_tools)):
affected = sorted(t for t in required_tools if t in spec.tools)
entry = f" {spec.env_var} for {', '.join(affected)}"
if spec.help_url:
entry += f"\n Get it at: {spec.help_url}"
missing.append(entry)
# Check node type credentials (e.g., ANTHROPIC_API_KEY for LLM nodes)
for nt in sorted(node_types):
cred_name = node_type_to_cred.get(nt)
if cred_name is None or cred_name in checked:
continue
checked.add(cred_name)
spec = CREDENTIAL_SPECS[cred_name]
cred_id = spec.credential_id or cred_name
if spec.required and not store.is_available(cred_id):
affected_types = sorted(t for t in node_types if t in spec.node_types)
entry = f" {spec.env_var} for {', '.join(affected_types)} nodes"
if spec.help_url:
entry += f"\n Get it at: {spec.help_url}"
missing.append(entry)
for _cred_name, spec in adapter.get_missing_for_node_types(sorted(node_types)):
affected_types = sorted(t for t in node_types if t in spec.node_types)
entry = f" {spec.env_var} for {', '.join(affected_types)} nodes"
if spec.help_url:
entry += f"\n Get it at: {spec.help_url}"
missing.append(entry)
if missing:
from framework.credentials.models import CredentialError
+14 -5
View File
@@ -205,6 +205,7 @@ class EventLoopNode(NodeProtocol):
self._injection_queue: asyncio.Queue[str] = asyncio.Queue()
# Client-facing input blocking state
self._input_ready = asyncio.Event()
self._awaiting_input = False
self._shutdown = False
def validate_input(self, ctx: NodeContext) -> list[str]:
@@ -303,7 +304,7 @@ class EventLoopNode(NodeProtocol):
set_output_tool = self._build_set_output_tool(ctx.node_spec.output_keys)
if set_output_tool:
tools.append(set_output_tool)
if ctx.node_spec.client_facing:
if ctx.node_spec.client_facing and not ctx.event_triggered:
tools.append(self._build_ask_user_tool())
logger.info(
@@ -599,7 +600,7 @@ class EventLoopNode(NodeProtocol):
"same tool calls with identical arguments. "
"Try a different approach or different arguments."
)
if ctx.node_spec.client_facing:
if ctx.node_spec.client_facing and not ctx.event_triggered:
await conversation.add_user_message(warning_msg)
self._input_ready.clear()
if self._event_bus:
@@ -608,7 +609,11 @@ class EventLoopNode(NodeProtocol):
node_id=node_id,
prompt=doom_desc,
)
await self._input_ready.wait()
self._awaiting_input = True
try:
await self._input_ready.wait()
finally:
self._awaiting_input = False
recent_tool_fingerprints.clear()
recent_responses.clear()
else:
@@ -636,7 +641,7 @@ class EventLoopNode(NodeProtocol):
# conversation — they flow through without blocking.
_cf_block = False
_cf_auto = False
if ctx.node_spec.client_facing:
if ctx.node_spec.client_facing and not ctx.event_triggered:
if user_input_requested:
_cf_block = True
elif assistant_text and not real_tool_results and not outputs_set:
@@ -1035,7 +1040,11 @@ class EventLoopNode(NodeProtocol):
prompt="",
)
await self._input_ready.wait()
self._awaiting_input = True
try:
await self._input_ready.wait()
finally:
self._awaiting_input = False
return not self._shutdown
# -------------------------------------------------------------------
+6
View File
@@ -346,6 +346,9 @@ class GraphExecutor:
for key, value in input_data.items():
memory.write(key, value)
# Detect event-triggered execution (timer/webhook) — no interactive user.
_event_triggered = bool(input_data and isinstance(input_data.get("event"), dict))
path: list[str] = []
total_tokens = 0
total_latency = 0
@@ -690,6 +693,7 @@ class GraphExecutor:
inherited_conversation=continuous_conversation if is_continuous else None,
override_tools=cumulative_tools if is_continuous else None,
cumulative_output_keys=cumulative_output_keys if is_continuous else None,
event_triggered=_event_triggered,
)
# Log actual input data being read
@@ -1430,6 +1434,7 @@ class GraphExecutor:
inherited_conversation: Any = None,
override_tools: list | None = None,
cumulative_output_keys: list[str] | None = None,
event_triggered: bool = False,
) -> NodeContext:
"""Build execution context for a node."""
# Filter tools to those available to this node
@@ -1463,6 +1468,7 @@ class GraphExecutor:
continuous_mode=continuous_mode,
inherited_conversation=inherited_conversation,
cumulative_output_keys=cumulative_output_keys or [],
event_triggered=event_triggered,
)
# Valid node types - no ambiguous "llm" type allowed
+3
View File
@@ -498,6 +498,9 @@ class NodeContext:
inherited_conversation: Any = None # NodeConversation | None (from prior node)
cumulative_output_keys: list[str] = field(default_factory=list) # All output keys from path
# Event-triggered execution (no interactive user attached)
event_triggered: bool = False
@dataclass
class NodeResult:
+19 -18
View File
@@ -3773,30 +3773,31 @@ def load_exported_plan(
def _get_credential_store():
"""Get a CredentialStore that checks encrypted files and env vars.
"""Get a CredentialStore that checks encrypted files, env vars, and Aden sync.
Uses CompositeStorage: encrypted file storage (primary) with env var fallback.
This ensures credentials stored via `store_credential` AND env vars are both found.
Uses CredentialStoreAdapter.default() which handles:
- Aden sync + provider index (resolving hashed IDs for OAuth)
- CompositeStorage (encrypted primary + env fallback)
- Auto-refresh of OAuth tokens
- Graceful fallback if Aden is unavailable
"""
from framework.credentials import CredentialStore
from framework.credentials.storage import CompositeStorage, EncryptedFileStorage, EnvVarStorage
# Build env var mapping from CREDENTIAL_SPECS for the fallback
env_mapping: dict[str, str] = {}
try:
from aden_tools.credentials import CREDENTIAL_SPECS
from aden_tools.credentials.store_adapter import CredentialStoreAdapter
for name, spec in CREDENTIAL_SPECS.items():
cred_id = spec.credential_id or name
env_mapping[cred_id] = spec.env_var
return CredentialStoreAdapter.default().store
except ImportError:
pass
from framework.credentials import CredentialStore
from framework.credentials.storage import (
CompositeStorage,
EncryptedFileStorage,
EnvVarStorage,
)
storage = CompositeStorage(
primary=EncryptedFileStorage(),
fallbacks=[EnvVarStorage(env_mapping=env_mapping)],
)
return CredentialStore(storage=storage)
storage = CompositeStorage(
primary=EncryptedFileStorage(),
fallbacks=[EnvVarStorage(env_mapping={})],
)
return CredentialStore(storage=storage)
@mcp.tool()
+19 -69
View File
@@ -1064,83 +1064,33 @@ class AgentRunner:
warnings.append(f"Missing tool implementations: {', '.join(missing_tools)}")
# Check credentials for required tools and node types
# Uses CredentialStore (encrypted files + env var fallback)
# Uses CredentialStoreAdapter.default() which includes Aden sync support
missing_credentials = []
try:
from aden_tools.credentials import CREDENTIAL_SPECS
from aden_tools.credentials.store_adapter import CredentialStoreAdapter
from framework.credentials import CredentialStore
from framework.credentials.storage import (
CompositeStorage,
EncryptedFileStorage,
EnvVarStorage,
)
# Build env mapping for credential lookup
env_mapping = {
(spec.credential_id or name): spec.env_var
for name, spec in CREDENTIAL_SPECS.items()
}
# Only use EncryptedFileStorage if the encryption key is configured;
# otherwise just check env vars (avoids generating a throwaway key)
storages: list = [EnvVarStorage(env_mapping=env_mapping)]
if os.environ.get("HIVE_CREDENTIAL_KEY"):
storages.insert(0, EncryptedFileStorage())
if len(storages) == 1:
storage = storages[0]
else:
storage = CompositeStorage(
primary=storages[0],
fallbacks=storages[1:],
)
store = CredentialStore(storage=storage)
# Build reverse mappings
tool_to_cred: dict[str, str] = {}
node_type_to_cred: dict[str, str] = {}
for cred_name, spec in CREDENTIAL_SPECS.items():
for tool_name in spec.tools:
tool_to_cred[tool_name] = cred_name
for nt in spec.node_types:
node_type_to_cred[nt] = cred_name
adapter = CredentialStoreAdapter.default()
# Check tool credentials
checked: set[str] = set()
for tool_name in info.required_tools:
cred_name = tool_to_cred.get(tool_name)
if cred_name is None or cred_name in checked:
continue
checked.add(cred_name)
spec = CREDENTIAL_SPECS[cred_name]
cred_id = spec.credential_id or cred_name
if spec.required and not store.is_available(cred_id):
missing_credentials.append(spec.env_var)
affected_tools = [t for t in info.required_tools if t in spec.tools]
tools_str = ", ".join(affected_tools)
warning_msg = f"Missing {spec.env_var} for {tools_str}"
if spec.help_url:
warning_msg += f"\n Get it at: {spec.help_url}"
warnings.append(warning_msg)
for _cred_name, spec in adapter.get_missing_for_tools(list(info.required_tools)):
missing_credentials.append(spec.env_var)
affected_tools = [t for t in info.required_tools if t in spec.tools]
tools_str = ", ".join(affected_tools)
warning_msg = f"Missing {spec.env_var} for {tools_str}"
if spec.help_url:
warning_msg += f"\n Get it at: {spec.help_url}"
warnings.append(warning_msg)
# Check node type credentials (e.g., ANTHROPIC_API_KEY for LLM nodes)
node_types = list({node.node_type for node in self.graph.nodes})
for nt in node_types:
cred_name = node_type_to_cred.get(nt)
if cred_name is None or cred_name in checked:
continue
checked.add(cred_name)
spec = CREDENTIAL_SPECS[cred_name]
cred_id = spec.credential_id or cred_name
if spec.required and not store.is_available(cred_id):
missing_credentials.append(spec.env_var)
affected_types = [t for t in node_types if t in spec.node_types]
types_str = ", ".join(affected_types)
warning_msg = f"Missing {spec.env_var} for {types_str} nodes"
if spec.help_url:
warning_msg += f"\n Get it at: {spec.help_url}"
warnings.append(warning_msg)
for _cred_name, spec in adapter.get_missing_for_node_types(node_types):
missing_credentials.append(spec.env_var)
affected_types = [t for t in node_types if t in spec.node_types]
types_str = ", ".join(affected_types)
warning_msg = f"Missing {spec.env_var} for {types_str} nodes"
if spec.help_url:
warning_msg += f"\n Get it at: {spec.help_url}"
warnings.append(warning_msg)
except ImportError:
# aden_tools not installed - fall back to direct check
has_llm_nodes = any(
+36 -13
View File
@@ -341,19 +341,25 @@ class AgentRuntime:
while self._running:
self._timer_next_fire.pop(entry_point_id, None)
try:
session_state = self._get_primary_session_state(
exclude_entry_point=entry_point_id
)
await self.trigger(
entry_point_id,
{"event": {"source": "timer", "reason": "scheduled"}},
session_state=session_state,
)
logger.info(
"Timer fired for entry point '%s' (next in %s min)",
entry_point_id,
mins,
)
if self._should_skip_timer(entry_point_id):
logger.info(
"Timer '%s' skipped — primary stream busy",
entry_point_id,
)
else:
session_state = self._get_primary_session_state(
exclude_entry_point=entry_point_id
)
await self.trigger(
entry_point_id,
{"event": {"source": "timer", "reason": "scheduled"}},
session_state=session_state,
)
logger.info(
"Timer fired for entry point '%s' (next in %s min)",
entry_point_id,
mins,
)
except Exception:
logger.error(
"Timer trigger failed for '%s'",
@@ -469,6 +475,23 @@ class AgentRuntime:
raise ValueError(f"Entry point '{entry_point_id}' not found")
return await stream.wait_for_completion(exec_id, timeout)
def _should_skip_timer(self, timer_ep_id: str) -> bool:
"""Return True if a non-timer stream is actively running (not waiting for input).
Timers should only fire when the primary stream is idle (blocked
waiting for client input) or has no active execution. This prevents
concurrent pipeline runs that would race on shared memory.
"""
for ep_id, stream in self._streams.items():
if ep_id == timer_ep_id:
continue
spec = self._entry_points.get(ep_id)
if spec and spec.trigger_type == "timer":
continue
if stream.active_execution_ids and not stream.is_awaiting_input:
return True
return False
def _get_primary_session_state(self, exclude_entry_point: str) -> dict[str, Any] | None:
"""Build session_state so an async entry point runs in the primary session.
@@ -201,6 +201,17 @@ class ExecutionStream:
"""Return IDs of all currently active executions."""
return list(self._active_executions.keys())
@property
def is_awaiting_input(self) -> bool:
"""True when an active execution is blocked waiting for client input."""
if not self._active_executors:
return False
for executor in self._active_executors.values():
for node in executor.node_registry.values():
if getattr(node, "_awaiting_input", False):
return True
return False
def _record_execution_result(self, execution_id: str, result: ExecutionResult) -> None:
"""Record a completed execution result with retention pruning."""
self._execution_results[execution_id] = result
@@ -27,7 +27,8 @@ goal = Goal(
"Manage Gmail inbox emails autonomously using user-defined free-text rules. "
"For every five minutes, fetch inbox emails (configurable batch size, default 100), "
"apply the user's rules to each email, and execute the appropriate Gmail actions — trash, "
"mark as spam, mark important, mark read/unread, star, and more."
"mark as spam, mark important, mark read/unread, star, draft replies, "
"create/apply custom labels, and more."
),
success_criteria=[
SuccessCriterion(
@@ -38,7 +39,7 @@ goal = Goal(
),
metric="action_correctness",
target=">=95%",
weight=0.35,
weight=0.30,
),
SuccessCriterion(
id="action-report",
@@ -48,7 +49,7 @@ goal = Goal(
),
metric="report_completeness",
target="100%",
weight=0.3,
weight=0.25,
),
SuccessCriterion(
id="batch-completeness",
@@ -58,7 +59,14 @@ goal = Goal(
),
metric="emails_processed_ratio",
target="100%",
weight=0.35,
weight=0.30,
),
SuccessCriterion(
id="label-management",
description="Custom labels are created and applied correctly when rules require them",
metric="label_coverage",
target="100%",
weight=0.15,
),
],
constraints=[
@@ -77,6 +85,12 @@ goal = Goal(
constraint_type="hard",
category="safety",
),
Constraint(
id="draft-not-send",
description="Agent creates draft replies but NEVER sends them automatically",
constraint_type="hard",
category="safety",
),
],
)
@@ -138,14 +152,16 @@ pause_nodes = []
terminal_nodes = []
loop_config = {
"max_iterations": 100,
"max_tool_calls_per_turn": 50,
"max_tool_calls_per_turn": 30,
"max_tool_result_chars": 8000,
"max_history_tokens": 32000,
}
conversation_mode = "continuous"
identity_prompt = (
"You are an email inbox management assistant. You help users manage "
"their Gmail inbox by applying free-text rules to emails — trash, "
"mark as spam, mark important, mark read/unread, star, and more."
"mark as spam, mark important, mark read/unread, star, draft replies, "
"create/apply custom labels, and more."
)
@@ -372,4 +388,4 @@ class EmailInboxManagementAgent:
# Create default instance
default_agent = EmailInboxManagementAgent()
default_agent = EmailInboxManagementAgent()
@@ -14,13 +14,15 @@ class AgentMetadata:
description: str = (
"Automatically manage Gmail inbox emails using free-text rules. "
"Trash junk, mark spam, mark important, mark read/unread, star, "
"and more — using only native Gmail actions."
"draft replies, create/apply custom labels, and more — using only "
"native Gmail actions."
)
intro_message: str = (
"Hi! I'm your email inbox management assistant. Tell me your rules "
"(what to trash, mark as spam, mark important, etc.) and I'll run an "
"initial triage of your inbox. After that, I'll automatically check "
"and process new emails every 5 minutes — so you can set it and forget it. "
"(what to trash, mark as spam, mark important, draft replies to, "
"label with custom labels, etc.) and I'll run an initial triage of "
"your inbox. After that, I'll automatically check and process new "
"emails every 5 minutes — so you can set it and forget it. "
"What rules would you like me to apply?"
)
@@ -30,6 +30,8 @@ The following Gmail actions are available — map the user's rules to whichever
- **Mark as read** / mark as unread
- **Star** / unstar emails
- **Add/remove Gmail labels** (INBOX, UNREAD, IMPORTANT, STARRED, SPAM, CATEGORY_PERSONAL, CATEGORY_SOCIAL, CATEGORY_PROMOTIONS, CATEGORY_UPDATES, CATEGORY_FORUMS)
- **Draft replies** create draft reply emails (never sent automatically)
- **Create/apply custom labels** create new Gmail labels and apply them to emails
Present the rules back to the user in plain language. Do NOT refuse rules if the user asks for any of the above actions, confirm you will do it.
@@ -37,12 +39,16 @@ Also confirm the batch size (max_emails). If max_emails is not provided, default
Ask the user to confirm: "Does this look right? I'll proceed once you confirm."
**STEP 2 After the user confirms, call set_output:**
**STEP 2 Show existing labels (tool call):**
Call gmail_list_labels() to show the user their current Gmail labels. This helps them reference existing labels or decide whether new custom labels are needed for their rules.
**STEP 3 After the user confirms, call set_output:**
- set_output("rules", <the confirmed rules as a clear text description>)
- set_output("max_emails", <the confirmed max_emails as a string number, e.g. "100">)
""",
tools=[],
tools=["gmail_list_labels"],
)
# Node 2: Fetch Emails (event_loop — fetches emails with pagination support)
@@ -117,27 +123,36 @@ You are an inbox management assistant. Apply the user's rules to their emails an
- gmail_batch_modify_messages(message_ids, add_labels, remove_labels) Modify Gmail labels in batch. ALWAYS prefer this.
- gmail_modify_message(message_id, add_labels, remove_labels) Modify a single message's labels.
- gmail_trash_message(message_id) Move a message to trash. No batch version; call per email.
- gmail_create_draft(to, subject, body) Create a draft reply. NEVER sends automatically.
- gmail_create_label(name) Create a new Gmail label. Returns the label ID.
- gmail_list_labels() List all existing Gmail labels with their IDs.
- set_output(key, value) Set an output value. Call ONLY after all actions are executed.
**CONTEXT:**
- "rules" = the user's rule to apply (e.g. "mark all as unread")
- "emails" = a filename (e.g. "emails.jsonl") containing the fetched emails as JSONL. Each line has: id, subject, from, to, date, snippet, labels.
**STEP 1 LOAD EMAILS (your first tool call MUST be load_data):**
Call load_data(filename=<the "emails" value from context>) to read the email data.
- If the result is empty, call set_output("actions_taken", "no emails to process") and stop.
- If has_more=true, load more pages with load_data(filename=..., offset=...) until all emails are loaded.
**PROCESS EMAILS ONE CHUNK AT A TIME (you will get multiple turns):**
**STEP 2 DETERMINE STRATEGY:**
- **Blanket rule** (same action for ALL emails, e.g. "mark all as unread"): Collect all message IDs, then execute ONE gmail_batch_modify_messages call.
- **Classification rule** (different actions for different emails): Classify each email, group by action, execute batch operations per group.
Each turn, process exactly ONE chunk: load classify act record. Then STOP and wait for your next turn to load the next chunk.
**STEP 3 EXECUTE ACTIONS:**
Call the appropriate Gmail tool(s) with the real message IDs from the loaded emails. Then record each action:
- append_data(filename="actions.jsonl", data=<JSON of {email_id, subject, from, action}>)
1. Call load_data(filename=<emails value>, limit_bytes=7500).
- Parse the visible JSONL lines: split by \n, JSON.parse each complete line.
- Ignore the last line if it appears cut off (incomplete JSON).
- Note the next_offset_bytes value from the result.
**STEP 4 FINISH:**
After ALL actions are executed, call set_output("actions_taken", "actions.jsonl").
2. Classify the emails in THIS chunk against the rules. For each email, decide the action: trash, draft reply, label change, or no action.
3. Execute Gmail actions for this chunk immediately:
- **Label changes:** gmail_batch_modify_messages for all IDs in this chunk that need the same label change.
- **Trash:** gmail_trash_message per email.
- **Drafts:** gmail_create_draft per email.
- Record each action: append_data(filename="actions.jsonl", data=<JSON of {email_id, subject, from, action}>)
4. If has_more=true, STOP HERE. On your next turn, call load_data with offset_bytes=<next_offset_bytes> and repeat from step 2.
If has_more=false, you are done processing call set_output("actions_taken", "actions.jsonl").
**CRITICAL:** Only call load_data ONCE per turn. Do NOT pre-load multiple chunks. You must see the emails before you can act on them.
**GMAIL LABEL REFERENCE:**
- MARK AS UNREAD add_labels=["UNREAD"]
@@ -149,17 +164,24 @@ After ALL actions are executed, call set_output("actions_taken", "actions.jsonl"
- ARCHIVE remove_labels=["INBOX"]
- MARK AS SPAM add_labels=["SPAM"], remove_labels=["INBOX"]
- TRASH use gmail_trash_message(message_id) per email
- DRAFT REPLY use gmail_create_draft(to=<sender>, subject="Re: <subject>", body=<contextual reply based on email content>). Creates a draft only, never sends.
- CREATE CUSTOM LABEL use gmail_create_label(name=<label_name>) to create, then apply via gmail_modify_message with add_labels=[<label_id>]
- APPLY CUSTOM LABEL add_labels=[<label_id>] using the ID from gmail_create_label or gmail_list_labels
**CRITICAL RULES:**
- Your FIRST tool call MUST be load_data. Do NOT skip this.
- You MUST call Gmail tools to execute real actions. Do NOT just report what should be done.
- Do NOT call set_output until all Gmail actions are executed.
- Pass ONLY the filename "actions.jsonl" to set_output, NOT raw data.
- NEVER send emails. Only create drafts via gmail_create_draft.
""",
tools=[
"gmail_trash_message",
"gmail_modify_message",
"gmail_batch_modify_messages",
"gmail_create_draft",
"gmail_create_label",
"gmail_list_labels",
"load_data",
"append_data",
],
@@ -46,6 +46,8 @@ EMAIL_CREDENTIALS = {
"gmail_batch_modify_messages",
"gmail_batch_get_messages",
"gmail_create_draft",
"gmail_list_labels",
"gmail_create_label",
],
node_types=[],
required=True,
+5 -5
View File
@@ -34,11 +34,6 @@ from .email_tool import register_tools as register_email
from .exa_search_tool import register_tools as register_exa_search
from .example_tool import register_tools as register_example
from .excel_tool import register_tools as register_excel
from .github_tool import register_tools as register_github
from .gmail_tool import register_tools as register_gmail
from .google_docs_tool import register_tools as register_google_docs
from .google_maps_tool import register_tools as register_google_maps
from .http_headers_scanner import register_tools as register_http_headers_scanner
from .file_system_toolkits.apply_diff import register_tools as register_apply_diff
from .file_system_toolkits.apply_patch import register_tools as register_apply_patch
from .file_system_toolkits.data_tools import register_tools as register_data_tools
@@ -54,6 +49,11 @@ from .file_system_toolkits.replace_file_content import (
# Import file system toolkits
from .file_system_toolkits.view_file import register_tools as register_view_file
from .file_system_toolkits.write_to_file import register_tools as register_write_to_file
from .github_tool import register_tools as register_github
from .gmail_tool import register_tools as register_gmail
from .google_docs_tool import register_tools as register_google_docs
from .google_maps_tool import register_tools as register_google_maps
from .http_headers_scanner import register_tools as register_http_headers_scanner
from .hubspot_tool import register_tools as register_hubspot
from .news_tool import register_tools as register_news
from .pdf_read_tool import register_tools as register_pdf_read
@@ -517,3 +517,83 @@ def register_tools(
"draft_id": data.get("id", ""),
"message_id": data.get("message", {}).get("id", ""),
}
@mcp.tool()
def gmail_list_labels() -> dict:
"""
List all Gmail labels for the user's account.
Returns both system labels (INBOX, SENT, SPAM, TRASH, etc.) and
user-created custom labels.
Returns:
Dict with "labels" list (each has "id", "name", "type"),
or error dict.
"""
token = _require_token()
if isinstance(token, dict):
return token
try:
response = _gmail_request("GET", "labels", token)
except httpx.HTTPError as e:
return {"error": f"Request failed: {e}"}
error = _handle_error(response)
if error:
return error
data = response.json()
return {"labels": data.get("labels", [])}
@mcp.tool()
def gmail_create_label(
name: str,
label_list_visibility: Literal["labelShow", "labelShowIfUnread", "labelHide"] = "labelShow",
message_list_visibility: Literal["show", "hide"] = "show",
) -> dict:
"""
Create a new Gmail label.
Args:
name: The display name for the new label. Must be unique.
Supports nesting with "/" separator (e.g. "Agent/Important").
label_list_visibility: Whether label appears in the label list.
"labelShow" (default) - always visible.
"labelShowIfUnread" - only visible when unread mail exists.
"labelHide" - hidden from label list.
message_list_visibility: Whether label appears in message list.
"show" (default) or "hide".
Returns:
Dict with "success", "id", "name", and "type", or error dict.
"""
if not name or not name.strip():
return {"error": "Label name is required"}
token = _require_token()
if isinstance(token, dict):
return token
body = {
"name": name,
"labelListVisibility": label_list_visibility,
"messageListVisibility": message_list_visibility,
}
try:
response = _gmail_request("POST", "labels", token, json=body)
except httpx.HTTPError as e:
return {"error": f"Request failed: {e}"}
error = _handle_error(response)
if error:
return error
data = response.json()
return {
"success": True,
"id": data.get("id", ""),
"name": data.get("name", ""),
"type": data.get("type", "user"),
}
@@ -253,9 +253,7 @@ class _GoogleDocsClient:
)
return self._handle_response(response)
def batch_update(
self, document_id: str, requests: list[dict[str, Any]]
) -> dict[str, Any]:
def batch_update(self, document_id: str, requests: list[dict[str, Any]]) -> dict[str, Any]:
"""Execute multiple requests in a single atomic operation."""
response = httpx.post(
f"{GOOGLE_DOCS_API_BASE}/documents/{document_id}:batchUpdate",
@@ -51,7 +51,8 @@ class TestGoogleDocsCreateDocument:
def test_service_account_json_without_access_token_is_not_used(self, mcp):
"""Test that service account JSON alone is not treated as an access token."""
with patch.dict("os.environ", {"GOOGLE_SERVICE_ACCOUNT_JSON": '{"type":"service_account"}'}):
env = {"GOOGLE_SERVICE_ACCOUNT_JSON": '{"type":"service_account"}'}
with patch.dict("os.environ", env):
tool_fn = get_tool_fn(mcp, "google_docs_create_document")
result = tool_fn(title="Test Document")
assert "error" in result
@@ -160,9 +161,7 @@ class TestGoogleDocsInsertText:
# Mock get document for finding end index
mock_get_response = MagicMock()
mock_get_response.status_code = 200
mock_get_response.json.return_value = {
"body": {"content": [{"endIndex": 100}]}
}
mock_get_response.json.return_value = {"body": {"content": [{"endIndex": 100}]}}
mock_get.return_value = mock_get_response
# Mock batch update
@@ -236,10 +235,12 @@ class TestGoogleDocsBatchUpdate:
mock_post.return_value = mock_response
tool_fn = get_tool_fn(mcp_with_credentials, "google_docs_batch_update")
requests = json.dumps([
{"insertText": {"location": {"index": 1}, "text": "Hello"}},
{"insertText": {"location": {"index": 6}, "text": " World"}},
])
requests = json.dumps(
[
{"insertText": {"location": {"index": 1}, "text": "Hello"}},
{"insertText": {"location": {"index": 6}, "text": " World"}},
]
)
result = tool_fn(document_id="doc123", requests_json=requests)
assert "error" not in result
@@ -469,6 +470,7 @@ class TestServiceAccountTokenExchange:
# Should use the pre-exchanged token and make the API call
assert "error" not in result or "not configured" not in result.get("error", "")
class TestGoogleDocsListComments:
"""Tests for google_docs_list_comments tool."""
+151
View File
@@ -44,6 +44,16 @@ def batch_fn(gmail_tools):
return gmail_tools["gmail_batch_modify_messages"]
@pytest.fixture
def list_labels_fn(gmail_tools):
return gmail_tools["gmail_list_labels"]
@pytest.fixture
def create_label_fn(gmail_tools):
return gmail_tools["gmail_create_label"]
def _mock_response(
status_code: int = 200, json_data: dict | None = None, text: str = ""
) -> MagicMock:
@@ -91,6 +101,18 @@ class TestCredentials:
result = batch_fn(message_ids=["abc"], add_labels=["STARRED"])
assert "error" in result
def test_list_labels_no_credentials(self, list_labels_fn, monkeypatch):
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
result = list_labels_fn()
assert "error" in result
assert "Gmail credentials not configured" in result["error"]
def test_create_label_no_credentials(self, create_label_fn, monkeypatch):
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
result = create_label_fn(name="Test")
assert "error" in result
assert "Gmail credentials not configured" in result["error"]
# ---------------------------------------------------------------------------
# gmail_list_messages
@@ -391,3 +413,132 @@ class TestBatchModifyMessages:
result = batch_fn(message_ids=["msg1"], add_labels=["FAKE_LABEL"])
assert "error" in result
# ---------------------------------------------------------------------------
# gmail_list_labels
# ---------------------------------------------------------------------------
class TestListLabels:
def test_list_labels_success(self, list_labels_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
mock_resp = _mock_response(
200,
{
"labels": [
{"id": "INBOX", "name": "INBOX", "type": "system"},
{"id": "Label_1", "name": "MyLabel", "type": "user"},
],
},
)
with patch(HTTPX_MODULE, return_value=mock_resp) as mock_req:
result = list_labels_fn()
assert len(result["labels"]) == 2
assert result["labels"][0]["id"] == "INBOX"
assert result["labels"][1]["name"] == "MyLabel"
call_args = mock_req.call_args
assert call_args[0][0] == "GET"
assert "labels" in call_args[0][1]
def test_list_labels_empty(self, list_labels_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
mock_resp = _mock_response(200, {})
with patch(HTTPX_MODULE, return_value=mock_resp):
result = list_labels_fn()
assert result["labels"] == []
def test_list_labels_token_expired(self, list_labels_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "expired")
mock_resp = _mock_response(401)
with patch(HTTPX_MODULE, return_value=mock_resp):
result = list_labels_fn()
assert "error" in result
assert "expired" in result["error"].lower() or "invalid" in result["error"].lower()
def test_list_labels_network_error(self, list_labels_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
with patch(HTTPX_MODULE, side_effect=httpx.HTTPError("connection refused")):
result = list_labels_fn()
assert "error" in result
assert "Request failed" in result["error"]
# ---------------------------------------------------------------------------
# gmail_create_label
# ---------------------------------------------------------------------------
class TestCreateLabel:
def test_create_label_success(self, create_label_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
mock_resp = _mock_response(
200,
{
"id": "Label_42",
"name": "Agent/Important",
"type": "user",
},
)
with patch(HTTPX_MODULE, return_value=mock_resp) as mock_req:
result = create_label_fn(name="Agent/Important")
assert result["success"] is True
assert result["id"] == "Label_42"
assert result["name"] == "Agent/Important"
assert result["type"] == "user"
body = mock_req.call_args[1]["json"]
assert body["name"] == "Agent/Important"
assert body["labelListVisibility"] == "labelShow"
assert body["messageListVisibility"] == "show"
def test_create_label_custom_visibility(self, create_label_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
mock_resp = _mock_response(
200,
{"id": "Label_43", "name": "Hidden", "type": "user"},
)
with patch(HTTPX_MODULE, return_value=mock_resp) as mock_req:
result = create_label_fn(
name="Hidden",
label_list_visibility="labelHide",
message_list_visibility="hide",
)
assert result["success"] is True
body = mock_req.call_args[1]["json"]
assert body["labelListVisibility"] == "labelHide"
assert body["messageListVisibility"] == "hide"
def test_create_label_empty_name(self, create_label_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
result = create_label_fn(name="")
assert "error" in result
assert "Label name is required" in result["error"]
def test_create_label_whitespace_name(self, create_label_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
result = create_label_fn(name=" ")
assert "error" in result
assert "Label name is required" in result["error"]
def test_create_label_api_error(self, create_label_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
mock_resp = _mock_response(409, text="Label name exists")
with patch(HTTPX_MODULE, return_value=mock_resp):
result = create_label_fn(name="Duplicate")
assert "error" in result
assert "409" in result["error"]
def test_create_label_network_error(self, create_label_fn, monkeypatch):
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test_token")
with patch(HTTPX_MODULE, side_effect=httpx.HTTPError("timeout")):
result = create_label_fn(name="Test")
assert "error" in result
assert "Request failed" in result["error"]