from __future__ import annotations from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional @dataclass class RuleViolation: """Represents a potential issue in a WAF rule or configuration.""" rule_id: Optional[str] message: str severity: str # "info" | "warning" | "error" framework_refs: List[str] = field(default_factory=list) location: Optional[str] = None confidence: float = 0.5 # 0.0-1.0: how sure we are hint: Optional[str] = None # short suggestion on how to fix @dataclass class AnalysisResult: """High-level result of analyzing one or more WAF configs.""" source: str violations: List[RuleViolation] = field(default_factory=list) metadata: Dict[str, Any] = field(default_factory=dict) @property def has_issues(self) -> bool: return any(v.severity in ("warning", "error") for v in self.violations) def top_violations( self, *, min_severity: str = "warning", min_confidence: float = 0.7, limit: int = 5, ) -> List[RuleViolation]: """Return a small, high-quality subset of violations.""" severity_order = {"info": 0, "warning": 1, "error": 2} min_level = severity_order.get(min_severity, 1) ranked = [ v for v in self.violations if severity_order.get(v.severity, 0) >= min_level and v.confidence >= min_confidence ] ranked.sort(key=lambda v: (v.severity != "error", -v.confidence)) return ranked[:limit] class WAFRuleAnalyzer: """ Analyze Cloudflare WAF rules from Terraform with a quality-first posture. """ def analyze_file( self, path: str | Path, *, min_severity: str = "warning", min_confidence: float = 0.6, ) -> AnalysisResult: path = Path(path) text = path.read_text(encoding="utf-8") violations: List[RuleViolation] = [] # Example heuristic: no managed rules present if "managed_rules" not in text: violations.append( RuleViolation( rule_id=None, message="No managed WAF rules detected in this file.", severity="warning", confidence=0.9, framework_refs=["PCI-DSS 6.6", "OWASP-ASVS 13"], location=str(path), hint="Enable Cloudflare managed WAF rulesets (SQLi, XSS, RCE, bots) for this zone.", ) ) # Example heuristic: overly broad allow if '"*"' in text and "allow" in text: violations.append( RuleViolation( rule_id=None, message="Potentially overly broad allow rule detected ('*').", severity="error", confidence=0.85, framework_refs=["Zero-Trust Principle"], location=str(path), hint="Narrow the rule expression to specific paths, methods, or IP ranges.", ) ) result = AnalysisResult( source=str(path), violations=violations, metadata={ "file_size": path.stat().st_size, "heuristics_version": "0.2.0", }, ) result.violations = result.top_violations( min_severity=min_severity, min_confidence=min_confidence, limit=5, ) return result def analyze_terraform_text( self, source_name: str, text: str, *, min_severity: str = "warning", min_confidence: float = 0.6, ) -> AnalysisResult: """Same as analyze_file but for already-loaded text.""" tmp_path = Path(source_name) violations: List[RuleViolation] = [] if "managed_rules" not in text: violations.append( RuleViolation( rule_id=None, message="No managed WAF rules detected in this snippet.", severity="warning", confidence=0.9, framework_refs=["PCI-DSS 6.6", "OWASP-ASVS 13"], location=source_name, hint="Enable Cloudflare managed WAF rulesets (SQLi, XSS, RCE, bots) for this zone.", ) ) result = AnalysisResult( source=str(tmp_path), violations=violations, metadata={"heuristics_version": "0.2.0"}, ) result.violations = result.top_violations( min_severity=min_severity, min_confidence=min_confidence, limit=5, ) return result def analyze_with_threat_intel( self, path: str | Path, threat_indicators: List[Any], *, min_severity: str = "warning", min_confidence: float = 0.6, ) -> AnalysisResult: """ Enhanced analysis using threat intelligence data. Args: path: WAF config file path threat_indicators: List of ThreatIndicator objects from threat_intel module min_severity: Minimum severity to include min_confidence: Minimum confidence threshold Returns: AnalysisResult with violations informed by threat intel """ # Start with base analysis base_result = self.analyze_file(path, min_severity=min_severity, min_confidence=min_confidence) path = Path(path) text = path.read_text(encoding="utf-8") text_lower = text.lower() # Check if threat indicators are addressed by existing rules critical_ips = [i for i in threat_indicators if i.indicator_type == "ip" and i.severity in ("critical", "high")] critical_patterns = [i for i in threat_indicators if i.indicator_type == "pattern" and i.severity in ("critical", "high")] # Check for IP blocking coverage if critical_ips: ip_block_present = "ip.src" in text_lower or "cf.client.ip" in text_lower if not ip_block_present: base_result.violations.append( RuleViolation( rule_id=None, message=f"Threat intel identified {len(critical_ips)} high-risk IPs not addressed by WAF rules.", severity="error", confidence=0.85, framework_refs=["Zero-Trust", "Threat Intelligence"], location=str(path), hint=f"Add IP blocking rules for identified threat actors. Sample IPs: {', '.join(i.value for i in critical_ips[:3])}", ) ) # Check for pattern-based attack coverage attack_types_seen = set() for ind in critical_patterns: for tag in ind.tags: if tag in ("sqli", "xss", "rce", "path_traversal"): attack_types_seen.add(tag) # Check managed ruleset coverage for attack_type in attack_types_seen: if attack_type not in text_lower and f'"{attack_type}"' not in text_lower: base_result.violations.append( RuleViolation( rule_id=None, message=f"Threat intel detected {attack_type.upper()} attacks but no explicit protection found.", severity="warning", confidence=0.8, framework_refs=["OWASP Top 10", "Threat Intelligence"], location=str(path), hint=f"Enable Cloudflare managed rules for {attack_type.upper()} protection.", ) ) # Update metadata with threat intel stats base_result.metadata["threat_intel"] = { "critical_ips": len(critical_ips), "critical_patterns": len(critical_patterns), "attack_types_seen": list(attack_types_seen), } return base_result