#!/usr/bin/env python3 """ Enhanced Security Classification Framework for Layer0 Provides advanced classification capabilities for Cloudflare infrastructure operations """ from enum import Enum from typing import Dict, List, Optional, Any from dataclasses import dataclass import re class SecurityLevel(str, Enum): """Security classification levels""" LOW_RISK = "low_risk" MEDIUM_RISK = "medium_risk" HIGH_RISK = "high_risk" CRITICAL_RISK = "critical_risk" class OperationType(str, Enum): """Types of infrastructure operations""" READ_ONLY = "read_only" CONFIGURATION_CHANGE = "configuration_change" INFRASTRUCTURE_MODIFICATION = "infrastructure_modification" SECURITY_MODIFICATION = "security_modification" ACCESS_CONTROL_CHANGE = "access_control_change" class ResourceType(str, Enum): """Types of Cloudflare resources""" DNS_RECORD = "dns_record" WAF_RULE = "waf_rule" ACCESS_RULE = "access_rule" TUNNEL = "tunnel" ZONE_SETTINGS = "zone_settings" ACCOUNT_SETTINGS = "account_settings" @dataclass class SecurityClassification: """Result of security classification""" level: SecurityLevel operation_type: OperationType resource_type: ResourceType confidence: float # 0.0 to 1.0 flags: List[str] rationale: str requires_approval: bool approval_threshold: Optional[str] = None class SecurityClassifier: """ Advanced security classifier for Cloudflare infrastructure operations Provides multi-dimensional risk assessment and classification """ def __init__(self): # Pattern definitions for different risk levels self.critical_patterns = [ r"delete.*all", r"destroy.*infrastructure", r"disable.*waf", r"remove.*firewall", r"bypass.*security", r"expose.*credentials", r"terraform.*destroy", r"drop.*database", ] self.high_risk_patterns = [ r"modify.*dns", r"change.*tunnel", r"update.*waf", r"create.*rule", r"modify.*access", r"terraform.*apply", ] self.medium_risk_patterns = [ r"create.*record", r"update.*settings", r"configure.*zone", r"modify.*page", r"change.*cache", ] self.low_risk_patterns = [ r"list.*records", r"get.*status", r"show.*config", r"read.*logs", r"monitor.*health", ] # Operation type patterns self.operation_patterns = { OperationType.READ_ONLY: [ r"list", r"get", r"show", r"read", r"monitor", r"status", ], OperationType.CONFIGURATION_CHANGE: [ r"configure", r"update.*settings", r"change.*config", ], OperationType.INFRASTRUCTURE_MODIFICATION: [ r"create", r"modify", r"update", r"delete", r"destroy", ], OperationType.SECURITY_MODIFICATION: [ r"waf", r"firewall", r"security", r"block", r"allow", ], OperationType.ACCESS_CONTROL_CHANGE: [ r"access", r"permission", r"role", r"policy", ], } # Resource type patterns self.resource_patterns = { ResourceType.DNS_RECORD: [r"dns", r"record", r"domain", r"zone"], ResourceType.WAF_RULE: [r"waf", r"firewall", r"rule", r"security"], ResourceType.ACCESS_RULE: [r"access", r"policy", r"permission"], ResourceType.TUNNEL: [r"tunnel", r"connector", r"proxy"], ResourceType.ZONE_SETTINGS: [r"zone.*settings", r"domain.*config"], ResourceType.ACCOUNT_SETTINGS: [r"account.*settings", r"billing"], } def classify_operation( self, operation_description: str, context: Optional[Dict[str, Any]] = None ) -> SecurityClassification: """ Classify an infrastructure operation based on description and context """ description_lower = operation_description.lower() # Determine security level security_level = self._determine_security_level(description_lower) # Determine operation type operation_type = self._determine_operation_type(description_lower) # Determine resource type resource_type = self._determine_resource_type(description_lower) # Calculate confidence confidence = self._calculate_confidence(description_lower, security_level) # Generate flags flags = self._generate_flags(description_lower, security_level, context) # Generate rationale rationale = self._generate_rationale( security_level, operation_type, resource_type ) # Determine if approval is required requires_approval = security_level in [ SecurityLevel.HIGH_RISK, SecurityLevel.CRITICAL_RISK, ] approval_threshold = self._determine_approval_threshold(security_level) return SecurityClassification( level=security_level, operation_type=operation_type, resource_type=resource_type, confidence=confidence, flags=flags, rationale=rationale, requires_approval=requires_approval, approval_threshold=approval_threshold, ) def _determine_security_level(self, description: str) -> SecurityLevel: """Determine the security risk level""" for pattern in self.critical_patterns: if re.search(pattern, description): return SecurityLevel.CRITICAL_RISK for pattern in self.high_risk_patterns: if re.search(pattern, description): return SecurityLevel.HIGH_RISK for pattern in self.medium_risk_patterns: if re.search(pattern, description): return SecurityLevel.MEDIUM_RISK for pattern in self.low_risk_patterns: if re.search(pattern, description): return SecurityLevel.LOW_RISK # Default to medium risk for unknown operations return SecurityLevel.MEDIUM_RISK def _determine_operation_type(self, description: str) -> OperationType: """Determine the type of operation""" for op_type, patterns in self.operation_patterns.items(): for pattern in patterns: if re.search(pattern, description): return op_type # Default to infrastructure modification for safety return OperationType.INFRASTRUCTURE_MODIFICATION def _determine_resource_type(self, description: str) -> ResourceType: """Determine the type of resource being operated on""" for resource_type, patterns in self.resource_patterns.items(): for pattern in patterns: if re.search(pattern, description): return resource_type # Default to DNS records (most common) return ResourceType.DNS_RECORD def _calculate_confidence( self, description: str, security_level: SecurityLevel ) -> float: """Calculate confidence score for classification""" base_confidence = 0.7 # Increase confidence for longer, more specific descriptions word_count = len(description.split()) if word_count > 10: base_confidence += 0.2 elif word_count > 5: base_confidence += 0.1 # Adjust based on security level if security_level == SecurityLevel.CRITICAL_RISK: base_confidence += 0.1 # Critical patterns are usually clear return min(1.0, base_confidence) def _generate_flags( self, description: str, security_level: SecurityLevel, context: Optional[Dict[str, Any]], ) -> List[str]: """Generate security flags for the operation""" flags = [] # Basic flags based on security level if security_level == SecurityLevel.CRITICAL_RISK: flags.extend( ["critical_risk", "requires_emergency_approval", "multi_factor_auth"] ) elif security_level == SecurityLevel.HIGH_RISK: flags.extend(["high_risk", "requires_senior_approval", "audit_trail"]) elif security_level == SecurityLevel.MEDIUM_RISK: flags.extend(["medium_risk", "requires_standard_approval"]) else: flags.extend(["low_risk", "auto_approved"]) # Context-based flags if context: environment = context.get("environment", "") if environment.lower() in ["prod", "production"]: flags.append("production_environment") user_role = context.get("user_role", "") if user_role.lower() in ["admin", "root"]: flags.append("privileged_user") # Pattern-based flags if re.search(r"delete|destroy|remove", description): flags.append("destructive_operation") if re.search(r"waf|firewall|security", description): flags.append("security_related") if re.search(r"dns|domain|zone", description): flags.append("dns_related") return flags def _generate_rationale( self, security_level: SecurityLevel, operation_type: OperationType, resource_type: ResourceType, ) -> str: """Generate rationale for the classification""" rationales = { SecurityLevel.CRITICAL_RISK: "Critical risk operation involving infrastructure destruction or security bypass", SecurityLevel.HIGH_RISK: "High risk operation modifying core infrastructure or security settings", SecurityLevel.MEDIUM_RISK: "Medium risk operation involving configuration changes", SecurityLevel.LOW_RISK: "Low risk read-only operation", } base_rationale = rationales.get( security_level, "Standard infrastructure operation" ) # Add operation-specific details if operation_type == OperationType.INFRASTRUCTURE_MODIFICATION: base_rationale += " with infrastructure modification capabilities" elif operation_type == OperationType.SECURITY_MODIFICATION: base_rationale += " affecting security controls" # Add resource-specific details if resource_type == ResourceType.DNS_RECORD: base_rationale += " on DNS infrastructure" elif resource_type == ResourceType.WAF_RULE: base_rationale += " on WAF security rules" return base_rationale def _determine_approval_threshold( self, security_level: SecurityLevel ) -> Optional[str]: """Determine the approval threshold required""" thresholds = { SecurityLevel.CRITICAL_RISK: "Emergency Change Advisory Board (ECAB)", SecurityLevel.HIGH_RISK: "Senior Infrastructure Engineer", SecurityLevel.MEDIUM_RISK: "Team Lead", SecurityLevel.LOW_RISK: None, } return thresholds.get(security_level) # Example usage and testing def main(): """Example usage of the security classifier""" classifier = SecurityClassifier() # Test cases test_cases = [ "Delete all DNS records for domain example.com", "Update WAF rule to allow traffic from China", "Create new DNS record for subdomain", "List all current tunnels and their status", "Modify zone settings to enable development mode", "Destroy all terraform infrastructure", ] print("🔐 Security Classification Framework Test") print("=" * 60) for test_case in test_cases: classification = classifier.classify_operation(test_case) print(f"\nOperation: {test_case}") print(f"Security Level: {classification.level.value}") print(f"Operation Type: {classification.operation_type.value}") print(f"Resource Type: {classification.resource_type.value}") print(f"Confidence: {classification.confidence:.2f}") print(f"Requires Approval: {classification.requires_approval}") if classification.approval_threshold: print(f"Approval Threshold: {classification.approval_threshold}") print(f"Flags: {', '.join(classification.flags)}") print(f"Rationale: {classification.rationale}") if __name__ == "__main__": main()