Files
hive/examples/templates/vulnerability_assessment/agent.py
T
2026-04-03 20:31:14 -07:00

360 lines
12 KiB
Python

"""Agent graph construction for Passive Website Vulnerability Assessment."""
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.runtime.event_bus import EventBus
from framework.runtime.core import Runtime
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from .config import default_config, metadata
from .nodes import (
intake_node,
passive_recon_node,
risk_scoring_node,
findings_review_node,
final_report_node,
)
# Goal definition
goal = Goal(
id="passive-vulnerability-assessment",
name="Passive Website Vulnerability Assessment",
description=(
"A passive, OSINT-based website vulnerability assessment agent that accepts a "
"website domain, performs non-intrusive security scanning using purpose-built "
"Python tools, produces letter-grade risk scores (A-F) per category, and delivers "
"a structured vulnerability report with remediation guidance. The user is consulted "
"after scanning to decide whether to investigate further or generate the final report."
),
success_criteria=[
SuccessCriterion(
id="risk-score-produced",
description="Overall risk grade (A-F) generated from combined scan results",
metric="overall_grade_generated",
target="true",
weight=0.25,
),
SuccessCriterion(
id="category-coverage",
description=(
"At least 5 of 6 security categories scored (SSL/TLS, HTTP Headers, "
"DNS, Network, Technology, Attack Surface)"
),
metric="categories_scored",
target=">=5",
weight=0.20,
),
SuccessCriterion(
id="vulnerability-discovery",
description=("At least 3 security findings identified across different categories"),
metric="vulnerabilities_found",
target=">=3",
weight=0.20,
),
SuccessCriterion(
id="remediation-guidance",
description=(
"Every finding includes clear, actionable remediation steps a developer can follow"
),
metric="findings_with_remediation",
target="100%",
weight=0.20,
),
SuccessCriterion(
id="user-control",
description=(
"User is presented findings with risk grades and given checkpoint "
"to continue deeper scanning or generate report"
),
metric="user_checkpoints",
target=">=1",
weight=0.15,
),
],
constraints=[
Constraint(
id="non-intrusive-only",
description=(
"Never execute active attacks, send exploit payloads, or perform actions "
"that could trigger WAF/IDS systems. Passive and OSINT-based scanning only "
"— no nmap, sqlmap, or attack payloads."
),
constraint_type="hard",
category="safety",
),
Constraint(
id="developer-audience",
description=(
"All findings and remediation steps must be written for developers "
"using clear language, not security jargon"
),
constraint_type="hard",
category="quality",
),
],
)
# Node list
nodes = [
intake_node,
passive_recon_node,
risk_scoring_node,
findings_review_node,
final_report_node,
]
# Edge definitions
edges = [
# intake -> passive-recon
EdgeSpec(
id="intake-to-passive-recon",
source="intake",
target="passive-recon",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# passive-recon -> risk-scoring
EdgeSpec(
id="passive-recon-to-risk-scoring",
source="passive-recon",
target="risk-scoring",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# risk-scoring -> findings-review
EdgeSpec(
id="risk-scoring-to-findings-review",
source="risk-scoring",
target="findings-review",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# findings-review -> passive-recon (feedback loop: user wants deeper scanning)
EdgeSpec(
id="findings-review-to-passive-recon",
source="findings-review",
target="passive-recon",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(continue_scanning).lower() == 'true'",
priority=-1,
),
# findings-review -> final-report (user is satisfied, generate report)
EdgeSpec(
id="findings-review-to-final-report",
source="findings-review",
target="final-report",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(continue_scanning).lower() != 'true'",
priority=1,
),
# final-report -> intake (forever-alive: scan another target)
EdgeSpec(
id="final-report-to-intake",
source="final-report",
target="intake",
condition=EdgeCondition.ON_SUCCESS,
priority=-1,
),
]
# Graph configuration — forever-alive pattern
entry_node = "intake"
entry_points = {"start": "intake"}
pause_nodes = []
terminal_nodes = []
class VulnerabilityResearcherAgent:
"""
Passive Website Vulnerability Assessment — forever-alive agent.
Flow: intake -> passive-recon -> risk-scoring -> findings-review -> final-report
^ | |
+---- feedback loop (deeper scan) -+ |
|
intake <----- forever-alive loop (new target) -------------------+
"""
def __init__(self, config=None):
self.config = config or default_config
self.goal = goal
self.nodes = nodes
self.edges = edges
self.entry_node = entry_node
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._executor: GraphExecutor | None = None
self._graph: GraphSpec | None = None
self._event_bus: EventBus | None = None
self._tool_registry: ToolRegistry | None = None
def _build_graph(self) -> GraphSpec:
"""Build the GraphSpec."""
return GraphSpec(
id="vulnerability-researcher-graph",
goal_id=self.goal.id,
version="2.0.0",
entry_node=self.entry_node,
entry_points=self.entry_points,
terminal_nodes=self.terminal_nodes,
pause_nodes=self.pause_nodes,
nodes=self.nodes,
edges=self.edges,
default_model=self.config.model,
max_tokens=self.config.max_tokens,
loop_config={
"max_iterations": 100,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
conversation_mode="continuous",
identity_prompt=(
"You are a passive website vulnerability assessment agent. You use "
"purpose-built Python scanning tools to evaluate the security posture "
"of websites. You produce letter-grade risk scores (A-F) per category "
"and deliver actionable remediation guidance written for developers."
),
)
def _setup(self, mock_mode=False) -> GraphExecutor:
"""Set up the executor with all components."""
from pathlib import Path
storage_path = Path.home() / ".hive" / "agents" / "vulnerability_researcher"
storage_path.mkdir(parents=True, exist_ok=True)
self._event_bus = EventBus()
self._tool_registry = ToolRegistry()
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
self._tool_registry.load_mcp_config(mcp_config_path)
llm = None
if not mock_mode:
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
tool_executor = self._tool_registry.get_executor()
tools = list(self._tool_registry.get_tools().values())
self._graph = self._build_graph()
runtime = Runtime(storage_path)
self._executor = GraphExecutor(
runtime=runtime,
llm=llm,
tools=tools,
tool_executor=tool_executor,
event_bus=self._event_bus,
storage_path=storage_path,
loop_config=self._graph.loop_config,
)
return self._executor
async def start(self, mock_mode=False) -> None:
"""Set up the agent (initialize executor and tools)."""
if self._executor is None:
self._setup(mock_mode=mock_mode)
async def stop(self) -> None:
"""Clean up resources."""
self._executor = None
self._event_bus = None
async def trigger_and_wait(
self,
entry_point: str,
input_data: dict,
timeout: float | None = None,
session_state: dict | None = None,
) -> ExecutionResult | None:
"""Execute the graph and wait for completion."""
if self._executor is None:
raise RuntimeError("Agent not started. Call start() first.")
if self._graph is None:
raise RuntimeError("Graph not built. Call start() first.")
return await self._executor.execute(
graph=self._graph,
goal=self.goal,
input_data=input_data,
session_state=session_state,
)
async def run(self, context: dict, mock_mode=False, session_state=None) -> ExecutionResult:
"""Run the agent (convenience method for single execution)."""
await self.start(mock_mode=mock_mode)
try:
result = await self.trigger_and_wait("start", context, session_state=session_state)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
await self.stop()
def info(self):
"""Get agent information."""
return {
"name": metadata.name,
"version": metadata.version,
"description": metadata.description,
"goal": {
"name": self.goal.name,
"description": self.goal.description,
},
"nodes": [n.id for n in self.nodes],
"edges": [e.id for e in self.edges],
"entry_node": self.entry_node,
"entry_points": self.entry_points,
"pause_nodes": self.pause_nodes,
"terminal_nodes": self.terminal_nodes,
"client_facing_nodes": [n.id for n in self.nodes if n.client_facing],
}
def validate(self):
"""Validate agent structure."""
errors = []
warnings = []
node_ids = {node.id for node in self.nodes}
for edge in self.edges:
if edge.source not in node_ids:
errors.append(f"Edge {edge.id}: source '{edge.source}' not found")
if edge.target not in node_ids:
errors.append(f"Edge {edge.id}: target '{edge.target}' not found")
if self.entry_node not in node_ids:
errors.append(f"Entry node '{self.entry_node}' not found")
for terminal in self.terminal_nodes:
if terminal not in node_ids:
errors.append(f"Terminal node '{terminal}' not found")
for ep_id, node_id in self.entry_points.items():
if node_id not in node_ids:
errors.append(f"Entry point '{ep_id}' references unknown node '{node_id}'")
# Verify all nodes have at least one outgoing edge (forever-alive)
for node_id in node_ids:
outgoing = [e for e in self.edges if e.source == node_id]
if not outgoing and node_id not in self.terminal_nodes:
warnings.append(
f"Node '{node_id}' has no outgoing edges (dead end in forever-alive graph)"
)
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
}
# Create default instance
default_agent = VulnerabilityResearcherAgent()