Files
vm-cloudflare/mcp/waf_intelligence/analyzer.py
Vault Sovereign f0b8d962de
Some checks failed
WAF Intelligence Guardrail / waf-intel (push) Waiting to run
Cloudflare Registry Validation / validate-registry (push) Has been cancelled
chore: pre-migration snapshot
Layer0, MCP servers, Terraform consolidation
2025-12-27 01:52:27 +00:00

262 lines
8.8 KiB
Python

from __future__ import annotations
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
MANAGED_WAF_RULESET_IDS = (
# Cloudflare managed WAF ruleset IDs (last updated 2025-12-18).
"efb7b8c949ac4650a09736fc376e9aee", # Cloudflare Managed Ruleset
"4814384a9e5d4991b9815dcfc25d2f1f", # OWASP Core Ruleset
)
@dataclass
class RuleViolation:
"""Represents a potential issue in a WAF rule or configuration."""
rule_id: Optional[str]
message: str
severity: str # "info" | "warning" | "error"
framework_refs: List[str] = field(default_factory=list)
location: Optional[str] = None
confidence: float = 0.5 # 0.0-1.0: how sure we are
hint: Optional[str] = None # short suggestion on how to fix
@dataclass
class AnalysisResult:
"""High-level result of analyzing one or more WAF configs."""
source: str
violations: List[RuleViolation] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
@property
def has_issues(self) -> bool:
return any(v.severity in ("warning", "error") for v in self.violations)
def top_violations(
self,
*,
min_severity: str = "warning",
min_confidence: float = 0.7,
limit: int = 5,
) -> List[RuleViolation]:
"""Return a small, high-quality subset of violations."""
severity_order = {"info": 0, "warning": 1, "error": 2}
min_level = severity_order.get(min_severity, 1)
ranked = [
v
for v in self.violations
if severity_order.get(v.severity, 0) >= min_level
and v.confidence >= min_confidence
]
ranked.sort(key=lambda v: (v.severity != "error", -v.confidence))
return ranked[:limit]
class WAFRuleAnalyzer:
"""
Analyze Cloudflare WAF rules from Terraform with a quality-first posture.
"""
def _has_managed_waf_rules(self, text: str) -> bool:
text_lower = text.lower()
if "managed_rules" in text_lower:
return True
if re.search(r'phase\s*=\s*"http_request_firewall_managed"', text_lower):
return True
if "cf.waf" in text_lower:
return True
return any(ruleset_id in text_lower for ruleset_id in MANAGED_WAF_RULESET_IDS)
def analyze_file(
self,
path: str | Path,
*,
min_severity: str = "warning",
min_confidence: float = 0.6,
) -> AnalysisResult:
path = Path(path)
text = path.read_text(encoding="utf-8")
violations: List[RuleViolation] = []
# Example heuristic: no managed rules present
if not self._has_managed_waf_rules(text):
violations.append(
RuleViolation(
rule_id=None,
message="No managed WAF rules detected in this file.",
severity="warning",
confidence=0.9,
framework_refs=["PCI-DSS 6.6", "OWASP-ASVS 13"],
location=str(path),
hint="Enable Cloudflare managed WAF rulesets (SQLi, XSS, RCE, bots) for this zone.",
)
)
# Example heuristic: overly broad allow
if '"*"' in text and "allow" in text:
violations.append(
RuleViolation(
rule_id=None,
message="Potentially overly broad allow rule detected ('*').",
severity="error",
confidence=0.85,
framework_refs=["Zero-Trust Principle"],
location=str(path),
hint="Narrow the rule expression to specific paths, methods, or IP ranges.",
)
)
result = AnalysisResult(
source=str(path),
violations=violations,
metadata={
"file_size": path.stat().st_size,
"heuristics_version": "0.3.0",
},
)
result.violations = result.top_violations(
min_severity=min_severity,
min_confidence=min_confidence,
limit=5,
)
return result
def analyze_terraform_text(
self,
source_name: str,
text: str,
*,
min_severity: str = "warning",
min_confidence: float = 0.6,
) -> AnalysisResult:
"""Same as analyze_file but for already-loaded text."""
tmp_path = Path(source_name)
violations: List[RuleViolation] = []
if not self._has_managed_waf_rules(text):
violations.append(
RuleViolation(
rule_id=None,
message="No managed WAF rules detected in this snippet.",
severity="warning",
confidence=0.9,
framework_refs=["PCI-DSS 6.6", "OWASP-ASVS 13"],
location=source_name,
hint="Enable Cloudflare managed WAF rulesets (SQLi, XSS, RCE, bots) for this zone.",
)
)
result = AnalysisResult(
source=str(tmp_path),
violations=violations,
metadata={"heuristics_version": "0.3.0"},
)
result.violations = result.top_violations(
min_severity=min_severity,
min_confidence=min_confidence,
limit=5,
)
return result
def analyze_with_threat_intel(
self,
path: str | Path,
threat_indicators: List[Any],
*,
min_severity: str = "warning",
min_confidence: float = 0.6,
) -> AnalysisResult:
"""
Enhanced analysis using threat intelligence data.
Args:
path: WAF config file path
threat_indicators: List of ThreatIndicator objects from threat_intel module
min_severity: Minimum severity to include
min_confidence: Minimum confidence threshold
Returns:
AnalysisResult with violations informed by threat intel
"""
# Start with base analysis
base_result = self.analyze_file(
path, min_severity=min_severity, min_confidence=min_confidence
)
path = Path(path)
text = path.read_text(encoding="utf-8")
text_lower = text.lower()
# Check if threat indicators are addressed by existing rules
critical_ips = [
i
for i in threat_indicators
if i.indicator_type == "ip" and i.severity in ("critical", "high")
]
critical_patterns = [
i
for i in threat_indicators
if i.indicator_type == "pattern" and i.severity in ("critical", "high")
]
# Check for IP blocking coverage
if critical_ips:
ip_block_present = "ip.src" in text_lower or "cf.client.ip" in text_lower
if not ip_block_present:
base_result.violations.append(
RuleViolation(
rule_id=None,
message=f"Threat intel identified {len(critical_ips)} high-risk IPs not addressed by WAF rules.",
severity="error",
confidence=0.85,
framework_refs=["Zero-Trust", "Threat Intelligence"],
location=str(path),
hint=f"Add IP blocking rules for identified threat actors. Sample IPs: {', '.join(i.value for i in critical_ips[:3])}",
)
)
# Check for pattern-based attack coverage
attack_types_seen = set()
for ind in critical_patterns:
for tag in ind.tags:
if tag in ("sqli", "xss", "rce", "path_traversal"):
attack_types_seen.add(tag)
# Check managed ruleset coverage
for attack_type in attack_types_seen:
if attack_type not in text_lower and f'"{attack_type}"' not in text_lower:
base_result.violations.append(
RuleViolation(
rule_id=None,
message=f"Threat intel detected {attack_type.upper()} attacks but no explicit protection found.",
severity="warning",
confidence=0.8,
framework_refs=["OWASP Top 10", "Threat Intelligence"],
location=str(path),
hint=f"Enable Cloudflare managed rules for {attack_type.upper()} protection.",
)
)
# Update metadata with threat intel stats
base_result.metadata["threat_intel"] = {
"critical_ips": len(critical_ips),
"critical_patterns": len(critical_patterns),
"attack_types_seen": list(attack_types_seen),
}
return base_result