#!/usr/bin/env python3 """ Phase 7: WAF Rule Proposer for GitOps Integration Generates Terraform WAF rules based on: - Threat intelligence indicators - ML classification results - Compliance requirements - Existing rule gaps Integrates with Phase 6 GitOps to create automated MRs. """ from __future__ import annotations import json import os import re from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Set # Import sibling modules import sys sys.path.insert(0, str(Path(__file__).parent.parent.parent)) # Type imports with fallbacks for standalone testing _HAS_WAF_INTEL = False try: from mcp.waf_intelligence.threat_intel import ThreatIndicator, ThreatIntelReport from mcp.waf_intelligence.classifier import ClassificationResult, ThreatClassifier from mcp.waf_intelligence.generator import GeneratedRule, WAFRuleGenerator from mcp.waf_intelligence.compliance import ComplianceMapper, FrameworkMapping _HAS_WAF_INTEL = True except ImportError: pass # TYPE_CHECKING block for type hints when modules unavailable from typing import TYPE_CHECKING if TYPE_CHECKING: from mcp.waf_intelligence.threat_intel import ThreatIndicator, ThreatIntelReport from mcp.waf_intelligence.classifier import ClassificationResult, ThreatClassifier @dataclass class RuleProposal: """A proposed WAF rule with full context for GitOps review.""" rule_name: str rule_type: str # "ip_block", "pattern_block", "rate_limit", "managed_rule" terraform_code: str severity: str # "low", "medium", "high", "critical" confidence: float justification: str threat_indicators: List[str] = field(default_factory=list) compliance_refs: List[str] = field(default_factory=list) estimated_impact: str = "" auto_deploy_eligible: bool = False tags: List[str] = field(default_factory=list) def to_markdown(self) -> str: """Render proposal as Markdown for MR description.""" emoji = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🟢"}.get(self.severity, "⚪") md = f"""### {emoji} {self.rule_name} **Type:** `{self.rule_type}` | **Severity:** `{self.severity}` | **Confidence:** `{self.confidence:.0%}` **Justification:** {self.justification} **Compliance:** {', '.join(self.compliance_refs) or 'N/A'} **Estimated Impact:** {self.estimated_impact or 'Unknown'}
Terraform Code ```hcl {self.terraform_code} ```
**Tags:** {', '.join(f'`{t}`' for t in self.tags) or 'None'} --- """ return md @dataclass class ProposalBatch: """Batch of rule proposals for a single MR.""" proposals: List[RuleProposal] = field(default_factory=list) generated_at: datetime = field(default_factory=datetime.utcnow) source_report: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) @property def critical_count(self) -> int: return sum(1 for p in self.proposals if p.severity == "critical") @property def auto_deployable(self) -> List[RuleProposal]: return [p for p in self.proposals if p.auto_deploy_eligible] def to_markdown(self) -> str: """Generate full MR description.""" header = f"""# WAF Rule Proposals - Phase 7 Intelligence **Generated:** {self.generated_at.strftime('%Y-%m-%d %H:%M:%S UTC')} **Total Proposals:** {len(self.proposals)} **Critical:** {self.critical_count} **Auto-Deploy Eligible:** {len(self.auto_deployable)} --- ## Summary | Rule | Type | Severity | Confidence | Auto-Deploy | |------|------|----------|------------|-------------| """ for p in self.proposals: auto = "✅" if p.auto_deploy_eligible else "❌" header += f"| {p.rule_name} | {p.rule_type} | {p.severity} | {p.confidence:.0%} | {auto} |\n" header += "\n---\n\n## Detailed Proposals\n\n" for p in self.proposals: header += p.to_markdown() + "\n" return header def to_terraform_file(self) -> str: """Generate combined Terraform file.""" header = f"""# Auto-generated WAF rules from Phase 7 Intelligence # Generated: {self.generated_at.strftime('%Y-%m-%d %H:%M:%S UTC')} # Review carefully before applying """ return header + "\n\n".join(p.terraform_code for p in self.proposals) class WAFRuleProposer: """ Generates WAF rule proposals from threat intelligence and ML analysis. Usage: proposer = WAFRuleProposer(workspace_path="/path/to/cloudflare") batch = proposer.generate_proposals(threat_report) print(batch.to_markdown()) """ def __init__( self, workspace_path: Optional[str] = None, zone_id_var: str = "var.zone_id", account_id_var: str = "var.cloudflare_account_id", ): self.workspace = Path(workspace_path) if workspace_path else Path.cwd() self.zone_id_var = zone_id_var self.account_id_var = account_id_var # Initialize components only if available self.classifier = None self.rule_generator = None self.compliance_mapper = None if _HAS_WAF_INTEL: try: self.classifier = ThreatClassifier() except Exception: pass try: self.rule_generator = WAFRuleGenerator() except Exception: pass try: self.compliance_mapper = ComplianceMapper() except Exception: pass # Auto-deploy thresholds self.auto_deploy_min_confidence = 0.85 self.auto_deploy_severities = {"critical", "high"} def generate_proposals( self, threat_report: Optional[Any] = None, indicators: Optional[List[Any]] = None, max_proposals: int = 10, ) -> ProposalBatch: """ Generate rule proposals from threat intelligence. Args: threat_report: Full threat intel report indicators: Or just a list of indicators max_proposals: Maximum number of proposals to generate Returns: ProposalBatch ready for GitOps MR """ proposals: List[RuleProposal] = [] # Get indicators from report or directly if threat_report: all_indicators = threat_report.indicators elif indicators: all_indicators = indicators else: all_indicators = [] # Group indicators by type ip_indicators = [i for i in all_indicators if i.indicator_type == "ip"] pattern_indicators = [i for i in all_indicators if i.indicator_type == "pattern"] ua_indicators = [i for i in all_indicators if i.indicator_type == "ua"] # Generate IP blocking rules proposals.extend(self._generate_ip_rules(ip_indicators)) # Generate pattern-based rules proposals.extend(self._generate_pattern_rules(pattern_indicators)) # Generate user-agent rules proposals.extend(self._generate_ua_rules(ua_indicators)) # Generate managed rule recommendations proposals.extend(self._generate_managed_rule_proposals(all_indicators)) # Sort by severity and confidence severity_order = {"critical": 4, "high": 3, "medium": 2, "low": 1} proposals.sort( key=lambda p: (severity_order.get(p.severity, 0), p.confidence), reverse=True ) return ProposalBatch( proposals=proposals[:max_proposals], source_report=str(threat_report.collection_time) if threat_report else None, metadata={ "total_indicators": len(all_indicators), "ip_indicators": len(ip_indicators), "pattern_indicators": len(pattern_indicators), } ) def _generate_ip_rules(self, indicators: List[Any]) -> List[RuleProposal]: """Generate IP blocking rules.""" proposals: List[RuleProposal] = [] # Group by severity critical_ips = [i for i in indicators if i.severity == "critical"] high_ips = [i for i in indicators if i.severity == "high"] # Critical IPs - individual block rules for ind in critical_ips[:5]: # Limit to top 5 rule_name = f"waf_block_ip_{ind.value.replace('.', '_')}" terraform = self._ip_block_terraform(rule_name, [ind.value], "block") proposals.append(RuleProposal( rule_name=rule_name, rule_type="ip_block", terraform_code=terraform, severity="critical", confidence=ind.confidence, justification=f"Critical threat actor IP detected. Sources: {', '.join(ind.sources)}. " f"Hit count: {ind.hit_count}. {ind.context.get('abuse_score', 'N/A')} abuse score.", threat_indicators=[ind.value], compliance_refs=["Zero-Trust", "Threat Intelligence"], estimated_impact="Blocks all traffic from this IP", auto_deploy_eligible=ind.confidence >= self.auto_deploy_min_confidence, tags=["auto-generated", "threat-intel", "ip-block"] )) # Batch high-severity IPs into one rule if high_ips: ips = [i.value for i in high_ips[:20]] # Limit batch size rule_name = "waf_block_high_risk_ips" terraform = self._ip_block_terraform(rule_name, ips, "block") avg_confidence = sum(i.confidence for i in high_ips[:20]) / len(high_ips[:20]) proposals.append(RuleProposal( rule_name=rule_name, rule_type="ip_block", terraform_code=terraform, severity="high", confidence=avg_confidence, justification=f"Batch block of {len(ips)} high-risk IPs from threat intelligence.", threat_indicators=ips, compliance_refs=["Zero-Trust", "Threat Intelligence"], estimated_impact=f"Blocks traffic from {len(ips)} IPs", auto_deploy_eligible=False, # Batch rules require manual review tags=["auto-generated", "threat-intel", "ip-block", "batch"] )) return proposals def _generate_pattern_rules(self, indicators: List[Any]) -> List[RuleProposal]: """Generate pattern-based blocking rules.""" proposals: List[RuleProposal] = [] # Group by attack type attack_types: Dict[str, List[Any]] = {} for ind in indicators: for tag in ind.tags: if tag in ("sqli", "xss", "rce", "path_traversal"): attack_types.setdefault(tag, []).append(ind) # Generate rules per attack type for attack_type, inds in attack_types.items(): if not inds: continue # Use ML classifier to validate if available if self.classifier: # Classify a sample to confirm sample = inds[0].value[:500] result = self.classifier.classify(sample) if result.label != attack_type and result.confidence > 0.7: # ML disagrees, adjust confidence confidence = min(ind.confidence for ind in inds) * 0.7 else: confidence = max(ind.confidence for ind in inds) else: confidence = max(ind.confidence for ind in inds) rule_name = f"waf_protect_{attack_type}" terraform = self._managed_rule_terraform(rule_name, attack_type) severity = "critical" if attack_type in ("sqli", "rce") else "high" proposals.append(RuleProposal( rule_name=rule_name, rule_type="managed_rule", terraform_code=terraform, severity=severity, confidence=confidence, justification=f"Detected {len(inds)} {attack_type.upper()} attack patterns in traffic. " f"Enabling managed ruleset protection.", threat_indicators=[ind.value[:100] for ind in inds[:3]], compliance_refs=self._get_compliance_refs(attack_type), estimated_impact=f"Blocks {attack_type.upper()} attacks via managed rules", auto_deploy_eligible=confidence >= self.auto_deploy_min_confidence, tags=["auto-generated", "threat-intel", attack_type, "managed-rules"] )) return proposals def _generate_ua_rules(self, indicators: List[Any]) -> List[RuleProposal]: """Generate user-agent blocking rules.""" proposals: List[RuleProposal] = [] scanner_uas = [i for i in indicators if "scanner" in i.tags or "bad_ua" in i.tags] if scanner_uas: # Extract unique patterns patterns = list(set(i.value[:100] for i in scanner_uas))[:10] rule_name = "waf_block_scanner_uas" terraform = self._ua_block_terraform(rule_name, patterns) proposals.append(RuleProposal( rule_name=rule_name, rule_type="pattern_block", terraform_code=terraform, severity="medium", confidence=0.75, justification=f"Blocking {len(patterns)} scanner/bot user agents detected in traffic.", threat_indicators=patterns, compliance_refs=["Bot Protection"], estimated_impact="Blocks automated scanning tools", auto_deploy_eligible=False, tags=["auto-generated", "threat-intel", "scanner", "user-agent"] )) return proposals def _generate_managed_rule_proposals( self, indicators: List[Any] ) -> List[RuleProposal]: """Generate recommendations to enable managed rulesets.""" proposals: List[RuleProposal] = [] # Check for attack types that should have managed rules attack_types_seen = set() for ind in indicators: for tag in ind.tags: if tag in ("sqli", "xss", "rce", "path_traversal"): attack_types_seen.add(tag) # Check existing terraform for gaps tf_path = self.workspace / "terraform" / "waf.tf" existing_coverage = set() if tf_path.exists(): try: content = tf_path.read_text().lower() for attack_type in ["sqli", "xss", "rce"]: if attack_type in content or f'"{attack_type}"' in content: existing_coverage.add(attack_type) except Exception: pass # Propose missing protections for attack_type in attack_types_seen - existing_coverage: rule_name = f"waf_enable_{attack_type}_protection" terraform = self._managed_rule_terraform(rule_name, attack_type) proposals.append(RuleProposal( rule_name=rule_name, rule_type="managed_rule", terraform_code=terraform, severity="high", confidence=0.9, justification=f"Traffic shows {attack_type.upper()} attack patterns but no protection enabled. " f"Recommend enabling Cloudflare managed {attack_type.upper()} ruleset.", threat_indicators=[], compliance_refs=self._get_compliance_refs(attack_type), estimated_impact=f"Enables {attack_type.upper()} protection", auto_deploy_eligible=True, tags=["auto-generated", "gap-analysis", attack_type, "managed-rules"] )) return proposals def _ip_block_terraform( self, rule_name: str, ips: List[str], action: str = "block" ) -> str: """Generate Terraform for IP blocking rule.""" if len(ips) == 1: expression = f'(ip.src eq {ips[0]})' else: ip_list = " ".join(ips) expression = f'(ip.src in {{{ip_list}}})' return f'''resource "cloudflare_ruleset" "{rule_name}" {{ zone_id = {self.zone_id_var} name = "{rule_name.replace('_', ' ').title()}" description = "Auto-generated by Phase 7 WAF Intelligence" kind = "zone" phase = "http_request_firewall_custom" rules {{ action = "{action}" expression = "{expression}" description = "Block threat intel IPs" enabled = true }} }} ''' def _managed_rule_terraform(self, rule_name: str, attack_type: str) -> str: """Generate Terraform for managed ruleset.""" ruleset_map = { "sqli": "efb7b8c949ac4650a09736fc376e9aee", # Cloudflare SQLi "xss": "c2e184081120413c86c3ab7e14069605", # Cloudflare XSS "rce": "4814384a9e5d4991b9815dcfc25d2f1f", # Cloudflare RCE (example) } ruleset_id = ruleset_map.get(attack_type, "efb7b8c949ac4650a09736fc376e9aee") return f'''resource "cloudflare_ruleset" "{rule_name}" {{ zone_id = {self.zone_id_var} name = "{attack_type.upper()} Protection" description = "Managed {attack_type.upper()} protection - Phase 7 WAF Intelligence" kind = "zone" phase = "http_request_firewall_managed" rules {{ action = "execute" action_parameters {{ id = "{ruleset_id}" }} expression = "true" description = "Enable {attack_type.upper()} managed ruleset" enabled = true }} }} ''' def _ua_block_terraform(self, rule_name: str, patterns: List[str]) -> str: """Generate Terraform for user-agent blocking.""" # Escape patterns for regex safe_patterns = [re.escape(p)[:50] for p in patterns] pattern_regex = "|".join(safe_patterns) return f'''resource "cloudflare_ruleset" "{rule_name}" {{ zone_id = {self.zone_id_var} name = "Block Scanner User Agents" description = "Auto-generated by Phase 7 WAF Intelligence" kind = "zone" phase = "http_request_firewall_custom" rules {{ action = "block" expression = "(http.user_agent contains \\"sqlmap\\" or http.user_agent contains \\"nikto\\" or http.user_agent contains \\"nmap\\" or http.user_agent contains \\"masscan\\")" description = "Block known scanner user agents" enabled = true }} }} ''' def _get_compliance_refs(self, attack_type: str) -> List[str]: """Get compliance references for an attack type.""" refs = { "sqli": ["PCI-DSS 6.6", "OWASP A03:2021"], "xss": ["OWASP A07:2017", "CWE-79"], "rce": ["OWASP A03:2021", "CWE-78"], "path_traversal": ["CWE-22", "OWASP A01:2021"], } return refs.get(attack_type, []) # CLI for testing if __name__ == "__main__": import sys workspace = sys.argv[1] if len(sys.argv) > 1 else "." # Create mock indicators for testing mock_indicators = [ type("ThreatIndicator", (), { "indicator_type": "ip", "value": "192.0.2.100", "severity": "critical", "confidence": 0.95, "sources": ["abuseipdb", "honeypot"], "tags": ["threat-intel"], "hit_count": 150, "context": {"abuse_score": 95}, })(), type("ThreatIndicator", (), { "indicator_type": "pattern", "value": "' OR '1'='1", "severity": "high", "confidence": 0.85, "sources": ["log_analysis"], "tags": ["sqli", "attack_pattern"], "hit_count": 50, "context": {}, })(), type("ThreatIndicator", (), { "indicator_type": "ua", "value": "sqlmap/1.0", "severity": "medium", "confidence": 0.9, "sources": ["log_analysis"], "tags": ["scanner", "bad_ua"], "hit_count": 25, "context": {}, })(), ] proposer = WAFRuleProposer(workspace_path=workspace) batch = proposer.generate_proposals(indicators=mock_indicators) print(batch.to_markdown())