fix: fix email agent version

This commit is contained in:
Richard Tang
2026-02-17 19:09:17 -08:00
parent e95f7e7339
commit 9eb3fc6285
@@ -1,12 +1,15 @@
"""Agent graph construction for Inbox Management Agent."""
"""Agent graph construction for Gmail Inbox Guardian."""
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.runtime.event_bus import EventBus
from framework.runtime.core import Runtime
from pathlib import Path
from framework.graph import Constraint, EdgeCondition, EdgeSpec, Goal, SuccessCriterion
from framework.graph.checkpoint_config import CheckpointConfig
from framework.graph.edge import AsyncEntryPointSpec, GraphSpec
from framework.graph.executor import ExecutionResult
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime, AgentRuntimeConfig, create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import (
@@ -16,15 +19,23 @@ from .nodes import (
report_node,
)
# Loop configuration (used by AgentRunner.load() and _build_graph())
loop_config = {
"max_iterations": 100,
"max_tool_calls_per_turn": 20,
"max_history_tokens": 32000,
}
# Goal definition
goal = Goal(
id="inbox-management",
name="Inbox Management",
id="gmail-inbox-guardian",
name="Gmail Inbox Guardian",
description=(
"Manage Gmail inbox emails using user-defined free-text rules. "
"Fetch inbox emails (configurable batch size, default 100), apply the user's "
"rules to each email, and execute the appropriate Gmail actions — trash, "
"mark as spam, mark important, mark read/unread, star, and more."
"An event-driven Gmail inbox agent. The user defines free-text rules for "
"email triage (e.g., 'star emails from my boss', 'spam marketing newsletters'). "
"When triggered by external events (webhooks, manual triggers), the agent fetches "
"new emails, classifies each one against the user's rules, executes the appropriate "
"Gmail actions, and reports what was done."
),
success_criteria=[
SuccessCriterion(
@@ -120,20 +131,64 @@ edges = [
# Graph configuration
entry_node = "intake"
entry_points = {"start": "intake"}
async_entry_points = [
AsyncEntryPointSpec(
id="email-event",
name="Email Event Handler",
entry_node="fetch-emails",
trigger_type="event",
trigger_config={"event_types": ["webhook_received"]},
isolation_level="shared",
max_concurrent=10,
),
AsyncEntryPointSpec(
id="email-timer",
name="Scheduled Inbox Check",
entry_node="fetch-emails",
trigger_type="timer",
trigger_config={"interval_minutes": 5},
isolation_level="shared",
max_concurrent=1,
),
]
pause_nodes = []
terminal_nodes = []
loop_config = {
"max_iterations": 100,
"max_tool_calls_per_turn": 50,
"max_history_tokens": 32000,
}
conversation_mode = "continuous"
identity_prompt = (
"You are an inbox management assistant. You help users manage their "
"Gmail inbox by applying free-text rules to emails — trash, mark as spam, "
"mark important, mark read/unread, star, and more."
)
runtime_config = AgentRuntimeConfig(
webhook_host="127.0.0.1",
webhook_port=8080,
webhook_routes=[
{
"source_id": "gmail",
"path": "/webhooks/gmail",
"methods": ["POST"],
},
],
)
class InboxManagementAgent:
class EmailInboxManagementAgent:
"""
Inbox Management Agent continuous 4-node pipeline for email triage.
Gmail Inbox Guardian event-driven email triage agent.
Flow: intake -> fetch-emails -> classify-and-act -> report -> intake (loop)
Primary: intake (forever-alive, user tweaks rules)
Async: fetch-emails -> classify-and-act -> report (per event/timer trigger)
Entry Points:
- "start" (primary): User sets up triage rules via intake node
- "email-event" (async): External events trigger fetch-emails directly
- "email-timer" (timer): Scheduled inbox check every 20 minutes
Uses AgentRuntime for:
- Multi-entry-point execution (primary + event-driven + timer)
- Session-scoped storage (sessions/{session_id}/)
- Shared state for rules persistence across entry points
- Checkpointing for resume capability
"""
def __init__(self, config=None):
@@ -153,7 +208,7 @@ class InboxManagementAgent:
def _build_graph(self) -> GraphSpec:
"""Build the GraphSpec."""
return GraphSpec(
id="inbox-management-graph",
id="gmail-inbox-guardian-graph",
goal_id=self.goal.id,
version="1.0.0",
entry_node=self.entry_node,
@@ -167,18 +222,29 @@ class InboxManagementAgent:
loop_config=loop_config,
conversation_mode="continuous",
identity_prompt=(
"You are an inbox management assistant. You help users manage "
"their Gmail inbox by applying free-text rules to emails — trash, "
"mark as spam, mark important, mark read/unread, star, and more."
"You are an inbox management assistant. You help users manage their "
"Gmail inbox by applying free-text rules to emails — trash, mark as spam, "
"mark important, mark read/unread, star, and more."
),
async_entry_points=[
AsyncEntryPointSpec(
id="email-event",
name="Email Event Handler",
entry_node="fetch-emails",
trigger_type="event",
trigger_config={
"event_types": ["webhook_received"],
},
isolation_level="shared",
max_concurrent=10,
),
],
)
def _setup(self, mock_mode=False) -> GraphExecutor:
"""Set up the executor with all components."""
from pathlib import Path
storage_path = Path.home() / ".hive" / "agents" / "inbox_management"
storage_path.mkdir(parents=True, exist_ok=True)
def _setup(self, mock_mode=False) -> None:
"""Set up the agent runtime with sessions, checkpoints, and logging."""
self._storage_path = Path.home() / ".hive" / "agents" / "gmail_inbox_guardian"
self._storage_path.mkdir(parents=True, exist_ok=True)
self._event_bus = EventBus()
self._tool_registry = ToolRegistry()
@@ -204,16 +270,70 @@ class InboxManagementAgent:
tools = list(self._tool_registry.get_tools().values())
self._graph = self._build_graph()
runtime = Runtime(storage_path)
self._executor = GraphExecutor(
runtime=runtime,
checkpoint_config = CheckpointConfig(
enabled=True,
checkpoint_on_node_start=False,
checkpoint_on_node_complete=True,
checkpoint_max_age_days=7,
async_checkpoint=True,
)
# Primary entry point (user-facing rule setup)
entry_point_specs = [
EntryPointSpec(
id="default",
name="Rule Setup",
entry_node=self.entry_node,
trigger_type="manual",
isolation_level="shared",
),
# Event-driven entry point (triggered by external events)
EntryPointSpec(
id="email-event",
name="Email Event Handler",
entry_node="fetch-emails",
trigger_type="event",
trigger_config={
"event_types": ["webhook_received"],
},
isolation_level="shared",
max_concurrent=10,
),
# Scheduled entry point (fires every 20 minutes)
EntryPointSpec(
id="email-timer",
name="Scheduled Inbox Check",
entry_node="fetch-emails",
trigger_type="timer",
trigger_config={"interval_minutes": 20},
isolation_level="shared",
max_concurrent=1,
),
]
runtime_config = AgentRuntimeConfig(
webhook_host="127.0.0.1",
webhook_port=8080,
webhook_routes=[
{
"source_id": "gmail",
"path": "/webhooks/gmail",
"methods": ["POST"],
},
],
)
self._agent_runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=entry_point_specs,
llm=llm,
tools=tools,
tool_executor=tool_executor,
event_bus=self._event_bus,
storage_path=storage_path,
loop_config=self._graph.loop_config,
checkpoint_config=checkpoint_config,
config=runtime_config,
)
return self._executor
@@ -224,9 +344,10 @@ class InboxManagementAgent:
self._setup(mock_mode=mock_mode)
async def stop(self) -> None:
"""Clean up resources."""
self._executor = None
self._event_bus = None
"""Stop the agent runtime and clean up."""
if self._agent_runtime and self._agent_runtime.is_running:
await self._agent_runtime.stop()
self._agent_runtime = None
async def trigger_and_wait(
self,
@@ -241,10 +362,9 @@ class InboxManagementAgent:
if self._graph is None:
raise RuntimeError("Graph not built. Call start() first.")
return await self._executor.execute(
graph=self._graph,
goal=self.goal,
input_data=input_data,
return await self._agent_runtime.trigger_and_wait(
entry_point_id=entry_point,
input_data=input_data or {},
session_state=session_state,
)
@@ -254,9 +374,7 @@ class InboxManagementAgent:
"""Run the agent (convenience method for single execution)."""
await self.start(mock_mode=mock_mode)
try:
result = await self.trigger_and_wait(
"start", context, session_state=session_state
)
result = await self.trigger_and_wait("default", context, session_state=session_state)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
await self.stop()
@@ -278,6 +396,10 @@ class InboxManagementAgent:
"pause_nodes": self.pause_nodes,
"terminal_nodes": self.terminal_nodes,
"client_facing_nodes": [n.id for n in self.nodes if n.client_facing],
"async_entry_points": [
{"id": ep.id, "name": ep.name, "entry_node": ep.entry_node}
for ep in async_entry_points
],
}
def validate(self):
@@ -301,9 +423,7 @@ class InboxManagementAgent:
for ep_id, node_id in self.entry_points.items():
if node_id not in node_ids:
errors.append(
f"Entry point '{ep_id}' references unknown node '{node_id}'"
)
errors.append(f"Entry point '{ep_id}' references unknown node '{node_id}'")
return {
"valid": len(errors) == 0,
@@ -313,4 +433,4 @@ class InboxManagementAgent:
# Create default instance
default_agent = InboxManagementAgent()
default_agent = EmailInboxManagementAgent()