Add competitive intelligence agent template

- Adds a new autonomous agent template that monitors competitor websites, news, and GitHub
- Implements a 7-node graph workflow to collect, aggregate, and analyze competitive data
- Generates a weekly structured HTML digest with key highlights and 30-day trends
- Utilizes existing web_scrape, web_search, and github MCP tools
- Addresses issue #4153

Closes #4153
This commit is contained in:
Nafiyad Adane
2026-02-20 19:13:47 -07:00
parent 976ae75fde
commit f568728de1
7 changed files with 1114 additions and 0 deletions
@@ -0,0 +1,80 @@
# Competitive Intelligence Agent
An autonomous agent that monitors competitor websites, news sources, and GitHub repositories to deliver structured digests with key insights and trend analysis.
## Prerequisites
- **Python 3.11+** with `uv`
- **ANTHROPIC_API_KEY** — set in your `.env` or environment
- **GITHUB_TOKEN** *(optional)* — for GitHub activity monitoring
## Quick Start
### Interactive Shell
```bash
cd examples/templates
uv run python -m competitive_intel_agent shell
```
### CLI Run
```bash
# With inline JSON
uv run python -m competitive_intel_agent run \
--competitors '[{"name":"Acme","website":"https://acme.com","github":"acme-org"},{"name":"Beta Inc","website":"https://beta.io","github":null}]' \
--focus-areas "pricing,features,partnerships,hiring" \
--frequency weekly
# From a file
uv run python -m competitive_intel_agent run --competitors competitors.json
```
### TUI Dashboard
```bash
uv run python -m competitive_intel_agent tui
```
### Validate & Info
```bash
uv run python -m competitive_intel_agent validate
uv run python -m competitive_intel_agent info
```
## Agent Graph
```
intake → web-scraper → news-search → github-monitor → aggregator → analysis → report
(skipped if no competitors have GitHub)
```
| Node | Purpose | Tools | Client-Facing |
|------|---------|-------|:---:|
| **intake** | Collect competitor list & focus areas | — | ✅ |
| **web-scraper** | Scrape competitor websites | web_search, web_scrape | |
| **news-search** | Search news & press releases | web_search, web_scrape | |
| **github-monitor** | Track public GitHub activity | github_* | |
| **aggregator** | Merge, deduplicate, persist | save_data, load_data | |
| **analysis** | Extract insights & trends | load_data, save_data | |
| **report** | Generate HTML digest | save_data, serve_file | ✅ |
## Input Format
```json
{
"competitors": [
{"name": "CompetitorA", "website": "https://competitor-a.com", "github": "competitor-a"},
{"name": "CompetitorB", "website": "https://competitor-b.com", "github": null}
],
"focus_areas": ["pricing", "new_features", "hiring", "partnerships"],
"report_frequency": "weekly"
}
```
## Output
The agent produces an HTML report saved to `~/.hive/agents/competitive_intel_agent/` with:
- 🔥 **Key Highlights** — most significant competitive moves
- 📊 **Per-Competitor Tables** — category, update, source, date
- 📈 **30-Day Trends** — patterns across competitors over time
Historical snapshots are stored for trend comparison on subsequent runs.
@@ -0,0 +1,24 @@
"""
Competitive Intelligence Agent Automated competitor monitoring and reporting.
Monitors competitor websites, news sources, and GitHub repositories to deliver
structured weekly digests with key insights and 30-day trend analysis for
product and marketing teams.
"""
from .agent import CompetitiveIntelAgent, default_agent, goal, nodes, edges
from .config import RuntimeConfig, AgentMetadata, default_config, metadata
__version__ = "1.0.0"
__all__ = [
"CompetitiveIntelAgent",
"default_agent",
"goal",
"nodes",
"edges",
"RuntimeConfig",
"AgentMetadata",
"default_config",
"metadata",
]
@@ -0,0 +1,278 @@
"""
CLI entry point for Competitive Intelligence Agent.
Uses AgentRuntime for multi-entrypoint support with HITL pause/resume.
"""
import asyncio
import json
import logging
import sys
from typing import Any
from pathlib import Path
import click
from .agent import CompetitiveIntelAgent, default_agent
def setup_logging(verbose: bool = False, debug: bool = False) -> None:
"""Configure logging for execution visibility."""
if debug:
level, fmt = logging.DEBUG, "%(asctime)s %(name)s: %(message)s"
elif verbose:
level, fmt = logging.INFO, "%(message)s"
else:
level, fmt = logging.WARNING, "%(levelname)s: %(message)s"
logging.basicConfig(level=level, format=fmt, stream=sys.stderr)
logging.getLogger("framework").setLevel(level)
@click.group()
@click.version_option(version="1.0.0")
def cli() -> None:
"""Competitive Intelligence Agent - Monitor competitors and deliver weekly digests."""
pass
@cli.command()
@click.option(
"--competitors",
"-c",
type=str,
required=True,
help='Competitors JSON string or file path (e.g. \'[{"name":"Acme","website":"https://acme.com"}]\')',
)
@click.option(
"--focus-areas",
"-f",
type=str,
default="pricing,features,partnerships,hiring",
help="Comma-separated focus areas (default: pricing,features,partnerships,hiring)",
)
@click.option(
"--frequency",
type=click.Choice(["weekly", "daily", "monthly"]),
default="weekly",
help="Report frequency (default: weekly)",
)
@click.option("--quiet", "-q", is_flag=True, help="Only output result JSON")
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def run(
competitors: str,
focus_areas: str,
frequency: str,
quiet: bool,
verbose: bool,
debug: bool,
) -> None:
"""Execute competitive intelligence gathering and report generation."""
if not quiet:
setup_logging(verbose=verbose, debug=debug)
# Parse competitors — accept JSON string or file path
try:
competitors_data = json.loads(competitors)
except json.JSONDecodeError:
# Try loading from file
try:
with open(competitors) as f:
competitors_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
click.echo(f"Error parsing competitors: {e}", err=True)
sys.exit(1)
context: dict[str, Any] = {
"competitors_input": json.dumps({
"competitors": competitors_data,
"focus_areas": [a.strip() for a in focus_areas.split(",")],
"report_frequency": frequency,
})
}
result = asyncio.run(default_agent.run(context))
output_data: dict[str, Any] = {
"success": result.success,
"steps_executed": result.steps_executed,
"output": result.output,
}
if result.error:
output_data["error"] = result.error
click.echo(json.dumps(output_data, indent=2, default=str))
sys.exit(0 if result.success else 1)
@cli.command()
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def tui(verbose: bool, debug: bool) -> None:
"""Launch the TUI dashboard for interactive competitive intelligence."""
setup_logging(verbose=verbose, debug=debug)
try:
from framework.tui.app import AdenTUI
except ImportError:
click.echo(
"TUI requires the 'textual' package. Install with: pip install textual"
)
sys.exit(1)
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import create_agent_runtime
from framework.runtime.event_bus import EventBus
from framework.runtime.execution_stream import EntryPointSpec
async def run_with_tui() -> None:
agent = CompetitiveIntelAgent()
# Build graph and tools
agent._event_bus = EventBus()
agent._tool_registry = ToolRegistry()
storage_path = Path.home() / ".hive" / "agents" / "competitive_intel_agent"
storage_path.mkdir(parents=True, exist_ok=True)
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
agent._tool_registry.load_mcp_config(mcp_config_path)
llm = LiteLLMProvider(
model=agent.config.model,
api_key=agent.config.api_key,
api_base=agent.config.api_base,
)
tools = list(agent._tool_registry.get_tools().values())
tool_executor = agent._tool_registry.get_executor()
graph = agent._build_graph()
runtime = create_agent_runtime(
graph=graph,
goal=agent.goal,
storage_path=storage_path,
entry_points=[
EntryPointSpec(
id="start",
name="Start Competitive Analysis",
entry_node="intake",
trigger_type="manual",
isolation_level="isolated",
),
],
llm=llm,
tools=tools,
tool_executor=tool_executor,
)
await runtime.start()
try:
app = AdenTUI(runtime)
await app.run_async()
finally:
await runtime.stop()
asyncio.run(run_with_tui())
@cli.command()
@click.option("--json", "output_json", is_flag=True)
def info(output_json: bool) -> None:
"""Show agent information."""
info_data = default_agent.info()
if output_json:
click.echo(json.dumps(info_data, indent=2))
else:
click.echo(f"Agent: {info_data['name']}")
click.echo(f"Version: {info_data['version']}")
click.echo(f"Description: {info_data['description']}")
click.echo(f"\nGoal: {info_data['goal']['name']}")
click.echo(f" {info_data['goal']['description']}")
click.echo(f"\nNodes: {', '.join(info_data['nodes'])}")
# click.echo(f"Client-facing: {', '.join(info_data['client_facing_nodes'])}")
click.echo(f"Entry: {info_data['entry_node']}")
click.echo(f"Terminal: {', '.join(info_data['terminal_nodes'])}")
click.echo(f"Edges: {len(info_data['edges'])}")
@cli.command()
def validate() -> None:
"""Validate agent structure."""
validation = default_agent.validate()
if validation["valid"]:
click.echo("✅ Agent is valid")
if validation["warnings"]:
for warning in validation["warnings"]:
click.echo(f" ⚠️ {warning}")
else:
click.echo("❌ Agent has errors:")
for error in validation["errors"]:
click.echo(f" ERROR: {error}")
sys.exit(0 if validation["valid"] else 1)
@cli.command()
@click.option("--verbose", "-v", is_flag=True)
def shell(verbose: bool) -> None:
"""Interactive competitive intelligence session (CLI, no TUI)."""
asyncio.run(_interactive_shell(verbose))
async def _interactive_shell(verbose: bool = False) -> None:
"""Async interactive shell."""
setup_logging(verbose=verbose)
click.echo("=== Competitive Intelligence Agent ===")
click.echo("Provide competitor details to begin analysis (or 'quit' to exit):\n")
agent = CompetitiveIntelAgent()
await agent.start()
try:
while True:
try:
user_input = await asyncio.get_event_loop().run_in_executor(
None, input, "Competitors> "
)
if user_input.lower() in ["quit", "exit", "q"]:
click.echo("Goodbye!")
break
if not user_input.strip():
continue
click.echo("\nGathering competitive intelligence...\n")
result = await agent.trigger_and_wait(
"start", {"competitors_input": user_input}
)
if result is None:
click.echo("\n[Execution timed out]\n")
continue
if result.success:
output = result.output
status = output.get("delivery_status", "unknown")
click.echo(f"\nAnalysis complete (status: {status})\n")
else:
click.echo(f"\nAnalysis failed: {result.error}\n")
except KeyboardInterrupt:
click.echo("\nGoodbye!")
break
except Exception as e:
click.echo(f"Error: {e}", err=True)
import traceback
traceback.print_exc()
finally:
await agent.stop()
if __name__ == "__main__":
cli()
@@ -0,0 +1,375 @@
"""Agent graph construction for Competitive Intelligence Agent."""
from typing import Any, TYPE_CHECKING
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint, NodeSpec
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.runtime.event_bus import EventBus
from framework.runtime.core import Runtime
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from .config import default_config, metadata, RuntimeConfig
from .nodes import (
intake_node,
web_scraper_node,
news_search_node,
github_monitor_node,
aggregator_node,
analysis_node,
report_node,
)
if TYPE_CHECKING:
from framework.config import RuntimeConfig
# Goal definition
goal: Goal = Goal(
id="competitive-intelligence-report",
name="Competitive Intelligence Report",
description=(
"Monitor competitor websites, news sources, and GitHub repositories "
"to produce a structured weekly digest with key insights, detailed "
"findings per competitor, and 30-day trend analysis."
),
success_criteria=[
SuccessCriterion(
id="sc-source-coverage",
description="Check multiple source types per competitor (website, news, GitHub)",
metric="sources_per_competitor",
target=">=3",
weight=0.25,
),
SuccessCriterion(
id="sc-findings-structured",
description="All findings structured with competitor, category, update, source, and date",
metric="findings_structured",
target="true",
weight=0.25,
),
SuccessCriterion(
id="sc-historical-comparison",
description="Uses stored data to compare with previous reports for trend analysis",
metric="historical_comparison",
target="true",
weight=0.25,
),
SuccessCriterion(
id="sc-report-delivered",
description="User receives a formatted, readable competitive intelligence digest",
metric="report_delivered",
target="true",
weight=0.25,
),
],
constraints=[
Constraint(
id="c-no-fabrication",
description="Never fabricate findings, news, or data — only report what was found",
constraint_type="hard",
category="quality",
),
Constraint(
id="c-source-attribution",
description="Every finding must include a source URL",
constraint_type="hard",
category="quality",
),
Constraint(
id="c-recency",
description="Prioritize findings from the past 7 days; include up to 30 days",
constraint_type="soft",
category="quality",
),
],
)
# Node list
nodes: list[NodeSpec] = [
intake_node,
web_scraper_node,
news_search_node,
github_monitor_node,
aggregator_node,
analysis_node,
report_node,
]
# Edge definitions
edges: list[EdgeSpec] = [
EdgeSpec(
id="intake-to-web-scraper",
source="intake",
target="web-scraper",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="web-scraper-to-news-search",
source="web-scraper",
target="news-search",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="news-search-to-github-monitor",
source="news-search",
target="github-monitor",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(has_github_competitors).lower() == 'true'",
priority=2,
),
EdgeSpec(
id="news-search-to-aggregator-skip-github",
source="news-search",
target="aggregator",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(has_github_competitors).lower() != 'true'",
priority=1,
),
EdgeSpec(
id="github-monitor-to-aggregator",
source="github-monitor",
target="aggregator",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="aggregator-to-analysis",
source="aggregator",
target="analysis",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
EdgeSpec(
id="analysis-to-report",
source="analysis",
target="report",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
]
# Graph configuration
entry_node: str = "intake"
entry_points: dict[str, str] = {"start": "intake"}
pause_nodes: list[str] = []
terminal_nodes: list[str] = ["report"]
class CompetitiveIntelAgent:
"""
Competitive Intelligence Agent 7-node pipeline.
Flow: intake -> web-scraper -> news-search -> github-monitor -> aggregator -> analysis -> report
|
(skipped if no GitHub competitors)
"""
def __init__(self, config: RuntimeConfig | None = None) -> None:
"""
Initialize the Competitive Intelligence Agent.
Args:
config: Optional runtime configuration. Defaults to default_config.
"""
self.config = config or default_config
self.goal = goal
self.nodes = nodes
self.edges = edges
self.entry_node = entry_node
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._executor: GraphExecutor | None = None
self._graph: GraphSpec | None = None
self._event_bus: EventBus | None = None
self._tool_registry: ToolRegistry | None = None
def _build_graph(self) -> GraphSpec:
"""
Build the GraphSpec for the competitive intelligence workflow.
Returns:
A GraphSpec defining the agent's logic.
"""
return GraphSpec(
id="competitive-intel-agent-graph",
goal_id=self.goal.id,
version="1.0.0",
entry_node=self.entry_node,
entry_points=self.entry_points,
terminal_nodes=self.terminal_nodes,
pause_nodes=self.pause_nodes,
nodes=self.nodes,
edges=self.edges,
default_model=self.config.model,
max_tokens=self.config.max_tokens,
loop_config={
"max_iterations": 100,
"max_tool_calls_per_turn": 20,
"max_history_tokens": 32000,
},
)
def _setup(self) -> GraphExecutor:
"""
Set up the executor with all components (runtime, LLM, tools).
Returns:
An initialized GraphExecutor instance.
"""
from pathlib import Path
storage_path = Path.home() / ".hive" / "agents" / "competitive_intel_agent"
storage_path.mkdir(parents=True, exist_ok=True)
self._event_bus = EventBus()
self._tool_registry = ToolRegistry()
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
self._tool_registry.load_mcp_config(mcp_config_path)
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
tool_executor = self._tool_registry.get_executor()
tools = list(self._tool_registry.get_tools().values())
self._graph = self._build_graph()
runtime = Runtime(storage_path)
self._executor = GraphExecutor(
runtime=runtime,
llm=llm,
tools=tools,
tool_executor=tool_executor,
event_bus=self._event_bus,
storage_path=storage_path,
loop_config=self._graph.loop_config,
)
return self._executor
async def start(self) -> None:
"""Set up the agent (initialize executor and tools)."""
if self._executor is None:
self._setup()
async def stop(self) -> None:
"""Clean up resources."""
self._executor = None
self._event_bus = None
async def trigger_and_wait(
self,
entry_point: str,
input_data: dict[str, Any],
timeout: float | None = None,
session_state: dict[str, Any] | None = None,
) -> ExecutionResult | None:
"""
Execute the graph and wait for completion.
Args:
entry_point: The graph entry point to trigger.
input_data: Data to pass to the entry node.
timeout: Optional execution timeout.
session_state: Optional initial session state.
Returns:
The execution result, or None if it timed out.
"""
if self._executor is None:
raise RuntimeError("Agent not started. Call start() first.")
if self._graph is None:
raise RuntimeError("Graph not built. Call start() first.")
return await self._executor.execute(
graph=self._graph,
goal=self.goal,
input_data=input_data,
session_state=session_state,
)
async def run(self, context: dict[str, Any], session_state: dict[str, Any] | None = None) -> ExecutionResult:
"""
Run the agent (convenience method for single execution).
Args:
context: The input context for the agent.
session_state: Optional initial session state.
Returns:
The final execution result.
"""
await self.start()
try:
result = await self.trigger_and_wait(
"start", context, session_state=session_state
)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
await self.stop()
def info(self) -> dict[str, Any]:
"""Get agent information for introspection."""
return {
"name": metadata.name,
"version": metadata.version,
"description": metadata.description,
"goal": {
"name": self.goal.name,
"description": self.goal.description,
},
"nodes": [n.id for n in self.nodes],
"edges": [e.id for e in self.edges],
"entry_node": self.entry_node,
"entry_points": self.entry_points,
"pause_nodes": self.pause_nodes,
"terminal_nodes": self.terminal_nodes,
"client_facing_nodes": [n.id for n in self.nodes if n.client_facing],
}
def validate(self) -> dict[str, Any]:
"""
Validate agent structure for cycles, missing nodes, or invalid edges.
Returns:
A dict with 'valid' (bool), 'errors' (list), and 'warnings' (list).
"""
errors = []
warnings = []
node_ids = {node.id for node in self.nodes}
for edge in self.edges:
if edge.source not in node_ids:
errors.append(f"Edge {edge.id}: source '{edge.source}' not found")
if edge.target not in node_ids:
errors.append(f"Edge {edge.id}: target '{edge.target}' not found")
if self.entry_node not in node_ids:
errors.append(f"Entry node '{self.entry_node}' not found")
for terminal in self.terminal_nodes:
if terminal not in node_ids:
errors.append(f"Terminal node '{terminal}' not found")
for ep_id, node_id in self.entry_points.items():
if node_id not in node_ids:
errors.append(
f"Entry point '{ep_id}' references unknown node '{node_id}'"
)
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
}
# Create default instance
default_agent: CompetitiveIntelAgent = CompetitiveIntelAgent()
@@ -0,0 +1,24 @@
"""Runtime configuration for Competitive Intelligence Agent."""
from dataclasses import dataclass
from framework.config import RuntimeConfig
default_config: RuntimeConfig = RuntimeConfig()
@dataclass
class AgentMetadata:
"""Metadata for the Competitive Intelligence Agent."""
name: str = "Competitive Intelligence Agent"
version: str = "1.0.0"
description: str = (
"Monitors competitor websites, news sources, and GitHub repositories "
"to deliver automated weekly digests with key insights and trend analysis "
"for product and marketing teams."
)
intro_message: str = (
"Hi! I'm your competitive intelligence assistant. Tell me which competitors "
"to monitor and what areas to focus on (pricing, features, hiring, partnerships, etc.) "
"and I'll research them across websites, news, and GitHub to produce a detailed digest."
)
metadata: AgentMetadata = AgentMetadata()
@@ -0,0 +1,14 @@
{
"hive-tools": {
"transport": "stdio",
"command": "uv",
"args": [
"run",
"python",
"mcp_server.py",
"--stdio"
],
"cwd": "../../../tools",
"description": "Hive tools MCP server providing web_search, web_scrape, github tools, and file utilities"
}
}
@@ -0,0 +1,319 @@
"""Node definitions for Competitive Intelligence Agent."""
from framework.graph import NodeSpec
# Node 1: Intake (client-facing)
intake_node: NodeSpec = NodeSpec(
id="intake",
name="Competitor Intake",
description="Collect competitor list, focus areas, and report preferences from the user",
node_type="event_loop",
client_facing=True,
input_keys=["competitors_input"],
output_keys=["competitors", "focus_areas", "report_frequency", "has_github_competitors"],
system_prompt="""\
You are a competitive intelligence intake specialist. Your job is to gather the
information needed to run a competitive analysis.
**STEP 1 Read the input and respond (text only, NO tool calls):**
The user may provide input in several forms:
- A JSON object with "competitors", "focus_areas", and "report_frequency"
- A natural-language description of competitors to track
- Just company names
If the input is clear, confirm what you understood and ask the user to confirm.
If it's vague, ask 1-2 clarifying questions:
- Which competitors? (name + website URL at minimum)
- What focus areas? (pricing, features, hiring, partnerships, messaging, etc.)
- Do any competitors have public GitHub organizations/repos?
After your message, call ask_user() to wait for the user's response.
**STEP 2 After the user confirms, call set_output for each key:**
Structure the data and set outputs:
- set_output("competitors", <JSON list of {name, website, github (or null)}>)
- set_output("focus_areas", <JSON list of strings like ["pricing", "features", "hiring"]>)
- set_output("report_frequency", "weekly")
- set_output("has_github_competitors", "true" or "false")
Set has_github_competitors to "true" if at least one competitor has a non-null github field.
""",
tools=[],
)
# Node 2: Web Scraper
web_scraper_node: NodeSpec = NodeSpec(
id="web-scraper",
name="Website Monitor",
description="Scrape competitor websites for pricing, features, and announcements",
node_type="event_loop",
input_keys=["competitors", "focus_areas"],
output_keys=["web_findings"],
system_prompt="""\
You are a web intelligence agent. For each competitor, systematically check their
online presence for updates related to the focus areas.
**Process for each competitor:**
1. Use web_search to find their current pricing page, product page, changelog,
and blog. Try queries like:
- "{competitor_name} pricing"
- "{competitor_name} changelog OR release notes OR what's new"
- "{competitor_name} blog announcements"
- "site:{competitor_website} pricing OR features"
2. Use web_scrape on the most relevant URLs to extract actual content.
Focus on: pricing tiers, feature lists, recent announcements, messaging.
3. For each finding, note:
- competitor: which competitor
- category: pricing / features / announcement / messaging / other
- update: what changed or what you found
- source: the URL
- date: when it was published/updated (if available, otherwise "unknown")
**Important:**
- Work through competitors one at a time
- Skip URLs that fail to load; move on
- Prioritize recent content (last 7-30 days)
- Be factual only report what you actually see on the page
When done, call:
- set_output("web_findings", <JSON list of finding objects>)
""",
tools=["web_search", "web_scrape"],
)
# Node 3: News Search
news_search_node: NodeSpec = NodeSpec(
id="news-search",
name="News & Press Monitor",
description="Search for competitor mentions in news, press releases, and industry publications",
node_type="event_loop",
input_keys=["competitors", "focus_areas"],
output_keys=["news_findings"],
system_prompt="""\
You are a news intelligence agent. Search for recent news, press releases, and
industry coverage about each competitor.
**Process for each competitor:**
1. Use web_search with news-focused queries:
- "{competitor_name} news"
- "{competitor_name} press release 2026"
- "{competitor_name} partnership OR acquisition OR funding"
- "{competitor_name} {focus_area}" for each focus area
2. Use web_scrape on the most relevant news articles (aim for 2-3 per competitor).
Extract the headline, key details, and publication date.
3. For each finding, note:
- competitor: which competitor
- category: partnership / funding / hiring / press_release / industry_news
- update: summary of the news item
- source: the article URL
- date: publication date
**Important:**
- Prioritize news from the last 7 days, but include last 30 days if sparse
- Include press releases, blog posts, and industry analyst coverage
- Skip paywalled content gracefully
- Do NOT fabricate news only report what you find
When done, call:
- set_output("news_findings", <JSON list of finding objects>)
""",
tools=["web_search", "web_scrape"],
)
# Node 4: GitHub Monitor
github_monitor_node: NodeSpec = NodeSpec(
id="github-monitor",
name="GitHub Activity Monitor",
description="Track public GitHub repository activity for competitors with GitHub presence",
node_type="event_loop",
input_keys=["competitors"],
output_keys=["github_findings"],
system_prompt="""\
You are a GitHub intelligence agent. For each competitor that has a GitHub
organization or username, check their recent public activity.
**Process for each competitor with a GitHub handle:**
1. Use github_get_repo or github_list_repos to find their main repositories.
2. Note key metrics:
- New repositories created recently
- Star count changes (if you have historical data)
- Recent commit activity (last 7 days)
- Open issues/PRs count
- Any new releases or tags
3. For each notable finding, note:
- competitor: which competitor
- category: github_activity / new_repo / release / open_source
- update: what you found (e.g. "3 new commits to main repo", "Released v2.1")
- source: GitHub URL
- date: date of activity
**Important:**
- Only process competitors that have a non-null "github" field
- Focus on activity that signals product direction or engineering investment
- If a competitor has many repos, focus on the most starred / most active ones
- If no GitHub tool is available or auth fails, set output with an empty list
When done, call:
- set_output("github_findings", <JSON list of finding objects>)
""",
tools=["github_list_repos", "github_get_repo", "github_search_repos"],
)
# Node 5: Aggregator
aggregator_node: NodeSpec = NodeSpec(
id="aggregator",
name="Data Aggregator",
description="Combine findings from all sources, deduplicate, and structure for analysis",
node_type="event_loop",
input_keys=["competitors", "web_findings", "news_findings", "github_findings"],
output_keys=["aggregated_findings"],
nullable_output_keys=["github_findings"],
system_prompt="""\
You are a data aggregation specialist. Combine all the findings from the web
scraper, news search, and GitHub monitor into a single, clean dataset.
**Steps:**
1. Merge all findings into one list, preserving the source attribution.
2. Deduplicate: if the same update appears from multiple searches, keep the
most detailed version and note multiple sources.
3. Categorize each finding consistently using these categories:
- pricing, features, partnership, hiring, funding, press_release,
- github_activity, messaging, product_launch, other
4. Sort findings by competitor, then by date (most recent first).
5. Save the aggregated data for historical tracking:
save_data(filename="findings_latest.json", data=<aggregated JSON>)
When done, call:
- set_output("aggregated_findings", <JSON list of deduplicated finding objects>)
Each finding should have: competitor, category, update, source, date.
""",
tools=["save_data", "load_data", "list_data_files"],
)
# Node 6: Analysis
analysis_node: NodeSpec = NodeSpec(
id="analysis",
name="Insight Analysis",
description="Extract key insights, detect trends, and compare with historical data",
node_type="event_loop",
input_keys=["aggregated_findings", "competitors", "focus_areas"],
output_keys=["key_highlights", "trend_analysis", "detailed_findings"],
system_prompt="""\
You are a competitive intelligence analyst. Analyze the aggregated findings and
produce actionable insights.
**Steps:**
1. **Load historical data** (if available):
- Use list_data_files() to see past snapshots
- Use load_data() to load the most recent previous snapshot
- Compare current findings with previous data to identify CHANGES
2. **Extract Key Highlights** (the most important 3-5 items):
- Significant pricing changes
- Major feature launches or product updates
- Strategic moves (partnerships, acquisitions, funding)
- Anything that requires immediate attention
3. **Trend Analysis** (30-day view):
- Is a competitor investing more in enterprise features?
- Are multiple competitors moving in the same direction?
- Any shifts in pricing strategy across the market?
- Engineering investment signals from GitHub activity
4. **Save current snapshot for future comparison:**
save_data(filename="snapshot_YYYY-MM-DD.json", data=<current findings + analysis>)
When done, call:
- set_output("key_highlights", <JSON list of highlight strings>)
- set_output("trend_analysis", <JSON list of trend observation strings>)
- set_output("detailed_findings", <JSON: per-competitor structured findings>)
""",
tools=["load_data", "save_data", "list_data_files"],
)
# Node 7: Report Generator (client-facing)
report_node: NodeSpec = NodeSpec(
id="report",
name="Report Generator",
description="Generate and deliver the competitive intelligence digest as an HTML report",
node_type="event_loop",
client_facing=True,
input_keys=["key_highlights", "trend_analysis", "detailed_findings", "competitors"],
output_keys=["delivery_status"],
system_prompt="""\
You are a report generation specialist. Create a polished, self-contained HTML
competitive intelligence report and deliver it to the user.
**STEP 1 Build the HTML report (tool calls, NO text to user yet):**
Create a complete, well-styled HTML document. Use this structure:
```html
<h1>Competitive Intelligence Report</h1>
<p>Week of [date range]</p>
<h2>🔥 Key Highlights</h2>
<!-- Bulleted list of the most important findings -->
<h2>📊 Detailed Findings</h2>
<!-- For each competitor: -->
<h3>[Competitor Name]</h3>
<table>
<tr><th>Category</th><th>Update</th><th>Source</th><th>Date</th></tr>
<!-- One row per finding -->
</table>
<h2>📈 30-Day Trends</h2>
<!-- Bulleted list of trend observations -->
<footer>Generated by Competitive Intelligence Agent</footer>
```
Design requirements:
- Modern, readable styling with a dark header and clean tables
- Color-coded categories (pricing=blue, features=green, partnerships=purple, etc.)
- Clickable source links
- Responsive layout
Save the report:
save_data(filename="report_YYYY-MM-DD.html", data=<your_html>)
Serve it to the user:
serve_file_to_user(filename="report_YYYY-MM-DD.html", label="Competitive Intelligence Report")
**STEP 2 Present to the user (text only, NO tool calls):**
Tell the user the report is ready and include the file link. Provide a brief
summary of the most important findings. Ask if they want to:
- Dig deeper into any specific competitor
- Adjust focus areas for next time
- See historical trends
After presenting, call ask_user() to wait for the user's response.
**STEP 3 After the user responds:**
- Answer follow-up questions from the research material
- Call ask_user() again if they might have more questions
- When satisfied: set_output("delivery_status", "completed")
""",
tools=["save_data", "load_data", "serve_file_to_user", "list_data_files"],
)
__all__ = [
"intake_node",
"web_scraper_node",
"news_search_node",
"github_monitor_node",
"aggregator_node",
"analysis_node",
"report_node",
]