""" Phase 7: Multi-Source Threat Intelligence Collector Aggregates threat data from: - Cloudflare Analytics API (WAF events, firewall logs) - External threat feeds (AbuseIPDB, Emerging Threats, etc.) - Local honeypot signals (if configured) - Historical attack patterns from receipts/logs Produces scored ThreatIndicators for ML classification and rule generation. """ from __future__ import annotations import hashlib import json import os import re from dataclasses import dataclass, field from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Set from urllib.parse import urlparse # Optional: requests for external API calls try: import requests HAS_REQUESTS = True except ImportError: HAS_REQUESTS = False @dataclass class ThreatIndicator: """Single threat indicator with scoring metadata.""" indicator_type: str # "ip", "ua", "path", "pattern", "country" value: str confidence: float # 0.0-1.0 severity: str # "low", "medium", "high", "critical" sources: List[str] = field(default_factory=list) tags: List[str] = field(default_factory=list) first_seen: Optional[datetime] = None last_seen: Optional[datetime] = None hit_count: int = 1 context: Dict[str, Any] = field(default_factory=dict) @property def fingerprint(self) -> str: """Unique identifier for deduplication.""" raw = f"{self.indicator_type}:{self.value}" return hashlib.sha256(raw.encode()).hexdigest()[:16] def merge(self, other: "ThreatIndicator") -> None: """Merge another indicator into this one (for deduplication).""" self.hit_count += other.hit_count self.confidence = max(self.confidence, other.confidence) self.sources = list(set(self.sources + other.sources)) self.tags = list(set(self.tags + other.tags)) if other.first_seen and (not self.first_seen or other.first_seen < self.first_seen): self.first_seen = other.first_seen if other.last_seen and (not self.last_seen or other.last_seen > self.last_seen): self.last_seen = other.last_seen @dataclass class ThreatIntelReport: """Aggregated threat intelligence from all sources.""" indicators: List[ThreatIndicator] = field(default_factory=list) sources_queried: List[str] = field(default_factory=list) collection_time: datetime = field(default_factory=datetime.utcnow) metadata: Dict[str, Any] = field(default_factory=dict) @property def critical_count(self) -> int: return sum(1 for i in self.indicators if i.severity == "critical") @property def high_count(self) -> int: return sum(1 for i in self.indicators if i.severity == "high") def top_indicators(self, limit: int = 10) -> List[ThreatIndicator]: """Return highest-priority indicators.""" severity_order = {"critical": 4, "high": 3, "medium": 2, "low": 1} sorted_indicators = sorted( self.indicators, key=lambda x: (severity_order.get(x.severity, 0), x.confidence, x.hit_count), reverse=True ) return sorted_indicators[:limit] class CloudflareLogParser: """Parse Cloudflare WAF/firewall logs for threat indicators.""" # Common attack patterns in URIs ATTACK_PATTERNS = [ (r"(?i)(?:union\s+select|select\s+.*\s+from)", "sqli", "high"), (r"(?i)]*>", "xss", "high"), (r"(?i)(?:\.\./|\.\.\\)", "path_traversal", "medium"), (r"(?i)(?:cmd=|exec=|system\()", "rce", "critical"), (r"(?i)(?:wp-admin|wp-login|xmlrpc\.php)", "wordpress_probe", "low"), (r"(?i)(?:\.env|\.git|\.htaccess)", "sensitive_file", "medium"), (r"(?i)(?:phpmyadmin|adminer|mysql)", "db_probe", "medium"), (r"(?i)(?:eval\(|base64_decode)", "code_injection", "high"), ] # Known bad user agents BAD_USER_AGENTS = [ ("sqlmap", "sqli_tool", "high"), ("nikto", "scanner", "medium"), ("nmap", "scanner", "medium"), ("masscan", "scanner", "medium"), ("zgrab", "scanner", "low"), ("python-requests", "bot", "low"), # contextual ("curl", "bot", "low"), # contextual ] def parse_log_file(self, path: Path) -> List[ThreatIndicator]: """Parse a log file and extract threat indicators.""" indicators: List[ThreatIndicator] = [] if not path.exists(): return indicators try: with open(path, "r", encoding="utf-8", errors="ignore") as f: for line in f: indicators.extend(self._parse_log_line(line)) except Exception: pass return indicators def _parse_log_line(self, line: str) -> List[ThreatIndicator]: """Extract indicators from a single log line.""" indicators: List[ThreatIndicator] = [] # Try JSON format first try: data = json.loads(line) indicators.extend(self._parse_json_log(data)) return indicators except json.JSONDecodeError: pass # Fall back to pattern matching on raw line indicators.extend(self._scan_for_patterns(line)) return indicators def _parse_json_log(self, data: Dict[str, Any]) -> List[ThreatIndicator]: """Parse structured JSON log entry.""" indicators: List[ThreatIndicator] = [] # Extract IP if blocked or challenged action = data.get("action", "").lower() if action in ("block", "challenge", "managed_challenge"): ip = data.get("clientIP") or data.get("client_ip") or data.get("ip") if ip: indicators.append(ThreatIndicator( indicator_type="ip", value=ip, confidence=0.8 if action == "block" else 0.6, severity="high" if action == "block" else "medium", sources=["cloudflare_log"], tags=[action, data.get("ruleId", "unknown_rule")], context={"rule": data.get("ruleName", ""), "action": action} )) # Extract URI patterns uri = data.get("clientRequestURI") or data.get("uri") or data.get("path", "") if uri: indicators.extend(self._scan_for_patterns(uri)) # Extract user agent ua = data.get("clientRequestHTTPHost") or data.get("user_agent", "") if ua: for pattern, tag, severity in self.BAD_USER_AGENTS: if pattern.lower() in ua.lower(): indicators.append(ThreatIndicator( indicator_type="ua", value=ua[:200], # truncate confidence=0.7, severity=severity, sources=["cloudflare_log"], tags=[tag, "bad_ua"] )) break return indicators def _scan_for_patterns(self, text: str) -> List[ThreatIndicator]: """Scan text for known attack patterns.""" indicators: List[ThreatIndicator] = [] for pattern, tag, severity in self.ATTACK_PATTERNS: if re.search(pattern, text): indicators.append(ThreatIndicator( indicator_type="pattern", value=text[:500], # truncate confidence=0.75, severity=severity, sources=["pattern_match"], tags=[tag, "attack_pattern"] )) return indicators class ExternalThreatFeed: """Fetch threat intelligence from external APIs.""" def __init__(self, api_keys: Optional[Dict[str, str]] = None): self.api_keys = api_keys or {} self._cache: Dict[str, ThreatIndicator] = {} def query_abuseipdb(self, ip: str) -> Optional[ThreatIndicator]: """Query AbuseIPDB for IP reputation.""" if not HAS_REQUESTS: return None api_key = self.api_keys.get("abuseipdb") or os.getenv("ABUSEIPDB_API_KEY") if not api_key: return None cache_key = f"abuseipdb:{ip}" if cache_key in self._cache: return self._cache[cache_key] try: resp = requests.get( "https://api.abuseipdb.com/api/v2/check", headers={"Key": api_key, "Accept": "application/json"}, params={"ipAddress": ip, "maxAgeInDays": 90}, timeout=5 ) if resp.status_code == 200: data = resp.json().get("data", {}) abuse_score = data.get("abuseConfidenceScore", 0) if abuse_score > 0: severity = "critical" if abuse_score > 80 else "high" if abuse_score > 50 else "medium" indicator = ThreatIndicator( indicator_type="ip", value=ip, confidence=abuse_score / 100, severity=severity, sources=["abuseipdb"], tags=["external_intel", "ip_reputation"], hit_count=data.get("totalReports", 1), context={ "abuse_score": abuse_score, "country": data.get("countryCode"), "isp": data.get("isp"), "domain": data.get("domain"), "usage_type": data.get("usageType"), } ) self._cache[cache_key] = indicator return indicator except Exception: pass return None def query_emerging_threats(self, ip: str) -> Optional[ThreatIndicator]: """Check IP against Emerging Threats blocklist (free, no API key).""" if not HAS_REQUESTS: return None # This is a simplified check - real implementation would cache the blocklist # For demo purposes, we return None and rely on other sources return None def enrich_indicator(self, indicator: ThreatIndicator) -> ThreatIndicator: """Enrich an indicator with external intelligence.""" if indicator.indicator_type == "ip": external = self.query_abuseipdb(indicator.value) if external: indicator.merge(external) return indicator class ThreatIntelCollector: """ Main collector that aggregates from all sources. Usage: collector = ThreatIntelCollector(workspace_path="/path/to/cloudflare") report = collector.collect() for indicator in report.top_indicators(10): print(f"{indicator.severity}: {indicator.indicator_type}={indicator.value}") """ def __init__( self, workspace_path: Optional[str] = None, api_keys: Optional[Dict[str, str]] = None, enable_external: bool = True ): self.workspace = Path(workspace_path) if workspace_path else Path.cwd() self.log_parser = CloudflareLogParser() self.external_feed = ExternalThreatFeed(api_keys) if enable_external else None self._indicators: Dict[str, ThreatIndicator] = {} def collect( self, log_dirs: Optional[List[str]] = None, enrich_external: bool = True, max_indicators: int = 1000 ) -> ThreatIntelReport: """ Collect threat intelligence from all configured sources. Args: log_dirs: Directories to scan for logs (default: observatory/, anomalies/) enrich_external: Whether to query external APIs for enrichment max_indicators: Maximum indicators to return Returns: ThreatIntelReport with deduplicated, scored indicators """ sources_queried: List[str] = [] # Default log directories if log_dirs is None: log_dirs = ["observatory", "anomalies", "archive_runtime/receipts"] # Collect from local logs for log_dir in log_dirs: dir_path = self.workspace / log_dir if dir_path.exists(): sources_queried.append(f"local:{log_dir}") self._collect_from_directory(dir_path) # Collect from Terraform state (extract referenced IPs/patterns) tf_path = self.workspace / "terraform" if tf_path.exists(): sources_queried.append("terraform_state") self._collect_from_terraform(tf_path) # Enrich with external intel if enabled if enrich_external and self.external_feed: sources_queried.append("external_apis") self._enrich_all_indicators() # Build report all_indicators = list(self._indicators.values()) # Sort by priority and truncate severity_order = {"critical": 4, "high": 3, "medium": 2, "low": 1} all_indicators.sort( key=lambda x: (severity_order.get(x.severity, 0), x.confidence, x.hit_count), reverse=True ) return ThreatIntelReport( indicators=all_indicators[:max_indicators], sources_queried=sources_queried, metadata={ "workspace": str(self.workspace), "total_raw": len(self._indicators), "external_enabled": enrich_external and self.external_feed is not None } ) def _collect_from_directory(self, dir_path: Path) -> None: """Scan a directory for log files and extract indicators.""" log_patterns = ["*.log", "*.json", "*.jsonl"] for pattern in log_patterns: for log_file in dir_path.rglob(pattern): for indicator in self.log_parser.parse_log_file(log_file): self._add_indicator(indicator) def _collect_from_terraform(self, tf_path: Path) -> None: """Extract indicators referenced in Terraform files.""" for tf_file in tf_path.glob("*.tf"): try: content = tf_file.read_text(encoding="utf-8") # Extract IPs from allow/block rules ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}(?:/\d{1,2})?\b' for match in re.finditer(ip_pattern, content): ip = match.group() # Only flag if in a block context context_start = max(0, match.start() - 100) context = content[context_start:match.start()].lower() if "block" in context or "deny" in context: self._add_indicator(ThreatIndicator( indicator_type="ip", value=ip, confidence=0.9, severity="medium", sources=["terraform_blocklist"], tags=["existing_rule", "blocked_ip"], context={"file": str(tf_file.name)} )) except Exception: pass def _add_indicator(self, indicator: ThreatIndicator) -> None: """Add indicator with deduplication.""" key = indicator.fingerprint if key in self._indicators: self._indicators[key].merge(indicator) else: self._indicators[key] = indicator def _enrich_all_indicators(self) -> None: """Enrich all IP indicators with external intelligence.""" if not self.external_feed: return for key, indicator in list(self._indicators.items()): if indicator.indicator_type == "ip": self.external_feed.enrich_indicator(indicator) # CLI interface for testing if __name__ == "__main__": import sys workspace = sys.argv[1] if len(sys.argv) > 1 else "." collector = ThreatIntelCollector( workspace_path=workspace, enable_external=False # Don't hit APIs in CLI test ) report = collector.collect() print(f"\nšŸ” Threat Intelligence Report") print(f"=" * 50) print(f"Sources: {', '.join(report.sources_queried)}") print(f"Total indicators: {len(report.indicators)}") print(f"Critical: {report.critical_count} | High: {report.high_count}") print(f"\nTop 10 Indicators:") print("-" * 50) for ind in report.top_indicators(10): print(f" [{ind.severity.upper():8}] {ind.indicator_type}={ind.value[:50]}") print(f" confidence={ind.confidence:.2f} hits={ind.hit_count} sources={ind.sources}")