Merge pull request #4922 from adenhq/feat/vulnerability_agent

Feat/vulnerability agent
This commit is contained in:
RichardTang-Aden
2026-02-15 19:21:00 -08:00
committed by GitHub
28 changed files with 3563 additions and 1 deletions
+1
View File
@@ -1449,6 +1449,7 @@ def _select_agent(agents_dir: Path) -> str | None:
for path in agents_dir.iterdir():
if _is_valid_agent_dir(path):
agents.append(path)
agents.sort(key=lambda p: p.name)
if not agents:
print(f"No agents found in {agents_dir}", file=sys.stderr)
@@ -200,7 +200,7 @@ class DeepResearchAgent:
},
)
def _setup(self) -> GraphExecutor:
def _setup(self, mock_mode: bool = False) -> None:
"""Set up the executor with all components."""
from pathlib import Path
@@ -0,0 +1,47 @@
# Passive Vulnerability Assessment
A template agent that performs passive, OSINT-based security scanning on a target domain and produces letter-grade risk scores (A-F) per category with a developer-focused vulnerability report.
## Architecture
```
intake → passive-recon → risk-scoring → findings-review → final-report
↑ | |
└──────── feedback loop ─────────┘ |
intake ←────────── forever-alive loop ────────────────────────┘
```
### Nodes
1. **intake** — Collect target domain from the user (client-facing)
2. **passive-recon** — Run 6 scanning tools: SSL/TLS, HTTP headers, DNS, ports, tech stack, subdomains
3. **risk-scoring** — Calculate weighted letter grades (A-F) per category via `risk_score` tool
4. **findings-review** — Present grades and findings, ask user to continue or generate report (client-facing)
5. **final-report** — Generate an HTML risk dashboard with remediation steps (client-facing)
### Required Tools
- `ssl_tls_scan`, `http_headers_scan`, `dns_security_scan`
- `port_scan`, `tech_stack_detect`, `subdomain_enumerate`
- `risk_score`, `save_data`, `serve_file_to_user`
## Usage
### Linux / Mac
```bash
PYTHONPATH=core:examples/templates python -m vulnerability_assessment run --target "example.com"
```
### Windows
```powershell
$env:PYTHONPATH="core;examples\templates"
python -m vulnerability_assessment run --target "example.com"
```
## Options
- `-t, --target`: Target domain to scan (required).
- `--mock`: Run without calling real LLM APIs (simulated execution).
- `-v, --verbose`: Show execution details.
- `--debug`: Show debug logging.
- `--help`: Show all available options.
@@ -0,0 +1,25 @@
"""
Passive Vulnerability Assessment - OSINT-based security scanning with risk grades.
Performs non-intrusive security scanning (SSL/TLS, HTTP headers, DNS, ports, tech stack,
subdomains) on a target domain and produces letter-grade risk scores (A-F) per category
with a developer-focused vulnerability report. Features human-in-the-loop checkpoints
and a forever-alive loop for continuous assessments.
"""
from .agent import VulnerabilityResearcherAgent, default_agent, goal, nodes, edges
from .config import RuntimeConfig, AgentMetadata, default_config, metadata
__version__ = "2.0.0"
__all__ = [
"VulnerabilityResearcherAgent",
"default_agent",
"goal",
"nodes",
"edges",
"RuntimeConfig",
"AgentMetadata",
"default_config",
"metadata",
]
@@ -0,0 +1,238 @@
"""
CLI entry point for Passive Vulnerability Assessment.
Uses AgentRuntime for multi-entrypoint support with HITL pause/resume.
"""
import asyncio
import json
import logging
import sys
import click
from .agent import default_agent, VulnerabilityResearcherAgent
def setup_logging(verbose=False, debug=False):
"""Configure logging for execution visibility."""
if debug:
level, fmt = logging.DEBUG, "%(asctime)s %(name)s: %(message)s"
elif verbose:
level, fmt = logging.INFO, "%(message)s"
else:
level, fmt = logging.WARNING, "%(levelname)s: %(message)s"
logging.basicConfig(level=level, format=fmt, stream=sys.stderr)
logging.getLogger("framework").setLevel(level)
@click.group()
@click.version_option(version="2.0.0")
def cli():
"""Passive Vulnerability Assessment - OSINT-based security scanning with risk grades."""
pass
@cli.command()
@click.option("--target", "-t", type=str, required=True, help="Target domain to scan")
@click.option("--mock", is_flag=True, help="Run in mock mode")
@click.option("--quiet", "-q", is_flag=True, help="Only output result JSON")
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def run(target, mock, quiet, verbose, debug):
"""Execute passive vulnerability assessment on a target domain."""
if not quiet:
setup_logging(verbose=verbose, debug=debug)
context = {"target_domain": target}
result = asyncio.run(default_agent.run(context, mock_mode=mock))
output_data = {
"success": result.success,
"steps_executed": result.steps_executed,
"output": result.output,
}
if result.error:
output_data["error"] = result.error
click.echo(json.dumps(output_data, indent=2, default=str))
sys.exit(0 if result.success else 1)
@cli.command()
@click.option("--mock", is_flag=True, help="Run in mock mode")
@click.option("--verbose", "-v", is_flag=True, help="Show execution details")
@click.option("--debug", is_flag=True, help="Show debug logging")
def tui(mock, verbose, debug):
"""Launch the TUI dashboard for interactive vulnerability assessment."""
setup_logging(verbose=verbose, debug=debug)
try:
from framework.tui.app import AdenTUI
except ImportError:
click.echo(
"TUI requires the 'textual' package. Install with: pip install textual"
)
sys.exit(1)
from pathlib import Path
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import create_agent_runtime
from framework.runtime.event_bus import EventBus
from framework.runtime.execution_stream import EntryPointSpec
async def run_with_tui():
agent = VulnerabilityResearcherAgent()
# Build graph and tools
agent._event_bus = EventBus()
agent._tool_registry = ToolRegistry()
storage_path = Path.home() / ".hive" / "agents" / "vulnerability_researcher"
storage_path.mkdir(parents=True, exist_ok=True)
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
agent._tool_registry.load_mcp_config(mcp_config_path)
llm = None
if not mock:
llm = LiteLLMProvider(
model=agent.config.model,
api_key=agent.config.api_key,
api_base=agent.config.api_base,
)
tools = list(agent._tool_registry.get_tools().values())
tool_executor = agent._tool_registry.get_executor()
graph = agent._build_graph()
runtime = create_agent_runtime(
graph=graph,
goal=agent.goal,
storage_path=storage_path,
entry_points=[
EntryPointSpec(
id="start",
name="Start Vulnerability Assessment",
entry_node="intake",
trigger_type="manual",
isolation_level="isolated",
),
],
llm=llm,
tools=tools,
tool_executor=tool_executor,
)
await runtime.start()
try:
app = AdenTUI(runtime)
await app.run_async()
finally:
await runtime.stop()
asyncio.run(run_with_tui())
@cli.command()
@click.option("--json", "output_json", is_flag=True)
def info(output_json):
"""Show agent information."""
info_data = default_agent.info()
if output_json:
click.echo(json.dumps(info_data, indent=2))
else:
click.echo(f"Agent: {info_data['name']}")
click.echo(f"Version: {info_data['version']}")
click.echo(f"Description: {info_data['description']}")
click.echo(f"\nNodes: {', '.join(info_data['nodes'])}")
click.echo(f"Client-facing: {', '.join(info_data['client_facing_nodes'])}")
click.echo(f"Entry: {info_data['entry_node']}")
click.echo(
f"Terminal: {', '.join(info_data['terminal_nodes']) or '(forever-alive)'}"
)
@cli.command()
def validate():
"""Validate agent structure."""
validation = default_agent.validate()
if validation["valid"]:
click.echo("Agent is valid")
if validation["warnings"]:
for warning in validation["warnings"]:
click.echo(f" WARNING: {warning}")
else:
click.echo("Agent has errors:")
for error in validation["errors"]:
click.echo(f" ERROR: {error}")
sys.exit(0 if validation["valid"] else 1)
@cli.command()
@click.option("--verbose", "-v", is_flag=True)
def shell(verbose):
"""Interactive vulnerability assessment session (CLI, no TUI)."""
asyncio.run(_interactive_shell(verbose))
async def _interactive_shell(verbose=False):
"""Async interactive shell."""
setup_logging(verbose=verbose)
click.echo("=== Passive Vulnerability Assessment ===")
click.echo("Enter a target domain to assess (or 'quit' to exit):\n")
agent = VulnerabilityResearcherAgent()
await agent.start()
try:
while True:
try:
target = await asyncio.get_event_loop().run_in_executor(
None, input, "Target> "
)
if target.lower() in ["quit", "exit", "q"]:
click.echo("Goodbye!")
break
if not target.strip():
continue
click.echo("\nAssessing...\n")
result = await agent.trigger_and_wait(
"start", {"target_domain": target}
)
if result is None:
click.echo("\n[Execution timed out]\n")
continue
if result.success:
output = result.output
if "report_status" in output:
click.echo(
f"\nAssessment complete: {output['report_status']}\n"
)
else:
click.echo(f"\nAssessment failed: {result.error}\n")
except KeyboardInterrupt:
click.echo("\nGoodbye!")
break
except Exception as e:
click.echo(f"Error: {e}", err=True)
import traceback
traceback.print_exc()
finally:
await agent.stop()
if __name__ == "__main__":
cli()
@@ -0,0 +1,320 @@
{
"agent": {
"id": "vulnerability_assessment",
"name": "Passive Vulnerability Assessment",
"version": "2.0.0",
"description": "A passive, OSINT-based website vulnerability assessment agent that accepts a website domain, performs non-intrusive security scanning using purpose-built Python tools, produces letter-grade risk scores (A-F) per category, and delivers a structured vulnerability report with remediation guidance. The user is consulted after scanning to decide whether to investigate further or generate the final report."
},
"graph": {
"id": "vulnerability-researcher-graph",
"goal_id": "passive-vulnerability-assessment",
"version": "2.0.0",
"entry_node": "intake",
"entry_points": {
"start": "intake"
},
"pause_nodes": [],
"terminal_nodes": [],
"conversation_mode": "continuous",
"identity_prompt": "You are a passive website vulnerability assessment agent. You use purpose-built Python scanning tools to evaluate the security posture of websites. You produce letter-grade risk scores (A-F) per category and deliver actionable remediation guidance written for developers.",
"nodes": [
{
"id": "intake",
"name": "Intake",
"description": "Collect the target website domain from the user and confirm the scanning scope",
"node_type": "event_loop",
"input_keys": [],
"output_keys": [
"target_domain"
],
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are the intake specialist for a passive website vulnerability assessment agent.\n\n**STEP 1 \u2014 Greet and collect target (text only, NO tool calls):**\nAsk the user for the website domain they want to assess. If they already provided one, confirm it.\n\nClarify:\n- The exact domain or URL (e.g., example.com, https://app.example.com)\n- Any specific areas of concern (e.g., email security, SSL, exposed services)\n\nExplain briefly that this is a **passive, non-intrusive assessment** \u2014 we only examine publicly available information (SSL certificates, HTTP headers, DNS records, open ports, tech fingerprints, and public subdomain data). No attack payloads or exploit attempts.\n\nKeep it brief. One message, 2-3 questions max.\n\nAfter your message, call ask_user() to wait for the user's response.\n\n**STEP 2 \u2014 After the user responds, call set_output:**\n- set_output(\"target_domain\", \"the confirmed domain/URL to test, e.g. https://example.com\")",
"tools": [],
"model": null,
"function": null,
"routes": {},
"max_retries": 3,
"retry_on": [],
"max_node_visits": 0,
"output_model": null,
"max_validation_retries": 2,
"client_facing": true,
"success_criteria": null
},
{
"id": "passive-recon",
"name": "Passive Reconnaissance",
"description": "Run all 6 passive scanning tools against the target domain: SSL/TLS, HTTP headers, DNS security, port scanning, tech stack detection, and subdomain enumeration",
"node_type": "event_loop",
"input_keys": [
"target_domain",
"feedback"
],
"output_keys": [
"scan_results"
],
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You are a passive reconnaissance specialist. Given a target domain, run all 6 scanning tools to assess the security posture. These tools are non-intrusive and OSINT-based.\n\nIf feedback is provided (not None/empty), this is a follow-up round \u2014 focus on the areas the user requested. You may skip tools that aren't relevant to the feedback. If feedback is None or empty, this is the first scan \u2014 run ALL 6 tools.\n\n**Run these tools against the target domain:**\n\n1. **ssl_tls_scan(hostname)** \u2014 Checks TLS version, certificate validity, cipher strength\n2. **http_headers_scan(url)** \u2014 Checks OWASP-recommended security headers (HSTS, CSP, X-Frame-Options, etc.)\n3. **dns_security_scan(domain)** \u2014 Checks SPF, DMARC, DKIM, DNSSEC, zone transfer\n4. **port_scan(hostname)** \u2014 TCP connect scan on top 20 common ports, flags exposed database/admin ports\n5. **tech_stack_detect(url)** \u2014 Detects web server, framework, CMS, JS libraries, cookies\n6. **subdomain_enumerate(domain)** \u2014 Queries Certificate Transparency logs for subdomains\n\n**IMPORTANT:**\n- Extract just the hostname/domain from the URL for tools that need it (e.g., \"example.com\" not \"https://example.com\")\n- Use the full URL (with https://) for http_headers_scan and tech_stack_detect\n- Run tools in batches of 2-3 to avoid overwhelming the system\n- If a tool fails, note the error and continue with the remaining tools\n\n**After all tools complete, compile results:**\n\nCombine ALL tool outputs into a single JSON object and store it:\n\nset_output(\"scan_results\", \"<JSON string containing all 6 tool results: {ssl: {...}, headers: {...}, dns: {...}, ports: {...}, tech: {...}, subdomains: {...}}>\")\n\nEach tool returns a grade_input dict \u2014 preserve these as-is, the risk scorer needs them.",
"tools": [
"ssl_tls_scan",
"http_headers_scan",
"dns_security_scan",
"port_scan",
"tech_stack_detect",
"subdomain_enumerate"
],
"model": null,
"function": null,
"routes": {},
"max_retries": 3,
"retry_on": [],
"max_node_visits": 0,
"output_model": null,
"max_validation_retries": 2,
"client_facing": false,
"success_criteria": null
},
{
"id": "risk-scoring",
"name": "Risk Scoring",
"description": "Calculate weighted letter grades (A-F) per security category and overall risk score from scan results",
"node_type": "event_loop",
"input_keys": [
"scan_results"
],
"output_keys": [
"risk_report"
],
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You calculate risk scores from scan results.\n\nGiven scan_results (a JSON string with ssl, headers, dns, ports, tech, subdomains sections), call the risk_score tool to produce letter grades.\n\n**Step 1 \u2014 Extract scan results and call risk_score:**\n\nThe risk_score tool accepts JSON strings for each category. Extract the relevant sections from scan_results and pass them:\n\nrisk_score(\n ssl_results=\"<JSON string of the ssl section from scan_results>\",\n headers_results=\"<JSON string of the headers section from scan_results>\",\n dns_results=\"<JSON string of the dns section from scan_results>\",\n ports_results=\"<JSON string of the ports section from scan_results>\",\n tech_results=\"<JSON string of the tech section from scan_results>\",\n subdomain_results=\"<JSON string of the subdomains section from scan_results>\"\n)\n\nIf a category has no results (tool failed), pass an empty string for that parameter.\n\n**Step 2 \u2014 Store the risk report:**\n\nset_output(\"risk_report\", \"<the complete JSON output from risk_score, including overall_score, overall_grade, categories, top_risks, and grade_scale>\")",
"tools": [
"risk_score"
],
"model": null,
"function": null,
"routes": {},
"max_retries": 3,
"retry_on": [],
"max_node_visits": 0,
"output_model": null,
"max_validation_retries": 2,
"client_facing": false,
"success_criteria": null
},
{
"id": "findings-review",
"name": "Findings Review",
"description": "Present risk grades and security findings to the user, ask whether to continue deeper scanning or generate the final report",
"node_type": "event_loop",
"input_keys": [
"scan_results",
"risk_report",
"target_domain"
],
"output_keys": [
"continue_scanning",
"feedback",
"all_findings"
],
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "You present security scan findings and risk grades to the user and ask for their decision.\n\n**STEP 1 \u2014 Present findings (text only, NO tool calls):**\n\nDisplay the results in this format:\n\n1. **Overall Risk Grade** \u2014 Show the letter grade prominently (e.g., \"Overall Grade: C (68/100)\")\n\n2. **Category Breakdown** \u2014 Table showing each category's grade:\n | Category | Grade | Score | Findings |\n |----------|-------|-------|----------|\n | SSL/TLS | B | 85 | 1 issue |\n | HTTP Headers | D | 45 | 4 issues |\n | DNS Security | C | 60 | 3 issues |\n | Network Exposure | C | 70 | 1 issue |\n | Technology | B | 75 | 2 issues |\n | Attack Surface | B | 80 | 1 issue |\n\n3. **Top Risks** \u2014 List the most critical findings from the risk report's top_risks field\n\n4. **Grade Scale** \u2014 Show the grade scale so the user understands the scoring:\n - A (90-100): Excellent security posture\n - B (75-89): Good, minor improvements needed\n - C (60-74): Fair, notable security gaps\n - D (40-59): Poor, significant vulnerabilities\n - F (0-39): Critical, immediate action required\n\n5. **Options** \u2014 Ask: \"Would you like me to:\n - **Continue scanning** \u2014 I can focus on specific weak areas for a deeper look\n - **Generate the report** \u2014 I'll compile a full HTML risk dashboard with all findings and remediation steps\"\n\nAfter your message, call ask_user() to wait for the user's response.\n\n**STEP 2 \u2014 After the user responds, call set_output:**\n\nIf the user wants to continue:\n- set_output(\"continue_scanning\", \"true\")\n- set_output(\"feedback\", \"What the user wants investigated further, or 'focus on weakest categories'\")\n- set_output(\"all_findings\", \"Accumulated findings from all rounds so far as JSON string\")\n\nIf the user wants to stop and get the report:\n- set_output(\"continue_scanning\", \"false\")\n- set_output(\"feedback\", \"\")\n- set_output(\"all_findings\", \"All scan results and risk report combined as JSON string\")",
"tools": [],
"model": null,
"function": null,
"routes": {},
"max_retries": 3,
"retry_on": [],
"max_node_visits": 0,
"output_model": null,
"max_validation_retries": 2,
"client_facing": true,
"success_criteria": null
},
{
"id": "final-report",
"name": "Risk Dashboard Report",
"description": "Generate an HTML risk dashboard with color-coded grades, category breakdown, detailed findings, and remediation steps",
"node_type": "event_loop",
"input_keys": [
"all_findings",
"risk_report",
"target_domain"
],
"output_keys": [
"report_status"
],
"nullable_output_keys": [],
"input_schema": {},
"output_schema": {},
"system_prompt": "Generate an HTML risk dashboard report and deliver it to the user.\n\n**STEP 1 \u2014 Generate the HTML report (tool calls first):**\n\nCreate a self-contained HTML document with embedded CSS. Use a clean, professional security dashboard design.\n\nReport structure:\n- **Header**: Target domain, scan date, \"Security Risk Assessment\" title\n- **Overall Grade**: Large, color-coded letter grade (A=green, B=blue, C=yellow, D=orange, F=red) with numeric score\n- **Grade Scale Legend**: Show what each grade means (A through F)\n- **Category Breakdown**: 6 cards/panels, each showing:\n - Category name\n - Letter grade (color-coded)\n - Numeric score\n - Number of findings\n- **Detailed Findings by Category**: For each of the 6 categories:\n - Category header with grade\n - List of findings organized by severity (high -> medium -> low -> info)\n - For each finding:\n - Title and severity badge (color-coded)\n - Description of the issue\n - Why it matters (impact)\n - **Remediation**: Clear, step-by-step fix instructions for developers\n - Code examples where relevant (e.g., header configurations, DNS records to add)\n- **Top Risks Summary**: Prioritized action items (fix these first)\n- **Methodology**: \"This assessment used passive, OSINT-based scanning techniques...\"\n- **Disclaimer**: \"This is an automated passive assessment, not a comprehensive penetration test\"\n\nDesign requirements:\n- Every finding MUST have remediation steps\n- Write for developers, not security experts\n- Use severity color coding (red=critical/high, orange=medium, blue=low, gray=info)\n- Responsive layout, works on mobile\n- Self-contained \u2014 no external CSS/JS dependencies\n\nSave and serve:\n- save_data(filename=\"risk_assessment_report.html\", data=<html_content>)\n- serve_file_to_user(filename=\"risk_assessment_report.html\", label=\"Security Risk Assessment Report\")\n\n**STEP 2 \u2014 Present to user (text only, NO tool calls):**\nTell the user the report is ready. Summarize: overall grade, weakest category, top 3 action items.\n\nAfter presenting, call ask_user() to wait for follow-up questions.\n\n**STEP 3 \u2014 After the user responds:**\n- Answer any questions about findings or remediation\n- Call ask_user() again if they have more questions\n- When the user is satisfied: set_output(\"report_status\", \"completed\")",
"tools": [
"save_data",
"serve_file_to_user"
],
"model": null,
"function": null,
"routes": {},
"max_retries": 3,
"retry_on": [],
"max_node_visits": 0,
"output_model": null,
"max_validation_retries": 2,
"client_facing": true,
"success_criteria": null
}
],
"edges": [
{
"id": "intake-to-passive-recon",
"source": "intake",
"target": "passive-recon",
"condition": "on_success",
"condition_expr": null,
"priority": 1,
"input_mapping": {}
},
{
"id": "passive-recon-to-risk-scoring",
"source": "passive-recon",
"target": "risk-scoring",
"condition": "on_success",
"condition_expr": null,
"priority": 1,
"input_mapping": {}
},
{
"id": "risk-scoring-to-findings-review",
"source": "risk-scoring",
"target": "findings-review",
"condition": "on_success",
"condition_expr": null,
"priority": 1,
"input_mapping": {}
},
{
"id": "findings-review-to-passive-recon",
"source": "findings-review",
"target": "passive-recon",
"condition": "conditional",
"condition_expr": "str(continue_scanning).lower() == 'true'",
"priority": -1,
"input_mapping": {}
},
{
"id": "findings-review-to-final-report",
"source": "findings-review",
"target": "final-report",
"condition": "conditional",
"condition_expr": "str(continue_scanning).lower() != 'true'",
"priority": 1,
"input_mapping": {}
},
{
"id": "final-report-to-intake",
"source": "final-report",
"target": "intake",
"condition": "on_success",
"condition_expr": null,
"priority": -1,
"input_mapping": {}
}
],
"max_steps": 100,
"max_retries_per_node": 3,
"description": "A passive, OSINT-based website vulnerability assessment agent that accepts a website domain, performs non-intrusive security scanning using purpose-built Python tools, produces letter-grade risk scores (A-F) per category, and delivers a structured vulnerability report with remediation guidance. The user is consulted after scanning to decide whether to investigate further or generate the final report."
},
"goal": {
"id": "passive-vulnerability-assessment",
"name": "Passive Website Vulnerability Assessment",
"description": "A passive, OSINT-based website vulnerability assessment agent that accepts a website domain, performs non-intrusive security scanning using purpose-built Python tools, produces letter-grade risk scores (A-F) per category, and delivers a structured vulnerability report with remediation guidance. The user is consulted after scanning to decide whether to investigate further or generate the final report.",
"status": "draft",
"success_criteria": [
{
"id": "risk-score-produced",
"description": "Overall risk grade (A-F) generated from combined scan results",
"metric": "overall_grade_generated",
"target": "true",
"weight": 0.25,
"met": false
},
{
"id": "category-coverage",
"description": "At least 5 of 6 security categories scored (SSL/TLS, HTTP Headers, DNS, Network, Technology, Attack Surface)",
"metric": "categories_scored",
"target": ">=5",
"weight": 0.2,
"met": false
},
{
"id": "vulnerability-discovery",
"description": "At least 3 security findings identified across different categories",
"metric": "vulnerabilities_found",
"target": ">=3",
"weight": 0.2,
"met": false
},
{
"id": "remediation-guidance",
"description": "Every finding includes clear, actionable remediation steps a developer can follow",
"metric": "findings_with_remediation",
"target": "100%",
"weight": 0.2,
"met": false
},
{
"id": "user-control",
"description": "User is presented findings with risk grades and given checkpoint to continue deeper scanning or generate report",
"metric": "user_checkpoints",
"target": ">=1",
"weight": 0.15,
"met": false
}
],
"constraints": [
{
"id": "non-intrusive-only",
"description": "Never execute active attacks, send exploit payloads, or perform actions that could trigger WAF/IDS systems. Passive and OSINT-based scanning only \u2014 no nmap, sqlmap, or attack payloads.",
"constraint_type": "hard",
"category": "safety",
"check": ""
},
{
"id": "developer-audience",
"description": "All findings and remediation steps must be written for developers using clear language, not security jargon",
"constraint_type": "hard",
"category": "quality",
"check": ""
}
],
"context": {},
"required_capabilities": [],
"input_schema": {},
"output_schema": {},
"version": "2.0.0",
"parent_version": null,
"evolution_reason": null
},
"required_tools": [
"ssl_tls_scan",
"http_headers_scan",
"dns_security_scan",
"port_scan",
"tech_stack_detect",
"subdomain_enumerate",
"risk_score",
"save_data",
"serve_file_to_user"
],
"metadata": {
"node_count": 5,
"edge_count": 6
}
}
@@ -0,0 +1,368 @@
"""Agent graph construction for Passive Website Vulnerability Assessment."""
from framework.graph import EdgeSpec, EdgeCondition, Goal, SuccessCriterion, Constraint
from framework.graph.edge import GraphSpec
from framework.graph.executor import ExecutionResult, GraphExecutor
from framework.runtime.event_bus import EventBus
from framework.runtime.core import Runtime
from framework.llm import LiteLLMProvider
from framework.runner.tool_registry import ToolRegistry
from .config import default_config, metadata
from .nodes import (
intake_node,
passive_recon_node,
risk_scoring_node,
findings_review_node,
final_report_node,
)
# Goal definition
goal = Goal(
id="passive-vulnerability-assessment",
name="Passive Website Vulnerability Assessment",
description=(
"A passive, OSINT-based website vulnerability assessment agent that accepts a "
"website domain, performs non-intrusive security scanning using purpose-built "
"Python tools, produces letter-grade risk scores (A-F) per category, and delivers "
"a structured vulnerability report with remediation guidance. The user is consulted "
"after scanning to decide whether to investigate further or generate the final report."
),
success_criteria=[
SuccessCriterion(
id="risk-score-produced",
description="Overall risk grade (A-F) generated from combined scan results",
metric="overall_grade_generated",
target="true",
weight=0.25,
),
SuccessCriterion(
id="category-coverage",
description=(
"At least 5 of 6 security categories scored (SSL/TLS, HTTP Headers, "
"DNS, Network, Technology, Attack Surface)"
),
metric="categories_scored",
target=">=5",
weight=0.20,
),
SuccessCriterion(
id="vulnerability-discovery",
description=(
"At least 3 security findings identified across different categories"
),
metric="vulnerabilities_found",
target=">=3",
weight=0.20,
),
SuccessCriterion(
id="remediation-guidance",
description=(
"Every finding includes clear, actionable remediation steps "
"a developer can follow"
),
metric="findings_with_remediation",
target="100%",
weight=0.20,
),
SuccessCriterion(
id="user-control",
description=(
"User is presented findings with risk grades and given checkpoint "
"to continue deeper scanning or generate report"
),
metric="user_checkpoints",
target=">=1",
weight=0.15,
),
],
constraints=[
Constraint(
id="non-intrusive-only",
description=(
"Never execute active attacks, send exploit payloads, or perform actions "
"that could trigger WAF/IDS systems. Passive and OSINT-based scanning only "
"— no nmap, sqlmap, or attack payloads."
),
constraint_type="hard",
category="safety",
),
Constraint(
id="developer-audience",
description=(
"All findings and remediation steps must be written for developers "
"using clear language, not security jargon"
),
constraint_type="hard",
category="quality",
),
],
)
# Node list
nodes = [
intake_node,
passive_recon_node,
risk_scoring_node,
findings_review_node,
final_report_node,
]
# Edge definitions
edges = [
# intake -> passive-recon
EdgeSpec(
id="intake-to-passive-recon",
source="intake",
target="passive-recon",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# passive-recon -> risk-scoring
EdgeSpec(
id="passive-recon-to-risk-scoring",
source="passive-recon",
target="risk-scoring",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# risk-scoring -> findings-review
EdgeSpec(
id="risk-scoring-to-findings-review",
source="risk-scoring",
target="findings-review",
condition=EdgeCondition.ON_SUCCESS,
priority=1,
),
# findings-review -> passive-recon (feedback loop: user wants deeper scanning)
EdgeSpec(
id="findings-review-to-passive-recon",
source="findings-review",
target="passive-recon",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(continue_scanning).lower() == 'true'",
priority=-1,
),
# findings-review -> final-report (user is satisfied, generate report)
EdgeSpec(
id="findings-review-to-final-report",
source="findings-review",
target="final-report",
condition=EdgeCondition.CONDITIONAL,
condition_expr="str(continue_scanning).lower() != 'true'",
priority=1,
),
# final-report -> intake (forever-alive: scan another target)
EdgeSpec(
id="final-report-to-intake",
source="final-report",
target="intake",
condition=EdgeCondition.ON_SUCCESS,
priority=-1,
),
]
# Graph configuration — forever-alive pattern
entry_node = "intake"
entry_points = {"start": "intake"}
pause_nodes = []
terminal_nodes = []
class VulnerabilityResearcherAgent:
"""
Passive Website Vulnerability Assessment forever-alive agent.
Flow: intake -> passive-recon -> risk-scoring -> findings-review -> final-report
^ | |
+---- feedback loop (deeper scan) -+ |
|
intake <----- forever-alive loop (new target) -------------------+
"""
def __init__(self, config=None):
self.config = config or default_config
self.goal = goal
self.nodes = nodes
self.edges = edges
self.entry_node = entry_node
self.entry_points = entry_points
self.pause_nodes = pause_nodes
self.terminal_nodes = terminal_nodes
self._executor: GraphExecutor | None = None
self._graph: GraphSpec | None = None
self._event_bus: EventBus | None = None
self._tool_registry: ToolRegistry | None = None
def _build_graph(self) -> GraphSpec:
"""Build the GraphSpec."""
return GraphSpec(
id="vulnerability-researcher-graph",
goal_id=self.goal.id,
version="2.0.0",
entry_node=self.entry_node,
entry_points=self.entry_points,
terminal_nodes=self.terminal_nodes,
pause_nodes=self.pause_nodes,
nodes=self.nodes,
edges=self.edges,
default_model=self.config.model,
max_tokens=self.config.max_tokens,
loop_config={
"max_iterations": 100,
"max_tool_calls_per_turn": 20,
"max_history_tokens": 32000,
},
conversation_mode="continuous",
identity_prompt=(
"You are a passive website vulnerability assessment agent. You use "
"purpose-built Python scanning tools to evaluate the security posture "
"of websites. You produce letter-grade risk scores (A-F) per category "
"and deliver actionable remediation guidance written for developers."
),
)
def _setup(self, mock_mode=False) -> GraphExecutor:
"""Set up the executor with all components."""
from pathlib import Path
storage_path = Path.home() / ".hive" / "agents" / "vulnerability_researcher"
storage_path.mkdir(parents=True, exist_ok=True)
self._event_bus = EventBus()
self._tool_registry = ToolRegistry()
mcp_config_path = Path(__file__).parent / "mcp_servers.json"
if mcp_config_path.exists():
self._tool_registry.load_mcp_config(mcp_config_path)
llm = None
if not mock_mode:
llm = LiteLLMProvider(
model=self.config.model,
api_key=self.config.api_key,
api_base=self.config.api_base,
)
tool_executor = self._tool_registry.get_executor()
tools = list(self._tool_registry.get_tools().values())
self._graph = self._build_graph()
runtime = Runtime(storage_path)
self._executor = GraphExecutor(
runtime=runtime,
llm=llm,
tools=tools,
tool_executor=tool_executor,
event_bus=self._event_bus,
storage_path=storage_path,
loop_config=self._graph.loop_config,
)
return self._executor
async def start(self, mock_mode=False) -> None:
"""Set up the agent (initialize executor and tools)."""
if self._executor is None:
self._setup(mock_mode=mock_mode)
async def stop(self) -> None:
"""Clean up resources."""
self._executor = None
self._event_bus = None
async def trigger_and_wait(
self,
entry_point: str,
input_data: dict,
timeout: float | None = None,
session_state: dict | None = None,
) -> ExecutionResult | None:
"""Execute the graph and wait for completion."""
if self._executor is None:
raise RuntimeError("Agent not started. Call start() first.")
if self._graph is None:
raise RuntimeError("Graph not built. Call start() first.")
return await self._executor.execute(
graph=self._graph,
goal=self.goal,
input_data=input_data,
session_state=session_state,
)
async def run(
self, context: dict, mock_mode=False, session_state=None
) -> ExecutionResult:
"""Run the agent (convenience method for single execution)."""
await self.start(mock_mode=mock_mode)
try:
result = await self.trigger_and_wait(
"start", context, session_state=session_state
)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
await self.stop()
def info(self):
"""Get agent information."""
return {
"name": metadata.name,
"version": metadata.version,
"description": metadata.description,
"goal": {
"name": self.goal.name,
"description": self.goal.description,
},
"nodes": [n.id for n in self.nodes],
"edges": [e.id for e in self.edges],
"entry_node": self.entry_node,
"entry_points": self.entry_points,
"pause_nodes": self.pause_nodes,
"terminal_nodes": self.terminal_nodes,
"client_facing_nodes": [n.id for n in self.nodes if n.client_facing],
}
def validate(self):
"""Validate agent structure."""
errors = []
warnings = []
node_ids = {node.id for node in self.nodes}
for edge in self.edges:
if edge.source not in node_ids:
errors.append(f"Edge {edge.id}: source '{edge.source}' not found")
if edge.target not in node_ids:
errors.append(f"Edge {edge.id}: target '{edge.target}' not found")
if self.entry_node not in node_ids:
errors.append(f"Entry node '{self.entry_node}' not found")
for terminal in self.terminal_nodes:
if terminal not in node_ids:
errors.append(f"Terminal node '{terminal}' not found")
for ep_id, node_id in self.entry_points.items():
if node_id not in node_ids:
errors.append(
f"Entry point '{ep_id}' references unknown node '{node_id}'"
)
# Verify all nodes have at least one outgoing edge (forever-alive)
for node_id in node_ids:
outgoing = [e for e in self.edges if e.source == node_id]
if not outgoing and node_id not in self.terminal_nodes:
warnings.append(
f"Node '{node_id}' has no outgoing edges (dead end in forever-alive graph)"
)
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
}
# Create default instance
default_agent = VulnerabilityResearcherAgent()
@@ -0,0 +1,29 @@
"""Runtime configuration."""
from dataclasses import dataclass
from framework.config import RuntimeConfig
default_config = RuntimeConfig()
@dataclass
class AgentMetadata:
name: str = "Passive Vulnerability Assessment"
version: str = "2.0.0"
description: str = (
"Passive, OSINT-based website vulnerability assessment agent that performs "
"non-intrusive security scanning using purpose-built Python tools, produces "
"letter-grade risk scores (A-F) per category, and delivers a structured "
"vulnerability report with remediation guidance."
)
intro_message: str = (
"Hi! I'm your security assessment assistant. Give me a website domain and "
"I'll perform a passive, non-intrusive security assessment — checking SSL/TLS, "
"HTTP headers, DNS security, open ports, tech stack, and subdomains — then "
"produce a risk score card (A-F grades) with remediation steps. What domain "
"would you like me to assess?"
)
metadata = AgentMetadata()
@@ -0,0 +1,9 @@
{
"hive-tools": {
"transport": "stdio",
"command": "uv",
"args": ["run", "python", "mcp_server.py", "--stdio"],
"cwd": "../../../tools",
"description": "Hive tools MCP server"
}
}
@@ -0,0 +1,287 @@
"""Node definitions for Passive Website Vulnerability Assessment."""
from framework.graph import NodeSpec
# Node 1: Intake (client-facing)
# Collect the target domain and confirm scanning scope.
intake_node = NodeSpec(
id="intake",
name="Intake",
description="Collect the target website domain from the user and confirm the scanning scope",
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=[],
output_keys=["target_domain"],
system_prompt="""\
You are the intake specialist for a passive website vulnerability assessment agent.
**STEP 1 Greet and collect target (text only, NO tool calls):**
Ask the user for the website domain they want to assess. If they already provided one, \
confirm it.
Clarify:
- The exact domain or URL (e.g., example.com, https://app.example.com)
- Any specific areas of concern (e.g., email security, SSL, exposed services)
Explain briefly that this is a **passive, non-intrusive assessment** we only examine \
publicly available information (SSL certificates, HTTP headers, DNS records, open ports, \
tech fingerprints, and public subdomain data). No attack payloads or exploit attempts.
Keep it brief. One message, 2-3 questions max.
After your message, call ask_user() to wait for the user's response.
**STEP 2 After the user responds, call set_output:**
- set_output("target_domain", "the confirmed domain/URL to test, e.g. https://example.com")
""",
tools=[],
)
# Node 2: Passive Reconnaissance
# Runs all 6 scanning tools — no CLI dependencies, no attack payloads.
passive_recon_node = NodeSpec(
id="passive-recon",
name="Passive Reconnaissance",
description=(
"Run all 6 passive scanning tools against the target domain: SSL/TLS, "
"HTTP headers, DNS security, port scanning, tech stack detection, and "
"subdomain enumeration"
),
node_type="event_loop",
max_node_visits=0,
input_keys=["target_domain", "feedback"],
output_keys=["scan_results"],
system_prompt="""\
You are a passive reconnaissance specialist. Given a target domain, run all 6 scanning \
tools to assess the security posture. These tools are non-intrusive and OSINT-based.
If feedback is provided (not None/empty), this is a follow-up round focus on the areas \
the user requested. You may skip tools that aren't relevant to the feedback. If feedback \
is None or empty, this is the first scan run ALL 6 tools.
**Run these tools against the target domain:**
1. **ssl_tls_scan(hostname)** Checks TLS version, certificate validity, cipher strength
2. **http_headers_scan(url)** Checks OWASP-recommended security headers (HSTS, CSP, \
X-Frame-Options, etc.)
3. **dns_security_scan(domain)** Checks SPF, DMARC, DKIM, DNSSEC, zone transfer
4. **port_scan(hostname)** TCP connect scan on top 20 common ports, flags exposed \
database/admin ports
5. **tech_stack_detect(url)** Detects web server, framework, CMS, JS libraries, cookies
6. **subdomain_enumerate(domain)** Queries Certificate Transparency logs for subdomains
**IMPORTANT:**
- Extract just the hostname/domain from the URL for tools that need it \
(e.g., "example.com" not "https://example.com")
- Use the full URL (with https://) for http_headers_scan and tech_stack_detect
- Run tools in batches of 2-3 to avoid overwhelming the system
- If a tool fails, note the error and continue with the remaining tools
**After all tools complete, compile results:**
Combine ALL tool outputs into a single JSON object and store it:
set_output("scan_results", "<JSON string containing all 6 tool results: \
{ssl: {...}, headers: {...}, dns: {...}, ports: {...}, tech: {...}, subdomains: {...}}>")
Each tool returns a grade_input dict preserve these as-is, the risk scorer needs them.
""",
tools=[
"ssl_tls_scan",
"http_headers_scan",
"dns_security_scan",
"port_scan",
"tech_stack_detect",
"subdomain_enumerate",
],
)
# Node 3: Risk Scoring
# Calculates weighted letter grades from scan results.
risk_scoring_node = NodeSpec(
id="risk-scoring",
name="Risk Scoring",
description=(
"Calculate weighted letter grades (A-F) per security category and overall "
"risk score from scan results"
),
node_type="event_loop",
max_node_visits=0,
input_keys=["scan_results"],
output_keys=["risk_report"],
system_prompt="""\
You calculate risk scores from scan results.
Given scan_results (a JSON string with ssl, headers, dns, ports, tech, subdomains \
sections), call the risk_score tool to produce letter grades.
**Step 1 Extract scan results and call risk_score:**
The risk_score tool accepts JSON strings for each category. Extract the relevant \
sections from scan_results and pass them:
risk_score(
ssl_results="<JSON string of the ssl section from scan_results>",
headers_results="<JSON string of the headers section from scan_results>",
dns_results="<JSON string of the dns section from scan_results>",
ports_results="<JSON string of the ports section from scan_results>",
tech_results="<JSON string of the tech section from scan_results>",
subdomain_results="<JSON string of the subdomains section from scan_results>"
)
If a category has no results (tool failed), pass an empty string for that parameter.
**Step 2 Store the risk report:**
set_output("risk_report", "<the complete JSON output from risk_score, including \
overall_score, overall_grade, categories, top_risks, and grade_scale>")
""",
tools=["risk_score"],
)
# Node 4: Findings Review (client-facing)
# Present risk grades and ask the user to continue or generate report.
findings_review_node = NodeSpec(
id="findings-review",
name="Findings Review",
description=(
"Present risk grades and security findings to the user, ask whether to "
"continue deeper scanning or generate the final report"
),
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["scan_results", "risk_report", "target_domain"],
output_keys=["continue_scanning", "feedback", "all_findings"],
system_prompt="""\
You present security scan findings and risk grades to the user and ask for their decision.
**STEP 1 Present findings (text only, NO tool calls):**
Display the results in this format:
1. **Overall Risk Grade** Show the letter grade prominently \
(e.g., "Overall Grade: C (68/100)")
2. **Category Breakdown** Table showing each category's grade:
| Category | Grade | Score | Findings |
|----------|-------|-------|----------|
| SSL/TLS | B | 85 | 1 issue |
| HTTP Headers | D | 45 | 4 issues |
| DNS Security | C | 60 | 3 issues |
| Network Exposure | C | 70 | 1 issue |
| Technology | B | 75 | 2 issues |
| Attack Surface | B | 80 | 1 issue |
3. **Top Risks** List the most critical findings from the risk report's top_risks field
4. **Grade Scale** Show the grade scale so the user understands the scoring:
- A (90-100): Excellent security posture
- B (75-89): Good, minor improvements needed
- C (60-74): Fair, notable security gaps
- D (40-59): Poor, significant vulnerabilities
- F (0-39): Critical, immediate action required
5. **Options** Ask: "Would you like me to:
- **Continue scanning** I can focus on specific weak areas for a deeper look
- **Generate the report** I'll compile a full HTML risk dashboard with all \
findings and remediation steps"
After your message, call ask_user() to wait for the user's response.
**STEP 2 After the user responds, call set_output:**
If the user wants to continue:
- set_output("continue_scanning", "true")
- set_output("feedback", "What the user wants investigated further, or \
'focus on weakest categories'")
- set_output("all_findings", "Accumulated findings from all rounds so far as JSON string")
If the user wants to stop and get the report:
- set_output("continue_scanning", "false")
- set_output("feedback", "")
- set_output("all_findings", "All scan results and risk report combined as JSON string")
""",
tools=[],
)
# Node 5: Final Report (client-facing)
# Generates an HTML risk dashboard with color-coded grades.
final_report_node = NodeSpec(
id="final-report",
name="Risk Dashboard Report",
description=(
"Generate an HTML risk dashboard with color-coded grades, category breakdown, "
"detailed findings, and remediation steps"
),
node_type="event_loop",
client_facing=True,
max_node_visits=0,
input_keys=["all_findings", "risk_report", "target_domain"],
output_keys=["report_status"],
system_prompt="""\
Generate an HTML risk dashboard report and deliver it to the user.
**STEP 1 Generate the HTML report (tool calls first):**
Create a self-contained HTML document with embedded CSS. Use a clean, professional \
security dashboard design.
Report structure:
- **Header**: Target domain, scan date, "Security Risk Assessment" title
- **Overall Grade**: Large, color-coded letter grade (A=green, B=blue, C=yellow, \
D=orange, F=red) with numeric score
- **Grade Scale Legend**: Show what each grade means (A through F)
- **Category Breakdown**: 6 cards/panels, each showing:
- Category name
- Letter grade (color-coded)
- Numeric score
- Number of findings
- **Detailed Findings by Category**: For each of the 6 categories:
- Category header with grade
- List of findings organized by severity (high -> medium -> low -> info)
- For each finding:
- Title and severity badge (color-coded)
- Description of the issue
- Why it matters (impact)
- **Remediation**: Clear, step-by-step fix instructions for developers
- Code examples where relevant (e.g., header configurations, DNS records to add)
- **Top Risks Summary**: Prioritized action items (fix these first)
- **Methodology**: "This assessment used passive, OSINT-based scanning techniques..."
- **Disclaimer**: "This is an automated passive assessment, not a comprehensive \
penetration test"
Design requirements:
- Every finding MUST have remediation steps
- Write for developers, not security experts
- Use severity color coding (red=critical/high, orange=medium, blue=low, gray=info)
- Responsive layout, works on mobile
- Self-contained no external CSS/JS dependencies
Save and serve:
- save_data(filename="risk_assessment_report.html", data=<html_content>)
- serve_file_to_user(filename="risk_assessment_report.html", \
label="Security Risk Assessment Report")
**STEP 2 Present to user (text only, NO tool calls):**
Tell the user the report is ready. Summarize: overall grade, weakest category, \
top 3 action items.
After presenting, call ask_user() to wait for follow-up questions.
**STEP 3 After the user responds:**
- Answer any questions about findings or remediation
- Call ask_user() again if they have more questions
- When the user is satisfied: set_output("report_status", "completed")
""",
tools=["save_data", "serve_file_to_user"],
)
__all__ = [
"intake_node",
"passive_recon_node",
"risk_scoring_node",
"findings_review_node",
"final_report_node",
]
+1
View File
@@ -29,6 +29,7 @@ dependencies = [
"playwright>=1.40.0",
"playwright-stealth>=1.0.5",
"litellm>=1.81.0",
"dnspython>=2.4.0",
"resend>=2.0.0",
"framework",
+26
View File
@@ -26,6 +26,9 @@ from .bigquery_tool import register_tools as register_bigquery
from .calcom_tool import register_tools as register_calcom
from .calendar_tool import register_tools as register_calendar
from .csv_tool import register_tools as register_csv
# Security scanning tools
from .dns_security_scanner import register_tools as register_dns_security_scanner
from .email_tool import register_tools as register_email
from .example_tool import register_tools as register_example
from .excel_tool import register_tools as register_excel
@@ -47,12 +50,18 @@ from .file_system_toolkits.write_to_file import register_tools as register_write
from .github_tool import register_tools as register_github
from .gmail_tool import register_tools as register_gmail
from .google_maps_tool import register_tools as register_google_maps
from .http_headers_scanner import register_tools as register_http_headers_scanner
from .hubspot_tool import register_tools as register_hubspot
from .news_tool import register_tools as register_news
from .pdf_read_tool import register_tools as register_pdf_read
from .port_scanner import register_tools as register_port_scanner
from .risk_scorer import register_tools as register_risk_scorer
from .runtime_logs_tool import register_tools as register_runtime_logs
from .serpapi_tool import register_tools as register_serpapi
from .slack_tool import register_tools as register_slack
from .ssl_tls_scanner import register_tools as register_ssl_tls_scanner
from .subdomain_enumerator import register_tools as register_subdomain_enumerator
from .tech_stack_detector import register_tools as register_tech_stack_detector
from .telegram_tool import register_tools as register_telegram
from .time_tool import register_tools as register_time
from .vision_tool import register_tools as register_vision
@@ -115,6 +124,15 @@ def register_all_tools(
register_csv(mcp)
register_excel(mcp)
# Security scanning tools (no credentials needed)
register_ssl_tls_scanner(mcp)
register_http_headers_scanner(mcp)
register_dns_security_scanner(mcp)
register_port_scanner(mcp)
register_tech_stack_detector(mcp)
register_subdomain_enumerator(mcp)
register_risk_scorer(mcp)
return [
"example_tool",
"web_search",
@@ -283,6 +301,14 @@ def register_all_tools(
"maps_place_search",
"run_bigquery_query",
"describe_dataset",
# Security scanning tools
"ssl_tls_scan",
"http_headers_scan",
"dns_security_scan",
"port_scan",
"tech_stack_detect",
"subdomain_enumerate",
"risk_score",
]
@@ -0,0 +1,5 @@
"""DNS Security Scanner - Check SPF, DMARC, DKIM, DNSSEC, and zone transfer."""
from .dns_security_scanner import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,263 @@
"""
DNS Security Scanner - Check SPF, DMARC, DKIM, DNSSEC, and zone transfer.
Performs non-intrusive DNS queries to evaluate email security configuration
and DNS infrastructure hardening. Uses dnspython for all lookups.
"""
from __future__ import annotations
from fastmcp import FastMCP
try:
import dns.exception
import dns.name
import dns.query
import dns.rdatatype
import dns.resolver
import dns.xfr
import dns.zone
_DNS_AVAILABLE = True
except ImportError:
_DNS_AVAILABLE = False
# Common DKIM selectors to probe
DKIM_SELECTORS = ["default", "google", "selector1", "selector2", "k1", "mail", "dkim", "s1"]
def register_tools(mcp: FastMCP) -> None:
"""Register DNS security scanning tools with the MCP server."""
@mcp.tool()
def dns_security_scan(domain: str) -> dict:
"""
Scan a domain's DNS records for email security and infrastructure hardening.
Checks SPF, DMARC, DKIM (common selectors), DNSSEC, MX, CAA records,
and tests for zone transfer vulnerability. Non-intrusive uses standard
DNS queries only.
Args:
domain: Domain name to scan (e.g., "example.com"). Do not include protocol.
Returns:
Dict with SPF, DMARC, DKIM, DNSSEC, MX, CAA results, zone transfer
status, and grade_input for the risk_scorer tool.
"""
if not _DNS_AVAILABLE:
return {
"error": ("dnspython is not installed. Install it with: pip install dnspython"),
}
# Clean domain
domain = domain.replace("https://", "").replace("http://", "").strip("/")
domain = domain.split("/")[0]
if ":" in domain:
domain = domain.split(":")[0]
resolver = dns.resolver.Resolver()
resolver.timeout = 10
resolver.lifetime = 10
spf = _check_spf(resolver, domain)
dmarc = _check_dmarc(resolver, domain)
dkim = _check_dkim(resolver, domain)
dnssec = _check_dnssec(resolver, domain)
mx = _check_mx(resolver, domain)
caa = _check_caa(resolver, domain)
zone_transfer = _check_zone_transfer(resolver, domain)
grade_input = {
"spf_present": spf["present"],
"spf_strict": spf.get("policy") == "hardfail",
"dmarc_present": dmarc["present"],
"dmarc_enforcing": dmarc.get("policy") in ("quarantine", "reject"),
"dkim_found": len(dkim.get("selectors_found", [])) > 0,
"dnssec_enabled": dnssec["enabled"],
"zone_transfer_blocked": not zone_transfer["vulnerable"],
}
return {
"domain": domain,
"spf": spf,
"dmarc": dmarc,
"dkim": dkim,
"dnssec": dnssec,
"mx_records": mx,
"caa_records": caa,
"zone_transfer": zone_transfer,
"grade_input": grade_input,
}
def _check_spf(resolver: dns.resolver.Resolver, domain: str) -> dict:
"""Check SPF record."""
try:
answers = resolver.resolve(domain, "TXT")
for rdata in answers:
txt = rdata.to_text().strip('"')
if txt.startswith("v=spf1"):
issues = []
if "~all" in txt:
policy = "softfail"
issues.append(
"Uses ~all (softfail) instead of -all (hardfail). "
"Spoofed emails may still be delivered."
)
elif "-all" in txt:
policy = "hardfail"
elif "+all" in txt:
policy = "pass_all"
issues.append(
"Uses +all which allows ANY server to send email for this domain. "
"This effectively disables SPF protection."
)
elif "?all" in txt:
policy = "neutral"
issues.append("Uses ?all (neutral). SPF results are not used for filtering.")
else:
policy = "unknown"
issues.append("No 'all' mechanism found in SPF record.")
return {
"present": True,
"record": txt,
"policy": policy,
"issues": issues,
}
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.DNSException):
pass
return {
"present": False,
"record": None,
"policy": None,
"issues": ["No SPF record found. Any server can send email as this domain."],
}
def _check_dmarc(resolver: dns.resolver.Resolver, domain: str) -> dict:
"""Check DMARC record."""
try:
answers = resolver.resolve(f"_dmarc.{domain}", "TXT")
for rdata in answers:
txt = rdata.to_text().strip('"')
if txt.startswith("v=DMARC1"):
issues = []
policy = "none"
for part in txt.split(";"):
part = part.strip()
if part.startswith("p="):
policy = part[2:].strip()
if policy == "none":
issues.append(
"DMARC policy is 'none' — spoofed emails are not blocked. "
"Upgrade to p=quarantine or p=reject."
)
elif policy == "quarantine":
pass # Acceptable
elif policy == "reject":
pass # Best
return {
"present": True,
"record": txt,
"policy": policy,
"issues": issues,
}
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.DNSException):
pass
return {
"present": False,
"record": None,
"policy": None,
"issues": ["No DMARC record found. Email spoofing is not actively monitored or blocked."],
}
def _check_dkim(resolver: dns.resolver.Resolver, domain: str) -> dict:
"""Probe common DKIM selectors."""
found = []
missing = []
for selector in DKIM_SELECTORS:
try:
answers = resolver.resolve(f"{selector}._domainkey.{domain}", "TXT")
if answers:
found.append(selector)
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.DNSException):
missing.append(selector)
return {
"selectors_found": found,
"selectors_missing": missing,
}
def _check_dnssec(resolver: dns.resolver.Resolver, domain: str) -> dict:
"""Check if DNSSEC is enabled."""
try:
answers = resolver.resolve(domain, "DNSKEY")
if answers:
return {"enabled": True, "issues": []}
except dns.resolver.NoAnswer:
pass
except (dns.resolver.NXDOMAIN, dns.exception.DNSException):
pass
return {
"enabled": False,
"issues": [
"DNSSEC not enabled. The domain is vulnerable to DNS spoofing and cache poisoning."
],
}
def _check_mx(resolver: dns.resolver.Resolver, domain: str) -> list[str]:
"""Get MX records."""
try:
answers = resolver.resolve(domain, "MX")
return [f"{r.preference} {r.exchange}" for r in answers]
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.DNSException):
return []
def _check_caa(resolver: dns.resolver.Resolver, domain: str) -> list[str]:
"""Get CAA records."""
try:
answers = resolver.resolve(domain, "CAA")
return [rdata.to_text() for rdata in answers]
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.DNSException):
return []
def _check_zone_transfer(resolver: dns.resolver.Resolver, domain: str) -> dict:
"""Test if zone transfer (AXFR) is allowed — a common misconfiguration."""
try:
ns_answers = resolver.resolve(domain, "NS")
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.DNSException):
return {"vulnerable": False, "error": "Could not resolve NS records"}
for ns_rdata in ns_answers:
ns_host = str(ns_rdata.target)
try:
zone = dns.zone.from_xfr(dns.query.xfr(ns_host, domain, timeout=5))
if zone:
return {
"vulnerable": True,
"nameserver": ns_host,
"record_count": len(zone.nodes),
"severity": "critical",
"finding": f"Zone transfer allowed on {ns_host}",
"remediation": (
"Disable AXFR for public-facing nameservers. "
"Restrict zone transfers to authorized secondary DNS servers only."
),
}
except Exception:
continue
return {"vulnerable": False}
@@ -0,0 +1,5 @@
"""HTTP Headers Scanner - Check OWASP-recommended security headers."""
from .http_headers_scanner import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,186 @@
"""
HTTP Headers Scanner - Check OWASP-recommended security headers.
Performs a non-intrusive HTTP request and evaluates the presence and
configuration of security headers per OWASP Secure Headers Project guidelines.
"""
from __future__ import annotations
import httpx
from fastmcp import FastMCP
# Security headers to check — each with severity and remediation guidance
SECURITY_HEADERS = {
"Strict-Transport-Security": {
"severity": "high",
"description": (
"No HSTS header. Browsers may connect over plain HTTP, "
"enabling man-in-the-middle attacks."
),
"remediation": (
"Add the header: Strict-Transport-Security: max-age=31536000; includeSubDomains"
),
},
"Content-Security-Policy": {
"severity": "high",
"description": (
"No CSP header. The site is more vulnerable to XSS attacks "
"from inline scripts and untrusted sources."
),
"remediation": (
"Add a Content-Security-Policy header. "
"Start restrictive: default-src 'self'; script-src 'self'"
),
},
"X-Frame-Options": {
"severity": "medium",
"description": ("No X-Frame-Options header. The site may be vulnerable to clickjacking."),
"remediation": "Add the header: X-Frame-Options: DENY (or SAMEORIGIN)",
},
"X-Content-Type-Options": {
"severity": "medium",
"description": (
"No X-Content-Type-Options header. Browsers may MIME-sniff responses, "
"potentially executing malicious content."
),
"remediation": "Add the header: X-Content-Type-Options: nosniff",
},
"Referrer-Policy": {
"severity": "low",
"description": (
"No Referrer-Policy header. Full URLs (including query params) "
"may leak to third-party sites via the Referer header."
),
"remediation": ("Add the header: Referrer-Policy: strict-origin-when-cross-origin"),
},
"Permissions-Policy": {
"severity": "low",
"description": (
"No Permissions-Policy header. Browser features like camera, microphone, "
"and geolocation are not explicitly restricted."
),
"remediation": (
"Add the header: Permissions-Policy: camera=(), microphone=(), geolocation=()"
),
},
}
# Headers that leak server information
LEAKY_HEADERS = {
"Server": {
"severity": "low",
"remediation": "Remove or genericize the Server header to avoid version disclosure.",
},
"X-Powered-By": {
"severity": "low",
"remediation": "Remove the X-Powered-By header to hide the backend framework.",
},
"X-AspNet-Version": {
"severity": "low",
"remediation": "Remove the X-AspNet-Version header from IIS/ASP.NET configuration.",
},
"X-AspNetMvc-Version": {
"severity": "low",
"remediation": "Remove the X-AspNetMvc-Version header.",
},
"X-Generator": {
"severity": "low",
"remediation": "Remove the X-Generator header to hide the CMS/platform in use.",
},
}
def register_tools(mcp: FastMCP) -> None:
"""Register HTTP headers scanning tools with the MCP server."""
@mcp.tool()
async def http_headers_scan(url: str, follow_redirects: bool = True) -> dict:
"""
Scan a URL for OWASP-recommended security headers and information leaks.
Sends a single GET request and evaluates response headers against
OWASP Secure Headers Project guidelines. Non-intrusive just one request.
Args:
url: Full URL to scan (e.g., "https://example.com"). Auto-prefixes https://.
follow_redirects: Whether to follow HTTP redirects (default True).
Returns:
Dict with present headers, missing headers with remediation,
leaky headers, and grade_input for the risk_scorer tool.
"""
if not url.startswith(("http://", "https://")):
url = "https://" + url
try:
async with httpx.AsyncClient(
follow_redirects=follow_redirects,
timeout=15,
verify=True,
) as client:
response = await client.get(url)
except httpx.ConnectError as e:
return {"error": f"Connection failed: {e}"}
except httpx.TimeoutException:
return {"error": f"Request to {url} timed out"}
except Exception as e:
return {"error": f"Request failed: {e}"}
headers = response.headers
headers_present = []
headers_missing = []
# Check each security header
for header_name, info in SECURITY_HEADERS.items():
if header_name.lower() in {k.lower() for k in headers}:
headers_present.append(header_name)
else:
headers_missing.append(
{
"header": header_name,
"severity": info["severity"],
"description": info["description"],
"remediation": info["remediation"],
}
)
# Check for leaky headers
leaky_found = []
for header_name, info in LEAKY_HEADERS.items():
value = headers.get(header_name)
if value:
leaky_found.append(
{
"header": header_name,
"value": value,
"severity": info["severity"],
"remediation": info["remediation"],
}
)
# Check for deprecated X-XSS-Protection
xss_protection = headers.get("X-XSS-Protection")
if xss_protection:
headers_present.append("X-XSS-Protection (deprecated)")
# Build grade_input
header_lower = {k.lower() for k in headers}
grade_input = {
"hsts": "strict-transport-security" in header_lower,
"csp": "content-security-policy" in header_lower,
"x_frame_options": "x-frame-options" in header_lower,
"x_content_type_options": "x-content-type-options" in header_lower,
"referrer_policy": "referrer-policy" in header_lower,
"permissions_policy": "permissions-policy" in header_lower,
"no_leaky_headers": len(leaky_found) == 0,
}
return {
"url": str(response.url),
"status_code": response.status_code,
"headers_present": headers_present,
"headers_missing": headers_missing,
"leaky_headers": leaky_found,
"grade_input": grade_input,
}
@@ -0,0 +1,5 @@
"""Port Scanner - Scan common ports and detect exposed services."""
from .port_scanner import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,290 @@
"""
Port Scanner - Scan common ports and detect exposed services.
Performs non-intrusive TCP connect scans on common ports using Python stdlib.
Identifies open ports, grabs service banners, and flags risky exposures
(database ports, admin interfaces, legacy protocols).
"""
from __future__ import annotations
import asyncio
import socket
from fastmcp import FastMCP
# Well-known ports and their services
PORT_SERVICE_MAP = {
21: "FTP",
22: "SSH",
23: "Telnet",
25: "SMTP",
53: "DNS",
80: "HTTP",
110: "POP3",
143: "IMAP",
443: "HTTPS",
445: "SMB",
993: "IMAPS",
995: "POP3S",
1433: "MSSQL",
3306: "MySQL",
3389: "RDP",
5432: "PostgreSQL",
5900: "VNC",
6379: "Redis",
8080: "HTTP-Alt",
8443: "HTTPS-Alt",
}
TOP20_PORTS = sorted(PORT_SERVICE_MAP.keys())
TOP100_PORTS = sorted(
set(TOP20_PORTS)
| {
# Additional common ports
8,
20,
69,
111,
119,
123,
135,
137,
138,
139,
161,
162,
179,
389,
443,
465,
514,
515,
520,
587,
631,
636,
873,
902,
989,
990,
1080,
1194,
1443,
1521,
1723,
2049,
2082,
2083,
2086,
2087,
2096,
2181,
2222,
3000,
3128,
4443,
5000,
5001,
5060,
5222,
5601,
5984,
6443,
6660,
6661,
6662,
6663,
6664,
6665,
6666,
6667,
7001,
7002,
7443,
8000,
8008,
8081,
8082,
8083,
8088,
8443,
8888,
9000,
9090,
9200,
9300,
9443,
10000,
11211,
27017,
27018,
}
)
# Ports that are risky when exposed to the internet
DATABASE_PORTS = {1433, 3306, 5432, 6379, 27017, 27018, 9200, 9300, 5984, 11211}
ADMIN_PORTS = {3389, 5900, 2082, 2083, 2086, 2087, 10000}
LEGACY_PORTS = {21, 23, 110, 143, 445}
# Security findings per port category
PORT_FINDINGS = {
"database": {
"severity": "high",
"remediation": (
"Restrict database ports to localhost or VPN only. "
"Use firewall rules to block public access."
),
},
"admin": {
"severity": "high",
"remediation": (
"Restrict remote admin ports to VPN or trusted IP ranges. "
"Never expose RDP/VNC directly to the internet."
),
},
"legacy": {
"severity": "medium",
"remediation": (
"Replace legacy protocols with secure alternatives. "
"Use SFTP instead of FTP, SSH instead of Telnet, "
"IMAPS/POP3S instead of IMAP/POP3."
),
},
}
def register_tools(mcp: FastMCP) -> None:
"""Register port scanning tools with the MCP server."""
@mcp.tool()
async def port_scan(
hostname: str,
ports: str = "top20",
timeout: float = 3.0,
) -> dict:
"""
Scan a host for open ports using TCP connect probes.
Non-intrusive scan that checks if ports accept connections, grabs service
banners where possible, and flags risky exposures (databases, admin interfaces).
Args:
hostname: Domain or IP to scan (e.g., "example.com").
ports: Which ports to scan. Options: "top20" (default), "top100",
or comma-separated list like "80,443,8080".
timeout: Connection timeout per port in seconds (default 3.0, max 10.0).
Returns:
Dict with open/closed ports, service details, security findings,
and grade_input for the risk_scorer tool.
"""
# Clean hostname
hostname = hostname.replace("https://", "").replace("http://", "").strip("/")
hostname = hostname.split("/")[0]
if ":" in hostname:
hostname = hostname.split(":")[0]
timeout = min(timeout, 10.0)
# Parse port list
if ports == "top20":
port_list = TOP20_PORTS
elif ports == "top100":
port_list = TOP100_PORTS
else:
try:
port_list = sorted({int(p.strip()) for p in ports.split(",") if p.strip()})
except ValueError:
return {"error": f"Invalid port list: {ports}. Use 'top20', 'top100', or '80,443'"}
# Resolve hostname
try:
ip = socket.gethostbyname(hostname)
except socket.gaierror:
return {"error": f"Could not resolve hostname: {hostname}"}
# Scan ports concurrently
open_ports = []
closed_ports = []
# Limit concurrency to avoid overwhelming the target
semaphore = asyncio.Semaphore(20)
async def scan_port(port: int) -> None:
async with semaphore:
result = await _check_port(ip, port, timeout)
if result["open"]:
entry = {
"port": port,
"service": PORT_SERVICE_MAP.get(port, "unknown"),
"banner": result.get("banner", ""),
}
# Check if this port is risky
if port in DATABASE_PORTS:
entry["severity"] = PORT_FINDINGS["database"]["severity"]
entry["finding"] = f"{entry['service']} port ({port}) exposed to internet"
entry["remediation"] = PORT_FINDINGS["database"]["remediation"]
elif port in ADMIN_PORTS:
entry["severity"] = PORT_FINDINGS["admin"]["severity"]
entry["finding"] = (
f"{entry['service']} admin port ({port}) exposed to internet"
)
entry["remediation"] = PORT_FINDINGS["admin"]["remediation"]
elif port in LEGACY_PORTS:
entry["severity"] = PORT_FINDINGS["legacy"]["severity"]
entry["finding"] = (
f"Legacy protocol {entry['service']} ({port}) still active"
)
entry["remediation"] = PORT_FINDINGS["legacy"]["remediation"]
open_ports.append(entry)
else:
closed_ports.append(port)
await asyncio.gather(*[scan_port(p) for p in port_list])
# Sort open ports by port number
open_ports.sort(key=lambda x: x["port"])
# Grade input
open_port_numbers = {p["port"] for p in open_ports}
grade_input = {
"no_database_ports_exposed": not bool(open_port_numbers & DATABASE_PORTS),
"no_admin_ports_exposed": not bool(open_port_numbers & ADMIN_PORTS),
"no_legacy_ports_exposed": not bool(open_port_numbers & LEGACY_PORTS),
"only_web_ports": open_port_numbers <= {80, 443, 8080, 8443},
}
return {
"hostname": hostname,
"ip": ip,
"ports_scanned": len(port_list),
"open_ports": open_ports,
"closed_ports": sorted(closed_ports),
"grade_input": grade_input,
}
async def _check_port(ip: str, port: int, timeout: float) -> dict:
"""Check if a single port is open and try to grab a banner."""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(ip, port),
timeout=timeout,
)
# Try banner grab from the same connection
banner = ""
try:
data = await asyncio.wait_for(reader.read(256), timeout=2.0)
banner = data.decode("utf-8", errors="ignore").strip()
except Exception:
pass
writer.close()
await writer.wait_closed()
return {"open": True, "banner": banner}
except (TimeoutError, ConnectionRefusedError, OSError):
return {"open": False}
@@ -0,0 +1,5 @@
"""Risk Scorer - Produce weighted letter-grade risk scores from scan results."""
from .risk_scorer import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,281 @@
"""
Risk Scorer - Produce weighted letter-grade risk scores from scan results.
Consumes grade_input dicts from the 6 scanning tools and produces a weighted
overall score (0-100) with letter grades (A-F) per category and overall.
Pure Python no external dependencies.
"""
from __future__ import annotations
import json
from fastmcp import FastMCP
# Grade scale definition
GRADE_SCALE = {
"A": "90-100: Excellent security posture",
"B": "75-89: Good, minor improvements needed",
"C": "60-74: Fair, notable security gaps",
"D": "40-59: Poor, significant vulnerabilities",
"F": "0-39: Critical, immediate action required",
}
# Category weights (must sum to 1.0)
CATEGORY_WEIGHTS = {
"ssl_tls": 0.20,
"http_headers": 0.20,
"dns_security": 0.15,
"network_exposure": 0.15,
"technology": 0.15,
"attack_surface": 0.15,
}
# Scoring rules per category — each check is worth equal points within its category
SSL_CHECKS = {
"tls_version_ok": {"points": 25, "finding": "Insecure TLS version in use"},
"cert_valid": {"points": 30, "finding": "SSL certificate is invalid or untrusted"},
"cert_expiring_soon": {
"points": 10,
"finding": "SSL certificate expiring soon",
"invert": True, # True = bad
},
"strong_cipher": {"points": 20, "finding": "Weak cipher suite in use"},
"self_signed": {
"points": 15,
"finding": "Self-signed certificate detected",
"invert": True,
},
}
HEADERS_CHECKS = {
"hsts": {"points": 20, "finding": "Missing Strict-Transport-Security header"},
"csp": {"points": 20, "finding": "Missing Content-Security-Policy header"},
"x_frame_options": {"points": 15, "finding": "Missing X-Frame-Options header"},
"x_content_type_options": {"points": 15, "finding": "Missing X-Content-Type-Options header"},
"referrer_policy": {"points": 10, "finding": "Missing Referrer-Policy header"},
"permissions_policy": {"points": 10, "finding": "Missing Permissions-Policy header"},
"no_leaky_headers": {"points": 10, "finding": "Server information leaked via headers"},
}
DNS_CHECKS = {
"spf_present": {"points": 15, "finding": "No SPF record found"},
"spf_strict": {"points": 10, "finding": "SPF policy is not strict (hardfail)"},
"dmarc_present": {"points": 20, "finding": "No DMARC record found"},
"dmarc_enforcing": {"points": 15, "finding": "DMARC policy is not enforcing"},
"dkim_found": {"points": 15, "finding": "No DKIM selector found"},
"dnssec_enabled": {"points": 15, "finding": "DNSSEC not enabled"},
"zone_transfer_blocked": {"points": 10, "finding": "DNS zone transfer allowed"},
}
NETWORK_CHECKS = {
"no_database_ports_exposed": {
"points": 35,
"finding": "Database port(s) exposed to internet",
},
"no_admin_ports_exposed": {
"points": 30,
"finding": "Admin/remote access port(s) exposed to internet",
},
"no_legacy_ports_exposed": {
"points": 20,
"finding": "Legacy protocol port(s) still active",
},
"only_web_ports": {"points": 15, "finding": "Non-web ports open"},
}
TECH_CHECKS = {
"server_version_hidden": {"points": 25, "finding": "Server version disclosed in headers"},
"framework_version_hidden": {
"points": 20,
"finding": "Framework/runtime version disclosed",
},
"security_txt_present": {"points": 20, "finding": "No security.txt file found"},
"cookies_secure": {"points": 20, "finding": "Cookies missing Secure flag"},
"cookies_httponly": {"points": 15, "finding": "Cookies missing HttpOnly flag"},
}
SURFACE_CHECKS = {
"no_dev_staging_exposed": {
"points": 40,
"finding": "Dev/staging environment subdomains exposed",
},
"no_admin_exposed": {
"points": 35,
"finding": "Admin/backup subdomains exposed",
},
"reasonable_surface_area": {
"points": 25,
"finding": "Large attack surface (many subdomains)",
},
}
ALL_CHECKS = {
"ssl_tls": SSL_CHECKS,
"http_headers": HEADERS_CHECKS,
"dns_security": DNS_CHECKS,
"network_exposure": NETWORK_CHECKS,
"technology": TECH_CHECKS,
"attack_surface": SURFACE_CHECKS,
}
def _score_to_grade(score: int) -> str:
"""Convert a numeric score (0-100) to a letter grade."""
if score >= 90:
return "A"
if score >= 75:
return "B"
if score >= 60:
return "C"
if score >= 40:
return "D"
return "F"
def _parse_json(data: str) -> dict | None:
"""Safely parse a JSON string, returning None on failure."""
if not data or not data.strip():
return None
try:
parsed = json.loads(data)
return parsed if isinstance(parsed, dict) else None
except (json.JSONDecodeError, TypeError):
return None
def _score_category(grade_input: dict, checks: dict) -> tuple[int, list[str]]:
"""Score a category based on its grade_input and check definitions.
Returns (score 0-100, list of finding strings).
"""
total_possible = sum(c["points"] for c in checks.values())
earned = 0
findings = []
for check_key, check_def in checks.items():
value = grade_input.get(check_key)
invert = check_def.get("invert", False)
if value is None:
# Missing data — give half credit (don't penalize for missing scans)
earned += check_def["points"] // 2
continue
# For "invert" checks, True = bad (e.g., self_signed=True is bad)
passed = (not value) if invert else bool(value)
if passed:
earned += check_def["points"]
else:
findings.append(check_def["finding"])
score = round((earned / total_possible) * 100) if total_possible > 0 else 50
return score, findings
def register_tools(mcp: FastMCP) -> None:
"""Register risk scoring tools with the MCP server."""
@mcp.tool()
def risk_score(
ssl_results: str = "",
headers_results: str = "",
dns_results: str = "",
ports_results: str = "",
tech_results: str = "",
subdomain_results: str = "",
) -> dict:
"""
Calculate a weighted risk score from scan results.
Consumes the JSON output from the 6 scanning tools (ssl_tls_scan,
http_headers_scan, dns_security_scan, port_scan, tech_stack_detect,
subdomain_enumerate) and produces letter grades (A-F) per category
plus an overall weighted score.
Args:
ssl_results: JSON string from ssl_tls_scan output. Empty string to skip.
headers_results: JSON string from http_headers_scan output. Empty string to skip.
dns_results: JSON string from dns_security_scan output. Empty string to skip.
ports_results: JSON string from port_scan output. Empty string to skip.
tech_results: JSON string from tech_stack_detect output. Empty string to skip.
subdomain_results: JSON string from subdomain_enumerate output. Empty string to skip.
Returns:
Dict with overall_score, overall_grade, per-category scores/grades,
top_risks list, and grade_scale reference.
"""
# Parse inputs and extract grade_input dicts
inputs = {
"ssl_tls": _parse_json(ssl_results),
"http_headers": _parse_json(headers_results),
"dns_security": _parse_json(dns_results),
"network_exposure": _parse_json(ports_results),
"technology": _parse_json(tech_results),
"attack_surface": _parse_json(subdomain_results),
}
categories = {}
all_findings: list[tuple[str, str, int]] = [] # (category, finding, category_score)
weighted_sum = 0.0
total_weight = 0.0
for category, checks in ALL_CHECKS.items():
raw = inputs[category]
weight = CATEGORY_WEIGHTS[category]
if raw is None:
# Category not scanned — skip it and redistribute weight
categories[category] = {
"score": None,
"grade": "N/A",
"weight": weight,
"findings_count": 0,
"skipped": True,
}
continue
# Extract grade_input from the tool output
grade_input = raw.get("grade_input", raw)
score, findings = _score_category(grade_input, checks)
grade = _score_to_grade(score)
categories[category] = {
"score": score,
"grade": grade,
"weight": weight,
"findings_count": len(findings),
"skipped": False,
}
weighted_sum += score * weight
total_weight += weight
for f in findings:
all_findings.append((category, f, score))
# Calculate overall score (normalize if some categories were skipped)
if total_weight > 0:
overall_score = round(weighted_sum / total_weight)
else:
overall_score = 0
overall_grade = _score_to_grade(overall_score)
# Build top risks — sorted by category score (worst first), then by finding
all_findings.sort(key=lambda x: (x[2], x[0]))
top_risks = []
for category, finding, _cat_score in all_findings[:10]:
cat_grade = categories[category]["grade"]
cat_label = category.replace("_", " ").title()
top_risks.append(f"{finding} ({cat_label}: {cat_grade})")
return {
"overall_score": overall_score,
"overall_grade": overall_grade,
"categories": categories,
"top_risks": top_risks,
"grade_scale": GRADE_SCALE,
}
@@ -0,0 +1,5 @@
"""SSL/TLS Scanner - Analyze SSL/TLS configuration and certificate security."""
from .ssl_tls_scanner import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,256 @@
"""
SSL/TLS Scanner - Analyze SSL/TLS configuration and certificate security.
Performs non-intrusive analysis of a host's TLS setup including protocol version,
cipher suite, certificate validity, and common misconfigurations.
Uses only Python stdlib (ssl + socket) no external dependencies.
"""
from __future__ import annotations
import hashlib
import socket
import ssl
from datetime import UTC, datetime
from fastmcp import FastMCP
# Weak ciphers that should be flagged
WEAK_CIPHERS = {
"RC4",
"DES",
"3DES",
"MD5",
"NULL",
"EXPORT",
"anon",
}
# TLS versions considered insecure
INSECURE_TLS_VERSIONS = {"TLSv1", "TLSv1.0", "TLSv1.1", "SSLv2", "SSLv3"}
def register_tools(mcp: FastMCP) -> None:
"""Register SSL/TLS scanning tools with the MCP server."""
@mcp.tool()
def ssl_tls_scan(hostname: str, port: int = 443) -> dict:
"""
Scan a host's SSL/TLS configuration and certificate.
Performs a non-intrusive check of TLS version, cipher suite, certificate
validity, expiry, chain details, and common misconfigurations.
Uses only Python stdlib no external tools required.
Args:
hostname: Domain name to scan (e.g., "example.com"). Do not include protocol.
port: Port to connect to (default 443).
Returns:
Dict with TLS version, cipher, certificate details, issues found,
and grade_input for the risk_scorer tool.
"""
# Strip protocol prefix if provided
hostname = hostname.replace("https://", "").replace("http://", "").strip("/")
# Strip path
hostname = hostname.split("/")[0]
# Strip port from hostname if embedded
if ":" in hostname:
hostname = hostname.split(":")[0]
issues: list[dict] = []
try:
# Create SSL context that accepts all certs (we want to inspect, not reject)
ctx = ssl.create_default_context()
# We still verify but catch errors to report them as findings
conn = ctx.wrap_socket(socket.socket(), server_hostname=hostname)
conn.settimeout(10)
try:
conn.connect((hostname, port))
except ssl.SSLCertVerificationError as e:
# Still try to gather info with verification disabled
ctx_noverify = ssl.create_default_context()
ctx_noverify.check_hostname = False
ctx_noverify.verify_mode = ssl.CERT_NONE
conn = ctx_noverify.wrap_socket(socket.socket(), server_hostname=hostname)
conn.settimeout(10)
conn.connect((hostname, port))
issues.append(
{
"severity": "critical",
"finding": f"SSL certificate verification failed: {e}",
"remediation": (
"Obtain a valid certificate from a trusted CA. "
"Let's Encrypt provides free certificates."
),
}
)
# Gather TLS info
tls_version = conn.version() or "unknown"
cipher_info = conn.cipher()
cipher_name = cipher_info[0] if cipher_info else "unknown"
cipher_bits = cipher_info[2] if cipher_info else 0
# Get certificate
cert_der = conn.getpeercert(binary_form=True)
cert_dict = conn.getpeercert()
conn.close()
except TimeoutError:
return {"error": f"Connection to {hostname}:{port} timed out"}
except ConnectionRefusedError:
return {"error": f"Connection to {hostname}:{port} refused. Port may be closed."}
except OSError as e:
return {"error": f"Connection failed: {e}"}
# Parse certificate details
subject = _format_dn(cert_dict.get("subject", ()))
issuer = _format_dn(cert_dict.get("issuer", ()))
not_before_str = cert_dict.get("notBefore", "")
not_after_str = cert_dict.get("notAfter", "")
not_before = _parse_cert_date(not_before_str)
not_after = _parse_cert_date(not_after_str)
now = datetime.now(UTC)
days_until_expiry = (not_after - now).days if not_after else None
# SAN (Subject Alternative Names)
san_list = []
for san_type, san_value in cert_dict.get("subjectAltName", ()):
if san_type == "DNS":
san_list.append(san_value)
# Self-signed check
self_signed = subject == issuer
# Certificate fingerprint
cert_sha256 = hashlib.sha256(cert_der).hexdigest() if cert_der else ""
# --- Check for issues ---
# TLS version
tls_version_ok = tls_version not in INSECURE_TLS_VERSIONS
if not tls_version_ok:
issues.append(
{
"severity": "high",
"finding": f"Insecure TLS version: {tls_version}",
"remediation": (
"Disable TLS 1.0 and 1.1 in your server configuration. "
"Use TLS 1.2 or 1.3 only."
),
}
)
# Cipher strength
strong_cipher = True
if any(weak in cipher_name.upper() for weak in WEAK_CIPHERS):
strong_cipher = False
issues.append(
{
"severity": "high",
"finding": f"Weak cipher suite: {cipher_name}",
"remediation": (
"Configure your server to use strong cipher suites only. "
"Prefer AES-GCM and ChaCha20-Poly1305."
),
}
)
if cipher_bits and cipher_bits < 128:
strong_cipher = False
issues.append(
{
"severity": "high",
"finding": f"Cipher key length too short: {cipher_bits} bits",
"remediation": "Use cipher suites with at least 128-bit keys.",
}
)
# Certificate validity
cert_valid = True
cert_expiring_soon = False
if not_after and now > not_after:
cert_valid = False
issues.append(
{
"severity": "critical",
"finding": "SSL certificate has expired",
"remediation": "Renew the SSL certificate immediately.",
}
)
elif days_until_expiry is not None and days_until_expiry <= 30:
cert_expiring_soon = True
issues.append(
{
"severity": "medium",
"finding": f"SSL certificate expires in {days_until_expiry} days",
"remediation": "Renew the SSL certificate before it expires.",
}
)
if self_signed:
cert_valid = False
issues.append(
{
"severity": "high",
"finding": "Self-signed certificate detected",
"remediation": (
"Replace with a certificate from a trusted CA. "
"Let's Encrypt provides free certificates."
),
}
)
return {
"hostname": hostname,
"port": port,
"tls_version": tls_version,
"cipher": cipher_name,
"cipher_bits": cipher_bits,
"certificate": {
"subject": subject,
"issuer": issuer,
"not_before": not_before.isoformat() if not_before else not_before_str,
"not_after": not_after.isoformat() if not_after else not_after_str,
"days_until_expiry": days_until_expiry,
"san": san_list,
"self_signed": self_signed,
"sha256_fingerprint": cert_sha256,
},
"issues": issues,
"grade_input": {
"tls_version_ok": tls_version_ok,
"cert_valid": cert_valid,
"cert_expiring_soon": cert_expiring_soon,
"strong_cipher": strong_cipher,
"self_signed": self_signed,
},
}
def _format_dn(dn_tuple: tuple) -> str:
"""Format a certificate distinguished name tuple into a readable string."""
parts = []
for rdn in dn_tuple:
for attr_type, attr_value in rdn:
parts.append(f"{attr_type}={attr_value}")
return ", ".join(parts)
def _parse_cert_date(date_str: str) -> datetime | None:
"""Parse a certificate date string into a datetime object."""
if not date_str:
return None
# OpenSSL format: "Jan 1 00:00:00 2025 GMT"
for fmt in ("%b %d %H:%M:%S %Y %Z", "%b %d %H:%M:%S %Y %Z"):
try:
return datetime.strptime(date_str, fmt).replace(tzinfo=UTC)
except ValueError:
continue
return None
@@ -0,0 +1,5 @@
"""Subdomain Enumerator - Discover subdomains via Certificate Transparency logs."""
from .subdomain_enumerator import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,183 @@
"""
Subdomain Enumerator - Discover subdomains via Certificate Transparency logs.
Performs passive subdomain discovery by querying crt.sh (Certificate Transparency
log aggregator). No active brute-forcing or DNS enumeration fully OSINT-based.
"""
from __future__ import annotations
import re
import httpx
from fastmcp import FastMCP
# Subdomain keywords that indicate potentially sensitive environments
INTERESTING_KEYWORDS = {
"staging": {
"reason": "Staging environment exposed publicly",
"severity": "medium",
"remediation": "Restrict staging to VPN or internal network access.",
},
"dev": {
"reason": "Development environment exposed publicly",
"severity": "medium",
"remediation": "Restrict development servers to internal access only.",
},
"test": {
"reason": "Test environment exposed publicly",
"severity": "medium",
"remediation": "Restrict test servers to internal access only.",
},
"admin": {
"reason": "Admin panel subdomain exposed publicly",
"severity": "high",
"remediation": "Restrict admin panels to VPN or trusted IP ranges.",
},
"internal": {
"reason": "Internal subdomain exposed in CT logs",
"severity": "medium",
"remediation": "Review if internal subdomains should have public certificates.",
},
"vpn": {
"reason": "VPN endpoint discoverable via CT logs",
"severity": "low",
"remediation": "Consider if VPN endpoint exposure is acceptable for your threat model.",
},
"api": {
"reason": "API subdomain discovered — potential attack surface",
"severity": "low",
"remediation": "Ensure API is properly authenticated and rate-limited.",
},
"mail": {
"reason": "Mail server subdomain discovered",
"severity": "info",
"remediation": "Ensure mail server has proper SPF, DKIM, and DMARC configuration.",
},
"ftp": {
"reason": "FTP subdomain discovered — legacy protocol",
"severity": "medium",
"remediation": "Replace FTP with SFTP. Restrict access to trusted networks.",
},
"debug": {
"reason": "Debug subdomain exposed publicly",
"severity": "high",
"remediation": "Remove debug endpoints from production. Restrict to internal access.",
},
"backup": {
"reason": "Backup subdomain exposed publicly",
"severity": "high",
"remediation": "Restrict backup infrastructure to internal access only.",
},
}
def register_tools(mcp: FastMCP) -> None:
"""Register subdomain enumeration tools with the MCP server."""
@mcp.tool()
async def subdomain_enumerate(domain: str, max_results: int = 50) -> dict:
"""
Discover subdomains using Certificate Transparency (CT) logs.
Queries crt.sh to find all certificates issued for a domain, extracting
subdomain names. Fully passive uses only public CT log data.
Flags potentially interesting subdomains (staging, dev, admin, etc.).
Args:
domain: Base domain to enumerate (e.g., "example.com"). No protocol prefix.
max_results: Maximum number of subdomains to return (default 50, max 200).
Returns:
Dict with discovered subdomains, interesting findings,
and grade_input for the risk_scorer tool.
"""
# Clean domain
domain = domain.replace("https://", "").replace("http://", "").strip("/")
domain = domain.split("/")[0]
if ":" in domain:
domain = domain.split(":")[0]
max_results = min(max_results, 200)
try:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(
"https://crt.sh/",
params={"q": f"%.{domain}", "output": "json"},
)
if response.status_code != 200:
return {
"error": f"crt.sh returned HTTP {response.status_code}",
"domain": domain,
}
data = response.json()
except httpx.TimeoutException:
return {"error": "crt.sh request timed out (try again later)", "domain": domain}
except Exception as e:
return {"error": f"CT log query failed: {e}", "domain": domain}
# Extract unique subdomains
raw_names: set[str] = set()
for entry in data:
name_value = entry.get("name_value", "")
# Can contain multiple names separated by newlines
for name in name_value.split("\n"):
name = name.strip().lower()
if name and name.endswith(f".{domain}") or name == domain:
raw_names.add(name)
# Filter out wildcards and deduplicate
subdomains = sorted(
{name for name in raw_names if not name.startswith("*.")},
)
# Limit results
subdomains = subdomains[:max_results]
# Identify interesting subdomains
interesting = []
for sub in subdomains:
# Get the subdomain prefix (everything before the base domain)
prefix = sub.replace(f".{domain}", "").lower()
for keyword, info in INTERESTING_KEYWORDS.items():
if re.search(rf"\b{keyword}\b", prefix) or prefix == keyword:
interesting.append(
{
"subdomain": sub,
"reason": info["reason"],
"severity": info["severity"],
"remediation": info["remediation"],
}
)
break
# Grade input
has_dev_staging = any(
i["severity"] in ("medium", "high")
and any(kw in i["subdomain"] for kw in ("staging", "dev", "test", "debug"))
for i in interesting
)
has_admin = any(
any(kw in i["subdomain"] for kw in ("admin", "backup")) for i in interesting
)
# "reasonable" = fewer than 50 subdomains
reasonable_surface = len(subdomains) < 50
grade_input = {
"no_dev_staging_exposed": not has_dev_staging,
"no_admin_exposed": not has_admin,
"reasonable_surface_area": reasonable_surface,
}
return {
"domain": domain,
"source": "crt.sh (Certificate Transparency)",
"total_found": len(subdomains),
"subdomains": subdomains,
"interesting": interesting,
"grade_input": grade_input,
}
@@ -0,0 +1,5 @@
"""Tech Stack Detector - Fingerprint web technologies via passive analysis."""
from .tech_stack_detector import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,424 @@
"""
Tech Stack Detector - Fingerprint web technologies via passive analysis.
Performs non-intrusive HTTP requests to identify web server, framework, CMS,
JavaScript libraries, CDN, and security configuration through response headers,
HTML analysis, cookies, and common path probing.
"""
from __future__ import annotations
import re
import httpx
from fastmcp import FastMCP
# Patterns to detect JS frameworks/libraries in HTML source
JS_PATTERNS = {
"React": [
re.compile(r"react(?:\.min)?\.js", re.I),
re.compile(r"data-reactroot", re.I),
re.compile(r"__NEXT_DATA__", re.I),
],
"Angular": [
re.compile(r"angular(?:\.min)?\.js", re.I),
re.compile(r"ng-app", re.I),
re.compile(r"ng-version", re.I),
],
"Vue.js": [
re.compile(r"vue(?:\.min)?\.js", re.I),
re.compile(r"data-v-[a-f0-9]", re.I),
re.compile(r"__vue__", re.I),
],
"jQuery": [
re.compile(r"jquery[.-](\d+\.\d+(?:\.\d+)?)", re.I),
re.compile(r"jquery(?:\.min)?\.js", re.I),
],
"Bootstrap": [
re.compile(r"bootstrap[.-](\d+\.\d+(?:\.\d+)?)", re.I),
re.compile(r"bootstrap(?:\.min)?\.(?:js|css)", re.I),
],
"Tailwind CSS": [
re.compile(r"tailwind", re.I),
],
"Svelte": [
re.compile(r"svelte", re.I),
re.compile(r"__svelte", re.I),
],
"Next.js": [
re.compile(r"_next/static", re.I),
re.compile(r"__NEXT_DATA__", re.I),
],
"Nuxt.js": [
re.compile(r"__nuxt", re.I),
re.compile(r"_nuxt/", re.I),
],
}
# Cookie names that reveal backend technology
COOKIE_TECH_MAP = {
"PHPSESSID": "PHP",
"JSESSIONID": "Java",
"ASP.NET_SessionId": "ASP.NET",
"csrftoken": "Django",
"laravel_session": "Laravel",
"rack.session": "Ruby/Rails",
"connect.sid": "Node.js/Express",
"_rails_session": "Ruby on Rails",
}
# Analytics and tracking patterns
ANALYTICS_PATTERNS = {
"Google Analytics": [
re.compile(r"google-analytics\.com/analytics\.js", re.I),
re.compile(r"googletagmanager\.com", re.I),
re.compile(r"gtag\(", re.I),
],
"Facebook Pixel": [re.compile(r"connect\.facebook\.net", re.I)],
"Hotjar": [re.compile(r"static\.hotjar\.com", re.I)],
"Mixpanel": [re.compile(r"cdn\.mxpnl\.com", re.I)],
"Segment": [re.compile(r"cdn\.segment\.com", re.I)],
}
# CDN detection via response headers
CDN_HEADERS = {
"cf-ray": "Cloudflare",
"x-cdn": None, # Value is the CDN name
"x-served-by": "Fastly",
"x-amz-cf-id": "AWS CloudFront",
"x-cache": None, # Generic, check value
"via": None, # Often contains CDN info
"x-vercel-id": "Vercel",
"x-netlify-request-id": "Netlify",
"fly-request-id": "Fly.io",
}
# Paths to probe for CMS / framework detection
PROBE_PATHS = {
"/wp-admin/": "WordPress",
"/wp-json/wp/v2/": "WordPress",
"/wp-login.php": "WordPress",
"/administrator/": "Joomla",
"/user/login": "Drupal",
"/admin/": None, # Generic admin panel
"/api/": None, # API endpoint
"/.well-known/security.txt": None,
"/robots.txt": None,
"/sitemap.xml": None,
}
def register_tools(mcp: FastMCP) -> None:
"""Register tech stack detection tools with the MCP server."""
@mcp.tool()
async def tech_stack_detect(url: str) -> dict:
"""
Detect the technology stack of a website through passive analysis.
Identifies web server, framework, CMS, JavaScript libraries, CDN,
analytics, and security configuration by analyzing HTTP responses,
HTML content, cookies, and common paths. Non-intrusive.
Args:
url: URL to analyze (e.g., "https://example.com"). Auto-prefixes https://.
Returns:
Dict with detected technologies, security configuration,
and grade_input for the risk_scorer tool.
"""
if not url.startswith(("http://", "https://")):
url = "https://" + url
# Ensure trailing slash for base URL
base_url = url.rstrip("/")
try:
async with httpx.AsyncClient(
follow_redirects=True,
timeout=15,
verify=True,
) as client:
# Main page request
response = await client.get(base_url)
html = response.text
headers = response.headers
# Detect server
server = _detect_server(headers)
# Detect CDN
cdn = _detect_cdn(headers)
# Detect framework from headers
framework = _detect_framework_from_headers(headers)
# Detect language from headers/cookies
language = _detect_language(headers, response.cookies)
# Detect JS libraries from HTML
js_libs = _detect_js_libraries(html)
# Detect analytics
analytics = _detect_analytics(html)
# Detect CMS from HTML meta tags
cms = _detect_cms_from_html(html)
# Analyze cookies from raw Set-Cookie headers
cookies = _analyze_cookies(response.headers)
# If we detected language from cookies, update
for cookie_name in response.cookies:
if cookie_name in COOKIE_TECH_MAP and not language:
language = COOKIE_TECH_MAP[cookie_name]
# Probe common paths
security_txt = False
robots_txt = False
interesting_paths = []
cms_from_paths = None
for path, tech in PROBE_PATHS.items():
try:
probe_resp = await client.get(
f"{base_url}{path}",
follow_redirects=False,
)
if probe_resp.status_code in (200, 301, 302, 403):
if path == "/.well-known/security.txt":
security_txt = probe_resp.status_code == 200
elif path == "/robots.txt":
robots_txt = probe_resp.status_code == 200
elif tech and probe_resp.status_code in (200, 301, 302):
cms_from_paths = tech
elif probe_resp.status_code in (200, 301, 302):
interesting_paths.append(path)
except httpx.HTTPError:
continue
# Use CMS from paths if not detected from HTML
if not cms and cms_from_paths:
cms = cms_from_paths
# Detect framework from HTML if not from headers
if not framework:
framework = _detect_framework_from_html(html)
except httpx.ConnectError as e:
return {"error": f"Connection failed: {e}"}
except httpx.TimeoutException:
return {"error": f"Request to {url} timed out"}
except Exception as e:
return {"error": f"Detection failed: {e}"}
# Grade input
server_version_hidden = True
if server and server.get("version"):
server_version_hidden = False
grade_input = {
"server_version_hidden": server_version_hidden,
"framework_version_hidden": framework is None or not _has_version(framework),
"security_txt_present": security_txt,
"cookies_secure": all(c.get("secure", False) for c in cookies) if cookies else True,
"cookies_httponly": (
all(c.get("httponly", False) for c in cookies) if cookies else True
),
}
return {
"url": str(response.url),
"server": server,
"framework": framework,
"language": language,
"cms": cms,
"javascript_libraries": js_libs,
"cdn": cdn,
"analytics": analytics,
"security_txt": security_txt,
"robots_txt": robots_txt,
"interesting_paths": interesting_paths,
"cookies": cookies,
"grade_input": grade_input,
}
def _detect_server(headers: httpx.Headers) -> dict | None:
"""Detect web server from headers."""
server_header = headers.get("server")
if not server_header:
return None
# Try to parse name and version
match = re.match(r"^([\w.-]+)(?:/(\S+))?", server_header)
if match:
return {"name": match.group(1), "version": match.group(2), "raw": server_header}
return {"name": server_header, "version": None, "raw": server_header}
def _detect_cdn(headers: httpx.Headers) -> str | None:
"""Detect CDN from response headers."""
for header_name, cdn_name in CDN_HEADERS.items():
value = headers.get(header_name)
if value:
if cdn_name:
return cdn_name
# Try to infer from value
value_lower = value.lower()
if "cloudflare" in value_lower:
return "Cloudflare"
if "cloudfront" in value_lower:
return "AWS CloudFront"
if "fastly" in value_lower:
return "Fastly"
if "akamai" in value_lower:
return "Akamai"
if "varnish" in value_lower:
return "Varnish"
return None
def _detect_framework_from_headers(headers: httpx.Headers) -> str | None:
"""Detect framework from HTTP headers."""
powered_by = headers.get("x-powered-by")
if powered_by:
return powered_by
return None
def _detect_framework_from_html(html: str) -> str | None:
"""Detect framework from HTML content."""
# Django
if "csrfmiddlewaretoken" in html:
return "Django"
# Rails
if "csrf-token" in html and "data-turbo" in html:
return "Ruby on Rails"
# Laravel
if "laravel" in html.lower():
return "Laravel"
return None
def _detect_language(headers: httpx.Headers, cookies: httpx.Cookies) -> str | None:
"""Detect programming language."""
powered_by = headers.get("x-powered-by", "").lower()
if "php" in powered_by:
return "PHP"
if "asp.net" in powered_by:
return "ASP.NET"
if "express" in powered_by:
return "Node.js"
# Check cookies
for cookie_name in cookies:
if cookie_name in COOKIE_TECH_MAP:
tech = COOKIE_TECH_MAP[cookie_name]
if tech in ("PHP", "Java", "ASP.NET", "Node.js/Express"):
return tech
return None
def _detect_js_libraries(html: str) -> list[str]:
"""Detect JavaScript libraries from HTML source."""
found = []
for lib_name, patterns in JS_PATTERNS.items():
for pattern in patterns:
match = pattern.search(html)
if match:
# Try to extract version
version_match = re.search(
rf"{lib_name.lower().replace('.', r'.')}[/-](\d+\.\d+(?:\.\d+)?)",
html,
re.I,
)
if version_match:
found.append(f"{lib_name} {version_match.group(1)}")
else:
found.append(lib_name)
break
return found
def _detect_analytics(html: str) -> list[str]:
"""Detect analytics/tracking from HTML source."""
found = []
for name, patterns in ANALYTICS_PATTERNS.items():
for pattern in patterns:
if pattern.search(html):
found.append(name)
break
return found
def _detect_cms_from_html(html: str) -> str | None:
"""Detect CMS from HTML meta tags and content."""
# WordPress
if "wp-content" in html or "wp-includes" in html:
return "WordPress"
# Drupal
if "Drupal" in html or "drupal.js" in html:
return "Drupal"
# Joomla
if "/media/jui/" in html or "Joomla" in html:
return "Joomla"
# Shopify
if "cdn.shopify.com" in html:
return "Shopify"
# Squarespace
if "squarespace" in html.lower():
return "Squarespace"
# Wix
if "wix.com" in html:
return "Wix"
# Ghost
if "ghost-" in html or "ghost/" in html:
return "Ghost"
# Check meta generator tag
gen_match = re.search(
r'<meta[^>]+name=["\']generator["\'][^>]+content=["\'](.*?)["\']',
html,
re.I,
)
if not gen_match:
gen_match = re.search(
r'<meta[^>]+content=["\'](.*?)["\'][^>]+name=["\']generator["\']',
html,
re.I,
)
if gen_match:
return gen_match.group(1)
return None
def _analyze_cookies(headers: httpx.Headers) -> list[dict]:
"""Analyze cookies for security flags by parsing raw Set-Cookie headers."""
result = []
for raw in headers.get_list("set-cookie"):
name = raw.split("=", 1)[0].strip()
parts = [p.strip().lower() for p in raw.split(";")]
result.append(
{
"name": name,
"secure": "secure" in parts,
"httponly": "httponly" in parts,
"samesite": _extract_samesite(raw.lower()),
}
)
return result
def _extract_samesite(raw_lower: str) -> str | None:
"""Extract SameSite value from a lowercased Set-Cookie string."""
for part in raw_lower.split(";"):
part = part.strip()
if part.startswith("samesite="):
return part.split("=", 1)[1].strip().capitalize()
return None
def _has_version(value: str) -> bool:
"""Check if a string contains a version number."""
return bool(re.search(r"\d+\.\d+", value))
+291
View File
@@ -0,0 +1,291 @@
"""Tests for security scanning tools — cookie analysis and port scanner fixes."""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, Mock, patch
import pytest
from aden_tools.tools.tech_stack_detector.tech_stack_detector import (
_analyze_cookies,
_extract_samesite,
)
# ---------------------------------------------------------------------------
# Cookie Analysis (_analyze_cookies)
# ---------------------------------------------------------------------------
class FakeHeaders:
"""Minimal stand-in for httpx.Headers.get_list()."""
def __init__(self, set_cookie_values: list[str]):
self._cookies = set_cookie_values
def get_list(self, name: str) -> list[str]:
if name == "set-cookie":
return self._cookies
return []
class TestAnalyzeCookies:
"""Tests for _analyze_cookies parsing raw Set-Cookie headers."""
def test_secure_and_httponly_detected(self):
headers = FakeHeaders(
[
"session_id=abc123; Path=/; Secure; HttpOnly",
]
)
result = _analyze_cookies(headers)
assert len(result) == 1
assert result[0]["name"] == "session_id"
assert result[0]["secure"] is True
assert result[0]["httponly"] is True
def test_missing_flags_detected(self):
headers = FakeHeaders(
[
"tracking=xyz; Path=/",
]
)
result = _analyze_cookies(headers)
assert len(result) == 1
assert result[0]["name"] == "tracking"
assert result[0]["secure"] is False
assert result[0]["httponly"] is False
def test_case_insensitive(self):
headers = FakeHeaders(
[
"tok=val; SECURE; HTTPONLY",
]
)
result = _analyze_cookies(headers)
assert result[0]["secure"] is True
assert result[0]["httponly"] is True
def test_samesite_lax(self):
headers = FakeHeaders(
[
"pref=dark; SameSite=Lax; Secure",
]
)
result = _analyze_cookies(headers)
assert result[0]["samesite"] == "Lax"
assert result[0]["secure"] is True
def test_samesite_strict(self):
headers = FakeHeaders(
[
"csrf=token; SameSite=Strict; Secure; HttpOnly",
]
)
result = _analyze_cookies(headers)
assert result[0]["samesite"] == "Strict"
def test_samesite_none(self):
headers = FakeHeaders(
[
"cross=val; SameSite=None; Secure",
]
)
result = _analyze_cookies(headers)
assert result[0]["samesite"] == "None"
assert result[0]["secure"] is True
def test_no_samesite(self):
headers = FakeHeaders(
[
"id=123; Path=/; Secure",
]
)
result = _analyze_cookies(headers)
assert result[0]["samesite"] is None
def test_multiple_cookies(self):
headers = FakeHeaders(
[
"a=1; Secure; HttpOnly",
"b=2; Path=/",
"c=3; Secure; SameSite=Strict",
]
)
result = _analyze_cookies(headers)
assert len(result) == 3
assert result[0] == {"name": "a", "secure": True, "httponly": True, "samesite": None}
assert result[1] == {"name": "b", "secure": False, "httponly": False, "samesite": None}
assert result[2] == {"name": "c", "secure": True, "httponly": False, "samesite": "Strict"}
def test_no_cookies(self):
headers = FakeHeaders([])
result = _analyze_cookies(headers)
assert result == []
def test_cookie_value_with_equals(self):
"""Cookie values containing '=' should not break name parsing."""
headers = FakeHeaders(
[
"token=abc=def==; Secure; HttpOnly",
]
)
result = _analyze_cookies(headers)
assert result[0]["name"] == "token"
assert result[0]["secure"] is True
def test_grade_input_reflects_real_flags(self):
"""Verify the grade_input logic works with our parsed cookies."""
cookies_all_secure = [
{"name": "a", "secure": True, "httponly": True, "samesite": None},
{"name": "b", "secure": True, "httponly": True, "samesite": None},
]
cookies_one_insecure = [
{"name": "a", "secure": True, "httponly": True, "samesite": None},
{"name": "b", "secure": False, "httponly": True, "samesite": None},
]
# Replicate the grade_input logic from tech_stack_detector
assert all(c.get("secure", False) for c in cookies_all_secure) is True
assert all(c.get("httponly", False) for c in cookies_all_secure) is True
assert all(c.get("secure", False) for c in cookies_one_insecure) is False
def test_secure_at_end_of_header(self):
"""Secure flag at the very end without trailing semicolon."""
headers = FakeHeaders(
[
"id=val; Path=/; Secure",
]
)
result = _analyze_cookies(headers)
assert result[0]["secure"] is True
def test_no_space_after_semicolons(self):
"""Servers may omit space after semicolons (RFC 6265 Section 5.2)."""
headers = FakeHeaders(
[
"id=val;Secure;HttpOnly;Path=/",
]
)
result = _analyze_cookies(headers)
assert result[0]["name"] == "id"
assert result[0]["secure"] is True
assert result[0]["httponly"] is True
class TestExtractSamesite:
"""Tests for _extract_samesite helper."""
def test_lax(self):
assert _extract_samesite("id=val; path=/; samesite=lax") == "Lax"
def test_strict(self):
assert _extract_samesite("id=val; samesite=strict; secure") == "Strict"
def test_none(self):
assert _extract_samesite("id=val; samesite=none; secure") == "None"
def test_missing(self):
assert _extract_samesite("id=val; secure; httponly") is None
def test_with_spaces(self):
assert _extract_samesite("id=val; samesite=lax ; secure") == "Lax"
# ---------------------------------------------------------------------------
# Port Scanner (_check_port)
# ---------------------------------------------------------------------------
class TestCheckPort:
"""Tests for _check_port using a single connection."""
@pytest.mark.asyncio
async def test_open_port_with_banner(self):
"""Open port reads banner from the same connection (no second connect)."""
from aden_tools.tools.port_scanner.port_scanner import _check_port
mock_reader = AsyncMock()
mock_reader.read = AsyncMock(return_value=b"SSH-2.0-OpenSSH_8.9\r\n")
mock_writer = AsyncMock()
mock_writer.close = lambda: None
mock_writer.wait_closed = AsyncMock()
with patch("asyncio.open_connection", new_callable=AsyncMock) as mock_conn:
mock_conn.return_value = (mock_reader, mock_writer)
result = await _check_port("127.0.0.1", 22, timeout=2.0)
assert result["open"] is True
assert result["banner"] == "SSH-2.0-OpenSSH_8.9"
# The critical assertion: open_connection called exactly ONCE
mock_conn.assert_awaited_once()
@pytest.mark.asyncio
async def test_open_port_no_banner(self):
"""Open port where banner read times out still reports open."""
from aden_tools.tools.port_scanner.port_scanner import _check_port
mock_reader = AsyncMock()
mock_reader.read = AsyncMock(side_effect=asyncio.TimeoutError)
mock_writer = AsyncMock()
mock_writer.close = lambda: None
mock_writer.wait_closed = AsyncMock()
with patch("asyncio.open_connection", new_callable=AsyncMock) as mock_conn:
mock_conn.return_value = (mock_reader, mock_writer)
result = await _check_port("127.0.0.1", 80, timeout=2.0)
assert result["open"] is True
assert result["banner"] == ""
mock_conn.assert_awaited_once()
@pytest.mark.asyncio
async def test_closed_port(self):
"""Closed port (ConnectionRefusedError) returns open=False."""
from aden_tools.tools.port_scanner.port_scanner import _check_port
with patch("asyncio.open_connection", new_callable=AsyncMock) as mock_conn:
mock_conn.side_effect = ConnectionRefusedError
result = await _check_port("127.0.0.1", 12345, timeout=2.0)
assert result["open"] is False
@pytest.mark.asyncio
async def test_timeout_port(self):
"""Timed-out port returns open=False."""
from aden_tools.tools.port_scanner.port_scanner import _check_port
with patch("asyncio.open_connection", new_callable=AsyncMock) as mock_conn:
mock_conn.side_effect = TimeoutError
result = await _check_port("127.0.0.1", 12345, timeout=0.5)
assert result["open"] is False
@pytest.mark.asyncio
async def test_writer_closed_even_on_banner_failure(self):
"""Writer from the connection is always closed, even if banner read fails."""
from aden_tools.tools.port_scanner.port_scanner import _check_port
mock_reader = AsyncMock()
mock_reader.read = AsyncMock(side_effect=Exception("unexpected"))
mock_writer = AsyncMock()
mock_writer.close = Mock()
mock_writer.wait_closed = AsyncMock()
with patch("asyncio.open_connection", new_callable=AsyncMock) as mock_conn:
mock_conn.return_value = (mock_reader, mock_writer)
result = await _check_port("127.0.0.1", 80, timeout=2.0)
assert result["open"] is True
mock_writer.close.assert_called_once()
mock_writer.wait_closed.assert_awaited_once()
Generated
+2
View File
@@ -3373,6 +3373,7 @@ source = { editable = "tools" }
dependencies = [
{ name = "beautifulsoup4" },
{ name = "diff-match-patch" },
{ name = "dnspython" },
{ name = "fastmcp" },
{ name = "framework" },
{ name = "httpx" },
@@ -3428,6 +3429,7 @@ dev = [
requires-dist = [
{ name = "beautifulsoup4", specifier = ">=4.12.0" },
{ name = "diff-match-patch", specifier = ">=20230430" },
{ name = "dnspython", specifier = ">=2.4.0" },
{ name = "duckdb", marker = "extra == 'all'", specifier = ">=1.0.0" },
{ name = "duckdb", marker = "extra == 'sql'", specifier = ">=1.0.0" },
{ name = "fastmcp", specifier = ">=2.0.0" },