- Complete Cloudflare Terraform configuration (DNS, WAF, tunnels, access) - WAF Intelligence MCP server with threat analysis and ML classification - GitOps automation with PR workflows and drift detection - Observatory monitoring stack with Prometheus/Grafana - IDE operator rules for governed development - Security playbooks and compliance frameworks - Autonomous remediation and state reconciliation
446 lines
17 KiB
Python
446 lines
17 KiB
Python
"""
|
|
Phase 7: Multi-Source Threat Intelligence Collector
|
|
|
|
Aggregates threat data from:
|
|
- Cloudflare Analytics API (WAF events, firewall logs)
|
|
- External threat feeds (AbuseIPDB, Emerging Threats, etc.)
|
|
- Local honeypot signals (if configured)
|
|
- Historical attack patterns from receipts/logs
|
|
|
|
Produces scored ThreatIndicators for ML classification and rule generation.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Set
|
|
from urllib.parse import urlparse
|
|
|
|
# Optional: requests for external API calls
|
|
try:
|
|
import requests
|
|
HAS_REQUESTS = True
|
|
except ImportError:
|
|
HAS_REQUESTS = False
|
|
|
|
|
|
@dataclass
|
|
class ThreatIndicator:
|
|
"""Single threat indicator with scoring metadata."""
|
|
|
|
indicator_type: str # "ip", "ua", "path", "pattern", "country"
|
|
value: str
|
|
confidence: float # 0.0-1.0
|
|
severity: str # "low", "medium", "high", "critical"
|
|
sources: List[str] = field(default_factory=list)
|
|
tags: List[str] = field(default_factory=list)
|
|
first_seen: Optional[datetime] = None
|
|
last_seen: Optional[datetime] = None
|
|
hit_count: int = 1
|
|
context: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
@property
|
|
def fingerprint(self) -> str:
|
|
"""Unique identifier for deduplication."""
|
|
raw = f"{self.indicator_type}:{self.value}"
|
|
return hashlib.sha256(raw.encode()).hexdigest()[:16]
|
|
|
|
def merge(self, other: "ThreatIndicator") -> None:
|
|
"""Merge another indicator into this one (for deduplication)."""
|
|
self.hit_count += other.hit_count
|
|
self.confidence = max(self.confidence, other.confidence)
|
|
self.sources = list(set(self.sources + other.sources))
|
|
self.tags = list(set(self.tags + other.tags))
|
|
if other.first_seen and (not self.first_seen or other.first_seen < self.first_seen):
|
|
self.first_seen = other.first_seen
|
|
if other.last_seen and (not self.last_seen or other.last_seen > self.last_seen):
|
|
self.last_seen = other.last_seen
|
|
|
|
|
|
@dataclass
|
|
class ThreatIntelReport:
|
|
"""Aggregated threat intelligence from all sources."""
|
|
|
|
indicators: List[ThreatIndicator] = field(default_factory=list)
|
|
sources_queried: List[str] = field(default_factory=list)
|
|
collection_time: datetime = field(default_factory=datetime.utcnow)
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
@property
|
|
def critical_count(self) -> int:
|
|
return sum(1 for i in self.indicators if i.severity == "critical")
|
|
|
|
@property
|
|
def high_count(self) -> int:
|
|
return sum(1 for i in self.indicators if i.severity == "high")
|
|
|
|
def top_indicators(self, limit: int = 10) -> List[ThreatIndicator]:
|
|
"""Return highest-priority indicators."""
|
|
severity_order = {"critical": 4, "high": 3, "medium": 2, "low": 1}
|
|
sorted_indicators = sorted(
|
|
self.indicators,
|
|
key=lambda x: (severity_order.get(x.severity, 0), x.confidence, x.hit_count),
|
|
reverse=True
|
|
)
|
|
return sorted_indicators[:limit]
|
|
|
|
|
|
class CloudflareLogParser:
|
|
"""Parse Cloudflare WAF/firewall logs for threat indicators."""
|
|
|
|
# Common attack patterns in URIs
|
|
ATTACK_PATTERNS = [
|
|
(r"(?i)(?:union\s+select|select\s+.*\s+from)", "sqli", "high"),
|
|
(r"(?i)<script[^>]*>", "xss", "high"),
|
|
(r"(?i)(?:\.\./|\.\.\\)", "path_traversal", "medium"),
|
|
(r"(?i)(?:cmd=|exec=|system\()", "rce", "critical"),
|
|
(r"(?i)(?:wp-admin|wp-login|xmlrpc\.php)", "wordpress_probe", "low"),
|
|
(r"(?i)(?:\.env|\.git|\.htaccess)", "sensitive_file", "medium"),
|
|
(r"(?i)(?:phpmyadmin|adminer|mysql)", "db_probe", "medium"),
|
|
(r"(?i)(?:eval\(|base64_decode)", "code_injection", "high"),
|
|
]
|
|
|
|
# Known bad user agents
|
|
BAD_USER_AGENTS = [
|
|
("sqlmap", "sqli_tool", "high"),
|
|
("nikto", "scanner", "medium"),
|
|
("nmap", "scanner", "medium"),
|
|
("masscan", "scanner", "medium"),
|
|
("zgrab", "scanner", "low"),
|
|
("python-requests", "bot", "low"), # contextual
|
|
("curl", "bot", "low"), # contextual
|
|
]
|
|
|
|
def parse_log_file(self, path: Path) -> List[ThreatIndicator]:
|
|
"""Parse a log file and extract threat indicators."""
|
|
indicators: List[ThreatIndicator] = []
|
|
|
|
if not path.exists():
|
|
return indicators
|
|
|
|
try:
|
|
with open(path, "r", encoding="utf-8", errors="ignore") as f:
|
|
for line in f:
|
|
indicators.extend(self._parse_log_line(line))
|
|
except Exception:
|
|
pass
|
|
|
|
return indicators
|
|
|
|
def _parse_log_line(self, line: str) -> List[ThreatIndicator]:
|
|
"""Extract indicators from a single log line."""
|
|
indicators: List[ThreatIndicator] = []
|
|
|
|
# Try JSON format first
|
|
try:
|
|
data = json.loads(line)
|
|
indicators.extend(self._parse_json_log(data))
|
|
return indicators
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Fall back to pattern matching on raw line
|
|
indicators.extend(self._scan_for_patterns(line))
|
|
|
|
return indicators
|
|
|
|
def _parse_json_log(self, data: Dict[str, Any]) -> List[ThreatIndicator]:
|
|
"""Parse structured JSON log entry."""
|
|
indicators: List[ThreatIndicator] = []
|
|
|
|
# Extract IP if blocked or challenged
|
|
action = data.get("action", "").lower()
|
|
if action in ("block", "challenge", "managed_challenge"):
|
|
ip = data.get("clientIP") or data.get("client_ip") or data.get("ip")
|
|
if ip:
|
|
indicators.append(ThreatIndicator(
|
|
indicator_type="ip",
|
|
value=ip,
|
|
confidence=0.8 if action == "block" else 0.6,
|
|
severity="high" if action == "block" else "medium",
|
|
sources=["cloudflare_log"],
|
|
tags=[action, data.get("ruleId", "unknown_rule")],
|
|
context={"rule": data.get("ruleName", ""), "action": action}
|
|
))
|
|
|
|
# Extract URI patterns
|
|
uri = data.get("clientRequestURI") or data.get("uri") or data.get("path", "")
|
|
if uri:
|
|
indicators.extend(self._scan_for_patterns(uri))
|
|
|
|
# Extract user agent
|
|
ua = data.get("clientRequestHTTPHost") or data.get("user_agent", "")
|
|
if ua:
|
|
for pattern, tag, severity in self.BAD_USER_AGENTS:
|
|
if pattern.lower() in ua.lower():
|
|
indicators.append(ThreatIndicator(
|
|
indicator_type="ua",
|
|
value=ua[:200], # truncate
|
|
confidence=0.7,
|
|
severity=severity,
|
|
sources=["cloudflare_log"],
|
|
tags=[tag, "bad_ua"]
|
|
))
|
|
break
|
|
|
|
return indicators
|
|
|
|
def _scan_for_patterns(self, text: str) -> List[ThreatIndicator]:
|
|
"""Scan text for known attack patterns."""
|
|
indicators: List[ThreatIndicator] = []
|
|
|
|
for pattern, tag, severity in self.ATTACK_PATTERNS:
|
|
if re.search(pattern, text):
|
|
indicators.append(ThreatIndicator(
|
|
indicator_type="pattern",
|
|
value=text[:500], # truncate
|
|
confidence=0.75,
|
|
severity=severity,
|
|
sources=["pattern_match"],
|
|
tags=[tag, "attack_pattern"]
|
|
))
|
|
|
|
return indicators
|
|
|
|
|
|
class ExternalThreatFeed:
|
|
"""Fetch threat intelligence from external APIs."""
|
|
|
|
def __init__(self, api_keys: Optional[Dict[str, str]] = None):
|
|
self.api_keys = api_keys or {}
|
|
self._cache: Dict[str, ThreatIndicator] = {}
|
|
|
|
def query_abuseipdb(self, ip: str) -> Optional[ThreatIndicator]:
|
|
"""Query AbuseIPDB for IP reputation."""
|
|
if not HAS_REQUESTS:
|
|
return None
|
|
|
|
api_key = self.api_keys.get("abuseipdb") or os.getenv("ABUSEIPDB_API_KEY")
|
|
if not api_key:
|
|
return None
|
|
|
|
cache_key = f"abuseipdb:{ip}"
|
|
if cache_key in self._cache:
|
|
return self._cache[cache_key]
|
|
|
|
try:
|
|
resp = requests.get(
|
|
"https://api.abuseipdb.com/api/v2/check",
|
|
headers={"Key": api_key, "Accept": "application/json"},
|
|
params={"ipAddress": ip, "maxAgeInDays": 90},
|
|
timeout=5
|
|
)
|
|
if resp.status_code == 200:
|
|
data = resp.json().get("data", {})
|
|
abuse_score = data.get("abuseConfidenceScore", 0)
|
|
|
|
if abuse_score > 0:
|
|
severity = "critical" if abuse_score > 80 else "high" if abuse_score > 50 else "medium"
|
|
indicator = ThreatIndicator(
|
|
indicator_type="ip",
|
|
value=ip,
|
|
confidence=abuse_score / 100,
|
|
severity=severity,
|
|
sources=["abuseipdb"],
|
|
tags=["external_intel", "ip_reputation"],
|
|
hit_count=data.get("totalReports", 1),
|
|
context={
|
|
"abuse_score": abuse_score,
|
|
"country": data.get("countryCode"),
|
|
"isp": data.get("isp"),
|
|
"domain": data.get("domain"),
|
|
"usage_type": data.get("usageType"),
|
|
}
|
|
)
|
|
self._cache[cache_key] = indicator
|
|
return indicator
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
def query_emerging_threats(self, ip: str) -> Optional[ThreatIndicator]:
|
|
"""Check IP against Emerging Threats blocklist (free, no API key)."""
|
|
if not HAS_REQUESTS:
|
|
return None
|
|
|
|
# This is a simplified check - real implementation would cache the blocklist
|
|
# For demo purposes, we return None and rely on other sources
|
|
return None
|
|
|
|
def enrich_indicator(self, indicator: ThreatIndicator) -> ThreatIndicator:
|
|
"""Enrich an indicator with external intelligence."""
|
|
if indicator.indicator_type == "ip":
|
|
external = self.query_abuseipdb(indicator.value)
|
|
if external:
|
|
indicator.merge(external)
|
|
|
|
return indicator
|
|
|
|
|
|
class ThreatIntelCollector:
|
|
"""
|
|
Main collector that aggregates from all sources.
|
|
|
|
Usage:
|
|
collector = ThreatIntelCollector(workspace_path="/path/to/cloudflare")
|
|
report = collector.collect()
|
|
for indicator in report.top_indicators(10):
|
|
print(f"{indicator.severity}: {indicator.indicator_type}={indicator.value}")
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
workspace_path: Optional[str] = None,
|
|
api_keys: Optional[Dict[str, str]] = None,
|
|
enable_external: bool = True
|
|
):
|
|
self.workspace = Path(workspace_path) if workspace_path else Path.cwd()
|
|
self.log_parser = CloudflareLogParser()
|
|
self.external_feed = ExternalThreatFeed(api_keys) if enable_external else None
|
|
self._indicators: Dict[str, ThreatIndicator] = {}
|
|
|
|
def collect(
|
|
self,
|
|
log_dirs: Optional[List[str]] = None,
|
|
enrich_external: bool = True,
|
|
max_indicators: int = 1000
|
|
) -> ThreatIntelReport:
|
|
"""
|
|
Collect threat intelligence from all configured sources.
|
|
|
|
Args:
|
|
log_dirs: Directories to scan for logs (default: observatory/, anomalies/)
|
|
enrich_external: Whether to query external APIs for enrichment
|
|
max_indicators: Maximum indicators to return
|
|
|
|
Returns:
|
|
ThreatIntelReport with deduplicated, scored indicators
|
|
"""
|
|
sources_queried: List[str] = []
|
|
|
|
# Default log directories
|
|
if log_dirs is None:
|
|
log_dirs = ["observatory", "anomalies", "archive_runtime/receipts"]
|
|
|
|
# Collect from local logs
|
|
for log_dir in log_dirs:
|
|
dir_path = self.workspace / log_dir
|
|
if dir_path.exists():
|
|
sources_queried.append(f"local:{log_dir}")
|
|
self._collect_from_directory(dir_path)
|
|
|
|
# Collect from Terraform state (extract referenced IPs/patterns)
|
|
tf_path = self.workspace / "terraform"
|
|
if tf_path.exists():
|
|
sources_queried.append("terraform_state")
|
|
self._collect_from_terraform(tf_path)
|
|
|
|
# Enrich with external intel if enabled
|
|
if enrich_external and self.external_feed:
|
|
sources_queried.append("external_apis")
|
|
self._enrich_all_indicators()
|
|
|
|
# Build report
|
|
all_indicators = list(self._indicators.values())
|
|
|
|
# Sort by priority and truncate
|
|
severity_order = {"critical": 4, "high": 3, "medium": 2, "low": 1}
|
|
all_indicators.sort(
|
|
key=lambda x: (severity_order.get(x.severity, 0), x.confidence, x.hit_count),
|
|
reverse=True
|
|
)
|
|
|
|
return ThreatIntelReport(
|
|
indicators=all_indicators[:max_indicators],
|
|
sources_queried=sources_queried,
|
|
metadata={
|
|
"workspace": str(self.workspace),
|
|
"total_raw": len(self._indicators),
|
|
"external_enabled": enrich_external and self.external_feed is not None
|
|
}
|
|
)
|
|
|
|
def _collect_from_directory(self, dir_path: Path) -> None:
|
|
"""Scan a directory for log files and extract indicators."""
|
|
log_patterns = ["*.log", "*.json", "*.jsonl"]
|
|
|
|
for pattern in log_patterns:
|
|
for log_file in dir_path.rglob(pattern):
|
|
for indicator in self.log_parser.parse_log_file(log_file):
|
|
self._add_indicator(indicator)
|
|
|
|
def _collect_from_terraform(self, tf_path: Path) -> None:
|
|
"""Extract indicators referenced in Terraform files."""
|
|
for tf_file in tf_path.glob("*.tf"):
|
|
try:
|
|
content = tf_file.read_text(encoding="utf-8")
|
|
|
|
# Extract IPs from allow/block rules
|
|
ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}(?:/\d{1,2})?\b'
|
|
for match in re.finditer(ip_pattern, content):
|
|
ip = match.group()
|
|
# Only flag if in a block context
|
|
context_start = max(0, match.start() - 100)
|
|
context = content[context_start:match.start()].lower()
|
|
if "block" in context or "deny" in context:
|
|
self._add_indicator(ThreatIndicator(
|
|
indicator_type="ip",
|
|
value=ip,
|
|
confidence=0.9,
|
|
severity="medium",
|
|
sources=["terraform_blocklist"],
|
|
tags=["existing_rule", "blocked_ip"],
|
|
context={"file": str(tf_file.name)}
|
|
))
|
|
except Exception:
|
|
pass
|
|
|
|
def _add_indicator(self, indicator: ThreatIndicator) -> None:
|
|
"""Add indicator with deduplication."""
|
|
key = indicator.fingerprint
|
|
if key in self._indicators:
|
|
self._indicators[key].merge(indicator)
|
|
else:
|
|
self._indicators[key] = indicator
|
|
|
|
def _enrich_all_indicators(self) -> None:
|
|
"""Enrich all IP indicators with external intelligence."""
|
|
if not self.external_feed:
|
|
return
|
|
|
|
for key, indicator in list(self._indicators.items()):
|
|
if indicator.indicator_type == "ip":
|
|
self.external_feed.enrich_indicator(indicator)
|
|
|
|
|
|
# CLI interface for testing
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
workspace = sys.argv[1] if len(sys.argv) > 1 else "."
|
|
|
|
collector = ThreatIntelCollector(
|
|
workspace_path=workspace,
|
|
enable_external=False # Don't hit APIs in CLI test
|
|
)
|
|
|
|
report = collector.collect()
|
|
|
|
print(f"\n🔍 Threat Intelligence Report")
|
|
print(f"=" * 50)
|
|
print(f"Sources: {', '.join(report.sources_queried)}")
|
|
print(f"Total indicators: {len(report.indicators)}")
|
|
print(f"Critical: {report.critical_count} | High: {report.high_count}")
|
|
print(f"\nTop 10 Indicators:")
|
|
print("-" * 50)
|
|
|
|
for ind in report.top_indicators(10):
|
|
print(f" [{ind.severity.upper():8}] {ind.indicator_type}={ind.value[:50]}")
|
|
print(f" confidence={ind.confidence:.2f} hits={ind.hit_count} sources={ind.sources}")
|