Merge remote-tracking branch 'origin/main' into feat/queen-responsibility

This commit is contained in:
Richard Tang
2026-03-06 11:17:03 -08:00
86 changed files with 9053 additions and 1090 deletions
+1
View File
@@ -43,4 +43,5 @@ uv run python -m exports.my_research_agent --input '{"topic": "..."}'
| Template | Description |
|----------|-------------|
| [deep_research_agent](deep_research_agent/) | Interactive research agent that searches diverse sources, evaluates findings with user checkpoints, and produces a cited HTML report |
| [local_business_extractor](local_business_extractor/) | Finds local businesses on Google Maps, scrapes contact details, and syncs to Google Sheets |
| [tech_news_reporter](tech_news_reporter/) | Researches the latest technology and AI news from the web and produces a well-organized report |
@@ -0,0 +1,31 @@
# Local Business Extractor
Finds local businesses on Google Maps, scrapes their websites for contact details, and syncs everything to a Google Sheets spreadsheet.
## Nodes
| Node | Type | Description |
|------|------|-------------|
| `map-search-worker` | `gcu` (browser) | Searches Google Maps and extracts business names + website URLs |
| `extract-contacts` | `event_loop` | Scrapes business websites for emails, phone, hours, reviews, address |
| `sheets-sync` | `event_loop` | Appends extracted data to a Google Sheets spreadsheet |
## Flow
```
extract-contacts → sheets-sync → (loop back to extract-contacts)
map-search-worker (sub-agent)
```
## Tools used
- **Exa** — `exa_search`, `exa_get_contents` for web scraping
- **Google Sheets** — `google_sheets_create_spreadsheet`, `google_sheets_update_values`, `google_sheets_append_values`, `google_sheets_get_values`
- **Browser (GCU)** — automated Google Maps browsing
## Running
```bash
uv run python -m examples.templates.local_business_extractor run --query "bakeries in San Francisco"
```
@@ -0,0 +1,34 @@
"""Local Business Extractor package."""
from .agent import (
LocalBusinessExtractor,
default_agent,
goal,
nodes,
edges,
entry_node,
entry_points,
pause_nodes,
terminal_nodes,
conversation_mode,
identity_prompt,
loop_config,
)
from .config import default_config, metadata
__all__ = [
"LocalBusinessExtractor",
"default_agent",
"goal",
"nodes",
"edges",
"entry_node",
"entry_points",
"pause_nodes",
"terminal_nodes",
"conversation_mode",
"identity_prompt",
"loop_config",
"default_config",
"metadata",
]
@@ -0,0 +1,146 @@
"""
CLI entry point for Local Business Extractor.
"""
import asyncio
import json
import logging
import sys
import click
from .agent import default_agent, LocalBusinessExtractor
def setup_logging(verbose=False, debug=False):
"""Configure logging for execution visibility."""
if debug:
level, fmt = logging.DEBUG, "%(asctime)s %(name)s: %(message)s"
elif verbose:
level, fmt = logging.INFO, "%(message)s"
else:
level, fmt = logging.WARNING, "%(levelname)s: %(message)s"
logging.basicConfig(level=level, format=fmt, stream=sys.stderr)
logging.getLogger("framework").setLevel(level)
@click.group()
@click.version_option(version="1.0.0")
def cli():
"""Local Business Extractor - Find businesses, extract contacts, sync to Sheets."""
pass
@cli.command()
@click.option(
"--query",
"-q",
type=str,
required=True,
help="Search query (e.g. 'bakeries in San Francisco')",
)
@click.option("--quiet", is_flag=True, help="Only output result JSON")
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def run(query, quiet, verbose, debug):
"""Extract businesses matching a search query."""
if not quiet:
setup_logging(verbose=verbose, debug=debug)
context = {"user_request": query}
result = asyncio.run(default_agent.run(context))
output_data = {
"success": result.success,
"steps_executed": result.steps_executed,
"output": result.output,
}
if result.error:
output_data["error"] = result.error
click.echo(json.dumps(output_data, indent=2, default=str))
sys.exit(0 if result.success else 1)
@cli.command()
@click.option("--json", "output_json", is_flag=True)
def info(output_json):
"""Show agent information."""
info_data = default_agent.info()
if output_json:
click.echo(json.dumps(info_data, indent=2))
else:
click.echo(f"Agent: {info_data['name']}")
click.echo(f"Version: {info_data['version']}")
click.echo(f"Description: {info_data['description']}")
click.echo(f"\nNodes: {', '.join(info_data['nodes'])}")
click.echo(f"Entry: {info_data['entry_node']}")
click.echo(f"Terminal: {', '.join(info_data['terminal_nodes'])}")
@cli.command()
def validate():
"""Validate agent structure."""
validation = default_agent.validate()
if validation["valid"]:
click.echo("Agent is valid")
if validation["warnings"]:
for warning in validation["warnings"]:
click.echo(f" WARNING: {warning}")
else:
click.echo("Agent has errors:")
for error in validation["errors"]:
click.echo(f" ERROR: {error}")
sys.exit(0 if validation["valid"] else 1)
@cli.command()
@click.option("--verbose", "-v", is_flag=True)
def shell(verbose):
"""Interactive session (CLI)."""
asyncio.run(_interactive_shell(verbose))
async def _interactive_shell(verbose=False):
"""Async interactive shell."""
setup_logging(verbose=verbose)
click.echo("=== Local Business Extractor ===")
click.echo("Enter a search query (or 'quit' to exit):\n")
agent = LocalBusinessExtractor()
await agent.start()
try:
while True:
try:
query = await asyncio.get_event_loop().run_in_executor(
None, input, "Query> "
)
if query.lower() in ["quit", "exit", "q"]:
click.echo("Goodbye!")
break
if not query.strip():
continue
click.echo("\nExtracting...\n")
result = await agent.run({"user_request": query})
if result.success:
click.echo("\nExtraction complete\n")
else:
click.echo(f"\nExtraction failed: {result.error}\n")
except KeyboardInterrupt:
click.echo("\nGoodbye!")
break
except Exception as e:
click.echo(f"Error: {e}", err=True)
finally:
await agent.stop()
if __name__ == "__main__":
cli()
@@ -0,0 +1,205 @@
"""Agent graph construction for Local Business Extractor."""
from pathlib import Path
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult
from framework.graph.checkpoint_config import CheckpointConfig
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import map_search_gcu, extract_contacts_node, sheets_sync_node
goal = Goal(
id="local-business-extraction",
name="Local Business Extraction",
description="Find local businesses on Maps, extract contacts, and sync to Google Sheets.",
success_criteria=[
SuccessCriterion(
id="sc-1",
description="Extract business details from Maps",
metric="count",
target="5",
weight=0.5,
),
SuccessCriterion(
id="sc-2",
description="Sync data to Google Sheets",
metric="success_rate",
target="1.0",
weight=0.5,
),
],
constraints=[
Constraint(
id="c-1",
description="Must verify website presence before scraping",
constraint_type="hard",
category="quality",
),
],
)
nodes = [map_search_gcu, extract_contacts_node, sheets_sync_node]
edges = [
EdgeSpec(
id="extract-to-sheets",
source="extract-contacts",
target="sheets-sync",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# Loop back for new tasks
EdgeSpec(
id="sheets-to-extract",
source="sheets-sync",
target="extract-contacts",
condition=EdgeCondition.ALWAYS,
priority=1,
),
]
entry_node = "extract-contacts"
entry_points = {"start": "extract-contacts"}
pause_nodes = []
terminal_nodes = []
conversation_mode = "continuous"
identity_prompt = "You are a lead generation specialist focused on local businesses."
loop_config = {
"max_iterations": 100,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
}
class LocalBusinessExtractor:
def __init__(self, config=None):
self.config = config or default_config
self.goal = goal
self.nodes = nodes
self.edges = edges
self.entry_node = entry_node
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._graph = None
self._agent_runtime = None
self._tool_registry = None
self._storage_path = None
def _build_graph(self):
return GraphSpec(
id="local-business-extractor-graph",
goal_id=self.goal.id,
version="1.0.0",
entry_node=self.entry_node,
entry_points=self.entry_points,
terminal_nodes=self.terminal_nodes,
pause_nodes=self.pause_nodes,
nodes=self.nodes,
edges=self.edges,
default_model=self.config.model,
max_tokens=self.config.max_tokens,
loop_config=loop_config,
conversation_mode=conversation_mode,
identity_prompt=identity_prompt,
)
def _setup(self):
self._storage_path = (
Path.home() / ".hive" / "agents" / "local_business_extractor"
)
self._storage_path.mkdir(parents=True, exist_ok=True)
self._tool_registry = ToolRegistry()
mcp_config = Path(__file__).parent / "mcp_servers.json"
if mcp_config.exists():
self._tool_registry.load_mcp_config(mcp_config)
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
tools = list(self._tool_registry.get_tools().values())
tool_executor = self._tool_registry.get_executor()
self._graph = self._build_graph()
self._agent_runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=[
EntryPointSpec(
id="default",
name="Default",
entry_node=self.entry_node,
trigger_type="manual",
isolation_level="shared",
)
],
llm=llm,
tools=tools,
tool_executor=tool_executor,
checkpoint_config=CheckpointConfig(
enabled=True, checkpoint_on_node_complete=True
),
)
async def start(self):
if self._agent_runtime is None:
self._setup()
if not self._agent_runtime.is_running:
await self._agent_runtime.start()
async def stop(self):
if self._agent_runtime and self._agent_runtime.is_running:
await self._agent_runtime.stop()
self._agent_runtime = None
async def run(self, context, session_state=None):
await self.start()
try:
result = await self._agent_runtime.trigger_and_wait(
"default", context, session_state=session_state
)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
await self.stop()
def info(self):
"""Get agent information."""
return {
"name": metadata.name,
"version": metadata.version,
"description": metadata.description,
"goal": {
"name": self.goal.name,
"description": self.goal.description,
},
"nodes": [n.id for n in self.nodes],
"edges": [e.id for e in self.edges],
"entry_node": self.entry_node,
"entry_points": self.entry_points,
"pause_nodes": self.pause_nodes,
"terminal_nodes": self.terminal_nodes,
}
def validate(self):
"""Validate agent structure."""
errors = []
warnings = []
node_ids = {n.id for n in self.nodes}
for edge in self.edges:
if edge.source not in node_ids:
errors.append(f"Edge {edge.id}: source '{edge.source}' not found")
if edge.target not in node_ids:
errors.append(f"Edge {edge.id}: target '{edge.target}' not found")
if self.entry_node not in node_ids:
errors.append(f"Entry node '{self.entry_node}' not found")
return {"valid": len(errors) == 0, "errors": errors, "warnings": warnings}
default_agent = LocalBusinessExtractor()
@@ -0,0 +1,21 @@
"""Runtime configuration."""
from dataclasses import dataclass
from framework.config import RuntimeConfig
default_config = RuntimeConfig()
@dataclass
class AgentMetadata:
name: str = "Local Business Extractor"
version: str = "1.0.0"
description: str = (
"Extracts local businesses from Google Maps, scrapes contact details, "
"and syncs the results to Google Sheets."
)
intro_message: str = "I'm ready to extract business data. What should I search for?"
metadata = AgentMetadata()
@@ -0,0 +1,14 @@
{
"hive-tools": {
"transport": "stdio",
"command": "uv",
"args": ["run", "python", "mcp_server.py", "--stdio"],
"cwd": "../../../tools"
},
"gcu-tools": {
"transport": "stdio",
"command": "uv",
"args": ["run", "python", "-m", "gcu.server", "--stdio"],
"cwd": "../../../tools"
}
}
@@ -0,0 +1,86 @@
"""Node definitions for Local Business Extractor."""
from framework.graph import NodeSpec
# GCU Subagent for Google Maps
map_search_gcu = NodeSpec(
id="map-search-worker",
name="Maps Browser Worker",
description="Browser subagent that searches Google Maps and extracts business links.",
node_type="gcu",
client_facing=False,
max_node_visits=1,
input_keys=["query"],
output_keys=["business_list"],
tools=[], # Auto-populated with browser tools
system_prompt="""\
You are a browser agent. Your job: Search Google Maps for the provided query and extract business names and website URLs.
## Workflow
1. browser_start
2. browser_open(url="https://www.google.com/maps")
3. Use browser_type or browser_click to search for the "query" in memory.
4. browser_wait(seconds=3)
5. browser_snapshot to find the list of results.
6. For each relevant result, extract:
- Name of the business
- Website URL (look for the website icon/link)
7. set_output("business_list", [{"name": "...", "website": "..."}, ...])
## Constraints
- Extract at least 5-10 businesses if possible.
- If you see a "Website" button, extract that URL specifically.
""",
)
# Processing Node: Scrape & Prepare
extract_contacts_node = NodeSpec(
id="extract-contacts",
name="Extract Business Details",
description="Scrapes business websites and Maps for comprehensive business details.",
node_type="event_loop",
sub_agents=["map-search-worker"],
input_keys=["user_request"],
output_keys=["business_data"],
success_criteria="Comprehensive business details (reviews, hours, contacts) extracted.",
system_prompt="""\
1. Call delegate_to_sub_agent(agent_id="map-search-worker", task=user_request)
2. Receive "business_list" from memory.
3. For each business in the list:
- Use exa_get_contents or exa_search to find:
- Contact emails and phone numbers.
- Business hours.
- Customer reviews or ratings summary.
- Physical address.
4. Format the data into a comprehensive report for each business.
5. set_output("business_data", enriched_business_list)
""",
tools=["exa_get_contents", "exa_search"],
)
# Google Sheets Sync Node
sheets_sync_node = NodeSpec(
id="sheets-sync",
name="Google Sheets Sync",
description="Appends the extracted business data to a Google Sheets spreadsheet.",
node_type="event_loop",
input_keys=["business_data"],
output_keys=["spreadsheet_id"],
success_criteria="Data successfully synced to Google Sheets.",
system_prompt="""\
1. Check memory for "spreadsheet_id". If not set, create a new spreadsheet:
- Use google_sheets_create_spreadsheet(title="Comprehensive Business Leads")
- Save the spreadsheet ID with set_output("spreadsheet_id", id)
2. If the spreadsheet is new, write header row:
- Use google_sheets_update_values(spreadsheet_id=id, range_name="Sheet1!A1:G1", values=[["Name", "Website", "Email", "Phone", "Address", "Hours", "Reviews"]])
3. For each business in "business_data", append a row:
- Use google_sheets_append_values(spreadsheet_id=id, range_name="Sheet1!A:G", values=[[name, website, email, phone, address, hours, reviews]])
4. set_output("spreadsheet_id", id)
""",
tools=[
"google_sheets_create_spreadsheet",
"google_sheets_update_values",
"google_sheets_append_values",
"google_sheets_get_values",
],
)
@@ -0,0 +1,34 @@
"""Meeting Scheduler — Find available times on your calendar and book meetings."""
from .agent import (
MeetingScheduler,
default_agent,
goal,
nodes,
edges,
entry_node,
entry_points,
pause_nodes,
terminal_nodes,
conversation_mode,
identity_prompt,
loop_config,
)
from .config import default_config, metadata
__all__ = [
"MeetingScheduler",
"default_agent",
"goal",
"nodes",
"edges",
"entry_node",
"entry_points",
"pause_nodes",
"terminal_nodes",
"conversation_mode",
"identity_prompt",
"loop_config",
"default_config",
"metadata",
]
@@ -0,0 +1,131 @@
"""CLI entry point for Meeting Scheduler."""
import asyncio
import json
import logging
import sys
import click
from .agent import default_agent, MeetingScheduler
def setup_logging(verbose=False, debug=False):
if debug:
level, fmt = logging.DEBUG, "%(asctime)s %(name)s: %(message)s"
elif verbose:
level, fmt = logging.INFO, "%(message)s"
else:
level, fmt = logging.WARNING, "%(levelname)s: %(message)s"
logging.basicConfig(level=level, format=fmt, stream=sys.stderr)
@click.group()
@click.version_option(version="1.0.0")
def cli():
"""Meeting Scheduler — Find available times on your calendar and book meetings."""
pass
@cli.command()
@click.option("--attendee", "-a", required=True, help="Attendee email address")
@click.option(
"--duration", "-d", type=int, required=True, help="Meeting duration in minutes"
)
@click.option("--title", "-t", required=True, help="Meeting title")
@click.option("--verbose", "-v", is_flag=True)
def run(attendee, duration, title, verbose):
"""Execute the scheduler."""
setup_logging(verbose=verbose)
result = asyncio.run(
default_agent.run(
{
"attendee_email": attendee,
"meeting_duration_minutes": str(duration),
"meeting_title": title,
}
)
)
click.echo(
json.dumps(
{"success": result.success, "output": result.output}, indent=2, default=str
)
)
sys.exit(0 if result.success else 1)
@cli.command()
def tui():
"""Launch TUI dashboard."""
from pathlib import Path
from framework.tui.app import AdenTUI
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
async def run_tui():
agent = MeetingScheduler()
agent._tool_registry = ToolRegistry()
storage = Path.home() / ".hive" / "agents" / "meeting_scheduler"
storage.mkdir(parents=True, exist_ok=True)
mcp_cfg = Path(__file__).parent / "mcp_servers.json"
if mcp_cfg.exists():
agent._tool_registry.load_mcp_config(mcp_cfg)
llm = LiteLLMProvider(
model=agent.config.model,
api_key=agent.config.api_key,
api_base=agent.config.api_base,
)
runtime = create_agent_runtime(
graph=agent._build_graph(),
goal=agent.goal,
storage_path=storage,
entry_points=[
EntryPointSpec(
id="start",
name="Start",
entry_node="intake",
trigger_type="manual",
isolation_level="isolated",
)
],
llm=llm,
tools=list(agent._tool_registry.get_tools().values()),
tool_executor=agent._tool_registry.get_executor(),
)
await runtime.start()
try:
app = AdenTUI(runtime)
await app.run_async()
finally:
await runtime.stop()
asyncio.run(run_tui())
@cli.command()
def info():
"""Show agent info."""
data = default_agent.info()
click.echo(
f"Agent: {data['name']}\nVersion: {data['version']}\nDescription: {data['description']}"
)
click.echo(
f"Nodes: {', '.join(data['nodes'])}\nClient-facing: {', '.join(data['client_facing_nodes'])}"
)
@cli.command()
def validate():
"""Validate agent structure."""
v = default_agent.validate()
if v["valid"]:
click.echo("Agent is valid")
else:
click.echo("Errors:")
for e in v["errors"]:
click.echo(f" {e}")
sys.exit(0 if v["valid"] else 1)
if __name__ == "__main__":
cli()
@@ -0,0 +1,257 @@
"""Agent graph construction for Meeting Scheduler."""
from pathlib import Path
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult
from framework.graph.checkpoint_config import CheckpointConfig
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import intake_node, schedule_node, confirm_node
# Goal definition
goal = Goal(
id="meeting-scheduler-goal",
name="Schedule Meetings",
description="Check calendar availability, find optimal meeting times, record meetings, and send reminders.",
success_criteria=[
SuccessCriterion(
id="sc-1",
description="Meeting time found within requested duration",
metric="calendar_availability",
target="success",
weight=0.35,
),
SuccessCriterion(
id="sc-2",
description="Meeting recorded in spreadsheet accurately",
metric="data_persistence",
target="recorded",
weight=0.30,
),
SuccessCriterion(
id="sc-3",
description="Attendee email reminder sent",
metric="communication",
target="sent",
weight=0.25,
),
SuccessCriterion(
id="sc-4",
description="User confirms meeting details",
metric="user_acknowledgment",
target="confirmed",
weight=0.10,
),
],
constraints=[
Constraint(
id="c-1",
description="Must use Google Calendar API for availability check",
constraint_type="hard",
category="functional",
),
Constraint(
id="c-2",
description="Meeting duration must match requested time",
constraint_type="hard",
category="accuracy",
),
Constraint(
id="c-3",
description="Spreadsheet record must include date, time, attendee, title",
constraint_type="hard",
category="quality",
),
],
)
# Node list
nodes = [intake_node, schedule_node, confirm_node]
# Edge definitions
edges = [
EdgeSpec(
id="intake-to-schedule",
source="intake",
target="schedule",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="schedule-to-confirm",
source="schedule",
target="confirm",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# Loop back for another booking
EdgeSpec(
id="confirm-to-intake",
source="confirm",
target="intake",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(next_action).lower() == 'another'",
priority=1,
),
]
# Graph configuration
entry_node = "intake"
entry_points = {"start": "intake"}
pause_nodes = []
terminal_nodes = [] # Forever-alive
# Module-level vars read by AgentRunner.load()
conversation_mode = "continuous"
identity_prompt = "You are a helpful meeting scheduler assistant that manages calendar availability and sends confirmations."
loop_config = {
"max_iterations": 100,
"max_tool_calls_per_turn": 20,
"max_history_tokens": 32000,
}
class MeetingScheduler:
def __init__(self, config=None):
self.config = config or default_config
self.goal = goal
self.nodes = nodes
self.edges = edges
self.entry_node = entry_node
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._graph = None
self._agent_runtime = None
self._tool_registry = None
self._storage_path = None
def _build_graph(self):
return GraphSpec(
id="meeting-scheduler-graph",
goal_id=self.goal.id,
version="1.0.0",
entry_node=self.entry_node,
entry_points=self.entry_points,
terminal_nodes=self.terminal_nodes,
pause_nodes=self.pause_nodes,
nodes=self.nodes,
edges=self.edges,
default_model=self.config.model,
max_tokens=self.config.max_tokens,
loop_config=loop_config,
conversation_mode=conversation_mode,
identity_prompt=identity_prompt,
)
def _setup(self):
self._storage_path = Path.home() / ".hive" / "agents" / "meeting_scheduler"
self._storage_path.mkdir(parents=True, exist_ok=True)
self._tool_registry = ToolRegistry()
mcp_config = Path(__file__).parent / "mcp_servers.json"
if mcp_config.exists():
self._tool_registry.load_mcp_config(mcp_config)
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
tools = list(self._tool_registry.get_tools().values())
tool_executor = self._tool_registry.get_executor()
self._graph = self._build_graph()
self._agent_runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=[
EntryPointSpec(
id="default",
name="Default",
entry_node=self.entry_node,
trigger_type="manual",
isolation_level="shared",
)
],
llm=llm,
tools=tools,
tool_executor=tool_executor,
checkpoint_config=CheckpointConfig(
enabled=True,
checkpoint_on_node_complete=True,
checkpoint_max_age_days=7,
async_checkpoint=True,
),
)
async def start(self):
if self._agent_runtime is None:
self._setup()
if not self._agent_runtime.is_running:
await self._agent_runtime.start()
async def stop(self):
if self._agent_runtime and self._agent_runtime.is_running:
await self._agent_runtime.stop()
self._agent_runtime = None
async def trigger_and_wait(
self, entry_point="default", input_data=None, timeout=None, session_state=None
):
if self._agent_runtime is None:
raise RuntimeError("Agent not started. Call start() first.")
return await self._agent_runtime.trigger_and_wait(
entry_point_id=entry_point,
input_data=input_data or {},
session_state=session_state,
)
async def run(self, context, session_state=None):
await self.start()
try:
result = await self.trigger_and_wait(
"default", context, session_state=session_state
)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
await self.stop()
def info(self):
return {
"name": metadata.name,
"version": metadata.version,
"description": metadata.description,
"goal": {"name": self.goal.name, "description": self.goal.description},
"nodes": [n.id for n in self.nodes],
"edges": [e.id for e in self.edges],
"entry_node": self.entry_node,
"entry_points": self.entry_points,
"terminal_nodes": self.terminal_nodes,
"client_facing_nodes": [n.id for n in self.nodes if n.client_facing],
}
def validate(self):
errors, warnings = [], []
node_ids = {n.id for n in self.nodes}
for e in self.edges:
if e.source not in node_ids:
errors.append(f"Edge {e.id}: source '{e.source}' not found")
if e.target not in node_ids:
errors.append(f"Edge {e.id}: target '{e.target}' not found")
if self.entry_node not in node_ids:
errors.append(f"Entry node '{self.entry_node}' not found")
for t in self.terminal_nodes:
if t not in node_ids:
errors.append(f"Terminal node '{t}' not found")
for ep_id, nid in self.entry_points.items():
if nid not in node_ids:
errors.append(f"Entry point '{ep_id}' references unknown node '{nid}'")
return {"valid": len(errors) == 0, "errors": errors, "warnings": warnings}
default_agent = MeetingScheduler()
@@ -0,0 +1,28 @@
"""Runtime configuration."""
from dataclasses import dataclass
from framework.config import RuntimeConfig
default_config = RuntimeConfig()
@dataclass
class AgentMetadata:
name: str = "Meeting Scheduler"
version: str = "1.0.0"
description: str = (
"Schedule meetings by checking Google Calendar availability, booking "
"optimal time slots, recording details in Google Sheets, and sending "
"email confirmations with Google Meet links to attendees."
)
intro_message: str = (
"Hi! I'm your meeting scheduler. Tell me who you'd like to meet with, "
"how long the meeting should be, and what it's about — I'll check "
"calendar availability, book a time slot, log it to your spreadsheet, "
"and send a confirmation email with a Google Meet link. "
"Who would you like to schedule a meeting with?"
)
metadata = AgentMetadata()
@@ -0,0 +1,9 @@
{
"hive-tools": {
"transport": "stdio",
"command": "uv",
"args": ["run", "python", "mcp_server.py", "--stdio"],
"cwd": "../../../tools",
"description": "Hive tools MCP server"
}
}
@@ -0,0 +1,140 @@
"""Node definitions for Meeting Scheduler."""
from framework.graph import NodeSpec
# Node 1: Intake (client-facing)
intake_node = NodeSpec(
id="intake",
name="Intake",
description="Gather meeting details from the user",
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["attendee_email", "meeting_duration_minutes"],
output_keys=["attendee_email", "meeting_duration_minutes", "meeting_title"],
nullable_output_keys=[
"attendee_email",
"meeting_duration_minutes",
"meeting_title",
],
success_criteria="User has provided attendee email, meeting duration, and title.",
system_prompt="""\
You are a meeting scheduler assistant.
**STEP 1 Use ask_user to collect meeting details:**
1. Call ask_user to ask for: attendee email, meeting duration (minutes), and meeting title
2. Wait for the user's response before proceeding
**STEP 2 After user provides all details, call set_output:**
- set_output("attendee_email", "user's email address")
- set_output("meeting_duration_minutes", meeting duration as string)
- set_output("meeting_title", "title of the meeting")
""",
tools=[],
)
# Node 2: Schedule (autonomous)
schedule_node = NodeSpec(
id="schedule",
name="Schedule",
description="Find available time on calendar, book meeting with Google Meet, and log to Google Sheet",
node_type="event_loop",
max_node_visits=0,
input_keys=["attendee_email", "meeting_duration_minutes", "meeting_title"],
output_keys=[
"meeting_time",
"booking_confirmed",
"spreadsheet_recorded",
"email_sent",
"meet_link",
],
nullable_output_keys=[],
success_criteria="Meeting time found, Google Meet created, Google Sheet 'Meeting Scheduler' updated with date/time/attendee/title/meet_link, and confirmation email sent.",
system_prompt="""\
You are a meeting booking agent that creates Google Calendar events with Google Meet and logs to Google Sheets.
## STEP 1 — Calendar Operations (tool calls in this turn):
1. **Find availability and verify conflicts:**
- Use calendar_check_availability to find potential time slots.
- **CRITICAL:** Always search a broad window (at least 8 hours) for the target day to see the full context of the user's schedule.
- **SECONDARY CHECK:** Before finalizing a slot, use calendar_list_events for that specific day. This ensures you catch "soft" conflicts or events not marked as 'busy' that might still be important.
2. **Create the event WITH GOOGLE MEET (AUTOMATIC):**
- Use calendar_create_event with these parameters:
- summary: the meeting title
- start_time: the start datetime in ISO format (e.g., "2024-01-15T09:00:00")
- end_time: the end datetime in ISO format
- attendees: list with the attendee email address (e.g., ["user@example.com"])
- timezone: user's timezone (e.g., "America/Los_Angeles")
- IMPORTANT: The tool automatically generates a Google Meet link when attendees are provided.
You do NOT need to pass conferenceData - it is handled automatically.
- The response will include conferenceData.entryPoints with the Google Meet link
- Extract the meet_link from conferenceData.entryPoints[0].uri in the response
3. **Log to Google Sheets:**
- First, use google_sheets_get_spreadsheet with spreadsheet_id="Meeting Scheduler" to check if it exists
- If it doesn't exist, use google_sheets_create_spreadsheet with title="Meeting Scheduler"
- Then use google_sheets_append_values to add a row with:
- Date, Time, Attendee Email, Meeting Title, Google Meet Link
- The spreadsheet_id should be "Meeting Scheduler" (by name) or the ID returned from create
4. **Send confirmation email:**
- Use send_email to send the attendee a confirmation with:
- to: attendee email address
- subject: "Meeting Confirmation: {meeting_title}"
- body: Include meeting title, date/time, and Google Meet link
## STEP 2 — set_output (SEPARATE turn, no other tool calls):
After all tools complete successfully, call set_output:
- set_output("meeting_time", "YYYY-MM-DD HH:MM")
- set_output("meet_link", "https://meet.google.com/xxx/yyy")
- set_output("booking_confirmed", "true")
- set_output("spreadsheet_recorded", "true")
- set_output("email_sent", "true")
## CRITICAL: Google Meet creation
Google Meet links are AUTOMATICALLY created by calendar_create_event when attendees are provided.
Simply pass the attendees list and the tool will generate the Meet link.
""",
tools=[
"calendar_check_availability",
"calendar_create_event",
"calendar_list_events",
"google_sheets_create_spreadsheet",
"google_sheets_get_spreadsheet",
"google_sheets_append_values",
"send_email",
],
)
# Node 3: Confirm (client-facing)
confirm_node = NodeSpec(
id="confirm",
name="Confirm",
description="Present booking confirmation to user with Google Meet link",
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["meeting_time", "booking_confirmed", "meet_link"],
output_keys=["next_action"],
nullable_output_keys=["next_action"],
success_criteria="User has acknowledged the booking and received the Google Meet link.",
system_prompt="""\
You are a confirmation assistant.
**STEP 1 Present confirmation and ask user:**
1. Show the meeting details (date, time, attendee, title)
2. Display the Google Meet link prominently
3. Confirm the booking is complete and logged to Google Sheets
4. Call ask_user to ask if they want to schedule another meeting or finish
**STEP 2 After user responds, call set_output:**
- set_output("next_action", "another") if booking another meeting
- set_output("next_action", "done") if finished
""",
tools=[],
)
__all__ = ["intake_node", "schedule_node", "confirm_node"]
@@ -0,0 +1,34 @@
"""Test fixtures."""
import sys
from pathlib import Path
import pytest
_repo_root = Path(__file__).resolve().parents[4]
for _p in ["examples/templates", "core"]:
_path = str(_repo_root / _p)
if _path not in sys.path:
sys.path.insert(0, _path)
AGENT_PATH = str(Path(__file__).resolve().parents[1])
@pytest.fixture(scope="session")
def agent_module():
"""Import the agent package for structural validation."""
import importlib
return importlib.import_module(Path(AGENT_PATH).name)
@pytest.fixture(scope="session")
def runner_loaded():
"""Load the agent through AgentRunner (structural only, no LLM needed)."""
from framework.runner.runner import AgentRunner
from framework.credentials.models import CredentialError
try:
return AgentRunner.load(AGENT_PATH)
except CredentialError:
pytest.skip("Google OAuth credentials not configured")
@@ -0,0 +1,103 @@
"""Structural tests for Meeting Scheduler."""
from meeting_scheduler import (
default_agent,
goal,
nodes,
edges,
entry_node,
entry_points,
terminal_nodes,
conversation_mode,
loop_config,
)
class TestGoalDefinition:
def test_goal_exists(self):
assert goal is not None
assert goal.id == "meeting-scheduler-goal"
assert len(goal.success_criteria) == 4
assert len(goal.constraints) == 3
def test_success_criteria_weights_sum_to_one(self):
total = sum(sc.weight for sc in goal.success_criteria)
assert abs(total - 1.0) < 0.01
class TestNodeStructure:
def test_three_nodes(self):
assert len(nodes) == 3
assert nodes[0].id == "intake"
assert nodes[1].id == "schedule"
assert nodes[2].id == "confirm"
def test_intake_is_client_facing(self):
assert nodes[0].client_facing is True
def test_schedule_has_required_tools(self):
required = {
"calendar_check_availability",
"calendar_create_event",
"google_sheets_append_values",
"send_email",
}
actual = set(nodes[1].tools)
assert required.issubset(actual)
def test_confirm_is_client_facing(self):
assert nodes[2].client_facing is True
class TestEdgeStructure:
def test_three_edges(self):
assert len(edges) == 3
def test_linear_path(self):
assert edges[0].source == "intake"
assert edges[0].target == "schedule"
assert edges[1].source == "schedule"
assert edges[1].target == "confirm"
def test_loop_back(self):
assert edges[2].source == "confirm"
assert edges[2].target == "intake"
class TestGraphConfiguration:
def test_entry_node(self):
assert entry_node == "intake"
def test_entry_points(self):
assert entry_points == {"start": "intake"}
def test_forever_alive(self):
assert terminal_nodes == []
def test_conversation_mode(self):
assert conversation_mode == "continuous"
def test_loop_config_valid(self):
assert "max_iterations" in loop_config
assert "max_tool_calls_per_turn" in loop_config
assert "max_history_tokens" in loop_config
class TestAgentClass:
def test_default_agent_created(self):
assert default_agent is not None
def test_validate_passes(self):
result = default_agent.validate()
assert result["valid"] is True
assert len(result["errors"]) == 0
def test_agent_info(self):
info = default_agent.info()
assert info["name"] == "Meeting Scheduler"
assert "schedule" in [n for n in info["nodes"]]
class TestRunnerLoad:
def test_agent_runner_load_succeeds(self, runner_loaded):
assert runner_loaded is not None
@@ -0,0 +1,32 @@
# Twitter News Digest
Monitors tech Twitter profiles, extracts the latest tweets, and compiles a daily tech news digest with user review.
## Nodes
| Node | Type | Description |
|------|------|-------------|
| `fetch-tweets` | `gcu` (browser) | Navigates to Twitter profiles and extracts latest tweets |
| `process-news` | `event_loop` | Analyzes and summarizes tweets into a tech digest |
| `review-digest` | `event_loop` (client-facing) | Presents digest for user review and feedback |
## Flow
```
process-news → review-digest → (loop back to process-news)
↓ ↑
fetch-tweets feedback loop (if revisions needed)
(sub-agent)
```
## Tools used
- **save_data / load_data** — persist daily reports
- **Browser (GCU)** — automated Twitter browsing and tweet extraction
## Running
```bash
uv run python -m examples.templates.twitter_news_agent run
uv run python -m examples.templates.twitter_news_agent run --handles "@TechCrunch,@verge,@WIRED"
```
@@ -0,0 +1,34 @@
"""Twitter News Digest — monitors Twitter for news."""
from .agent import (
TwitterNewsAgent,
default_agent,
goal,
nodes,
edges,
entry_node,
entry_points,
pause_nodes,
terminal_nodes,
conversation_mode,
identity_prompt,
loop_config,
)
from .config import default_config, metadata
__all__ = [
"TwitterNewsAgent",
"default_agent",
"goal",
"nodes",
"edges",
"entry_node",
"entry_points",
"pause_nodes",
"terminal_nodes",
"conversation_mode",
"identity_prompt",
"loop_config",
"default_config",
"metadata",
]
@@ -0,0 +1,148 @@
"""
CLI entry point for Twitter News Digest.
"""
import asyncio
import json
import logging
import sys
import click
from .agent import default_agent, TwitterNewsAgent
def setup_logging(verbose=False, debug=False):
"""Configure logging for execution visibility."""
if debug:
level, fmt = logging.DEBUG, "%(asctime)s %(name)s: %(message)s"
elif verbose:
level, fmt = logging.INFO, "%(message)s"
else:
level, fmt = logging.WARNING, "%(levelname)s: %(message)s"
logging.basicConfig(level=level, format=fmt, stream=sys.stderr)
logging.getLogger("framework").setLevel(level)
@click.group()
@click.version_option(version="1.1.0")
def cli():
"""Twitter News Digest - Monitor Twitter feeds for tech news."""
pass
@cli.command()
@click.option(
"--handles",
"-h",
type=str,
default=None,
help="Comma-separated Twitter handles to monitor",
)
@click.option("--quiet", is_flag=True, help="Only output result JSON")
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def run(handles, quiet, verbose, debug):
"""Fetch and summarize tech news from Twitter."""
if not quiet:
setup_logging(verbose=verbose, debug=debug)
context = {"user_request": "Fetch the latest tech news digest from Twitter"}
if handles:
context["twitter_handles"] = [h.strip() for h in handles.split(",")]
result = asyncio.run(default_agent.run(context))
output_data = {
"success": result.success,
"steps_executed": result.steps_executed,
"output": result.output,
}
if result.error:
output_data["error"] = result.error
click.echo(json.dumps(output_data, indent=2, default=str))
sys.exit(0 if result.success else 1)
@cli.command()
@click.option("--json", "output_json", is_flag=True)
def info(output_json):
"""Show agent information."""
info_data = default_agent.info()
if output_json:
click.echo(json.dumps(info_data, indent=2))
else:
click.echo(f"Agent: {info_data['name']}")
click.echo(f"Version: {info_data['version']}")
click.echo(f"Description: {info_data['description']}")
click.echo(f"\nNodes: {', '.join(info_data['nodes'])}")
click.echo(f"Entry: {info_data['entry_node']}")
click.echo(f"Terminal: {', '.join(info_data['terminal_nodes'])}")
@cli.command()
def validate():
"""Validate agent structure."""
validation = default_agent.validate()
if validation["valid"]:
click.echo("Agent is valid")
if validation["warnings"]:
for warning in validation["warnings"]:
click.echo(f" WARNING: {warning}")
else:
click.echo("Agent has errors:")
for error in validation["errors"]:
click.echo(f" ERROR: {error}")
sys.exit(0 if validation["valid"] else 1)
@cli.command()
@click.option("--verbose", "-v", is_flag=True)
def shell(verbose):
"""Interactive session (CLI)."""
asyncio.run(_interactive_shell(verbose))
async def _interactive_shell(verbose=False):
"""Async interactive shell."""
setup_logging(verbose=verbose)
click.echo("=== Twitter News Digest ===")
click.echo("Enter a request (or 'quit' to exit):\n")
agent = TwitterNewsAgent()
await agent.start()
try:
while True:
try:
query = await asyncio.get_event_loop().run_in_executor(
None, input, "News> "
)
if query.lower() in ["quit", "exit", "q"]:
click.echo("Goodbye!")
break
if not query.strip():
continue
click.echo("\nFetching news...\n")
result = await agent.run({"user_request": query})
if result.success:
click.echo("\nDigest complete\n")
else:
click.echo(f"\nDigest failed: {result.error}\n")
except KeyboardInterrupt:
click.echo("\nGoodbye!")
break
except Exception as e:
click.echo(f"Error: {e}", err=True)
finally:
await agent.stop()
if __name__ == "__main__":
cli()
@@ -0,0 +1,241 @@
"""Agent graph construction for Twitter News Digest."""
from pathlib import Path
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult
from framework.graph.checkpoint_config import CheckpointConfig
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import create_agent_runtime
from framework.runtime.execution_stream import EntryPointSpec
from .config import default_config, metadata
from .nodes import fetch_node, process_node, review_node
# Goal definition
goal = Goal(
id="twitter-news-goal",
name="Twitter News Digest",
description="Achieve an accurate and concise daily news digest based on Twitter feed monitoring.",
success_criteria=[
SuccessCriterion(
id="sc-1",
description="Navigate and extract tweets from at least 3 handles.",
metric="handle_count",
target=">=3",
weight=0.4,
),
SuccessCriterion(
id="sc-2",
description="Provide a summary of the most important stories.",
metric="summary_quality",
target="high",
weight=0.4,
),
SuccessCriterion(
id="sc-3",
description="Maintain a persistent log of daily digests.",
metric="file_exists",
target="true",
weight=0.2,
),
],
constraints=[
Constraint(
id="c-1",
description="Respect rate limits and ethical web usage.",
constraint_type="hard",
category="functional",
),
],
)
# Node list
nodes = [fetch_node, process_node, review_node]
# Edge definitions
edges = [
# Process tweets then review
EdgeSpec(
id="process-to-review",
source="process-news",
target="review-digest",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# Feedback loop if revisions needed
EdgeSpec(
id="review-to-process",
source="review-digest",
target="process-news",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(status).lower() == 'revise'",
priority=2,
),
# Loop back for next run (forever-alive)
EdgeSpec(
id="review-done",
source="review-digest",
target="process-news",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(status).lower() == 'approved'",
priority=1,
),
]
# Entry point is the autonomous processing node (queen handles intake)
entry_node = "process-news"
entry_points = {"start": "process-news"}
pause_nodes = []
terminal_nodes = [] # Forever-alive
# Module-level vars read by AgentRunner.load()
conversation_mode = "continuous"
identity_prompt = "You are a professional news analyst and researcher."
loop_config = {
"max_iterations": 100,
"max_tool_calls_per_turn": 20,
"max_history_tokens": 32000,
}
class TwitterNewsAgent:
def __init__(self, config=None):
self.config = config or default_config
self.goal = goal
self.nodes = nodes
self.edges = edges
self.entry_node = entry_node
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._graph = None
self._agent_runtime = None
self._tool_registry = None
self._storage_path = None
def _build_graph(self):
return GraphSpec(
id="twitter-news-graph",
goal_id=self.goal.id,
version="1.0.0",
entry_node=self.entry_node,
entry_points=self.entry_points,
terminal_nodes=self.terminal_nodes,
pause_nodes=self.pause_nodes,
nodes=self.nodes,
edges=self.edges,
default_model=self.config.model,
max_tokens=self.config.max_tokens,
loop_config=loop_config,
conversation_mode=conversation_mode,
identity_prompt=identity_prompt,
)
def _setup(self):
self._storage_path = Path.home() / ".hive" / "agents" / "twitter_news_agent"
self._storage_path.mkdir(parents=True, exist_ok=True)
self._tool_registry = ToolRegistry()
mcp_config = Path(__file__).parent / "mcp_servers.json"
if mcp_config.exists():
self._tool_registry.load_mcp_config(mcp_config)
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
tools = list(self._tool_registry.get_tools().values())
tool_executor = self._tool_registry.get_executor()
self._graph = self._build_graph()
self._agent_runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=self._storage_path,
entry_points=[
EntryPointSpec(
id="default",
name="Default",
entry_node=self.entry_node,
trigger_type="manual",
isolation_level="shared",
)
],
llm=llm,
tools=tools,
tool_executor=tool_executor,
checkpoint_config=CheckpointConfig(
enabled=True,
checkpoint_on_node_complete=True,
checkpoint_max_age_days=7,
async_checkpoint=True,
),
)
async def start(self):
if self._agent_runtime is None:
self._setup()
if not self._agent_runtime.is_running:
await self._agent_runtime.start()
async def stop(self):
if self._agent_runtime and self._agent_runtime.is_running:
await self._agent_runtime.stop()
self._agent_runtime = None
async def trigger_and_wait(
self, entry_point="default", input_data=None, timeout=None, session_state=None
):
if self._agent_runtime is None:
raise RuntimeError("Agent not started. Call start() first.")
return await self._agent_runtime.trigger_and_wait(
entry_point_id=entry_point,
input_data=input_data or {},
session_state=session_state,
)
async def run(self, context, session_state=None):
await self.start()
try:
result = await self.trigger_and_wait(
"default", context, session_state=session_state
)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
await self.stop()
def info(self):
return {
"name": metadata.name,
"version": metadata.version,
"description": metadata.description,
"goal": {"name": self.goal.name, "description": self.goal.description},
"nodes": [n.id for n in self.nodes],
"edges": [e.id for e in self.edges],
"entry_node": self.entry_node,
"entry_points": self.entry_points,
"terminal_nodes": self.terminal_nodes,
"client_facing_nodes": [n.id for n in self.nodes if n.client_facing],
}
def validate(self):
errors, warnings = [], []
node_ids = {n.id for n in self.nodes}
for e in self.edges:
if e.source not in node_ids:
errors.append(f"Edge {e.id}: source '{e.source}' not found")
if e.target not in node_ids:
errors.append(f"Edge {e.id}: target '{e.target}' not found")
if self.entry_node not in node_ids:
errors.append(f"Entry node '{self.entry_node}' not found")
for t in self.terminal_nodes:
if t not in node_ids:
errors.append(f"Terminal node '{t}' not found")
for ep_id, nid in self.entry_points.items():
if nid not in node_ids:
errors.append(f"Entry point '{ep_id}' references unknown node '{nid}'")
return {"valid": len(errors) == 0, "errors": errors, "warnings": warnings}
default_agent = TwitterNewsAgent()
@@ -0,0 +1,20 @@
"""Runtime configuration."""
from dataclasses import dataclass
from framework.config import RuntimeConfig
default_config = RuntimeConfig()
@dataclass
class AgentMetadata:
name: str = "Twitter News Digest"
version: str = "1.1.0"
description: str = (
"Monitors Twitter feeds and provides a daily news digest, focused on tech news."
)
intro_message: str = "I'm ready to fetch the latest tech news from Twitter. Which tech handles should I check?"
metadata = AgentMetadata()
@@ -0,0 +1,16 @@
{
"hive-tools": {
"transport": "stdio",
"command": "uv",
"args": ["run", "python", "mcp_server.py", "--stdio"],
"cwd": "../../../tools",
"description": "Hive tools MCP server"
},
"gcu-tools": {
"transport": "stdio",
"command": "uv",
"args": ["run", "python", "-m", "gcu.server", "--stdio"],
"cwd": "../../../tools",
"description": "GCU tools for browser automation"
}
}
@@ -0,0 +1,86 @@
"""Node definitions for Twitter News Digest."""
from framework.graph import NodeSpec
# Node 1: Browser subagent (GCU) to fetch tweets
fetch_node = NodeSpec(
id="fetch-tweets",
name="Fetch Tech Tweets",
description="Browser subagent to navigate to tech news Twitter profiles and extract latest tweets.",
node_type="gcu",
client_facing=False,
max_node_visits=1,
input_keys=["twitter_handles"],
output_keys=["raw_tweets"],
tools=[], # Auto-populated with browser tools
system_prompt="""\
You are a specialized tech news researcher.
Your task is to navigate to the provided tech Twitter profiles and extract the latest 10 tweets from each.
## Target Content
Focus on:
- Major software/AI releases
- Tech company earnings/acquisitions
- Hardware/Silicon breakthroughs
## Instructions
1. browser_start
2. For each handle:
a. browser_open(url=f"https://x.com/{handle}")
b. browser_wait(seconds=5)
c. browser_snapshot
d. Parse relevant tech news text
3. set_output("raw_tweets", consolidated_json)
""",
)
# Node 2: Process and summarize (autonomous)
process_node = NodeSpec(
id="process-news",
name="Process Tech News",
description="Analyze and summarize the raw tweets into a daily tech digest.",
node_type="event_loop",
sub_agents=["fetch-tweets"],
input_keys=["user_request", "feedback", "raw_tweets"],
output_keys=["daily_digest"],
nullable_output_keys=["feedback", "raw_tweets"],
success_criteria="A high-quality, tech-focused news summary.",
system_prompt="""\
You are a senior technology editor.
If "raw_tweets" is missing, call delegate_to_sub_agent(agent_id="fetch-tweets", task="Fetch tech news from @TechCrunch, @verge, @WIRED, @CNET, @engadget, @Gizmodo, @TheRegister, @ArsTechnica, @ZDNet, @venturebeat, @AndrewYNg, @ylecun, @geoffreyhinton, @goodfellow_ian, @drfeifei, @hardmaru, @tegmark, @GaryMarcus, @schmidhuberAI, @fastdotai").
Once tech tweets are available:
1. Synthesize a "Daily Tech Report" highlighting major breakthroughs.
2. Save the report using save_data(filename="daily_tech_report.txt", data=summary).
3. set_output("daily_digest", summary)
""",
tools=["save_data", "load_data"],
)
# Node 3: Review (client-facing)
review_node = NodeSpec(
id="review-digest",
name="Review Digest",
description="Present the news digest for user review and approval.",
node_type="event_loop",
client_facing=True,
input_keys=["daily_digest"],
output_keys=["status", "feedback"],
nullable_output_keys=["feedback"],
success_criteria="User has reviewed the digest and provided feedback or approval.",
system_prompt="""\
Present the daily news digest to the user.
**STEP 1 Present (text only, NO tool calls):**
Display the summary and ask:
1. Is this summary helpful?
2. Are there specific handles or topics you'd like to focus on for tomorrow?
**STEP 2 After user responds, call set_output:**
- set_output("status", "approved") if satisfied.
- set_output("status", "revise") and set_output("feedback", "...") if changes are needed.
""",
tools=[],
)
__all__ = ["fetch_node", "process_node", "review_node"]