Compare commits

...

3 Commits

Author SHA1 Message Date
bryan 9870c77c7c wip fitness coach 2026-02-18 14:44:41 -08:00
bryan 5fdcd3c2f5 added hour based timer to built in scheduler 2026-02-18 11:58:32 -08:00
bryan 4161827431 updated calendar tool and added sheets tool 2026-02-18 09:47:18 -08:00
28 changed files with 3642 additions and 268 deletions
+3 -2
View File
@@ -507,8 +507,9 @@ All credential specs are defined in `tools/src/aden_tools/credentials/`:
| ----------------- | ------------- | --------------------------------------------- | -------------- |
| `llm.py` | LLM Providers | `anthropic` | No |
| `search.py` | Search Tools | `brave_search`, `google_search`, `google_cse` | No |
| `email.py` | Email | `resend` | No |
| `integrations.py` | Integrations | `github`, `hubspot`, `google_calendar_oauth` | No / Yes |
| `email.py` | Email/Google | `resend`, `google` | No / Yes |
| `github.py` | Integrations | `github` | No |
| `hubspot.py` | Integrations | `hubspot` | No |
**Note:** Additional LLM providers (Cerebras, Groq, OpenAI) are handled by LiteLLM via environment
variables (`CEREBRAS_API_KEY`, `GROQ_API_KEY`, `OPENAI_API_KEY`) but are not yet in CREDENTIAL_SPECS.
+5 -23
View File
@@ -56,28 +56,12 @@ def validate_agent_credentials(nodes: list) -> None:
try:
from aden_tools.credentials import CREDENTIAL_SPECS
from framework.credentials import CredentialStore
from framework.credentials.storage import (
CompositeStorage,
EncryptedFileStorage,
EnvVarStorage,
)
from aden_tools.credentials.store_adapter import CredentialStoreAdapter
except ImportError:
return # aden_tools not installed, skip check
# Build credential store
env_mapping = {
(spec.credential_id or name): spec.env_var for name, spec in CREDENTIAL_SPECS.items()
}
storages: list = [EnvVarStorage(env_mapping=env_mapping)]
if os.environ.get("HIVE_CREDENTIAL_KEY"):
storages.insert(0, EncryptedFileStorage())
if len(storages) == 1:
storage = storages[0]
else:
storage = CompositeStorage(primary=storages[0], fallbacks=storages[1:])
store = CredentialStore(storage=storage)
# Use the canonical factory which handles Aden sync when ADEN_API_KEY is set
adapter = CredentialStoreAdapter.default()
# Build reverse mappings
tool_to_cred: dict[str, str] = {}
@@ -98,8 +82,7 @@ def validate_agent_credentials(nodes: list) -> None:
continue
checked.add(cred_name)
spec = CREDENTIAL_SPECS[cred_name]
cred_id = spec.credential_id or cred_name
if spec.required and not store.is_available(cred_id):
if spec.required and not adapter.is_available(cred_name):
affected = sorted(t for t in required_tools if t in spec.tools)
entry = f" {spec.env_var} for {', '.join(affected)}"
if spec.help_url:
@@ -113,8 +96,7 @@ def validate_agent_credentials(nodes: list) -> None:
continue
checked.add(cred_name)
spec = CREDENTIAL_SPECS[cred_name]
cred_id = spec.credential_id or cred_name
if spec.required and not store.is_available(cred_id):
if spec.required and not adapter.is_available(cred_name):
affected_types = sorted(t for t in node_types if t in spec.node_types)
entry = f" {spec.env_var} for {', '.join(affected_types)} nodes"
if spec.help_url:
+16
View File
@@ -604,6 +604,22 @@ class GraphSpec(BaseModel):
f"'{entry_point.trigger_type}'. Valid: {valid_triggers}"
)
# Validate trigger_config for timer entry points
if entry_point.trigger_type == "timer":
tc = entry_point.trigger_config
has_interval = bool(tc.get("interval_minutes"))
has_schedule = bool(tc.get("schedule"))
if has_interval and has_schedule:
errors.append(
f"Async entry point '{entry_point.id}' has both "
"interval_minutes and schedule; use one or the other"
)
if not has_interval and not has_schedule:
errors.append(
f"Async entry point '{entry_point.id}' has trigger_type='timer' "
"but neither interval_minutes nor schedule in trigger_config"
)
# Check terminal nodes exist
for term in self.terminal_nodes:
if not self.get_node(term):
+180 -43
View File
@@ -7,11 +7,14 @@ while preserving the goal-driven approach.
import asyncio
import logging
import re
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import TYPE_CHECKING, Any
from zoneinfo import ZoneInfo
from framework.graph.checkpoint_config import CheckpointConfig
from framework.graph.executor import ExecutionResult
@@ -29,6 +32,36 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
_HH_MM_RE = re.compile(r"^([01]\d|2[0-3]):([0-5]\d)$")
def _seconds_until_next_schedule(
schedule: list[str],
tz: ZoneInfo | None = None,
) -> tuple[float, str]:
"""Compute seconds from now until the next scheduled HH:MM time.
Args:
schedule: List of validated "HH:MM" time strings.
tz: Optional timezone. None means system local time.
Returns:
(seconds_until_next_fire, matched_time_str)
"""
now = datetime.now(tz)
best_delta = float("inf")
best_time = schedule[0]
for time_str in schedule:
h, m = int(time_str[:2]), int(time_str[3:5])
target = now.replace(hour=h, minute=m, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
delta = (target - now).total_seconds()
if delta < best_delta:
best_delta = delta
best_time = time_str
return (best_delta, best_time)
@dataclass
class AgentRuntimeConfig:
@@ -323,56 +356,160 @@ class AgentRuntime:
tc = spec.trigger_config
interval = tc.get("interval_minutes")
if not interval or interval <= 0:
schedule = tc.get("schedule")
# Mutual exclusion check
if interval and schedule:
logger.warning(
f"Entry point '{ep_id}' has trigger_type='timer' "
"but no valid interval_minutes in trigger_config"
"Entry point '%s' has both interval_minutes and schedule; "
"use one or the other — skipping",
ep_id,
)
continue
run_immediately = tc.get("run_immediately", False)
if schedule:
# --- Clock-based scheduling ---
# Validate HH:MM format
valid_times: list[str] = []
for t in schedule:
if isinstance(t, str) and _HH_MM_RE.match(t):
valid_times.append(t)
else:
logger.warning(
"Entry point '%s': ignoring invalid schedule time '%s' "
"(expected HH:MM, 00:00-23:59)",
ep_id,
t,
)
if not valid_times:
logger.warning(
"Entry point '%s' has trigger_type='timer' "
"but no valid times in schedule — skipping",
ep_id,
)
continue
def _make_timer(entry_point_id: str, mins: float, immediate: bool):
async def _timer_loop():
interval_secs = mins * 60
if not immediate:
self._timer_next_fire[entry_point_id] = time.monotonic() + interval_secs
await asyncio.sleep(interval_secs)
while self._running:
self._timer_next_fire.pop(entry_point_id, None)
try:
session_state = self._get_primary_session_state(
exclude_entry_point=entry_point_id
)
await self.trigger(
entry_point_id,
{"event": {"source": "timer", "reason": "scheduled"}},
session_state=session_state,
)
logger.info(
"Timer fired for entry point '%s' (next in %s min)",
entry_point_id,
mins,
)
except Exception:
logger.error(
"Timer trigger failed for '%s'",
entry_point_id,
exc_info=True,
)
self._timer_next_fire[entry_point_id] = time.monotonic() + interval_secs
await asyncio.sleep(interval_secs)
# Resolve timezone
tz: ZoneInfo | None = None
tz_name = tc.get("timezone")
if tz_name:
try:
tz = ZoneInfo(tz_name)
except Exception:
logger.warning(
"Entry point '%s': invalid timezone '%s'; "
"falling back to system local time",
ep_id,
tz_name,
)
return _timer_loop
def _make_schedule_timer(
entry_point_id: str,
times: list[str],
timezone: ZoneInfo | None,
):
async def _schedule_loop():
while self._running:
secs, matched_time = _seconds_until_next_schedule(times, timezone)
self._timer_next_fire[entry_point_id] = time.monotonic() + secs
await asyncio.sleep(secs)
if not self._running:
break
self._timer_next_fire.pop(entry_point_id, None)
try:
session_state = self._get_primary_session_state(
exclude_entry_point=entry_point_id
)
await self.trigger(
entry_point_id,
{
"event": {
"source": "timer",
"reason": "scheduled",
"scheduled_time": matched_time,
}
},
session_state=session_state,
)
logger.info(
"Schedule timer fired for '%s' at %s",
entry_point_id,
matched_time,
)
except Exception:
logger.error(
"Schedule timer trigger failed for '%s'",
entry_point_id,
exc_info=True,
)
task = asyncio.create_task(_make_timer(ep_id, interval, run_immediately)())
self._timer_tasks.append(task)
logger.info(
"Started timer for entry point '%s' every %s min%s",
ep_id,
interval,
" (immediate first run)" if run_immediately else "",
)
return _schedule_loop
task = asyncio.create_task(_make_schedule_timer(ep_id, valid_times, tz)())
self._timer_tasks.append(task)
logger.info(
"Started schedule timer for entry point '%s' at %s%s",
ep_id,
", ".join(valid_times),
f" ({tz_name})" if tz_name else " (local)",
)
elif interval and interval > 0:
# --- Interval-based scheduling (existing) ---
run_immediately = tc.get("run_immediately", False)
def _make_timer(entry_point_id: str, mins: float, immediate: bool):
async def _timer_loop():
interval_secs = mins * 60
if not immediate:
self._timer_next_fire[entry_point_id] = (
time.monotonic() + interval_secs
)
await asyncio.sleep(interval_secs)
while self._running:
self._timer_next_fire.pop(entry_point_id, None)
try:
session_state = self._get_primary_session_state(
exclude_entry_point=entry_point_id
)
await self.trigger(
entry_point_id,
{"event": {"source": "timer", "reason": "scheduled"}},
session_state=session_state,
)
logger.info(
"Timer fired for entry point '%s' (next in %s min)",
entry_point_id,
mins,
)
except Exception:
logger.error(
"Timer trigger failed for '%s'",
entry_point_id,
exc_info=True,
)
self._timer_next_fire[entry_point_id] = (
time.monotonic() + interval_secs
)
await asyncio.sleep(interval_secs)
return _timer_loop
task = asyncio.create_task(_make_timer(ep_id, interval, run_immediately)())
self._timer_tasks.append(task)
logger.info(
"Started timer for entry point '%s' every %s min%s",
ep_id,
interval,
" (immediate first run)" if run_immediately else "",
)
else:
logger.warning(
"Entry point '%s' has trigger_type='timer' "
"but neither interval_minutes nor schedule in trigger_config",
ep_id,
)
self._running = True
logger.info(f"AgentRuntime started with {len(self._streams)} streams")
@@ -641,5 +641,293 @@ class TestCreateAgentRuntime:
assert "api" in runtime._entry_points
# === Clock-Based Schedule Timer Tests ===
class TestSecondsUntilNextSchedule:
"""Tests for the _seconds_until_next_schedule helper function."""
def test_future_time_today(self):
"""Time later today returns seconds until that time."""
from datetime import datetime, timedelta
from framework.runtime.agent_runtime import _seconds_until_next_schedule
# Zero seconds/microseconds to match what the function targets (HH:MM:00.000)
now = datetime.now()
future = (now + timedelta(hours=2)).replace(second=0, microsecond=0)
time_str = f"{future.hour:02d}:{future.minute:02d}"
expected_secs = (future - now).total_seconds()
secs, matched = _seconds_until_next_schedule([time_str])
assert matched == time_str
assert abs(secs - expected_secs) < 5
def test_past_time_wraps_to_tomorrow(self):
"""Time already passed today returns seconds until tomorrow's occurrence."""
from datetime import datetime, timedelta
from framework.runtime.agent_runtime import _seconds_until_next_schedule
# Zero seconds/microseconds to match what the function targets (HH:MM:00.000)
now = datetime.now()
past = (now - timedelta(hours=1)).replace(second=0, microsecond=0)
time_str = f"{past.hour:02d}:{past.minute:02d}"
expected_target = past + timedelta(days=1)
expected_secs = (expected_target - now).total_seconds()
secs, matched = _seconds_until_next_schedule([time_str])
assert matched == time_str
assert abs(secs - expected_secs) < 5
def test_multiple_times_picks_nearest(self):
"""With multiple times, returns the nearest future one."""
from datetime import datetime, timedelta
from framework.runtime.agent_runtime import _seconds_until_next_schedule
# Zero seconds/microseconds to match what the function targets (HH:MM:00.000)
now = datetime.now()
near = (now + timedelta(hours=1)).replace(second=0, microsecond=0)
far = (now + timedelta(hours=5)).replace(second=0, microsecond=0)
near_str = f"{near.hour:02d}:{near.minute:02d}"
far_str = f"{far.hour:02d}:{far.minute:02d}"
expected_secs = (near - now).total_seconds()
secs, matched = _seconds_until_next_schedule([far_str, near_str])
assert matched == near_str
assert abs(secs - expected_secs) < 5
def test_with_timezone(self):
"""Timezone parameter is respected."""
from zoneinfo import ZoneInfo
from framework.runtime.agent_runtime import _seconds_until_next_schedule
# Just verify it doesn't crash with a valid timezone
secs, matched = _seconds_until_next_schedule(["12:00", "18:00"], tz=ZoneInfo("UTC"))
assert secs > 0
assert matched in ("12:00", "18:00")
class TestGraphSpecTimerValidation:
"""Tests for GraphSpec.validate() timer trigger_config checks."""
def _make_graph_with_timer(self, trigger_config):
"""Helper to create a graph with a timer entry point."""
nodes = [
NodeSpec(
id="process",
name="Process",
description="Process node",
node_type="llm_generate",
input_keys=[],
output_keys=[],
),
]
return GraphSpec(
id="test",
goal_id="goal",
entry_node="process",
async_entry_points=[
AsyncEntryPointSpec(
id="timer-ep",
name="Timer",
entry_node="process",
trigger_type="timer",
trigger_config=trigger_config,
),
],
nodes=nodes,
edges=[],
)
def test_interval_only_is_valid(self):
"""interval_minutes alone passes validation."""
graph = self._make_graph_with_timer({"interval_minutes": 5})
errors = graph.validate()
timer_errors = [e for e in errors if "timer-ep" in e]
assert len(timer_errors) == 0
def test_schedule_only_is_valid(self):
"""schedule alone passes validation."""
graph = self._make_graph_with_timer({"schedule": ["08:00", "12:00"]})
errors = graph.validate()
timer_errors = [e for e in errors if "timer-ep" in e]
assert len(timer_errors) == 0
def test_both_interval_and_schedule_is_invalid(self):
"""Having both interval_minutes and schedule is an error."""
graph = self._make_graph_with_timer({"interval_minutes": 5, "schedule": ["08:00"]})
errors = graph.validate()
assert any("both" in e.lower() for e in errors)
def test_neither_interval_nor_schedule_is_invalid(self):
"""Having neither interval_minutes nor schedule is an error."""
graph = self._make_graph_with_timer({})
errors = graph.validate()
assert any("neither" in e.lower() for e in errors)
class TestScheduleTimerIntegration:
"""Integration tests for schedule-based timer in AgentRuntime."""
@pytest.mark.asyncio
async def test_schedule_timer_sets_next_fire(self, sample_goal, temp_storage):
"""Schedule timer should set _timer_next_fire after starting."""
import time as time_mod
nodes = [
NodeSpec(
id="check-in",
name="Check In",
description="Check in node",
node_type="event_loop",
input_keys=[],
output_keys=[],
),
]
graph = GraphSpec(
id="test-schedule",
goal_id="test-goal",
entry_node="check-in",
nodes=nodes,
edges=[],
)
runtime = AgentRuntime(
graph=graph,
goal=sample_goal,
storage_path=temp_storage,
)
# Register a schedule-based timer
runtime.register_entry_point(
EntryPointSpec(
id="meal-timer",
name="Meal Timer",
entry_node="check-in",
trigger_type="timer",
trigger_config={"schedule": ["08:00", "12:00", "19:00"]},
max_concurrent=1,
)
)
await runtime.start()
# Give the timer loop a moment to set _timer_next_fire
await asyncio.sleep(0.1)
try:
assert "meal-timer" in runtime._timer_next_fire
next_fire = runtime._timer_next_fire["meal-timer"]
# Should be in the future
assert next_fire > time_mod.monotonic()
finally:
await runtime.stop()
@pytest.mark.asyncio
async def test_interval_timer_still_works(self, sample_goal, temp_storage):
"""Existing interval-based timer should still work after the refactor."""
import time as time_mod
nodes = [
NodeSpec(
id="check-in",
name="Check In",
description="Check in node",
node_type="event_loop",
input_keys=[],
output_keys=[],
),
]
graph = GraphSpec(
id="test-interval",
goal_id="test-goal",
entry_node="check-in",
nodes=nodes,
edges=[],
)
runtime = AgentRuntime(
graph=graph,
goal=sample_goal,
storage_path=temp_storage,
)
runtime.register_entry_point(
EntryPointSpec(
id="interval-timer",
name="Interval Timer",
entry_node="check-in",
trigger_type="timer",
trigger_config={"interval_minutes": 60},
max_concurrent=1,
)
)
await runtime.start()
await asyncio.sleep(0.1)
try:
assert "interval-timer" in runtime._timer_next_fire
next_fire = runtime._timer_next_fire["interval-timer"]
assert next_fire > time_mod.monotonic()
finally:
await runtime.stop()
@pytest.mark.asyncio
async def test_mutually_exclusive_config_skipped(self, sample_goal, temp_storage):
"""Timer with both interval_minutes and schedule should be skipped."""
nodes = [
NodeSpec(
id="check-in",
name="Check In",
description="Check in node",
node_type="event_loop",
input_keys=[],
output_keys=[],
),
]
graph = GraphSpec(
id="test-both",
goal_id="test-goal",
entry_node="check-in",
nodes=nodes,
edges=[],
)
runtime = AgentRuntime(
graph=graph,
goal=sample_goal,
storage_path=temp_storage,
)
runtime.register_entry_point(
EntryPointSpec(
id="bad-timer",
name="Bad Timer",
entry_node="check-in",
trigger_type="timer",
trigger_config={"interval_minutes": 5, "schedule": ["08:00"]},
max_concurrent=1,
)
)
await runtime.start()
await asyncio.sleep(0.1)
try:
# Should not have been started
assert "bad-timer" not in runtime._timer_next_fire
assert len(runtime._timer_tasks) == 0
finally:
await runtime.stop()
if __name__ == "__main__":
pytest.main([__file__, "-v"])
+20 -7
View File
@@ -457,18 +457,31 @@ class GraphOverview(Vertical):
for ep in event_sources:
if ep.trigger_type == "timer":
interval = ep.trigger_config.get("interval_minutes", "?")
interval = ep.trigger_config.get("interval_minutes")
schedule = ep.trigger_config.get("schedule")
display.write(f" [green]⏱[/green] {ep.name} [dim]→ {ep.entry_node}[/dim]")
# Show interval + next fire countdown
# Build description string
if schedule:
times_str = ", ".join(schedule)
tz_label = ep.trigger_config.get("timezone", "local")
desc = f"schedule {times_str} ({tz_label})"
else:
desc = f"every {interval or '?'} min"
# Show countdown if available
next_fire = self.runtime._timer_next_fire.get(ep.id)
if next_fire is not None:
remaining = max(0, next_fire - time.monotonic())
mins, secs = divmod(int(remaining), 60)
display.write(
f" [dim]every {interval} min — next in {mins}m {secs:02d}s[/dim]"
)
hrs, rem = divmod(int(remaining), 3600)
mins, secs = divmod(rem, 60)
if hrs > 0:
countdown = f"{hrs}h {mins:02d}m {secs:02d}s"
else:
countdown = f"{mins}m {secs:02d}s"
display.write(f" [dim]{desc} — next in {countdown}[/dim]")
else:
display.write(f" [dim]every {interval} min[/dim]")
display.write(f" [dim]{desc}[/dim]")
elif ep.trigger_type in ("event", "webhook"):
display.write(f" [yellow]⚡[/yellow] {ep.name} [dim]→ {ep.entry_node}[/dim]")
@@ -0,0 +1,46 @@
"""
Fitness Coach Agent Track calories, generate workouts, stay on schedule.
Conversational fitness coach that tracks daily calorie intake and burn via
Google Sheets, generates personalized workout plans, and sends scheduled
check-ins for meals and exercise reminders.
"""
from .agent import (
FitnessCoachAgent,
default_agent,
goal,
nodes,
edges,
loop_config,
async_entry_points,
entry_node,
entry_points,
pause_nodes,
terminal_nodes,
conversation_mode,
identity_prompt,
)
from .config import RuntimeConfig, AgentMetadata, default_config, metadata
__version__ = "1.0.0"
__all__ = [
"FitnessCoachAgent",
"default_agent",
"goal",
"nodes",
"edges",
"loop_config",
"async_entry_points",
"entry_node",
"entry_points",
"pause_nodes",
"terminal_nodes",
"conversation_mode",
"identity_prompt",
"RuntimeConfig",
"AgentMetadata",
"default_config",
"metadata",
]
@@ -0,0 +1,166 @@
"""
CLI entry point for Fitness Coach Agent.
Uses AgentRuntime for multi-entrypoint support with timer-driven check-ins.
"""
import asyncio
import json
import logging
import sys
import click
from .agent import default_agent, FitnessCoachAgent
def setup_logging(verbose=False, debug=False):
"""Configure logging for execution visibility."""
if debug:
level, fmt = logging.DEBUG, "%(asctime)s %(name)s: %(message)s"
elif verbose:
level, fmt = logging.INFO, "%(message)s"
else:
level, fmt = logging.WARNING, "%(levelname)s: %(message)s"
logging.basicConfig(level=level, format=fmt, stream=sys.stderr)
logging.getLogger("framework").setLevel(level)
@click.group()
@click.version_option(version="1.0.0")
def cli():
"""Fitness Coach Agent - Track calories, generate workouts, stay on schedule."""
pass
@cli.command()
@click.option("--mock", is_flag=True, help="Run in mock mode")
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def tui(mock, verbose, debug):
"""Launch the TUI dashboard for interactive fitness coaching."""
setup_logging(verbose=verbose, debug=debug)
try:
from framework.tui.app import AdenTUI
except ImportError:
click.echo(
"TUI requires the 'textual' package. Install with: pip install textual"
)
sys.exit(1)
from pathlib import Path
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
async def run_with_tui():
agent = FitnessCoachAgent()
agent._tool_registry = ToolRegistry()
storage_path = Path.home() / ".hive" / "agents" / "fitness_coach"
storage_path.mkdir(parents=True, exist_ok=True)
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
agent._tool_registry.load_mcp_config(mcp_config_path)
llm = None
if not mock:
llm = LiteLLMProvider(
model=agent.config.model,
api_key=agent.config.api_key,
api_base=agent.config.api_base,
)
tools = list(agent._tool_registry.get_tools().values())
tool_executor = agent._tool_registry.get_executor()
graph = agent._build_graph()
runtime = create_agent_runtime(
graph=graph,
goal=agent.goal,
storage_path=storage_path,
entry_points=[
EntryPointSpec(
id="start",
name="Start Fitness Coach",
entry_node="intake",
trigger_type="manual",
isolation_level="shared",
),
EntryPointSpec(
id="meal-timer",
name="Meal Check-in",
entry_node="meal-checkin",
trigger_type="timer",
trigger_config={"schedule": ["08:00", "12:00", "19:00"]},
isolation_level="shared",
max_concurrent=1,
),
EntryPointSpec(
id="exercise-timer",
name="Exercise Reminder",
entry_node="exercise-reminder",
trigger_type="timer",
trigger_config={"interval_minutes": 240},
isolation_level="shared",
max_concurrent=1,
),
],
llm=llm,
tools=tools,
tool_executor=tool_executor,
)
await runtime.start()
try:
app = AdenTUI(runtime)
await app.run_async()
finally:
await runtime.stop()
asyncio.run(run_with_tui())
@cli.command()
@click.option("--json", "output_json", is_flag=True)
def info(output_json):
"""Show agent information."""
info_data = default_agent.info()
if output_json:
click.echo(json.dumps(info_data, indent=2))
else:
click.echo(f"Agent: {info_data['name']}")
click.echo(f"Version: {info_data['version']}")
click.echo(f"Description: {info_data['description']}")
click.echo(f"\nNodes: {', '.join(info_data['nodes'])}")
click.echo(f"Client-facing: {', '.join(info_data['client_facing_nodes'])}")
click.echo(f"Entry: {info_data['entry_node']}")
click.echo(f"Terminal: {', '.join(info_data['terminal_nodes'])}")
click.echo("\nAsync entry points:")
for ep in info_data["async_entry_points"]:
click.echo(f" - {ep['name']}{ep['entry_node']}")
@cli.command()
def validate():
"""Validate agent structure."""
validation = default_agent.validate()
if validation["valid"]:
click.echo("Agent is valid")
if validation["warnings"]:
for warning in validation["warnings"]:
click.echo(f" WARNING: {warning}")
else:
click.echo("Agent has errors:")
for error in validation["errors"]:
click.echo(f" ERROR: {error}")
sys.exit(0 if validation["valid"] else 1)
if __name__ == "__main__":
cli()
+428
View File
@@ -0,0 +1,428 @@
"""Agent graph construction for Fitness Coach Agent."""
from pathlib import Path
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.checkpoint_config import CheckpointConfig
from framework.graph.edge import AsyncEntryPointSpec, GraphSpec
from framework.graph.executor import ExecutionResult
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import (
intake_node,
coach_node,
meal_checkin_node,
exercise_reminder_node,
)
# Goal definition
goal = Goal(
id="fitness-coach",
name="Personal Fitness Coach",
description=(
"Conversational fitness coach that tracks daily calorie intake and burn "
"via Google Sheets (separate tabs for Meals, Exercises, and Daily Summary), "
"generates personalized workout plans, and sends scheduled check-ins for "
"meals and exercise reminders."
),
success_criteria=[
SuccessCriterion(
id="calorie-tracking",
description=(
"Accurately logs all reported meals and exercises to Google Sheets "
"with approximate calorie estimates"
),
metric="logging_completeness",
target=">=95%",
weight=0.35,
),
SuccessCriterion(
id="workout-generation",
description=(
"Generates personalized workout plans based on user fitness level, "
"goals, and available equipment"
),
metric="plan_relevance",
target=">=90%",
weight=0.30,
),
SuccessCriterion(
id="proactive-checkins",
description=(
"Timer-driven check-ins fire at configured meal times and exercise "
"intervals, prompting the user naturally"
),
metric="checkin_reliability",
target=">=95%",
weight=0.35,
),
],
constraints=[
Constraint(
id="no-medical-advice",
description=(
"Never provide medical diagnoses, prescribe supplements or "
"medications, or override doctor recommendations"
),
constraint_type="hard",
category="safety",
),
Constraint(
id="transparent-estimates",
description=(
"Always disclose that calorie estimates are approximate and suggest "
"verifying with nutrition labels when possible"
),
constraint_type="hard",
category="safety",
),
Constraint(
id="non-destructive-sheets",
description=(
"Never bulk-delete rows or clear ranges in Google Sheets. "
"Corrections to individual cells are allowed via update_values."
),
constraint_type="hard",
category="operational",
),
],
)
# Node list
nodes = [
intake_node,
coach_node,
meal_checkin_node,
exercise_reminder_node,
]
# Edge definitions
edges = [
EdgeSpec(
id="intake-to-coach",
source="intake",
target="coach",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="coach-loop",
source="coach",
target="coach",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="meal-checkin-to-coach",
source="meal-checkin",
target="coach",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="exercise-reminder-to-coach",
source="exercise-reminder",
target="coach",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
]
# Graph configuration
entry_node = "intake"
entry_points = {"start": "intake"}
async_entry_points = [
AsyncEntryPointSpec(
id="meal-timer",
name="Meal Check-in",
entry_node="meal-checkin",
trigger_type="timer",
trigger_config={"schedule": ["08:00", "12:00", "19:00"]},
isolation_level="shared",
max_concurrent=1,
),
AsyncEntryPointSpec(
id="exercise-timer",
name="Exercise Reminder",
entry_node="exercise-reminder",
trigger_type="timer",
trigger_config={"interval_minutes": 240},
isolation_level="shared",
max_concurrent=1,
),
]
pause_nodes = []
terminal_nodes = []
loop_config = {
"max_iterations": 100,
"max_tool_calls_per_turn": 50,
"max_history_tokens": 32000,
}
conversation_mode = "continuous"
identity_prompt = (
"You are a friendly, knowledgeable personal fitness coach. You help users "
"track their daily calorie intake and burn via Google Sheets, generate "
"personalized workout plans, and stay on track with scheduled meal check-ins "
"and exercise reminders. You are encouraging, concise, and transparent about "
"the approximate nature of calorie estimates."
)
class FitnessCoachAgent:
"""
Fitness Coach Agent conversational fitness tracking with scheduled check-ins.
Flow:
[manual] intake coach (self-loop)
[timer: 08:00, 12:00, 19:00] meal-checkin coach
[timer: every 4h] exercise-reminder coach
Uses AgentRuntime for:
- Multi-entry-point execution (primary + 2 timer-driven)
- Session-scoped storage
- Shared state so timers read user profile set during intake
- Checkpointing for resume capability
"""
def __init__(self, config=None):
self.config = config or default_config
self.goal = goal
self.nodes = nodes
self.edges = edges
self.entry_node = entry_node
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._graph: GraphSpec | None = None
self._agent_runtime: AgentRuntime | None = None
self._tool_registry: ToolRegistry | None = None
self._storage_path: Path | None = None
self._saved_profile: dict | None = None
def _build_graph(self) -> GraphSpec:
"""Build the GraphSpec."""
return GraphSpec(
id="fitness-coach-graph",
goal_id=self.goal.id,
version="1.0.0",
entry_node=self.entry_node,
entry_points=self.entry_points,
terminal_nodes=self.terminal_nodes,
pause_nodes=self.pause_nodes,
nodes=self.nodes,
edges=self.edges,
default_model=self.config.model,
max_tokens=self.config.max_tokens,
loop_config=loop_config,
conversation_mode=conversation_mode,
identity_prompt=identity_prompt,
async_entry_points=async_entry_points,
)
def _setup(self, mock_mode=False) -> None:
"""Set up the agent runtime with sessions, checkpoints, and logging."""
from .tools import load_profile
self._storage_path = Path.home() / ".hive" / "agents" / "fitness_coach"
self._storage_path.mkdir(parents=True, exist_ok=True)
self._tool_registry = ToolRegistry()
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
self._tool_registry.load_mcp_config(mcp_config_path)
# Discover custom tools (save_profile)
tools_path = Path(__file__).parent / "tools.py"
if tools_path.exists():
self._tool_registry.discover_from_module(tools_path)
llm = None
if not mock_mode:
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
tool_executor = self._tool_registry.get_executor()
tools = list(self._tool_registry.get_tools().values())
# Check for saved profile — skip intake if we already have one
saved_profile = load_profile()
default_entry_node = "coach" if saved_profile else self.entry_node
self._saved_profile = saved_profile
self._graph = self._build_graph()
checkpoint_config = CheckpointConfig(
enabled=True,
checkpoint_on_node_start=False,
checkpoint_on_node_complete=True,
checkpoint_max_age_days=7,
async_checkpoint=True,
)
# Build entry point specs for AgentRuntime
entry_point_specs = [
# Primary entry point — starts at coach if profile exists, intake otherwise
EntryPointSpec(
id="default",
name="Default",
entry_node=default_entry_node,
trigger_type="manual",
isolation_level="shared",
),
# Meal check-in timer (clock-based schedule)
EntryPointSpec(
id="meal-timer",
name="Meal Check-in",
entry_node="meal-checkin",
trigger_type="timer",
trigger_config={"schedule": ["08:00", "12:00", "19:00"]},
isolation_level="shared",
max_concurrent=1,
),
# Exercise reminder timer (interval-based)
EntryPointSpec(
id="exercise-timer",
name="Exercise Reminder",
entry_node="exercise-reminder",
trigger_type="timer",
trigger_config={"interval_minutes": 240},
isolation_level="shared",
max_concurrent=1,
),
]
self._agent_runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=entry_point_specs,
llm=llm,
tools=tools,
tool_executor=tool_executor,
checkpoint_config=checkpoint_config,
)
async def start(self, mock_mode=False) -> None:
"""Set up and start the agent runtime.
If a saved profile exists, the default entry point skips intake and
goes straight to coach. The saved user_profile and sheet_id are
injected as input_data so they appear in shared memory.
"""
if self._agent_runtime is None:
self._setup(mock_mode=mock_mode)
if not self._agent_runtime.is_running:
await self._agent_runtime.start()
async def stop(self) -> None:
"""Stop and clean up the agent runtime."""
if self._agent_runtime is not None and self._agent_runtime.is_running:
await self._agent_runtime.stop()
async def trigger_and_wait(
self,
entry_point: str,
input_data: dict,
timeout: float | None = None,
session_state: dict | None = None,
) -> ExecutionResult | None:
"""Execute the graph and wait for completion."""
if self._agent_runtime is None:
raise RuntimeError("Agent not started. Call start() first.")
# Inject saved profile into input_data so coach/timers have it
if self._saved_profile and entry_point == "default":
input_data = {**self._saved_profile, **input_data}
return await self._agent_runtime.trigger_and_wait(
entry_point_id=entry_point,
input_data=input_data,
timeout=timeout,
session_state=session_state,
)
async def run(
self, context: dict, mock_mode=False, session_state=None
) -> ExecutionResult:
"""Run the agent (convenience method for single execution)."""
await self.start(mock_mode=mock_mode)
try:
result = await self.trigger_and_wait(
"default", context, session_state=session_state
)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
await self.stop()
def info(self):
"""Get agent information."""
return {
"name": metadata.name,
"version": metadata.version,
"description": metadata.description,
"goal": {
"name": self.goal.name,
"description": self.goal.description,
},
"nodes": [n.id for n in self.nodes],
"edges": [e.id for e in self.edges],
"entry_node": self.entry_node,
"entry_points": self.entry_points,
"pause_nodes": self.pause_nodes,
"terminal_nodes": self.terminal_nodes,
"client_facing_nodes": [n.id for n in self.nodes if n.client_facing],
"async_entry_points": [
{"id": ep.id, "name": ep.name, "entry_node": ep.entry_node}
for ep in async_entry_points
],
}
def validate(self):
"""Validate agent structure."""
errors = []
warnings = []
node_ids = {node.id for node in self.nodes}
for edge in self.edges:
if edge.source not in node_ids:
errors.append(f"Edge {edge.id}: source '{edge.source}' not found")
if edge.target not in node_ids:
errors.append(f"Edge {edge.id}: target '{edge.target}' not found")
if self.entry_node not in node_ids:
errors.append(f"Entry node '{self.entry_node}' not found")
for terminal in self.terminal_nodes:
if terminal not in node_ids:
errors.append(f"Terminal node '{terminal}' not found")
for ep_id, node_id in self.entry_points.items():
if node_id not in node_ids:
errors.append(
f"Entry point '{ep_id}' references unknown node '{node_id}'"
)
# Validate async entry points
for ep in async_entry_points:
if ep.entry_node not in node_ids:
errors.append(
f"Async entry point '{ep.id}' references unknown node '{ep.entry_node}'"
)
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
}
# Create default instance
default_agent = FitnessCoachAgent()
@@ -0,0 +1,27 @@
"""Runtime configuration."""
from dataclasses import dataclass
from framework.config import RuntimeConfig
default_config = RuntimeConfig()
@dataclass
class AgentMetadata:
name: str = "Fitness Coach Agent"
version: str = "1.0.0"
description: str = (
"Conversational fitness coach that tracks daily calorie intake and burn "
"via Google Sheets, generates personalized workout plans, and sends "
"scheduled check-ins for meals and exercise reminders."
)
intro_message: str = (
"Hey! I'm your personal fitness coach. Let's start by getting to know you — "
"your goals, fitness level, diet preferences, and what equipment you have access to. "
"Once we're set up, I'll track your meals and workouts in a Google Sheet and "
"check in with you at meal times and every few hours for exercise. Ready?"
)
metadata = AgentMetadata()
@@ -0,0 +1,9 @@
{
"hive-tools": {
"transport": "stdio",
"command": "uv",
"args": ["run", "python", "mcp_server.py", "--stdio"],
"cwd": "../../../tools",
"description": "Hive tools MCP server"
}
}
@@ -0,0 +1,237 @@
"""Node definitions for Fitness Coach Agent."""
from framework.graph import NodeSpec
# Node 1: Intake (client-facing)
# Collects user profile and creates Google Sheet with Meals, Exercises, Daily Summary tabs.
intake_node = NodeSpec(
id="intake",
name="Intake",
description=(
"Collect user profile: name, fitness goals, dietary preferences/restrictions, "
"available equipment, and schedule preferences. Create the Google Sheet with "
"Meals, Exercises, and Daily Summary tabs. Store profile and sheet ID to shared memory."
),
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=[],
output_keys=["user_profile", "sheet_id"],
system_prompt="""\
You are a friendly fitness coach assistant. Your job is to onboard a new user by collecting their fitness profile and setting up their tracking spreadsheet.
**STEP 1 Introduce yourself and collect profile (text only, NO tool calls):**
Greet the user warmly. Collect the following information through natural conversation:
- Name
- Fitness goals (e.g., lose weight, build muscle, maintain, improve endurance)
- Current fitness level (beginner, intermediate, advanced)
- Dietary preferences or restrictions (e.g., vegetarian, keto, no restrictions)
- Available equipment (e.g., full gym, home dumbbells, bodyweight only)
- Any injuries or limitations to be aware of
Be conversational don't dump a form. Ask 2-3 questions at a time. It's fine to take multiple turns.
**STEP 2 Confirm profile and create spreadsheet (after user confirms):**
Summarize the profile back to the user. Ask them to confirm.
Once confirmed:
1. Call google_sheets_create_spreadsheet(title="Fitness Tracker - [Name]", sheet_titles=["Meals", "Exercises", "Daily Summary"]) to create the spreadsheet with all three tabs.
2. Get the spreadsheet ID from the response.
3. Add header rows to each tab using google_sheets_update_values:
- Meals tab: google_sheets_update_values(spreadsheet_id=<id>, range_name="Meals!A1:F1", values=[["Date", "Time", "Meal Type", "Food Description", "Estimated Calories", "Notes"]])
- Exercises tab: google_sheets_update_values(spreadsheet_id=<id>, range_name="Exercises!A1:F1", values=[["Date", "Time", "Exercise", "Duration (min)", "Estimated Calories Burned", "Notes"]])
- Daily Summary tab: google_sheets_update_values(spreadsheet_id=<id>, range_name="Daily Summary!A1:F1", values=[["Date", "Calories In", "Calories Out", "Net Calories", "Goal Progress", "Notes"]])
**STEP 3 Set outputs and save profile:**
After the spreadsheet is set up:
- set_output("user_profile", <JSON string of the profile: {name, goals, fitness_level, diet, equipment, limitations}>)
- set_output("sheet_id", <the spreadsheet ID string>)
- Call save_profile(user_profile=<same JSON string>, sheet_id=<same spreadsheet ID>) to persist to disk so we remember on next startup.
Tell the user their tracker is ready and that you'll check in at meal times and remind them about exercise.\
""",
tools=[
"google_sheets_create_spreadsheet",
"google_sheets_update_values",
"save_profile",
],
)
# Node 2: Coach (client-facing)
# Main conversational hub — log meals, log exercises, generate workouts, daily summary.
coach_node = NodeSpec(
id="coach",
name="Coach",
description=(
"Main conversational hub. Handles: logging meals, logging exercises, "
"generating workout plans, answering fitness questions, and viewing "
"daily summaries. Reads and writes to Google Sheets."
),
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["user_profile", "sheet_id"],
output_keys=["last_action"],
nullable_output_keys=["last_action"],
system_prompt="""\
You are a friendly, knowledgeable fitness coach. You have the user's profile and a Google Sheet for tracking.
Read "user_profile" from context for the user's goals, fitness level, diet, and equipment.
Read "sheet_id" from context for the spreadsheet ID to log data to.
**CAPABILITIES handle whatever the user asks:**
1. **Log a meal**: When the user tells you what they ate, estimate calories using your nutritional knowledge (USDA averages). If unsure about a specific item, give a reasonable range. Log to the "Meals" tab using google_sheets_append_values with: [date, time, meal_type, food_description, estimated_calories, notes].
2. **Log exercise**: When the user reports exercise, estimate calories burned based on their profile (weight affects burn rate). Log to "Exercises" tab: [date, time, exercise, duration_min, estimated_calories_burned, notes].
3. **Generate workout plan**: Based on their goals, fitness level, equipment, and limitations, create a workout plan. Be specific: exercises, sets, reps, rest times. Tailor to their level.
4. **Daily summary**: Read today's entries from both Meals and Exercises tabs using google_sheets_get_values. Calculate totals and show: calories in, calories out, net. Compare against a reasonable daily target based on their goals.
5. **Answer fitness questions**: General fitness, nutrition, and exercise advice. Always remind them you're an AI coach, not a doctor.
**RULES:**
- Always be encouraging and positive
- Calorie estimates are APPROXIMATE say so explicitly every time you give one
- Never give medical advice if they ask about injuries, pain, or medical conditions, recommend seeing a healthcare professional
- To correct a wrong entry, use google_sheets_update_values to fix the specific cell never delete entire rows or clear ranges
- Use today's date and current time when logging
- When logging to sheets, use google_sheets_append_values(spreadsheet_id=<sheet_id>, range="<TabName>!A:F", values=[[...]])
**CONVERSATION STYLE:**
- Keep responses concise but warm
- Use the user's name from their profile
- Celebrate small wins ("Nice! That's 30 min of cardio logged!")
- If idle, don't force conversation — wait for the user\
""",
tools=[
"google_sheets_append_values",
"google_sheets_get_values",
"google_sheets_update_values",
],
)
# Node 3: Meal Check-in (client-facing, timer-triggered)
# Fires at breakfast (08:00), lunch (12:00), dinner (19:00).
meal_checkin_node = NodeSpec(
id="meal-checkin",
name="Meal Check-in",
description=(
"Timer-triggered node for meal check-ins. Fires at breakfast (08:00), "
"lunch (12:00), and dinner (19:00). Asks the user what they ate, "
"estimates calories, and logs to Google Sheets."
),
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["user_profile", "sheet_id"],
output_keys=["meal_logged"],
nullable_output_keys=["meal_logged"],
system_prompt="""\
You are a friendly fitness coach checking in about a meal. This node is triggered by a timer at meal times (08:00 = breakfast, 12:00 = lunch, 19:00 = dinner).
Read "user_profile" from context for the user's name and dietary preferences.
Read "sheet_id" from context for the spreadsheet ID.
**STEP 1 Greet and ask about the meal (text only, NO tool calls):**
Determine the meal type based on the approximate time:
- Morning (before noon) Breakfast
- Midday (noon-ish) Lunch
- Evening Dinner
Send a friendly, short check-in message like:
"Hey [Name]! It's lunchtime — what are you having?"
Keep it casual and brief. Don't lecture.
**STEP 2 After the user responds with what they ate:**
1. Estimate the calories for each item using your nutritional knowledge (USDA averages). Be transparent: "That's roughly X calories (approximate estimate)." If unsure, give a reasonable range.
2. Log to Google Sheets: google_sheets_append_values(spreadsheet_id=<sheet_id>, range="Meals!A:F", values=[[date, time, meal_type, food_description, estimated_calories, notes]])
4. Give brief positive feedback.
**STEP 3 Set output:**
- set_output("meal_logged", "true")
If the user says they haven't eaten yet or want to skip, that's fine don't pressure them. Just set_output("meal_logged", "skipped") and move on.
**RULES:**
- Keep it SHORT this is a quick check-in, not a lecture
- Calorie estimates are APPROXIMATE always say so
- No medical or dietary prescriptions
- Only append to sheets, never delete\
""",
tools=[
"google_sheets_append_values",
],
)
# Node 4: Exercise Reminder (client-facing, timer-triggered)
# Fires every 4 hours.
exercise_reminder_node = NodeSpec(
id="exercise-reminder",
name="Exercise Reminder",
description=(
"Timer-triggered node for exercise reminders. Fires every 4 hours. "
"Nudges the user about their workout plan and logs any completed "
"exercises to Google Sheets."
),
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["user_profile", "sheet_id"],
output_keys=["exercise_logged"],
nullable_output_keys=["exercise_logged"],
system_prompt="""\
You are a friendly fitness coach sending an exercise reminder. This node is triggered by a timer every 4 hours.
Read "user_profile" from context for the user's name, fitness goals, fitness level, and available equipment.
Read "sheet_id" from context for the spreadsheet ID.
**STEP 1 Send a motivating nudge (text only, NO tool calls):**
Send a brief, encouraging reminder. Vary the tone don't be robotic. Examples:
- "Hey [Name]! Have you moved today? Even a 10-minute walk counts!"
- "Quick check-in — done any exercise since we last talked?"
- "Reminder: your goals are waiting! Got time for a quick workout?"
If you know their goals/equipment from the profile, tailor the suggestion:
- Weight loss suggest cardio or HIIT
- Muscle building suggest a strength set
- Bodyweight only suggest pushups, squats, etc.
**STEP 2 If the user reports exercise:**
1. Estimate calories burned based on exercise type, duration, and their fitness level.
2. Log to Google Sheets: google_sheets_append_values(spreadsheet_id=<sheet_id>, range="Exercises!A:F", values=[[date, time, exercise, duration_min, estimated_calories_burned, notes]])
3. Celebrate the effort!
4. set_output("exercise_logged", "true")
**STEP 3 If the user says no or not yet:**
That's perfectly fine. Be supportive, not pushy. Maybe suggest something light.
- set_output("exercise_logged", "skipped")
**RULES:**
- Keep it SHORT and motivating max 2-3 sentences for the nudge
- Never guilt-trip or be negative
- Calorie burn estimates are APPROXIMATE say so
- No medical advice about injuries or pain
- Only append to sheets, never delete\
""",
tools=[
"google_sheets_append_values",
],
)
__all__ = [
"intake_node",
"coach_node",
"meal_checkin_node",
"exercise_reminder_node",
]
+76
View File
@@ -0,0 +1,76 @@
"""Custom tools for Fitness Coach Agent.
Provides save_profile persists user profile and sheet ID to disk so the
agent can skip intake on subsequent starts.
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from framework.llm.provider import Tool, ToolResult, ToolUse
logger = logging.getLogger(__name__)
PROFILE_PATH = Path.home() / ".hive" / "agents" / "fitness_coach" / "profile.json"
TOOLS = {
"save_profile": Tool(
name="save_profile",
description=(
"Save the user's profile and Google Sheet ID to disk so the agent "
"remembers them on next startup. Call this after set_output in intake."
),
parameters={
"type": "object",
"properties": {
"user_profile": {
"type": "string",
"description": "JSON string of the user profile",
},
"sheet_id": {
"type": "string",
"description": "The Google Sheet spreadsheet ID",
},
},
"required": ["user_profile", "sheet_id"],
},
),
}
def _save_profile(user_profile: str, sheet_id: str) -> str:
"""Write profile and sheet_id to disk."""
PROFILE_PATH.parent.mkdir(parents=True, exist_ok=True)
data = {"user_profile": user_profile, "sheet_id": sheet_id}
PROFILE_PATH.write_text(json.dumps(data, indent=2))
logger.info("Saved profile to %s", PROFILE_PATH)
return json.dumps({"saved": True, "path": str(PROFILE_PATH)})
def load_profile() -> dict | None:
"""Load saved profile from disk. Returns None if not found."""
if PROFILE_PATH.exists():
try:
return json.loads(PROFILE_PATH.read_text())
except (json.JSONDecodeError, OSError):
return None
return None
def tool_executor(tool_use: ToolUse) -> ToolResult:
"""Unified tool executor for custom tools."""
if tool_use.name == "save_profile":
result = _save_profile(
user_profile=tool_use.input.get("user_profile", ""),
sheet_id=tool_use.input.get("sheet_id", ""),
)
return ToolResult(tool_use_id=tool_use.id, content=result)
return ToolResult(
tool_use_id=tool_use.id,
content=json.dumps({"error": f"Unknown tool: {tool_use.name}"}),
is_error=True,
)
@@ -62,7 +62,6 @@ from .discord import DISCORD_CREDENTIALS
from .email import EMAIL_CREDENTIALS
from .gcp_vision import GCP_VISION_CREDENTIALS
from .github import GITHUB_CREDENTIALS
from .google_calendar import GOOGLE_CALENDAR_CREDENTIALS
from .google_maps import GOOGLE_MAPS_CREDENTIALS
from .health_check import HealthCheckResult, check_credential_health
from .hubspot import HUBSPOT_CREDENTIALS
@@ -93,7 +92,6 @@ CREDENTIAL_SPECS = {
**GITHUB_CREDENTIALS,
**GOOGLE_MAPS_CREDENTIALS,
**HUBSPOT_CREDENTIALS,
**GOOGLE_CALENDAR_CREDENTIALS,
**SLACK_CREDENTIALS,
**SERPAPI_CREDENTIALS,
**RAZORPAY_CREDENTIALS,
@@ -107,8 +105,6 @@ __all__ = [
"CredentialSpec",
"CredentialStoreAdapter",
"CredentialError",
# Credential store adapter (replaces deprecated CredentialManager)
"CredentialStoreAdapter",
# Health check utilities
"HealthCheckResult",
"check_credential_health",
@@ -132,7 +128,6 @@ __all__ = [
"GITHUB_CREDENTIALS",
"GOOGLE_MAPS_CREDENTIALS",
"HUBSPOT_CREDENTIALS",
"GOOGLE_CALENDAR_CREDENTIALS",
"SLACK_CREDENTIALS",
"APOLLO_CREDENTIALS",
"SERPAPI_CREDENTIALS",
+22 -1
View File
@@ -38,6 +38,7 @@ EMAIL_CREDENTIALS = {
tools=[
# send_email is excluded: it's a multi-provider tool that checks
# credentials at runtime based on the provider parameter.
# Gmail tools
"gmail_reply_email",
"gmail_list_messages",
"gmail_get_message",
@@ -45,12 +46,32 @@ EMAIL_CREDENTIALS = {
"gmail_modify_message",
"gmail_batch_modify_messages",
"gmail_batch_get_messages",
# Google Calendar tools
"calendar_list_events",
"calendar_get_event",
"calendar_create_event",
"calendar_update_event",
"calendar_delete_event",
"calendar_list_calendars",
"calendar_get_calendar",
"calendar_check_availability",
# Google Sheets tools
"google_sheets_get_spreadsheet",
"google_sheets_create_spreadsheet",
"google_sheets_get_values",
"google_sheets_update_values",
"google_sheets_append_values",
"google_sheets_clear_values",
"google_sheets_batch_update_values",
"google_sheets_batch_clear_values",
"google_sheets_add_sheet",
"google_sheets_delete_sheet",
],
node_types=[],
required=True,
startup_required=False,
help_url="https://hive.adenhq.com",
description="Google OAuth2 access token (via Aden) - used for Gmail",
description="Google OAuth2 access token (via Aden) - used for Gmail, Calendar, and Sheets",
aden_supported=True,
aden_provider_name="google",
direct_api_key_supported=False,
@@ -1,39 +0,0 @@
"""
Google Calendar tool credentials.
Contains credentials for Google Calendar integration.
"""
from .base import CredentialSpec
GOOGLE_CALENDAR_CREDENTIALS = {
"google_calendar_oauth": CredentialSpec(
env_var="GOOGLE_CALENDAR_ACCESS_TOKEN",
tools=[
"calendar_list_events",
"calendar_get_event",
"calendar_create_event",
"calendar_update_event",
"calendar_delete_event",
"calendar_list_calendars",
"calendar_get_calendar",
"calendar_check_availability",
],
node_types=[],
required=False,
startup_required=False,
help_url="https://hive.adenhq.com",
description="Google Calendar OAuth2 access token (via Aden) - used for Google Calendar",
# Auth method support
aden_supported=True,
aden_provider_name="google-calendar",
direct_api_key_supported=False,
api_key_instructions="Google Calendar OAuth requires OAuth2. Connect via hive.adenhq.com",
# Health check configuration
health_check_endpoint="https://www.googleapis.com/calendar/v3/users/me/calendarList",
health_check_method="GET",
# Credential store mapping
credential_id="google_calendar_oauth",
credential_key="access_token",
),
}
@@ -162,56 +162,72 @@ class BraveSearchHealthChecker:
)
class GoogleCalendarHealthChecker:
"""Health checker for Google Calendar OAuth tokens."""
class GoogleHealthChecker:
"""Health checker for Google OAuth tokens (Gmail, Calendar, Sheets)."""
ENDPOINT = "https://www.googleapis.com/calendar/v3/users/me/calendarList"
ENDPOINTS: dict[str, str] = {
"gmail": "https://gmail.googleapis.com/gmail/v1/users/me/profile",
"calendar": "https://www.googleapis.com/calendar/v3/users/me/calendarList",
"sheets": "https://sheets.googleapis.com/v4/spreadsheets/healthcheck_nonexistent",
}
TIMEOUT = 10.0
def check(self, access_token: str) -> HealthCheckResult:
"""
Validate Google Calendar token by making lightweight API call.
Validate Google OAuth token against Gmail, Calendar, and Sheets APIs.
Makes a GET request for 1 calendar to verify the token works.
Hits a lightweight endpoint for each service. A 401 on any endpoint
means the token is invalid (fail fast). A 403 means the token lacks
that service's scope. For Sheets, a 404 counts as success (scope is
valid, the spreadsheet just doesn't exist).
"""
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
}
missing_scopes: list[str] = []
try:
with httpx.Client(timeout=self.TIMEOUT) as client:
response = client.get(
self.ENDPOINT,
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
},
params={"maxResults": "1"},
for scope, url in self.ENDPOINTS.items():
params = {"maxResults": "1"} if scope == "calendar" else {}
response = client.get(url, headers=headers, params=params)
if response.status_code == 401:
return HealthCheckResult(
valid=False,
message="Google token is invalid or expired",
details={"status_code": 401},
)
if response.status_code == 403:
missing_scopes.append(scope)
continue
# Sheets returns 404 for a non-existent spreadsheet — that's fine,
# it means the token + scope are valid.
if response.status_code in (200, 404):
continue
# Unexpected status — not a scope issue, but not healthy either
return HealthCheckResult(
valid=False,
message=f"Google {scope} API returned status {response.status_code}",
details={"status_code": response.status_code, "scope": scope},
)
if missing_scopes:
return HealthCheckResult(
valid=False,
message=f"Google token lacks scopes for: {', '.join(missing_scopes)}",
details={"status_code": 403, "missing_scopes": missing_scopes},
)
if response.status_code == 200:
return HealthCheckResult(
valid=True,
message="Google Calendar credentials valid",
)
elif response.status_code == 401:
return HealthCheckResult(
valid=False,
message="Google Calendar token is invalid or expired",
details={"status_code": 401},
)
elif response.status_code == 403:
return HealthCheckResult(
valid=False,
message="Google Calendar token lacks required scopes",
details={"status_code": 403, "required": "calendar"},
)
else:
return HealthCheckResult(
valid=False,
message=f"Google Calendar API returned status {response.status_code}",
details={"status_code": response.status_code},
)
return HealthCheckResult(
valid=True,
message="Google credentials valid (Gmail, Calendar, Sheets)",
)
except httpx.TimeoutException:
return HealthCheckResult(
valid=False,
message="Google Calendar API request timed out",
message="Google API request timed out",
details={"error": "timeout"},
)
except httpx.RequestError as e:
@@ -220,7 +236,7 @@ class GoogleCalendarHealthChecker:
error_msg = "Request failed (details redacted for security)"
return HealthCheckResult(
valid=False,
message=f"Failed to connect to Google Calendar: {error_msg}",
message=f"Failed to connect to Google: {error_msg}",
details={"error": error_msg},
)
@@ -684,7 +700,7 @@ HEALTH_CHECKERS: dict[str, CredentialHealthChecker] = {
"discord": DiscordHealthChecker(),
"hubspot": HubSpotHealthChecker(),
"brave_search": BraveSearchHealthChecker(),
"google_calendar_oauth": GoogleCalendarHealthChecker(),
"google": GoogleHealthChecker(),
"slack": SlackHealthChecker(),
"google_search": GoogleSearchHealthChecker(),
"google_maps": GoogleMapsHealthChecker(),
+12
View File
@@ -52,6 +52,7 @@ from .file_system_toolkits.write_to_file import register_tools as register_write
from .github_tool import register_tools as register_github
from .gmail_tool import register_tools as register_gmail
from .google_maps_tool import register_tools as register_google_maps
from .google_sheets_tool import register_tools as register_google_sheets
from .http_headers_scanner import register_tools as register_http_headers_scanner
from .hubspot_tool import register_tools as register_hubspot
from .news_tool import register_tools as register_news
@@ -102,6 +103,7 @@ def register_all_tools(
register_email(mcp, credentials=credentials)
# Gmail inbox management (read, trash, modify labels)
register_gmail(mcp, credentials=credentials)
register_google_sheets(mcp, credentials=credentials)
register_hubspot(mcp, credentials=credentials)
register_news(mcp, credentials=credentials)
register_apollo(mcp, credentials=credentials)
@@ -204,6 +206,16 @@ def register_all_tools(
"github_list_stargazers",
"github_get_user_profile",
"github_get_user_emails",
"google_sheets_get_spreadsheet",
"google_sheets_create_spreadsheet",
"google_sheets_get_values",
"google_sheets_update_values",
"google_sheets_append_values",
"google_sheets_clear_values",
"google_sheets_batch_update_values",
"google_sheets_batch_clear_values",
"google_sheets_add_sheet",
"google_sheets_delete_sheet",
"send_email",
"gmail_reply_email",
"gmail_list_messages",
@@ -28,7 +28,7 @@ For quick testing, get a token from the [Google OAuth Playground](https://develo
4. Set the environment variable:
```bash
export GOOGLE_CALENDAR_ACCESS_TOKEN="your-access-token"
export GOOGLE_ACCESS_TOKEN="your-access-token"
```
**Note:** Access tokens from OAuth Playground expire after ~1 hour. For production, use Aden OAuth.
@@ -224,7 +224,7 @@ All tools return a dict with either success data or an error:
```json
{
"error": "Calendar credentials not configured",
"help": "Set GOOGLE_CALENDAR_ACCESS_TOKEN environment variable"
"help": "Set GOOGLE_ACCESS_TOKEN environment variable"
}
```
@@ -8,7 +8,7 @@ Supports:
Requires OAuth 2.0 credentials:
- Aden: Use aden_provider_name="google-calendar" for managed OAuth (recommended)
- Direct: Set GOOGLE_CALENDAR_ACCESS_TOKEN with token from OAuth Playground
- Direct: Set GOOGLE_ACCESS_TOKEN with token from OAuth Playground
"""
from __future__ import annotations
@@ -17,6 +17,7 @@ import logging
import os
import re
import uuid
import warnings
from datetime import UTC, datetime
from typing import TYPE_CHECKING
from urllib.parse import quote
@@ -82,10 +83,22 @@ def register_tools(
# Fall back to credential store adapter
if credentials is not None:
return credentials.get("google_calendar_oauth")
return credentials.get("google")
# Fall back to environment variable
return os.getenv("GOOGLE_CALENDAR_ACCESS_TOKEN")
token = os.getenv("GOOGLE_ACCESS_TOKEN")
if token:
return token
# Deprecated fallback — remove in a future release
token = os.getenv("GOOGLE_CALENDAR_ACCESS_TOKEN")
if token:
warnings.warn(
"GOOGLE_CALENDAR_ACCESS_TOKEN is deprecated, use GOOGLE_ACCESS_TOKEN instead",
DeprecationWarning,
stacklevel=2,
)
return token
return None
def _get_headers() -> dict[str, str]:
"""Get authorization headers for API requests.
@@ -106,7 +119,7 @@ def register_tools(
if not token:
return {
"error": "Calendar credentials not configured",
"help": "Set GOOGLE_CALENDAR_ACCESS_TOKEN environment variable",
"help": "Set GOOGLE_ACCESS_TOKEN environment variable",
}
return None
@@ -0,0 +1,181 @@
# Google Sheets Tool
Integration tool for reading, writing, and managing Google Sheets via the Google Sheets API v4.
## Features
- **Spreadsheet Management**: Create spreadsheets, get metadata
- **Read Data**: Get values from ranges with different rendering options
- **Write Data**: Update cells, append rows, batch updates
- **Clear Data**: Clear ranges, batch clear operations
- **Sheet Management**: Add and delete sheets/tabs within spreadsheets
## Authentication
This tool supports two authentication methods:
1. **Credential Store** (recommended):
- Configure `google` credential via the Aden credential store
- Requires `https://www.googleapis.com/auth/spreadsheets` scope
2. **Environment Variable**:
- Set `GOOGLE_ACCESS_TOKEN` with a valid OAuth2 access token
- Useful for local development and testing
## Available Tools
### Spreadsheet Management
- `google_sheets_get_spreadsheet` - Get spreadsheet metadata and properties
- `google_sheets_create_spreadsheet` - Create a new spreadsheet with optional sheets
### Reading Data
- `google_sheets_get_values` - Get values from a range (A1 notation)
### Writing Data
- `google_sheets_update_values` - Update values in a specific range
- `google_sheets_append_values` - Append rows to a sheet
- `google_sheets_clear_values` - Clear values in a range
### Batch Operations
- `google_sheets_batch_update_values` - Update multiple ranges in one request
- `google_sheets_batch_clear_values` - Clear multiple ranges in one request
### Sheet Management
- `google_sheets_add_sheet` - Add a new sheet/tab to a spreadsheet
- `google_sheets_delete_sheet` - Delete a sheet/tab from a spreadsheet
## Usage Examples
### Read data from a spreadsheet
```python
# Get values from a range
result = google_sheets_get_values(
spreadsheet_id="1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
range_name="Sheet1!A1:D10"
)
# Returns: {"range": "Sheet1!A1:D10", "values": [["A1", "B1", ...], ...]}
```
### Write data to a spreadsheet
```python
# Update a range
result = google_sheets_update_values(
spreadsheet_id="1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
range_name="Sheet1!A1:B2",
values=[
["Name", "Email"],
["John Doe", "john@example.com"]
]
)
```
### Append rows
```python
# Append new rows
result = google_sheets_append_values(
spreadsheet_id="1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
range_name="Sheet1!A1",
values=[
["Jane Smith", "jane@example.com"],
["Bob Johnson", "bob@example.com"]
]
)
```
### Create a new spreadsheet
```python
# Create spreadsheet with multiple sheets
result = google_sheets_create_spreadsheet(
title="My New Spreadsheet",
sheet_titles=["Data", "Analysis", "Summary"]
)
# Returns: {"spreadsheetId": "...", "spreadsheetUrl": "..."}
```
### Batch operations
```python
# Update multiple ranges at once
result = google_sheets_batch_update_values(
spreadsheet_id="1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
data=[
{"range": "Sheet1!A1:B1", "values": [["Header 1", "Header 2"]]},
{"range": "Sheet1!A2:B3", "values": [["Data 1", "Data 2"], ["Data 3", "Data 4"]]}
]
)
```
### Manage sheets
```python
# Add a new sheet
result = google_sheets_add_sheet(
spreadsheet_id="1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
title="New Sheet",
row_count=1000,
column_count=26
)
# Delete a sheet (need sheet_id from metadata)
result = google_sheets_delete_sheet(
spreadsheet_id="1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
sheet_id=123456
)
```
## A1 Notation
Google Sheets uses A1 notation to reference cells and ranges:
- Single cell: `Sheet1!A1`
- Range: `Sheet1!A1:D10`
- Entire column: `Sheet1!A:A`
- Entire row: `Sheet1!1:1`
- Multiple sheets: Use sheet name prefix
## Value Input Options
When writing data, you can specify how values should be interpreted:
- `USER_ENTERED` (default): Parse values as if typed by a user (formulas, numbers, dates)
- `RAW`: Store values as-is without parsing
## Value Render Options
When reading data, you can specify how values should be rendered:
- `FORMATTED_VALUE` (default): Values as they appear in the UI
- `UNFORMATTED_VALUE`: Unformatted values (numbers as numbers)
- `FORMULA`: Cell formulas
## Error Handling
All tools return error information in the response:
```python
{
"error": "Error message",
"help": "Suggestion for fixing the error" # When applicable
}
```
Common errors:
- `401`: Invalid or expired access token
- `403`: Insufficient permissions (check scopes)
- `404`: Spreadsheet or range not found
- `429`: Rate limit exceeded
## API Reference
- [Google Sheets API v4 Documentation](https://developers.google.com/sheets/api/reference/rest)
- [A1 Notation Guide](https://developers.google.com/sheets/api/guides/concepts#cell)
- [OAuth2 Scopes](https://developers.google.com/sheets/api/guides/authorizing)
@@ -0,0 +1,5 @@
"""Google Sheets integration tool."""
from .google_sheets_tool import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,615 @@
"""
Google Sheets Tool - Read, write, and manage Google Sheets via Google Sheets API v4.
Supports:
- OAuth2 access tokens via the credential store (key: "google")
- Environment variable: GOOGLE_ACCESS_TOKEN
API Reference: https://developers.google.com/sheets/api/reference/rest
"""
from __future__ import annotations
import os
from typing import TYPE_CHECKING, Any
from urllib.parse import quote
import httpx
from fastmcp import FastMCP
if TYPE_CHECKING:
from aden_tools.credentials import CredentialStoreAdapter
GOOGLE_SHEETS_API_BASE = "https://sheets.googleapis.com/v4/spreadsheets"
def _encode_range(range_name: str) -> str:
"""URL-encode an A1 notation range for safe use in URL paths."""
return quote(range_name, safe="")
class _GoogleSheetsClient:
"""Internal client wrapping Google Sheets API v4 calls."""
def __init__(self, access_token: str):
self._token = access_token
@property
def _headers(self) -> dict[str, str]:
return {
"Authorization": f"Bearer {self._token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def _handle_response(self, response: httpx.Response) -> dict[str, Any]:
"""Handle common HTTP error codes."""
if response.status_code == 401:
return {"error": "Invalid or expired Google Sheets access token"}
if response.status_code == 403:
return {"error": "Insufficient permissions. Check your Google API scopes."}
if response.status_code == 404:
return {"error": "Spreadsheet or range not found"}
if response.status_code == 429:
return {"error": "Google API rate limit exceeded. Try again later."}
if response.status_code >= 400:
try:
detail = response.json().get("error", {}).get("message", response.text)
except Exception:
detail = response.text
return {"error": f"Google Sheets API error (HTTP {response.status_code}): {detail}"}
return response.json()
def get_spreadsheet(
self,
spreadsheet_id: str,
include_grid_data: bool = False,
) -> dict[str, Any]:
"""Get spreadsheet metadata."""
params = {}
if include_grid_data:
params["includeGridData"] = "true"
response = httpx.get(
f"{GOOGLE_SHEETS_API_BASE}/{spreadsheet_id}",
headers=self._headers,
params=params,
timeout=30.0,
)
return self._handle_response(response)
def create_spreadsheet(
self,
title: str,
sheet_titles: list[str] | None = None,
) -> dict[str, Any]:
"""Create a new spreadsheet."""
body: dict[str, Any] = {"properties": {"title": title}}
if sheet_titles:
body["sheets"] = [
{"properties": {"title": sheet_title}} for sheet_title in sheet_titles
]
response = httpx.post(
GOOGLE_SHEETS_API_BASE,
headers=self._headers,
json=body,
timeout=30.0,
)
return self._handle_response(response)
def get_values(
self,
spreadsheet_id: str,
range_name: str,
value_render_option: str = "FORMATTED_VALUE",
) -> dict[str, Any]:
"""Get values from a range."""
params = {"valueRenderOption": value_render_option}
response = httpx.get(
f"{GOOGLE_SHEETS_API_BASE}/{spreadsheet_id}/values/{_encode_range(range_name)}",
headers=self._headers,
params=params,
timeout=30.0,
)
return self._handle_response(response)
def update_values(
self,
spreadsheet_id: str,
range_name: str,
values: list[list[Any]],
value_input_option: str = "USER_ENTERED",
) -> dict[str, Any]:
"""Update values in a range."""
params = {"valueInputOption": value_input_option}
body = {"values": values}
response = httpx.put(
f"{GOOGLE_SHEETS_API_BASE}/{spreadsheet_id}/values/{_encode_range(range_name)}",
headers=self._headers,
params=params,
json=body,
timeout=30.0,
)
return self._handle_response(response)
def append_values(
self,
spreadsheet_id: str,
range_name: str,
values: list[list[Any]],
value_input_option: str = "USER_ENTERED",
) -> dict[str, Any]:
"""Append values to a sheet."""
params = {"valueInputOption": value_input_option}
body = {"values": values}
response = httpx.post(
f"{GOOGLE_SHEETS_API_BASE}/{spreadsheet_id}/values/{_encode_range(range_name)}:append",
headers=self._headers,
params=params,
json=body,
timeout=30.0,
)
return self._handle_response(response)
def clear_values(
self,
spreadsheet_id: str,
range_name: str,
) -> dict[str, Any]:
"""Clear values in a range."""
response = httpx.post(
f"{GOOGLE_SHEETS_API_BASE}/{spreadsheet_id}/values/{_encode_range(range_name)}:clear",
headers=self._headers,
timeout=30.0,
)
return self._handle_response(response)
def batch_update_values(
self,
spreadsheet_id: str,
data: list[dict[str, Any]],
value_input_option: str = "USER_ENTERED",
) -> dict[str, Any]:
"""Batch update multiple ranges."""
body = {
"valueInputOption": value_input_option,
"data": data,
}
response = httpx.post(
f"{GOOGLE_SHEETS_API_BASE}/{spreadsheet_id}/values:batchUpdate",
headers=self._headers,
json=body,
timeout=30.0,
)
return self._handle_response(response)
def batch_clear_values(
self,
spreadsheet_id: str,
ranges: list[str],
) -> dict[str, Any]:
"""Batch clear multiple ranges."""
body = {"ranges": ranges}
response = httpx.post(
f"{GOOGLE_SHEETS_API_BASE}/{spreadsheet_id}/values:batchClear",
headers=self._headers,
json=body,
timeout=30.0,
)
return self._handle_response(response)
def add_sheet(
self,
spreadsheet_id: str,
title: str,
row_count: int = 1000,
column_count: int = 26,
) -> dict[str, Any]:
"""Add a new sheet to a spreadsheet."""
body = {
"requests": [
{
"addSheet": {
"properties": {
"title": title,
"gridProperties": {
"rowCount": row_count,
"columnCount": column_count,
},
}
}
}
]
}
response = httpx.post(
f"{GOOGLE_SHEETS_API_BASE}/{spreadsheet_id}:batchUpdate",
headers=self._headers,
json=body,
timeout=30.0,
)
return self._handle_response(response)
def delete_sheet(
self,
spreadsheet_id: str,
sheet_id: int,
) -> dict[str, Any]:
"""Delete a sheet from a spreadsheet."""
body = {"requests": [{"deleteSheet": {"sheetId": sheet_id}}]}
response = httpx.post(
f"{GOOGLE_SHEETS_API_BASE}/{spreadsheet_id}:batchUpdate",
headers=self._headers,
json=body,
timeout=30.0,
)
return self._handle_response(response)
def register_tools(
mcp: FastMCP,
credentials: CredentialStoreAdapter | None = None,
) -> None:
"""Register Google Sheets tools with the MCP server."""
def _get_token() -> str | None:
"""Get Google access token from credential manager or environment."""
if credentials is not None:
token = credentials.get("google")
# Defensive check: ensure we get a string, not a complex object
if token is not None and not isinstance(token, str):
raise TypeError(
f"Expected string from credentials.get('google'), got {type(token).__name__}"
)
return token
return os.getenv("GOOGLE_ACCESS_TOKEN")
def _get_client() -> _GoogleSheetsClient | dict[str, str]:
"""Get a Google Sheets client, or return an error dict if no credentials."""
token = _get_token()
if not token:
return {
"error": "Google Sheets credentials not configured",
"help": (
"Set GOOGLE_ACCESS_TOKEN environment variable "
"or configure 'google' via credential store"
),
}
return _GoogleSheetsClient(token)
def _sanitize_error(e: Exception) -> str:
"""Sanitize exception message to avoid leaking sensitive data like tokens."""
msg = str(e)
if "Bearer" in msg or "Authorization" in msg:
return f"{type(e).__name__}: Request failed (details redacted for security)"
if len(msg) > 200:
return f"{type(e).__name__}: {msg[:200]}..."
return msg
# --- Spreadsheet Management ---
@mcp.tool()
def google_sheets_get_spreadsheet(
spreadsheet_id: str,
include_grid_data: bool = False,
# Tracking parameters (injected by framework, ignored by tool)
workspace_id: str | None = None,
agent_id: str | None = None,
session_id: str | None = None,
) -> dict:
"""
Get Google Sheets spreadsheet metadata.
Args:
spreadsheet_id: The spreadsheet ID (from the URL)
include_grid_data: Whether to include cell data (default False)
Returns:
Dict with spreadsheet metadata or error
"""
client = _get_client()
if isinstance(client, dict):
return client
try:
return client.get_spreadsheet(spreadsheet_id, include_grid_data)
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {_sanitize_error(e)}"}
@mcp.tool()
def google_sheets_create_spreadsheet(
title: str,
sheet_titles: list[str] | None = None,
# Tracking parameters (injected by framework, ignored by tool)
workspace_id: str | None = None,
agent_id: str | None = None,
session_id: str | None = None,
) -> dict:
"""
Create a new Google Sheets spreadsheet.
Args:
title: The spreadsheet title
sheet_titles: Optional list of sheet/tab names to create
Returns:
Dict with created spreadsheet data or error
"""
client = _get_client()
if isinstance(client, dict):
return client
try:
return client.create_spreadsheet(title, sheet_titles)
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {_sanitize_error(e)}"}
# --- Reading Data ---
@mcp.tool()
def google_sheets_get_values(
spreadsheet_id: str,
range_name: str,
value_render_option: str = "FORMATTED_VALUE",
# Tracking parameters (injected by framework, ignored by tool)
workspace_id: str | None = None,
agent_id: str | None = None,
session_id: str | None = None,
) -> dict:
"""
Get values from a Google Sheets range.
Args:
spreadsheet_id: The spreadsheet ID (from the URL)
range_name: The A1 notation range (e.g., "Sheet1!A1:B10")
value_render_option: How to render values
(FORMATTED_VALUE, UNFORMATTED_VALUE, FORMULA)
Returns:
Dict with values or error
"""
client = _get_client()
if isinstance(client, dict):
return client
try:
return client.get_values(spreadsheet_id, range_name, value_render_option)
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {_sanitize_error(e)}"}
# --- Writing Data ---
@mcp.tool()
def google_sheets_update_values(
spreadsheet_id: str,
range_name: str,
values: list[list[Any]],
value_input_option: str = "USER_ENTERED",
# Tracking parameters (injected by framework, ignored by tool)
workspace_id: str | None = None,
agent_id: str | None = None,
session_id: str | None = None,
) -> dict:
"""
Update values in a Google Sheets range.
Args:
spreadsheet_id: The spreadsheet ID (from the URL)
range_name: The A1 notation range (e.g., "Sheet1!A1:B10")
values: 2D array of values to write
value_input_option: How to interpret input
(USER_ENTERED parses, RAW stores as-is)
Returns:
Dict with update result or error
"""
client = _get_client()
if isinstance(client, dict):
return client
try:
return client.update_values(spreadsheet_id, range_name, values, value_input_option)
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {_sanitize_error(e)}"}
@mcp.tool()
def google_sheets_append_values(
spreadsheet_id: str,
range_name: str,
values: list[list[Any]],
value_input_option: str = "USER_ENTERED",
# Tracking parameters (injected by framework, ignored by tool)
workspace_id: str | None = None,
agent_id: str | None = None,
session_id: str | None = None,
) -> dict:
"""
Append values to a Google Sheets range.
Args:
spreadsheet_id: The spreadsheet ID (from the URL)
range_name: The A1 notation range (e.g., "Sheet1!A1")
values: 2D array of values to append
value_input_option: How to interpret input
(USER_ENTERED parses, RAW stores as-is)
Returns:
Dict with append result or error
"""
client = _get_client()
if isinstance(client, dict):
return client
try:
return client.append_values(spreadsheet_id, range_name, values, value_input_option)
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {_sanitize_error(e)}"}
@mcp.tool()
def google_sheets_clear_values(
spreadsheet_id: str,
range_name: str,
# Tracking parameters (injected by framework, ignored by tool)
workspace_id: str | None = None,
agent_id: str | None = None,
session_id: str | None = None,
) -> dict:
"""
Clear values in a Google Sheets range.
Args:
spreadsheet_id: The spreadsheet ID (from the URL)
range_name: The A1 notation range (e.g., "Sheet1!A1:B10")
Returns:
Dict with clear result or error
"""
client = _get_client()
if isinstance(client, dict):
return client
try:
return client.clear_values(spreadsheet_id, range_name)
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {_sanitize_error(e)}"}
# --- Batch Operations ---
@mcp.tool()
def google_sheets_batch_update_values(
spreadsheet_id: str,
data: list[dict[str, Any]],
value_input_option: str = "USER_ENTERED",
# Tracking parameters (injected by framework, ignored by tool)
workspace_id: str | None = None,
agent_id: str | None = None,
session_id: str | None = None,
) -> dict:
"""
Batch update multiple ranges in a Google Sheets spreadsheet.
Args:
spreadsheet_id: The spreadsheet ID (from the URL)
data: List of update objects with "range" and "values" keys
value_input_option: How to interpret input
(USER_ENTERED parses, RAW stores as-is)
Returns:
Dict with batch update result or error
"""
client = _get_client()
if isinstance(client, dict):
return client
try:
return client.batch_update_values(spreadsheet_id, data, value_input_option)
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {_sanitize_error(e)}"}
@mcp.tool()
def google_sheets_batch_clear_values(
spreadsheet_id: str,
ranges: list[str],
# Tracking parameters (injected by framework, ignored by tool)
workspace_id: str | None = None,
agent_id: str | None = None,
session_id: str | None = None,
) -> dict:
"""
Batch clear multiple ranges in a Google Sheets spreadsheet.
Args:
spreadsheet_id: The spreadsheet ID (from the URL)
ranges: List of A1 notation ranges to clear
Returns:
Dict with batch clear result or error
"""
client = _get_client()
if isinstance(client, dict):
return client
try:
return client.batch_clear_values(spreadsheet_id, ranges)
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {_sanitize_error(e)}"}
# --- Sheet Management ---
@mcp.tool()
def google_sheets_add_sheet(
spreadsheet_id: str,
title: str,
row_count: int = 1000,
column_count: int = 26,
# Tracking parameters (injected by framework, ignored by tool)
workspace_id: str | None = None,
agent_id: str | None = None,
session_id: str | None = None,
) -> dict:
"""
Add a new sheet/tab to a Google Sheets spreadsheet.
Args:
spreadsheet_id: The spreadsheet ID (from the URL)
title: The sheet title
row_count: Number of rows (default 1000)
column_count: Number of columns (default 26)
Returns:
Dict with add sheet result or error
"""
client = _get_client()
if isinstance(client, dict):
return client
try:
return client.add_sheet(spreadsheet_id, title, row_count, column_count)
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {_sanitize_error(e)}"}
@mcp.tool()
def google_sheets_delete_sheet(
spreadsheet_id: str,
sheet_id: int,
# Tracking parameters (injected by framework, ignored by tool)
workspace_id: str | None = None,
agent_id: str | None = None,
session_id: str | None = None,
) -> dict:
"""
Delete a sheet/tab from a Google Sheets spreadsheet.
Args:
spreadsheet_id: The spreadsheet ID (from the URL)
sheet_id: The numeric sheet ID (not the title)
Returns:
Dict with delete result or error
"""
client = _get_client()
if isinstance(client, dict):
return client
try:
return client.delete_sheet(spreadsheet_id, sheet_id)
except httpx.TimeoutException:
return {"error": "Request timed out"}
except httpx.RequestError as e:
return {"error": f"Network error: {_sanitize_error(e)}"}
@@ -0,0 +1,255 @@
"""
Integration tests for Google Sheets tool against the real Google Sheets API.
These tests create a real spreadsheet, perform CRUD operations, and clean up.
They require a valid Google OAuth2 token with Sheets + Drive scopes.
Run with:
PYTHONPATH=core:tools/src python -m pytest \
tools/src/aden_tools/tools/google_sheets_tool/tests/test_google_sheets_integration.py -v
Skipped automatically if no Google credential is available.
"""
from __future__ import annotations
import uuid
import httpx
import pytest
from aden_tools.tools.google_sheets_tool.google_sheets_tool import (
_GoogleSheetsClient,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _get_google_token() -> str | None:
"""Try to get a Google OAuth token from the credential store.
Uses CredentialStoreAdapter.default() which wires up AdenCachedStorage
with the provider index, so ``get("google")`` resolves to the Aden-managed
OAuth token (compound ID) rather than requiring a plain ``google.enc`` file.
"""
try:
from aden_tools.credentials import CredentialStoreAdapter
adapter = CredentialStoreAdapter.default()
return adapter.get("google")
except Exception:
return None
_TOKEN = _get_google_token()
pytestmark = pytest.mark.skipif(
_TOKEN is None,
reason="No Google credential available (need credential store with 'google' token)",
)
def _delete_spreadsheet(token: str, spreadsheet_id: str) -> None:
"""Delete a spreadsheet via Google Drive API (cleanup helper)."""
httpx.delete(
f"https://www.googleapis.com/drive/v3/files/{spreadsheet_id}",
headers={"Authorization": f"Bearer {token}"},
timeout=15.0,
)
@pytest.fixture()
def client() -> _GoogleSheetsClient:
"""Create a real client with the stored Google token."""
assert _TOKEN is not None
return _GoogleSheetsClient(_TOKEN)
@pytest.fixture()
def spreadsheet(client: _GoogleSheetsClient):
"""Create a temporary spreadsheet and delete it after the test."""
unique = uuid.uuid4().hex[:8]
title = f"hive-integration-test-{unique}"
result = client.create_spreadsheet(title, sheet_titles=["Data", "Extra"])
assert "error" not in result, f"Failed to create spreadsheet: {result}"
spreadsheet_id = result["spreadsheetId"]
yield spreadsheet_id, result
# Cleanup: delete via Drive API
assert _TOKEN is not None
_delete_spreadsheet(_TOKEN, spreadsheet_id)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestCreateAndGetSpreadsheet:
def test_create_spreadsheet(self, spreadsheet):
"""Creating a spreadsheet returns a valid ID and the requested sheets."""
spreadsheet_id, result = spreadsheet
assert spreadsheet_id
sheets = result.get("sheets", [])
titles = [s["properties"]["title"] for s in sheets]
assert "Data" in titles
assert "Extra" in titles
def test_get_spreadsheet_metadata(self, client, spreadsheet):
"""Getting a spreadsheet returns its metadata."""
spreadsheet_id, _ = spreadsheet
result = client.get_spreadsheet(spreadsheet_id)
assert "error" not in result, f"Failed to get spreadsheet: {result}"
assert result["spreadsheetId"] == spreadsheet_id
assert "properties" in result
class TestReadWriteValues:
def test_write_and_read_values(self, client, spreadsheet):
"""Write values to a range and read them back."""
spreadsheet_id, _ = spreadsheet
values = [["Name", "Score"], ["Alice", "95"], ["Bob", "87"]]
# Write
update_result = client.update_values(spreadsheet_id, "Data!A1:B3", values)
assert "error" not in update_result, f"Failed to update: {update_result}"
# Read back
get_result = client.get_values(spreadsheet_id, "Data!A1:B3")
assert "error" not in get_result, f"Failed to get values: {get_result}"
assert get_result["values"] == values
def test_append_values(self, client, spreadsheet):
"""Append rows to an existing range."""
spreadsheet_id, _ = spreadsheet
# Seed initial data
client.update_values(spreadsheet_id, "Data!A1:B1", [["Name", "Score"]])
# Append
append_result = client.append_values(spreadsheet_id, "Data!A1", [["Charlie", "72"]])
assert "error" not in append_result, f"Failed to append: {append_result}"
# Verify row 2 has the appended data
get_result = client.get_values(spreadsheet_id, "Data!A2:B2")
assert "error" not in get_result, f"Failed to read: {get_result}"
assert get_result["values"] == [["Charlie", "72"]]
def test_clear_values(self, client, spreadsheet):
"""Clear a range and verify it's empty."""
spreadsheet_id, _ = spreadsheet
# Write data
client.update_values(spreadsheet_id, "Data!A1:B1", [["hello", "world"]])
# Clear
clear_result = client.clear_values(spreadsheet_id, "Data!A1:B1")
assert "error" not in clear_result, f"Failed to clear: {clear_result}"
# Verify empty
get_result = client.get_values(spreadsheet_id, "Data!A1:B1")
assert "error" not in get_result
# Google returns no "values" key for empty ranges
assert "values" not in get_result
class TestBatchOperations:
def test_batch_update_values(self, client, spreadsheet):
"""Batch update multiple ranges at once."""
spreadsheet_id, _ = spreadsheet
data = [
{"range": "Data!A1:A2", "values": [["X"], ["Y"]]},
{"range": "Data!C1:C2", "values": [["P"], ["Q"]]},
]
result = client.batch_update_values(spreadsheet_id, data)
assert "error" not in result, f"Batch update failed: {result}"
# Verify both ranges
a_vals = client.get_values(spreadsheet_id, "Data!A1:A2")
c_vals = client.get_values(spreadsheet_id, "Data!C1:C2")
assert a_vals["values"] == [["X"], ["Y"]]
assert c_vals["values"] == [["P"], ["Q"]]
def test_batch_clear_values(self, client, spreadsheet):
"""Batch clear multiple ranges."""
spreadsheet_id, _ = spreadsheet
# Write to two ranges
client.batch_update_values(
spreadsheet_id,
[
{"range": "Data!A1", "values": [["keep"]]},
{"range": "Data!B1", "values": [["remove"]]},
{"range": "Data!C1", "values": [["remove"]]},
],
)
# Batch clear B1 and C1
result = client.batch_clear_values(spreadsheet_id, ["Data!B1", "Data!C1"])
assert "error" not in result, f"Batch clear failed: {result}"
# A1 should still have data
a_vals = client.get_values(spreadsheet_id, "Data!A1")
assert a_vals["values"] == [["keep"]]
class TestSheetManagement:
def test_add_and_delete_sheet(self, client, spreadsheet):
"""Add a new sheet tab and then delete it."""
spreadsheet_id, _ = spreadsheet
# Add sheet
add_result = client.add_sheet(spreadsheet_id, "Temp Sheet")
assert "error" not in add_result, f"Add sheet failed: {add_result}"
# Extract the new sheet ID
new_sheet_id = add_result["replies"][0]["addSheet"]["properties"]["sheetId"]
assert isinstance(new_sheet_id, int)
# Delete it
del_result = client.delete_sheet(spreadsheet_id, new_sheet_id)
assert "error" not in del_result, f"Delete sheet failed: {del_result}"
# Verify the sheet is gone
meta = client.get_spreadsheet(spreadsheet_id)
sheet_titles = [s["properties"]["title"] for s in meta.get("sheets", [])]
assert "Temp Sheet" not in sheet_titles
class TestMCPToolRegistration:
"""Test that the MCP tools work end-to-end with real credentials."""
def test_tools_via_register(self):
"""Register tools via the public API and call one."""
from unittest.mock import MagicMock
from aden_tools.credentials import CredentialStoreAdapter
from aden_tools.tools.google_sheets_tool.google_sheets_tool import (
register_tools,
)
creds = CredentialStoreAdapter.default()
mcp = MagicMock()
registered_fns = []
mcp.tool.return_value = lambda fn: registered_fns.append(fn) or fn
register_tools(mcp, credentials=creds)
# Find the create tool
create_fn = next(
f for f in registered_fns if f.__name__ == "google_sheets_create_spreadsheet"
)
unique = uuid.uuid4().hex[:8]
result = create_fn(title=f"hive-mcp-test-{unique}")
assert "error" not in result, f"MCP create failed: {result}"
spreadsheet_id = result["spreadsheetId"]
assert spreadsheet_id
# Cleanup
assert _TOKEN is not None
_delete_spreadsheet(_TOKEN, spreadsheet_id)
@@ -0,0 +1,775 @@
"""
Tests for Google Sheets tool.
Covers:
- _GoogleSheetsClient methods (all CRUD operations)
- Error handling (401, 403, 404, 429, 500, timeout)
- Credential retrieval (CredentialStoreAdapter vs env var)
- All 11 MCP tool functions
- Batch operations
- Sheet management
"""
from __future__ import annotations
from unittest.mock import MagicMock, patch
import httpx
import pytest
from aden_tools.tools.google_sheets_tool.google_sheets_tool import (
GOOGLE_SHEETS_API_BASE,
_encode_range,
_GoogleSheetsClient,
register_tools,
)
# --- _GoogleSheetsClient tests ---
class TestGoogleSheetsClient:
def setup_method(self):
self.client = _GoogleSheetsClient("test-token")
def test_headers(self):
headers = self.client._headers
assert headers["Authorization"] == "Bearer test-token"
assert headers["Content-Type"] == "application/json"
def test_handle_response_success(self):
response = MagicMock()
response.status_code = 200
response.json.return_value = {"spreadsheetId": "123"}
assert self.client._handle_response(response) == {"spreadsheetId": "123"}
@pytest.mark.parametrize(
"status_code,expected_substring",
[
(401, "Invalid or expired"),
(403, "Insufficient permissions"),
(404, "not found"),
(429, "rate limit"),
],
)
def test_handle_response_errors(self, status_code, expected_substring):
response = MagicMock()
response.status_code = status_code
result = self.client._handle_response(response)
assert "error" in result
assert expected_substring in result["error"]
def test_handle_response_generic_error(self):
response = MagicMock()
response.status_code = 500
response.json.return_value = {"error": {"message": "Internal Server Error"}}
result = self.client._handle_response(response)
assert "error" in result
assert "500" in result["error"]
def test_handle_response_generic_error_fallback(self):
response = MagicMock()
response.status_code = 500
response.json.side_effect = Exception("parse error")
response.text = "Internal Server Error"
result = self.client._handle_response(response)
assert "error" in result
assert "500" in result["error"]
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get")
def test_get_spreadsheet(self, mock_get):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"spreadsheetId": "123",
"properties": {"title": "Test Sheet"},
}
mock_get.return_value = mock_response
result = self.client.get_spreadsheet("123")
mock_get.assert_called_once_with(
f"{GOOGLE_SHEETS_API_BASE}/123",
headers=self.client._headers,
params={},
timeout=30.0,
)
assert result["spreadsheetId"] == "123"
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get")
def test_get_spreadsheet_with_grid_data(self, mock_get):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"spreadsheetId": "123"}
mock_get.return_value = mock_response
self.client.get_spreadsheet("123", include_grid_data=True)
assert mock_get.call_args.kwargs["params"]["includeGridData"] == "true"
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_create_spreadsheet(self, mock_post):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"spreadsheetId": "456",
"properties": {"title": "New Sheet"},
}
mock_post.return_value = mock_response
result = self.client.create_spreadsheet("New Sheet")
mock_post.assert_called_once_with(
GOOGLE_SHEETS_API_BASE,
headers=self.client._headers,
json={"properties": {"title": "New Sheet"}},
timeout=30.0,
)
assert result["spreadsheetId"] == "456"
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_create_spreadsheet_with_sheets(self, mock_post):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"spreadsheetId": "456"}
mock_post.return_value = mock_response
self.client.create_spreadsheet("New Sheet", sheet_titles=["Sheet1", "Sheet2"])
call_json = mock_post.call_args.kwargs["json"]
assert "sheets" in call_json
assert len(call_json["sheets"]) == 2
assert call_json["sheets"][0]["properties"]["title"] == "Sheet1"
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get")
def test_get_values(self, mock_get):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"range": "Sheet1!A1:B2",
"values": [["A1", "B1"], ["A2", "B2"]],
}
mock_get.return_value = mock_response
result = self.client.get_values("123", "Sheet1!A1:B2")
mock_get.assert_called_once_with(
f"{GOOGLE_SHEETS_API_BASE}/123/values/{_encode_range('Sheet1!A1:B2')}",
headers=self.client._headers,
params={"valueRenderOption": "FORMATTED_VALUE"},
timeout=30.0,
)
assert result["values"] == [["A1", "B1"], ["A2", "B2"]]
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get")
def test_get_values_unformatted(self, mock_get):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"values": [["1", "2"]]}
mock_get.return_value = mock_response
self.client.get_values("123", "Sheet1!A1:B1", value_render_option="UNFORMATTED_VALUE")
assert mock_get.call_args.kwargs["params"]["valueRenderOption"] == "UNFORMATTED_VALUE"
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get")
def test_get_values_encodes_range_with_spaces(self, mock_get):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"values": [["A"]]}
mock_get.return_value = mock_response
self.client.get_values("123", "My Sheet!A1:B2")
url = mock_get.call_args.args[0]
assert "My Sheet" not in url
assert "My%20Sheet" in url
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.put")
def test_update_values(self, mock_put):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"updatedCells": 4,
"updatedRows": 2,
}
mock_put.return_value = mock_response
values = [["A1", "B1"], ["A2", "B2"]]
result = self.client.update_values("123", "Sheet1!A1:B2", values)
mock_put.assert_called_once_with(
f"{GOOGLE_SHEETS_API_BASE}/123/values/{_encode_range('Sheet1!A1:B2')}",
headers=self.client._headers,
params={"valueInputOption": "USER_ENTERED"},
json={"values": values},
timeout=30.0,
)
assert result["updatedCells"] == 4
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.put")
def test_update_values_raw(self, mock_put):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"updatedCells": 1}
mock_put.return_value = mock_response
self.client.update_values("123", "Sheet1!A1", [["value"]], value_input_option="RAW")
assert mock_put.call_args.kwargs["params"]["valueInputOption"] == "RAW"
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_append_values(self, mock_post):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"updates": {"updatedCells": 2},
}
mock_post.return_value = mock_response
values = [["new", "row"]]
result = self.client.append_values("123", "Sheet1!A1", values)
mock_post.assert_called_once_with(
f"{GOOGLE_SHEETS_API_BASE}/123/values/{_encode_range('Sheet1!A1')}:append",
headers=self.client._headers,
params={"valueInputOption": "USER_ENTERED"},
json={"values": values},
timeout=30.0,
)
assert "updates" in result
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_clear_values(self, mock_post):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"clearedRange": "Sheet1!A1:B2"}
mock_post.return_value = mock_response
result = self.client.clear_values("123", "Sheet1!A1:B2")
mock_post.assert_called_once_with(
f"{GOOGLE_SHEETS_API_BASE}/123/values/{_encode_range('Sheet1!A1:B2')}:clear",
headers=self.client._headers,
timeout=30.0,
)
assert result["clearedRange"] == "Sheet1!A1:B2"
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_batch_update_values(self, mock_post):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"totalUpdatedCells": 6,
}
mock_post.return_value = mock_response
data = [
{"range": "Sheet1!A1:B1", "values": [["A", "B"]]},
{"range": "Sheet1!A2:B2", "values": [["C", "D"]]},
]
result = self.client.batch_update_values("123", data)
mock_post.assert_called_once_with(
f"{GOOGLE_SHEETS_API_BASE}/123/values:batchUpdate",
headers=self.client._headers,
json={"valueInputOption": "USER_ENTERED", "data": data},
timeout=30.0,
)
assert result["totalUpdatedCells"] == 6
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_batch_clear_values(self, mock_post):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"clearedRanges": ["Sheet1!A1:B1", "Sheet1!C1:D1"],
}
mock_post.return_value = mock_response
ranges = ["Sheet1!A1:B1", "Sheet1!C1:D1"]
result = self.client.batch_clear_values("123", ranges)
mock_post.assert_called_once_with(
f"{GOOGLE_SHEETS_API_BASE}/123/values:batchClear",
headers=self.client._headers,
json={"ranges": ranges},
timeout=30.0,
)
assert len(result["clearedRanges"]) == 2
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_add_sheet(self, mock_post):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"replies": [{"addSheet": {"properties": {"sheetId": 1, "title": "New Sheet"}}}]
}
mock_post.return_value = mock_response
result = self.client.add_sheet("123", "New Sheet")
mock_post.assert_called_once_with(
f"{GOOGLE_SHEETS_API_BASE}/123:batchUpdate",
headers=self.client._headers,
json={
"requests": [
{
"addSheet": {
"properties": {
"title": "New Sheet",
"gridProperties": {
"rowCount": 1000,
"columnCount": 26,
},
}
}
}
]
},
timeout=30.0,
)
assert "replies" in result
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_add_sheet_custom_dimensions(self, mock_post):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"replies": []}
mock_post.return_value = mock_response
self.client.add_sheet("123", "Custom Sheet", row_count=500, column_count=10)
call_json = mock_post.call_args.kwargs["json"]
grid_props = call_json["requests"][0]["addSheet"]["properties"]["gridProperties"]
assert grid_props["rowCount"] == 500
assert grid_props["columnCount"] == 10
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_delete_sheet(self, mock_post):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"replies": [{}]}
mock_post.return_value = mock_response
result = self.client.delete_sheet("123", 456)
mock_post.assert_called_once_with(
f"{GOOGLE_SHEETS_API_BASE}/123:batchUpdate",
headers=self.client._headers,
json={"requests": [{"deleteSheet": {"sheetId": 456}}]},
timeout=30.0,
)
assert "replies" in result
# --- MCP tool registration and credential tests ---
class TestToolRegistration:
def test_register_tools_registers_all_tools(self):
mcp = MagicMock()
mcp.tool.return_value = lambda fn: fn
register_tools(mcp)
assert mcp.tool.call_count == 10
def test_no_credentials_returns_error(self):
mcp = MagicMock()
registered_fns = []
mcp.tool.return_value = lambda fn: registered_fns.append(fn) or fn
with patch.dict("os.environ", {}, clear=True):
register_tools(mcp, credentials=None)
# Pick the first tool and call it
get_fn = next(fn for fn in registered_fns if fn.__name__ == "google_sheets_get_values")
result = get_fn(spreadsheet_id="123", range_name="Sheet1!A1")
assert "error" in result
assert "not configured" in result["error"]
def test_credentials_from_credential_manager(self):
mcp = MagicMock()
registered_fns = []
mcp.tool.return_value = lambda fn: registered_fns.append(fn) or fn
cred_manager = MagicMock()
cred_manager.get.return_value = "test-token"
register_tools(mcp, credentials=cred_manager)
get_fn = next(fn for fn in registered_fns if fn.__name__ == "google_sheets_get_values")
with patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get") as mock_get:
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"values": [["test"]]}
mock_get.return_value = mock_response
result = get_fn(spreadsheet_id="123", range_name="Sheet1!A1")
cred_manager.get.assert_called_with("google")
assert result["values"] == [["test"]]
def test_credentials_from_env_var(self):
mcp = MagicMock()
registered_fns = []
mcp.tool.return_value = lambda fn: registered_fns.append(fn) or fn
register_tools(mcp, credentials=None)
get_fn = next(fn for fn in registered_fns if fn.__name__ == "google_sheets_get_values")
with (
patch.dict("os.environ", {"GOOGLE_ACCESS_TOKEN": "env-token"}),
patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get") as mock_get,
):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"values": [["test"]]}
mock_get.return_value = mock_response
result = get_fn(spreadsheet_id="123", range_name="Sheet1!A1")
assert result["values"] == [["test"]]
# Verify the token was used in headers
call_headers = mock_get.call_args.kwargs["headers"]
assert call_headers["Authorization"] == "Bearer env-token"
def test_credentials_wrong_type_raises_error(self):
mcp = MagicMock()
registered_fns = []
mcp.tool.return_value = lambda fn: registered_fns.append(fn) or fn
cred_manager = MagicMock()
cred_manager.get.return_value = {"not": "a string"}
register_tools(mcp, credentials=cred_manager)
get_fn = next(fn for fn in registered_fns if fn.__name__ == "google_sheets_get_values")
with pytest.raises(TypeError, match="Expected string"):
get_fn(spreadsheet_id="123", range_name="Sheet1!A1")
# --- Individual tool function tests ---
class TestSpreadsheetTools:
def setup_method(self):
self.mcp = MagicMock()
self.fns = []
self.mcp.tool.return_value = lambda fn: self.fns.append(fn) or fn
cred = MagicMock()
cred.get.return_value = "tok"
register_tools(self.mcp, credentials=cred)
def _fn(self, name):
return next(f for f in self.fns if f.__name__ == name)
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get")
def test_get_spreadsheet(self, mock_get):
mock_get.return_value = MagicMock(
status_code=200, json=MagicMock(return_value={"spreadsheetId": "123"})
)
result = self._fn("google_sheets_get_spreadsheet")(spreadsheet_id="123")
assert result["spreadsheetId"] == "123"
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_create_spreadsheet(self, mock_post):
mock_post.return_value = MagicMock(
status_code=200, json=MagicMock(return_value={"spreadsheetId": "456"})
)
result = self._fn("google_sheets_create_spreadsheet")(title="New Sheet")
assert result["spreadsheetId"] == "456"
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get")
def test_get_spreadsheet_timeout(self, mock_get):
mock_get.side_effect = httpx.TimeoutException("timed out")
result = self._fn("google_sheets_get_spreadsheet")(spreadsheet_id="123")
assert "error" in result
assert "timed out" in result["error"]
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_create_spreadsheet_network_error(self, mock_post):
mock_post.side_effect = httpx.RequestError("connection failed")
result = self._fn("google_sheets_create_spreadsheet")(title="New")
assert "error" in result
assert "Network error" in result["error"]
class TestReadDataTools:
def setup_method(self):
self.mcp = MagicMock()
self.fns = []
self.mcp.tool.return_value = lambda fn: self.fns.append(fn) or fn
cred = MagicMock()
cred.get.return_value = "tok"
register_tools(self.mcp, credentials=cred)
def _fn(self, name):
return next(f for f in self.fns if f.__name__ == name)
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get")
def test_get_values(self, mock_get):
mock_get.return_value = MagicMock(
status_code=200, json=MagicMock(return_value={"values": [["A", "B"]]})
)
result = self._fn("google_sheets_get_values")(
spreadsheet_id="123", range_name="Sheet1!A1:B1"
)
assert result["values"] == [["A", "B"]]
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get")
def test_get_values_timeout(self, mock_get):
mock_get.side_effect = httpx.TimeoutException("timed out")
result = self._fn("google_sheets_get_values")(spreadsheet_id="123", range_name="Sheet1!A1")
assert "error" in result
assert "timed out" in result["error"]
class TestWriteDataTools:
def setup_method(self):
self.mcp = MagicMock()
self.fns = []
self.mcp.tool.return_value = lambda fn: self.fns.append(fn) or fn
cred = MagicMock()
cred.get.return_value = "tok"
register_tools(self.mcp, credentials=cred)
def _fn(self, name):
return next(f for f in self.fns if f.__name__ == name)
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.put")
def test_update_values(self, mock_put):
mock_put.return_value = MagicMock(
status_code=200, json=MagicMock(return_value={"updatedCells": 2})
)
result = self._fn("google_sheets_update_values")(
spreadsheet_id="123", range_name="Sheet1!A1:B1", values=[["A", "B"]]
)
assert result["updatedCells"] == 2
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_append_values(self, mock_post):
mock_post.return_value = MagicMock(
status_code=200, json=MagicMock(return_value={"updates": {"updatedCells": 2}})
)
result = self._fn("google_sheets_append_values")(
spreadsheet_id="123", range_name="Sheet1!A1", values=[["new", "row"]]
)
assert "updates" in result
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_clear_values(self, mock_post):
mock_post.return_value = MagicMock(
status_code=200, json=MagicMock(return_value={"clearedRange": "Sheet1!A1:B2"})
)
result = self._fn("google_sheets_clear_values")(
spreadsheet_id="123", range_name="Sheet1!A1:B2"
)
assert result["clearedRange"] == "Sheet1!A1:B2"
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.put")
def test_update_values_network_error(self, mock_put):
mock_put.side_effect = httpx.RequestError("connection failed")
result = self._fn("google_sheets_update_values")(
spreadsheet_id="123", range_name="Sheet1!A1", values=[["test"]]
)
assert "error" in result
assert "Network error" in result["error"]
class TestBatchOperationsTools:
def setup_method(self):
self.mcp = MagicMock()
self.fns = []
self.mcp.tool.return_value = lambda fn: self.fns.append(fn) or fn
cred = MagicMock()
cred.get.return_value = "tok"
register_tools(self.mcp, credentials=cred)
def _fn(self, name):
return next(f for f in self.fns if f.__name__ == name)
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_batch_update_values(self, mock_post):
mock_post.return_value = MagicMock(
status_code=200, json=MagicMock(return_value={"totalUpdatedCells": 4})
)
data = [
{"range": "Sheet1!A1", "values": [["A"]]},
{"range": "Sheet1!B1", "values": [["B"]]},
]
result = self._fn("google_sheets_batch_update_values")(spreadsheet_id="123", data=data)
assert result["totalUpdatedCells"] == 4
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_batch_clear_values(self, mock_post):
mock_post.return_value = MagicMock(
status_code=200, json=MagicMock(return_value={"clearedRanges": ["Sheet1!A1"]})
)
result = self._fn("google_sheets_batch_clear_values")(
spreadsheet_id="123", ranges=["Sheet1!A1"]
)
assert "clearedRanges" in result
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_batch_update_values_timeout(self, mock_post):
mock_post.side_effect = httpx.TimeoutException("timed out")
result = self._fn("google_sheets_batch_update_values")(
spreadsheet_id="123", data=[{"range": "A1", "values": [["test"]]}]
)
assert "error" in result
assert "timed out" in result["error"]
class TestSheetManagementTools:
def setup_method(self):
self.mcp = MagicMock()
self.fns = []
self.mcp.tool.return_value = lambda fn: self.fns.append(fn) or fn
cred = MagicMock()
cred.get.return_value = "tok"
register_tools(self.mcp, credentials=cred)
def _fn(self, name):
return next(f for f in self.fns if f.__name__ == name)
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_add_sheet(self, mock_post):
mock_post.return_value = MagicMock(
status_code=200,
json=MagicMock(
return_value={"replies": [{"addSheet": {"properties": {"sheetId": 1}}}]}
),
)
result = self._fn("google_sheets_add_sheet")(spreadsheet_id="123", title="New Sheet")
assert "replies" in result
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_delete_sheet(self, mock_post):
mock_post.return_value = MagicMock(
status_code=200, json=MagicMock(return_value={"replies": [{}]})
)
result = self._fn("google_sheets_delete_sheet")(spreadsheet_id="123", sheet_id=456)
assert "replies" in result
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_add_sheet_network_error(self, mock_post):
mock_post.side_effect = httpx.RequestError("connection failed")
result = self._fn("google_sheets_add_sheet")(spreadsheet_id="123", title="New")
assert "error" in result
assert "Network error" in result["error"]
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_delete_sheet_timeout(self, mock_post):
mock_post.side_effect = httpx.TimeoutException("timed out")
result = self._fn("google_sheets_delete_sheet")(spreadsheet_id="123", sheet_id=1)
assert "error" in result
assert "timed out" in result["error"]
# --- Error sanitization tests ---
class TestErrorSanitization:
def setup_method(self):
self.mcp = MagicMock()
self.fns = []
self.mcp.tool.return_value = lambda fn: self.fns.append(fn) or fn
cred = MagicMock()
cred.get.return_value = "tok"
register_tools(self.mcp, credentials=cred)
def _fn(self, name):
return next(f for f in self.fns if f.__name__ == name)
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get")
def test_bearer_token_redacted_from_error(self, mock_get):
mock_get.side_effect = httpx.RequestError(
"Connection failed, Authorization: Bearer ya29.secret_token_here"
)
result = self._fn("google_sheets_get_spreadsheet")(spreadsheet_id="123")
assert "error" in result
assert "Network error" in result["error"]
assert "Bearer" not in result["error"]
assert "secret_token" not in result["error"]
assert "redacted" in result["error"]
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get")
def test_authorization_header_redacted_from_error(self, mock_get):
mock_get.side_effect = httpx.RequestError("Failed with Authorization header present")
result = self._fn("google_sheets_get_spreadsheet")(spreadsheet_id="123")
assert "error" in result
assert "Authorization" not in result["error"]
assert "redacted" in result["error"]
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get")
def test_long_error_message_truncated(self, mock_get):
long_msg = "x" * 300
mock_get.side_effect = httpx.RequestError(long_msg)
result = self._fn("google_sheets_get_spreadsheet")(spreadsheet_id="123")
assert "error" in result
assert len(result["error"]) < 300
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get")
def test_safe_error_message_passes_through(self, mock_get):
mock_get.side_effect = httpx.RequestError("connection refused")
result = self._fn("google_sheets_get_spreadsheet")(spreadsheet_id="123")
assert "error" in result
assert "connection refused" in result["error"]
# --- Tracking parameter tests ---
class TestTrackingParameters:
def setup_method(self):
self.mcp = MagicMock()
self.fns = []
self.mcp.tool.return_value = lambda fn: self.fns.append(fn) or fn
cred = MagicMock()
cred.get.return_value = "tok"
register_tools(self.mcp, credentials=cred)
def _fn(self, name):
return next(f for f in self.fns if f.__name__ == name)
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.get")
def test_tracking_params_accepted_by_get_spreadsheet(self, mock_get):
mock_get.return_value = MagicMock(
status_code=200, json=MagicMock(return_value={"spreadsheetId": "123"})
)
result = self._fn("google_sheets_get_spreadsheet")(
spreadsheet_id="123",
workspace_id="ws-1",
agent_id="agent-1",
session_id="sess-1",
)
assert result["spreadsheetId"] == "123"
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_tracking_params_accepted_by_create_spreadsheet(self, mock_post):
mock_post.return_value = MagicMock(
status_code=200, json=MagicMock(return_value={"spreadsheetId": "456"})
)
result = self._fn("google_sheets_create_spreadsheet")(
title="Test",
workspace_id="ws-1",
agent_id="agent-1",
session_id="sess-1",
)
assert result["spreadsheetId"] == "456"
@patch("aden_tools.tools.google_sheets_tool.google_sheets_tool.httpx.post")
def test_tracking_params_accepted_by_clear_values(self, mock_post):
mock_post.return_value = MagicMock(
status_code=200, json=MagicMock(return_value={"clearedRange": "A1:B2"})
)
result = self._fn("google_sheets_clear_values")(
spreadsheet_id="123",
range_name="Sheet1!A1:B2",
workspace_id="ws-1",
agent_id="agent-1",
session_id="sess-1",
)
assert result["clearedRange"] == "A1:B2"
+134 -35
View File
@@ -9,7 +9,7 @@ from aden_tools.credentials.health_check import (
AnthropicHealthChecker,
DiscordHealthChecker,
GitHubHealthChecker,
GoogleCalendarHealthChecker,
GoogleHealthChecker,
GoogleMapsHealthChecker,
GoogleSearchHealthChecker,
ResendHealthChecker,
@@ -45,10 +45,10 @@ class TestHealthCheckerRegistry:
assert "google_maps" in HEALTH_CHECKERS
assert isinstance(HEALTH_CHECKERS["google_maps"], GoogleMapsHealthChecker)
def test_google_calendar_oauth_registered(self):
"""GoogleCalendarHealthChecker is registered in HEALTH_CHECKERS."""
assert "google_calendar_oauth" in HEALTH_CHECKERS
assert isinstance(HEALTH_CHECKERS["google_calendar_oauth"], GoogleCalendarHealthChecker)
def test_google_registered(self):
"""GoogleHealthChecker is registered in HEALTH_CHECKERS under 'google'."""
assert "google" in HEALTH_CHECKERS
assert isinstance(HEALTH_CHECKERS["google"], GoogleHealthChecker)
def test_discord_registered(self):
"""DiscordHealthChecker is registered in HEALTH_CHECKERS."""
@@ -65,7 +65,7 @@ class TestHealthCheckerRegistry:
"anthropic",
"github",
"resend",
"google_calendar_oauth",
"google",
"slack",
"discord",
}
@@ -430,17 +430,136 @@ class TestCheckCredentialHealthDispatcher:
assert result.details.get("partial_check") is True
class TestGoogleCalendarHealthCheckerTokenSanitization:
"""Tests for token sanitization in GoogleCalendarHealthChecker error handling."""
class TestGoogleHealthChecker:
"""Tests for GoogleHealthChecker (Gmail, Calendar, Sheets)."""
def _setup_mock_client(self, mock_client_cls):
mock_client = MagicMock()
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
return mock_client
def _mock_response(self, status_code):
response = MagicMock(spec=httpx.Response)
response.status_code = status_code
return response
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_all_scopes_valid(self, mock_client_cls):
"""All three endpoints return 200/404 → valid."""
mock_client = self._setup_mock_client(mock_client_cls)
# Gmail 200, Calendar 200, Sheets 404 (no spreadsheet, but scope works)
mock_client.get.side_effect = [
self._mock_response(200),
self._mock_response(200),
self._mock_response(404),
]
checker = GoogleHealthChecker()
result = checker.check("test-token")
assert result.valid is True
assert "Gmail" in result.message
assert "Calendar" in result.message
assert "Sheets" in result.message
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_invalid_token_401_fails_fast(self, mock_client_cls):
"""401 on the first endpoint → token invalid, no further calls."""
mock_client = self._setup_mock_client(mock_client_cls)
mock_client.get.return_value = self._mock_response(401)
checker = GoogleHealthChecker()
result = checker.check("expired-token")
assert result.valid is False
assert result.details["status_code"] == 401
# Should fail fast — only one call made
assert mock_client.get.call_count == 1
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_missing_calendar_scope(self, mock_client_cls):
"""Gmail OK, Calendar 403, Sheets OK → reports missing calendar scope."""
mock_client = self._setup_mock_client(mock_client_cls)
mock_client.get.side_effect = [
self._mock_response(200), # gmail
self._mock_response(403), # calendar
self._mock_response(404), # sheets (404 = scope OK)
]
checker = GoogleHealthChecker()
result = checker.check("test-token")
assert result.valid is False
assert "calendar" in result.details["missing_scopes"]
assert "gmail" not in result.details["missing_scopes"]
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_missing_gmail_and_sheets_scopes(self, mock_client_cls):
"""Gmail 403, Calendar OK, Sheets 403 → reports both missing."""
mock_client = self._setup_mock_client(mock_client_cls)
mock_client.get.side_effect = [
self._mock_response(403), # gmail
self._mock_response(200), # calendar
self._mock_response(403), # sheets
]
checker = GoogleHealthChecker()
result = checker.check("test-token")
assert result.valid is False
assert "gmail" in result.details["missing_scopes"]
assert "sheets" in result.details["missing_scopes"]
assert len(result.details["missing_scopes"]) == 2
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_sheets_404_is_success(self, mock_client_cls):
"""Sheets returns 404 for non-existent spreadsheet — that's valid."""
mock_client = self._setup_mock_client(mock_client_cls)
mock_client.get.side_effect = [
self._mock_response(200),
self._mock_response(200),
self._mock_response(404),
]
checker = GoogleHealthChecker()
result = checker.check("test-token")
assert result.valid is True
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_unexpected_status_code(self, mock_client_cls):
"""500 on any endpoint → reports failure with scope name."""
mock_client = self._setup_mock_client(mock_client_cls)
mock_client.get.side_effect = [
self._mock_response(200), # gmail
self._mock_response(500), # calendar
]
checker = GoogleHealthChecker()
result = checker.check("test-token")
assert result.valid is False
assert result.details["status_code"] == 500
assert result.details["scope"] == "calendar"
@patch("aden_tools.credentials.health_check.httpx.Client")
def test_timeout(self, mock_client_cls):
mock_client = self._setup_mock_client(mock_client_cls)
mock_client.get.side_effect = httpx.TimeoutException("timed out")
checker = GoogleHealthChecker()
result = checker.check("test-token")
assert result.valid is False
assert result.details["error"] == "timeout"
def test_request_error_with_bearer_token_sanitized(self):
"""GoogleCalendarHealthChecker sanitizes Bearer tokens in error messages."""
checker = GoogleCalendarHealthChecker()
"""Sanitizes Bearer tokens in error messages."""
checker = GoogleHealthChecker()
with patch("aden_tools.credentials.health_check.httpx.Client") as mock_client_cls:
mock_client = MagicMock()
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_client = self._setup_mock_client(mock_client_cls)
mock_client.get.side_effect = httpx.RequestError(
"Connection failed with Bearer ya29.secret-token-here"
)
@@ -452,32 +571,12 @@ class TestGoogleCalendarHealthCheckerTokenSanitization:
assert "ya29" not in result.message
assert "redacted" in result.message
def test_request_error_with_authorization_header_sanitized(self):
"""GoogleCalendarHealthChecker sanitizes Authorization headers in errors."""
checker = GoogleCalendarHealthChecker()
with patch("aden_tools.credentials.health_check.httpx.Client") as mock_client_cls:
mock_client = MagicMock()
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_client.get.side_effect = httpx.RequestError(
"Failed sending Authorization: Bearer token123"
)
result = checker.check("token123")
assert not result.valid
assert "token123" not in result.message
assert "redacted" in result.message
def test_request_error_without_sensitive_data_passes_through(self):
"""Non-sensitive error messages pass through unchanged."""
checker = GoogleCalendarHealthChecker()
checker = GoogleHealthChecker()
with patch("aden_tools.credentials.health_check.httpx.Client") as mock_client_cls:
mock_client = MagicMock()
mock_client_cls.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_cls.return_value.__exit__ = MagicMock(return_value=False)
mock_client = self._setup_mock_client(mock_client_cls)
mock_client.get.side_effect = httpx.RequestError("Connection refused")
result = checker.check("token123")
+69 -69
View File
@@ -39,18 +39,18 @@ class TestCredentialErrors:
def test_list_events_no_credentials(self, calendar_tools, monkeypatch):
"""list_events without credentials returns helpful error."""
monkeypatch.delenv("GOOGLE_CALENDAR_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
result = calendar_tools["list_events"]()
assert "error" in result
assert "Calendar credentials not configured" in result["error"]
assert "help" in result
assert "GOOGLE_CALENDAR_ACCESS_TOKEN" in result["help"]
assert "GOOGLE_ACCESS_TOKEN" in result["help"]
def test_get_event_no_credentials(self, calendar_tools, monkeypatch):
"""get_event without credentials returns helpful error."""
monkeypatch.delenv("GOOGLE_CALENDAR_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
result = calendar_tools["get_event"](event_id="test-event-id")
@@ -59,7 +59,7 @@ class TestCredentialErrors:
def test_create_event_no_credentials(self, calendar_tools, monkeypatch):
"""create_event without credentials returns helpful error."""
monkeypatch.delenv("GOOGLE_CALENDAR_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
result = calendar_tools["create_event"](
summary="Test Event",
@@ -72,7 +72,7 @@ class TestCredentialErrors:
def test_update_event_no_credentials(self, calendar_tools, monkeypatch):
"""update_event without credentials returns helpful error."""
monkeypatch.delenv("GOOGLE_CALENDAR_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
result = calendar_tools["update_event"](event_id="test-event-id")
@@ -81,7 +81,7 @@ class TestCredentialErrors:
def test_delete_event_no_credentials(self, calendar_tools, monkeypatch):
"""delete_event without credentials returns helpful error."""
monkeypatch.delenv("GOOGLE_CALENDAR_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
result = calendar_tools["delete_event"](event_id="test-event-id")
@@ -90,7 +90,7 @@ class TestCredentialErrors:
def test_list_calendars_no_credentials(self, calendar_tools, monkeypatch):
"""list_calendars without credentials returns helpful error."""
monkeypatch.delenv("GOOGLE_CALENDAR_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
result = calendar_tools["list_calendars"]()
@@ -99,7 +99,7 @@ class TestCredentialErrors:
def test_get_calendar_no_credentials(self, calendar_tools, monkeypatch):
"""get_calendar without credentials returns helpful error."""
monkeypatch.delenv("GOOGLE_CALENDAR_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
result = calendar_tools["get_calendar"](calendar_id="primary")
@@ -108,7 +108,7 @@ class TestCredentialErrors:
def test_check_availability_no_credentials(self, calendar_tools, monkeypatch):
"""check_availability without credentials returns helpful error."""
monkeypatch.delenv("GOOGLE_CALENDAR_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
result = calendar_tools["check_availability"](
time_min="2024-01-15T00:00:00Z",
@@ -124,7 +124,7 @@ class TestParameterValidation:
def test_list_events_max_results_too_low(self, calendar_tools, monkeypatch):
"""max_results below 1 returns error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["list_events"](max_results=0)
@@ -133,7 +133,7 @@ class TestParameterValidation:
def test_list_events_max_results_too_high(self, calendar_tools, monkeypatch):
"""max_results above 2500 returns error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["list_events"](max_results=2501)
@@ -142,7 +142,7 @@ class TestParameterValidation:
def test_get_event_missing_event_id(self, calendar_tools, monkeypatch):
"""get_event without event_id returns error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["get_event"](event_id="")
@@ -151,7 +151,7 @@ class TestParameterValidation:
def test_create_event_missing_summary(self, calendar_tools, monkeypatch):
"""create_event without summary returns error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["create_event"](
summary="",
@@ -164,7 +164,7 @@ class TestParameterValidation:
def test_create_event_missing_start_time(self, calendar_tools, monkeypatch):
"""create_event without start_time returns error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["create_event"](
summary="Test Event",
@@ -177,7 +177,7 @@ class TestParameterValidation:
def test_create_event_missing_end_time(self, calendar_tools, monkeypatch):
"""create_event without end_time returns error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["create_event"](
summary="Test Event",
@@ -190,7 +190,7 @@ class TestParameterValidation:
def test_update_event_missing_event_id(self, calendar_tools, monkeypatch):
"""update_event without event_id returns error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["update_event"](event_id="")
@@ -199,7 +199,7 @@ class TestParameterValidation:
def test_delete_event_missing_event_id(self, calendar_tools, monkeypatch):
"""delete_event without event_id returns error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["delete_event"](event_id="")
@@ -208,7 +208,7 @@ class TestParameterValidation:
def test_list_calendars_max_results_too_high(self, calendar_tools, monkeypatch):
"""list_calendars max_results above 250 returns error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["list_calendars"](max_results=251)
@@ -217,7 +217,7 @@ class TestParameterValidation:
def test_get_calendar_missing_calendar_id(self, calendar_tools, monkeypatch):
"""get_calendar without calendar_id returns error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["get_calendar"](calendar_id="")
@@ -226,7 +226,7 @@ class TestParameterValidation:
def test_check_availability_missing_time_min(self, calendar_tools, monkeypatch):
"""check_availability without time_min returns error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["check_availability"](
time_min="",
@@ -238,7 +238,7 @@ class TestParameterValidation:
def test_check_availability_missing_time_max(self, calendar_tools, monkeypatch):
"""check_availability without time_max returns error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["check_availability"](
time_min="2024-01-15T00:00:00Z",
@@ -255,7 +255,7 @@ class TestMockedAPIResponses:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.get")
def test_list_events_success(self, mock_get, calendar_tools, monkeypatch):
"""list_events returns formatted events on success."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_get.return_value = _mock_response(
200,
@@ -286,7 +286,7 @@ class TestMockedAPIResponses:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.get")
def test_list_events_empty(self, mock_get, calendar_tools, monkeypatch):
"""list_events handles empty calendar."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_get.return_value = _mock_response(200, {"items": []})
@@ -299,7 +299,7 @@ class TestMockedAPIResponses:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.post")
def test_create_event_success(self, mock_post, calendar_tools, monkeypatch):
"""create_event returns created event details."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_post.return_value = _mock_response(
200,
@@ -324,7 +324,7 @@ class TestMockedAPIResponses:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.delete")
def test_delete_event_success(self, mock_delete, calendar_tools, monkeypatch):
"""delete_event returns success message."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_delete.return_value = _mock_response(204)
@@ -336,7 +336,7 @@ class TestMockedAPIResponses:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.get")
def test_list_calendars_success(self, mock_get, calendar_tools, monkeypatch):
"""list_calendars returns formatted calendar list."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_get.return_value = _mock_response(
200,
@@ -368,7 +368,7 @@ class TestMockedAPIResponses:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.post")
def test_check_availability_success(self, mock_post, calendar_tools, monkeypatch):
"""check_availability returns busy periods."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_post.return_value = _mock_response(
200,
@@ -398,7 +398,7 @@ class TestMockedAPIResponses:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.get")
def test_unauthorized_returns_error(self, mock_get, calendar_tools, monkeypatch):
"""401 response returns appropriate error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "invalid-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "invalid-token")
mock_get.return_value = _mock_response(401, {"error": {"message": "Invalid credentials"}})
@@ -410,7 +410,7 @@ class TestMockedAPIResponses:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.get")
def test_rate_limit_returns_error(self, mock_get, calendar_tools, monkeypatch):
"""429 response returns rate limit error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_get.return_value = _mock_response(429)
@@ -422,7 +422,7 @@ class TestMockedAPIResponses:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.get")
def test_not_found_returns_error(self, mock_get, calendar_tools, monkeypatch):
"""404 response returns not found error."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_get.return_value = _mock_response(404)
@@ -441,10 +441,10 @@ class TestCredentialManager:
from aden_tools.credentials import CredentialStoreAdapter
# Don't set env var - only use credential store adapter
monkeypatch.delenv("GOOGLE_CALENDAR_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
# Create credential store adapter with test token
creds = CredentialStoreAdapter.for_testing({"google_calendar_oauth": "test-oauth-token"})
creds = CredentialStoreAdapter.for_testing({"google": "test-oauth-token"})
register_tools(mcp, credentials=creds)
list_events_fn = mcp._tool_manager._tools["calendar_list_events"].fn
@@ -465,7 +465,7 @@ class TestTokenRefresh:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.get")
def test_expired_token_returns_helpful_error(self, mock_get, calendar_tools, monkeypatch):
"""401 response with simple token suggests re-authorization."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "expired-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "expired-token")
mock_get.return_value = _mock_response(401, {"error": {"message": "Token expired"}})
@@ -489,7 +489,7 @@ class TestTokenRefresh:
from aden_tools.credentials import CredentialStoreAdapter
# Clear env var
monkeypatch.delenv("GOOGLE_CALENDAR_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("GOOGLE_OAUTH_CLIENT_ID", raising=False)
monkeypatch.delenv("GOOGLE_OAUTH_CLIENT_SECRET", raising=False)
@@ -503,7 +503,7 @@ class TestTokenRefresh:
# Create credential store with OAuth tokens
store = CredentialStore.for_testing(
{
"google_calendar_oauth": {
"google": {
"access_token": "old-token",
"refresh_token": "test-refresh-token",
}
@@ -532,12 +532,12 @@ class TestTokenRefresh:
from aden_tools.credentials import CredentialStoreAdapter
monkeypatch.delenv("GOOGLE_CALENDAR_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("GOOGLE_ACCESS_TOKEN", raising=False)
# Create store with only access_token (no refresh_token)
store = CredentialStore.for_testing(
{
"google_calendar_oauth": {
"google": {
"access_token": "simple-token",
}
}
@@ -558,7 +558,7 @@ class TestTokenRefresh:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.get")
def test_graceful_degradation_on_refresh_failure(self, mock_get, calendar_tools, monkeypatch):
"""If token refresh fails, returns helpful error message."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "invalid-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "invalid-token")
# Simulate 401 (expired token that couldn't be refreshed)
mock_get.return_value = _mock_response(401, {"error": {"message": "Invalid credentials"}})
@@ -578,7 +578,7 @@ class TestUpdateEventPatch:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.patch")
def test_update_event_patch_success(self, mock_patch, calendar_tools, monkeypatch):
"""update_event uses PATCH and returns updated event."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_patch.return_value = _mock_response(
200,
@@ -605,7 +605,7 @@ class TestUpdateEventPatch:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.patch")
def test_update_event_partial_fields(self, mock_patch, calendar_tools, monkeypatch):
"""update_event sends only provided fields in PATCH body."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_patch.return_value = _mock_response(
200,
@@ -632,7 +632,7 @@ class TestUpdateEventPatch:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.patch")
def test_update_event_with_timezone(self, mock_patch, calendar_tools, monkeypatch):
"""update_event includes timezone in start/end when provided."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_patch.return_value = _mock_response(200, {"id": "event123"})
@@ -655,7 +655,7 @@ class TestAllDayEvents:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.post")
def test_create_all_day_event(self, mock_post, calendar_tools, monkeypatch):
"""create_event with all_day=True uses date field."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_post.return_value = _mock_response(
200,
@@ -684,7 +684,7 @@ class TestAllDayEvents:
def test_create_all_day_event_invalid_start_format(self, calendar_tools, monkeypatch):
"""create_event with all_day=True rejects non-date start_time."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["create_event"](
summary="Bad Event",
@@ -699,7 +699,7 @@ class TestAllDayEvents:
def test_create_all_day_event_invalid_end_format(self, calendar_tools, monkeypatch):
"""create_event with all_day=True rejects non-date end_time."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["create_event"](
summary="Bad Event",
@@ -715,7 +715,7 @@ class TestAllDayEvents:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.patch")
def test_update_to_all_day_event(self, mock_patch, calendar_tools, monkeypatch):
"""update_event can convert timed event to all-day."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_patch.return_value = _mock_response(
200,
@@ -744,7 +744,7 @@ class TestTimezoneValidation:
def test_invalid_timezone_create_event(self, calendar_tools, monkeypatch):
"""create_event rejects invalid timezone."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["create_event"](
summary="Test",
@@ -761,7 +761,7 @@ class TestTimezoneValidation:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.post")
def test_valid_timezone_passes(self, mock_post, calendar_tools, monkeypatch):
"""create_event accepts valid timezone."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_post.return_value = _mock_response(200, {"id": "event123"})
@@ -778,7 +778,7 @@ class TestTimezoneValidation:
def test_invalid_timezone_update_event(self, calendar_tools, monkeypatch):
"""update_event rejects invalid timezone."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["update_event"](
event_id="event123",
@@ -792,7 +792,7 @@ class TestTimezoneValidation:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.post")
def test_all_day_event_ignores_timezone(self, mock_post, calendar_tools, monkeypatch):
"""create_event with all_day=True skips timezone validation."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_post.return_value = _mock_response(200, {"id": "allday1"})
@@ -814,7 +814,7 @@ class TestCreateEventWithAttendees:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.post")
def test_create_event_with_attendees(self, mock_post, calendar_tools, monkeypatch):
"""create_event includes attendees in request body."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_post.return_value = _mock_response(
200,
@@ -850,7 +850,7 @@ class TestCreateEventWithAttendees:
self, mock_post, calendar_tools, monkeypatch
):
"""create_event with attendees auto-generates conferenceData with unique requestId."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_post.return_value = _mock_response(200, {"id": "event123"})
@@ -876,7 +876,7 @@ class TestCreateEventWithAttendees:
self, mock_post, calendar_tools, monkeypatch
):
"""create_event with attendees includes conferenceDataVersion=1 in query params."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_post.return_value = _mock_response(200, {"id": "event123"})
@@ -895,7 +895,7 @@ class TestCreateEventWithAttendees:
self, mock_post, calendar_tools, monkeypatch
):
"""create_event without attendees does not add conferenceData."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_post.return_value = _mock_response(200, {"id": "event123"})
@@ -919,7 +919,7 @@ class TestListEventsOutputFields:
self, mock_get, calendar_tools, monkeypatch
):
"""list_events output includes description and hangoutLink fields."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_get.return_value = _mock_response(
200,
@@ -947,7 +947,7 @@ class TestListEventsOutputFields:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.get")
def test_list_events_includes_attendees(self, mock_get, calendar_tools, monkeypatch):
"""list_events output includes attendee emails when present."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_get.return_value = _mock_response(
200,
@@ -976,7 +976,7 @@ class TestListEventsOutputFields:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.get")
def test_list_events_no_attendees_omits_field(self, mock_get, calendar_tools, monkeypatch):
"""list_events without attendees omits the attendees field."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_get.return_value = _mock_response(
200,
@@ -1000,7 +1000,7 @@ class TestListEventsOutputFields:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.get")
def test_list_events_max_results_2500_accepted(self, mock_get, calendar_tools, monkeypatch):
"""list_events accepts max_results=2500 (the API maximum)."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_get.return_value = _mock_response(200, {"items": []})
@@ -1016,7 +1016,7 @@ class TestIsNotNoneBehavior:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.post")
def test_create_event_empty_description_included(self, mock_post, calendar_tools, monkeypatch):
"""create_event with description='' includes it in body (not None check)."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_post.return_value = _mock_response(200, {"id": "event123"})
@@ -1034,7 +1034,7 @@ class TestIsNotNoneBehavior:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.post")
def test_create_event_empty_location_included(self, mock_post, calendar_tools, monkeypatch):
"""create_event with location='' includes it in body (not None check)."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_post.return_value = _mock_response(200, {"id": "event123"})
@@ -1052,7 +1052,7 @@ class TestIsNotNoneBehavior:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.post")
def test_create_event_none_description_excluded(self, mock_post, calendar_tools, monkeypatch):
"""create_event with description=None does not include it in body."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_post.return_value = _mock_response(200, {"id": "event123"})
@@ -1072,7 +1072,7 @@ class TestEmptyPatchGuard:
def test_update_event_no_fields_returns_error(self, calendar_tools, monkeypatch):
"""update_event with no fields to change returns error instead of empty PATCH."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
result = calendar_tools["update_event"](event_id="event123")
@@ -1087,7 +1087,7 @@ class TestRemoveAttendees:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.get")
def test_remove_single_attendee(self, mock_get, mock_patch, calendar_tools, monkeypatch):
"""remove_attendees removes specified email and keeps the rest."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
# GET returns current event with 3 attendees
mock_get.return_value = _mock_response(
@@ -1135,7 +1135,7 @@ class TestRemoveAttendees:
self, mock_get, mock_patch, calendar_tools, monkeypatch
):
"""remove_attendees matching is case-insensitive."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_get.return_value = _mock_response(
200,
@@ -1163,7 +1163,7 @@ class TestRemoveAttendees:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.get")
def test_remove_multiple_attendees(self, mock_get, mock_patch, calendar_tools, monkeypatch):
"""remove_attendees can remove multiple emails at once."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_get.return_value = _mock_response(
200,
@@ -1193,7 +1193,7 @@ class TestRemoveAttendees:
self, mock_get, mock_patch, calendar_tools, monkeypatch
):
"""remove_attendees on event with no attendees sends empty list."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_get.return_value = _mock_response(
200,
@@ -1215,7 +1215,7 @@ class TestRemoveAttendees:
self, mock_get, mock_patch, calendar_tools, monkeypatch
):
"""remove_attendees triggers conferenceDataVersion=1 in query params."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_get.return_value = _mock_response(
200,
@@ -1237,7 +1237,7 @@ class TestRemoveAttendees:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.get")
def test_remove_attendees_get_fails_returns_error(self, mock_get, calendar_tools, monkeypatch):
"""remove_attendees returns error if GET to fetch event fails."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_get.return_value = _mock_response(404)
@@ -1256,7 +1256,7 @@ class TestUpdateMeetLink:
@patch("aden_tools.tools.calendar_tool.calendar_tool.httpx.patch")
def test_update_event_add_meet_link(self, mock_patch, calendar_tools, monkeypatch):
"""update_event with add_meet_link=True includes conferenceData."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_patch.return_value = _mock_response(
200,
@@ -1286,7 +1286,7 @@ class TestUpdateMeetLink:
self, mock_patch, calendar_tools, monkeypatch
):
"""update_event without add_meet_link does not add conferenceData."""
monkeypatch.setenv("GOOGLE_CALENDAR_ACCESS_TOKEN", "test-token")
monkeypatch.setenv("GOOGLE_ACCESS_TOKEN", "test-token")
mock_patch.return_value = _mock_response(200, {"id": "event123", "summary": "Updated"})