#!/usr/bin/env python3 """ ████████╗███████╗███╗ ███╗ ╚══██╔══╝██╔════╝████╗ ████║ ██║ █████╗ ██╔████╔██║ ██║ ██╔══╝ ██║╚██╔╝██║ ██║ ███████╗██║ ╚═╝ ██║ ╚═╝ ╚══════╝╚═╝ ╚═╝ TEM — THREAT TRANSMUTATION ENGINE "Every attack becomes a lesson. Every danger becomes capability." Tem is the alchemical organ of VaultMesh that transmutes threats into defensive knowledge. When Shield detects a vulnerability, Tem doesn't just log it — Tem *learns* from it, generates countermeasures, and strengthens the system's immune response. The Four Transmutation Phases: 1. NIGREDO — Threat ingestion (the raw material) 2. ALBEDO — Analysis & pattern extraction (purification) 3. CITRINITAS — Rule generation (solar consciousness) 4. RUBEDO — Integration & hardening (the Stone complete) Usage: # As standalone daemon python3 tem.py daemon --watch ~/vaultmesh/shield/ # Process single finding python3 tem.py transmute --finding CVE-2024-1234 # Generate defense rules python3 tem.py generate --format sigma # View transmutation history python3 tem.py history # Integrate with offsec_mcp_live.py from tem import TemEngine tem = TemEngine(vaultmesh_root) await tem.transmute(finding) The system learns. The system remembers. The system adapts. """ import asyncio import hashlib import json import os import re import sqlite3 import sys import time import uuid from dataclasses import dataclass, field, asdict from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Optional, List, Dict, Any, Callable from watchfiles import awatch try: import httpx HTTPX_AVAILABLE = True except ImportError: HTTPX_AVAILABLE = False # ═══════════════════════════════════════════════════════════════════════════════ # ALCHEMICAL PHASES # ═══════════════════════════════════════════════════════════════════════════════ class Phase(Enum): """The four phases of alchemical transmutation.""" NIGREDO = "nigredo" # Blackening — raw threat ingestion ALBEDO = "albedo" # Whitening — analysis & extraction CITRINITAS = "citrinitas" # Yellowing — rule generation RUBEDO = "rubedo" # Reddening — integration complete class Severity(Enum): """Threat severity levels.""" CRITICAL = "critical" HIGH = "high" MEDIUM = "medium" LOW = "low" INFO = "info" # ═══════════════════════════════════════════════════════════════════════════════ # DATA STRUCTURES # ═══════════════════════════════════════════════════════════════════════════════ @dataclass class Threat: """A raw threat from Shield (Nigredo phase).""" id: str source: str # CVE, nuclei template, custom severity: Severity description: str target: str # Affected system/component detected_at: str raw_data: Dict[str, Any] = field(default_factory=dict) @classmethod def from_shield_finding(cls, finding: Dict) -> "Threat": return cls( id=finding.get("id", f"threat_{uuid.uuid4().hex[:8]}"), source=finding.get("source", "shield"), severity=Severity(finding.get("severity", "medium")), description=finding.get("description", "Unknown threat"), target=finding.get("target", "unknown"), detected_at=finding.get("detected_at", datetime.now(timezone.utc).isoformat()), raw_data=finding ) @dataclass class Pattern: """An extracted pattern from threat analysis (Albedo phase).""" id: str threat_id: str pattern_type: str # cve, misconfiguration, exposure, behavior indicators: List[str] # IOCs, signatures, patterns attack_vector: str # network, local, physical exploit_complexity: str # low, high privileges_required: str # none, low, high user_interaction: str # none, required scope: str # unchanged, changed impact: Dict[str, str] # confidentiality, integrity, availability extracted_at: str def cvss_estimate(self) -> float: """Estimate CVSS-like score from pattern attributes.""" base = 5.0 # Attack vector if self.attack_vector == "network": base += 1.5 elif self.attack_vector == "local": base += 0.5 # Complexity if self.exploit_complexity == "low": base += 1.0 # Privileges if self.privileges_required == "none": base += 1.0 # User interaction if self.user_interaction == "none": base += 0.5 # Impact for impact_type, level in self.impact.items(): if level == "high": base += 0.5 elif level == "low": base += 0.2 return min(10.0, base) @dataclass class Rule: """A generated defense rule (Citrinitas phase).""" id: str pattern_id: str rule_type: str # sigma, yara, snort, iptables, custom rule_content: str # The actual rule description: str confidence: float # 0.0 - 1.0 false_positive_rate: str # low, medium, high generated_at: str validated: bool = False deployed: bool = False @dataclass class Transmutation: """A complete transmutation record (Rubedo phase).""" id: str threat: Threat patterns: List[Pattern] rules: List[Rule] phase: Phase started_at: str completed_at: Optional[str] = None proof_id: Optional[str] = None # Link to VaultMesh proof notes: str = "" # ═══════════════════════════════════════════════════════════════════════════════ # PATTERN EXTRACTORS # ═══════════════════════════════════════════════════════════════════════════════ class PatternExtractor: """Base class for pattern extraction strategies.""" def extract(self, threat: Threat) -> List[Pattern]: raise NotImplementedError class CVEExtractor(PatternExtractor): """Extract patterns from CVE-based threats.""" # Known CVE patterns and their typical characteristics CVE_PATTERNS = { r"buffer.?overflow": { "pattern_type": "memory_corruption", "attack_vector": "network", "exploit_complexity": "high", "impact": {"confidentiality": "high", "integrity": "high", "availability": "high"} }, r"sql.?injection": { "pattern_type": "injection", "attack_vector": "network", "exploit_complexity": "low", "impact": {"confidentiality": "high", "integrity": "high", "availability": "low"} }, r"xss|cross.?site.?script": { "pattern_type": "injection", "attack_vector": "network", "exploit_complexity": "low", "impact": {"confidentiality": "low", "integrity": "low", "availability": "none"} }, r"remote.?code.?execution|rce": { "pattern_type": "code_execution", "attack_vector": "network", "exploit_complexity": "low", "impact": {"confidentiality": "high", "integrity": "high", "availability": "high"} }, r"privilege.?escalation": { "pattern_type": "privilege_escalation", "attack_vector": "local", "exploit_complexity": "low", "impact": {"confidentiality": "high", "integrity": "high", "availability": "high"} }, r"denial.?of.?service|dos": { "pattern_type": "availability", "attack_vector": "network", "exploit_complexity": "low", "impact": {"confidentiality": "none", "integrity": "none", "availability": "high"} }, r"authentication.?bypass": { "pattern_type": "authentication", "attack_vector": "network", "exploit_complexity": "low", "impact": {"confidentiality": "high", "integrity": "high", "availability": "low"} }, r"information.?disclosure|info.?leak": { "pattern_type": "information_disclosure", "attack_vector": "network", "exploit_complexity": "low", "impact": {"confidentiality": "high", "integrity": "none", "availability": "none"} }, } def extract(self, threat: Threat) -> List[Pattern]: patterns = [] desc_lower = threat.description.lower() for regex, attrs in self.CVE_PATTERNS.items(): if re.search(regex, desc_lower): pattern = Pattern( id=f"pat_{uuid.uuid4().hex[:8]}", threat_id=threat.id, pattern_type=attrs["pattern_type"], indicators=[threat.id, regex], attack_vector=attrs["attack_vector"], exploit_complexity=attrs["exploit_complexity"], privileges_required="none", user_interaction="none", scope="unchanged", impact=attrs["impact"], extracted_at=datetime.now(timezone.utc).isoformat() ) patterns.append(pattern) # Default pattern if no specific match if not patterns: patterns.append(Pattern( id=f"pat_{uuid.uuid4().hex[:8]}", threat_id=threat.id, pattern_type="unknown", indicators=[threat.id], attack_vector="network", exploit_complexity="high", privileges_required="low", user_interaction="none", scope="unchanged", impact={"confidentiality": "low", "integrity": "low", "availability": "low"}, extracted_at=datetime.now(timezone.utc).isoformat() )) return patterns class MisconfigExtractor(PatternExtractor): """Extract patterns from misconfiguration findings.""" MISCONFIG_PATTERNS = { r"exposed|public|open": "exposure", r"default.?password|weak.?password": "credential", r"missing.?header|no.?csp|no.?hsts": "header_missing", r"outdated|deprecated|eol": "version", r"debug|verbose|trace": "information_disclosure", r"permission|chmod|world.?readable": "permission", } def extract(self, threat: Threat) -> List[Pattern]: patterns = [] desc_lower = threat.description.lower() for regex, pattern_type in self.MISCONFIG_PATTERNS.items(): if re.search(regex, desc_lower): patterns.append(Pattern( id=f"pat_{uuid.uuid4().hex[:8]}", threat_id=threat.id, pattern_type=f"misconfiguration_{pattern_type}", indicators=[regex, threat.target], attack_vector="network", exploit_complexity="low", privileges_required="none", user_interaction="none", scope="unchanged", impact={"confidentiality": "medium", "integrity": "low", "availability": "low"}, extracted_at=datetime.now(timezone.utc).isoformat() )) return patterns if patterns else [self._default_pattern(threat)] def _default_pattern(self, threat: Threat) -> Pattern: return Pattern( id=f"pat_{uuid.uuid4().hex[:8]}", threat_id=threat.id, pattern_type="misconfiguration_generic", indicators=[threat.target], attack_vector="network", exploit_complexity="low", privileges_required="none", user_interaction="none", scope="unchanged", impact={"confidentiality": "low", "integrity": "low", "availability": "low"}, extracted_at=datetime.now(timezone.utc).isoformat() ) # ═══════════════════════════════════════════════════════════════════════════════ # RULE GENERATORS # ═══════════════════════════════════════════════════════════════════════════════ class RuleGenerator: """Base class for defense rule generation.""" def generate(self, pattern: Pattern, threat: Threat) -> List[Rule]: raise NotImplementedError class SigmaGenerator(RuleGenerator): """Generate Sigma detection rules.""" def generate(self, pattern: Pattern, threat: Threat) -> List[Rule]: rules = [] # Generate Sigma rule based on pattern type sigma_template = self._get_template(pattern) if sigma_template: rule_content = sigma_template.format( title=f"TEM: {threat.id} Detection", description=threat.description[:100], threat_id=threat.id, pattern_id=pattern.id, target=threat.target, severity=threat.severity.value, date=datetime.now().strftime("%Y/%m/%d"), ) rules.append(Rule( id=f"rule_{uuid.uuid4().hex[:8]}", pattern_id=pattern.id, rule_type="sigma", rule_content=rule_content, description=f"Sigma rule for {threat.id}", confidence=0.7, false_positive_rate="medium", generated_at=datetime.now(timezone.utc).isoformat() )) return rules def _get_template(self, pattern: Pattern) -> Optional[str]: templates = { "injection": """title: {title} id: {pattern_id} status: experimental description: {description} references: - https://tem.vaultmesh.local/{threat_id} author: TEM Engine date: {date} tags: - attack.initial_access - tem.generated logsource: category: webserver detection: selection: cs-uri-query|contains: - "'" - '"' - '--' - ';' - 'UNION' - 'SELECT' condition: selection fields: - cs-uri-query - c-ip - cs-host falsepositives: - Legitimate queries with special characters level: {severity} """, "code_execution": """title: {title} id: {pattern_id} status: experimental description: {description} references: - https://tem.vaultmesh.local/{threat_id} author: TEM Engine date: {date} tags: - attack.execution - tem.generated logsource: category: process_creation product: linux detection: selection: CommandLine|contains: - 'curl|bash' - 'wget|sh' - 'python -c' - 'perl -e' - 'ruby -e' condition: selection falsepositives: - Legitimate administrative scripts level: {severity} """, "authentication": """title: {title} id: {pattern_id} status: experimental description: {description} references: - https://tem.vaultmesh.local/{threat_id} author: TEM Engine date: {date} tags: - attack.credential_access - tem.generated logsource: category: authentication detection: selection: EventType: authentication_failure timeframe: 5m condition: selection | count() > 10 falsepositives: - Password reset procedures level: {severity} """, } for key in templates: if key in pattern.pattern_type: return templates[key] # Generic fallback template return """title: {title} id: {pattern_id} status: experimental description: {description} references: - https://tem.vaultmesh.local/{threat_id} author: TEM Engine date: {date} tags: - tem.generated logsource: category: application detection: keywords: - '{target}' condition: keywords falsepositives: - Unknown level: {severity} """ class FirewallGenerator(RuleGenerator): """Generate firewall/iptables rules.""" def generate(self, pattern: Pattern, threat: Threat) -> List[Rule]: rules = [] # Only generate for network-based threats if pattern.attack_vector != "network": return rules # Extract IP if present in target ip_match = re.search(r'\d+\.\d+\.\d+\.\d+', threat.target) if ip_match or "exposure" in pattern.pattern_type: rule_content = f"""# TEM Generated: {threat.id} # Pattern: {pattern.id} # Generated: {datetime.now(timezone.utc).isoformat()} # Rate limit potential attack source iptables -A INPUT -p tcp --dport 80 -m state --state NEW -m recent --set --name HTTP iptables -A INPUT -p tcp --dport 80 -m state --state NEW -m recent --update --seconds 60 --hitcount 20 --name HTTP -j DROP # Log suspicious activity iptables -A INPUT -p tcp --dport 80 -j LOG --log-prefix "TEM_{threat.id}: " """ rules.append(Rule( id=f"rule_{uuid.uuid4().hex[:8]}", pattern_id=pattern.id, rule_type="iptables", rule_content=rule_content, description=f"Firewall rule for {threat.id}", confidence=0.6, false_positive_rate="low", generated_at=datetime.now(timezone.utc).isoformat() )) return rules class RemediationGenerator(RuleGenerator): """Generate remediation recommendations.""" REMEDIATION_TEMPLATES = { "buffer_overflow": "Update affected software to patched version. If unavailable, consider WAF rules or network segmentation.", "injection": "Implement parameterized queries. Enable input validation. Deploy WAF with injection detection.", "code_execution": "Patch immediately. Restrict network access. Enable process monitoring. Consider application sandboxing.", "privilege_escalation": "Apply security patches. Review user permissions. Enable audit logging.", "authentication": "Enforce MFA. Implement account lockout. Review authentication logs.", "information_disclosure": "Remove debug endpoints. Configure proper error handling. Review exposed data.", "misconfiguration": "Review and harden configuration. Apply CIS benchmarks. Enable configuration drift detection.", "exposure": "Restrict network access. Implement authentication. Enable TLS.", "credential": "Rotate credentials. Enforce password policy. Remove default accounts.", "version": "Update to supported version. Enable auto-updates where safe. Track EOL dates.", } def generate(self, pattern: Pattern, threat: Threat) -> List[Rule]: rules = [] # Find matching remediation for key, template in self.REMEDIATION_TEMPLATES.items(): if key in pattern.pattern_type.lower(): rule_content = f"""# TEM Remediation: {threat.id} # Pattern: {pattern.pattern_type} # Target: {threat.target} # Severity: {threat.severity.value} # Generated: {datetime.now(timezone.utc).isoformat()} ## Recommended Actions {template} ## Verification Steps 1. Apply recommended fix 2. Rescan target: {threat.target} 3. Verify finding no longer present 4. Generate proof: `proof generate remediation.{threat.id}` ## References - Threat ID: {threat.id} - Pattern ID: {pattern.id} - CVSS Estimate: {pattern.cvss_estimate():.1f} """ rules.append(Rule( id=f"rule_{uuid.uuid4().hex[:8]}", pattern_id=pattern.id, rule_type="remediation", rule_content=rule_content, description=f"Remediation guide for {threat.id}", confidence=0.8, false_positive_rate="low", generated_at=datetime.now(timezone.utc).isoformat() )) break return rules # ═══════════════════════════════════════════════════════════════════════════════ # TEM ENGINE # ═══════════════════════════════════════════════════════════════════════════════ class TemEngine: """ The Threat Transmutation Engine. Transforms threats into defensive capabilities through the four alchemical phases: Nigredo → Albedo → Citrinitas → Rubedo. """ def __init__(self, vaultmesh_root: Path): self.root = Path(vaultmesh_root) self.tem_dir = self.root / "tem" self.rules_dir = self.tem_dir / "rules" self.db_path = self.tem_dir / "tem.db" # Ensure directories exist self.tem_dir.mkdir(parents=True, exist_ok=True) self.rules_dir.mkdir(exist_ok=True) # Initialize database self._init_db() # Pattern extractors self.extractors: List[PatternExtractor] = [ CVEExtractor(), MisconfigExtractor(), ] # Rule generators self.generators: List[RuleGenerator] = [ SigmaGenerator(), FirewallGenerator(), RemediationGenerator(), ] # Event callbacks self.on_transmute: Optional[Callable[[Transmutation], None]] = None self.on_phase_change: Optional[Callable[[str, Phase], None]] = None def _init_db(self): """Initialize SQLite database.""" conn = sqlite3.connect(str(self.db_path)) cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS transmutations ( id TEXT PRIMARY KEY, threat_id TEXT NOT NULL, phase TEXT NOT NULL, started_at TEXT NOT NULL, completed_at TEXT, proof_id TEXT, data TEXT NOT NULL ) """) cur.execute(""" CREATE TABLE IF NOT EXISTS patterns ( id TEXT PRIMARY KEY, threat_id TEXT NOT NULL, pattern_type TEXT NOT NULL, data TEXT NOT NULL, extracted_at TEXT NOT NULL ) """) cur.execute(""" CREATE TABLE IF NOT EXISTS rules ( id TEXT PRIMARY KEY, pattern_id TEXT NOT NULL, rule_type TEXT NOT NULL, rule_content TEXT NOT NULL, confidence REAL, validated INTEGER DEFAULT 0, deployed INTEGER DEFAULT 0, generated_at TEXT NOT NULL ) """) cur.execute(""" CREATE INDEX IF NOT EXISTS idx_transmutations_threat ON transmutations(threat_id) """) cur.execute(""" CREATE INDEX IF NOT EXISTS idx_patterns_threat ON patterns(threat_id) """) conn.commit() conn.close() def _get_db(self): conn = sqlite3.connect(str(self.db_path)) conn.row_factory = sqlite3.Row return conn # ═══════════════════════════════════════════════════════════════════════════ # THE FOUR PHASES # ═══════════════════════════════════════════════════════════════════════════ def _nigredo(self, finding: Dict) -> Threat: """ NIGREDO — Blackening Ingest raw threat material from Shield findings. The prima materia enters the athanor. """ threat = Threat.from_shield_finding(finding) if self.on_phase_change: self.on_phase_change(threat.id, Phase.NIGREDO) return threat def _albedo(self, threat: Threat) -> List[Pattern]: """ ALBEDO — Whitening Purify and analyze the threat, extracting patterns. Separation of the essential from the accidental. """ patterns = [] for extractor in self.extractors: extracted = extractor.extract(threat) patterns.extend(extracted) # Store patterns conn = self._get_db() cur = conn.cursor() for pattern in patterns: cur.execute( """INSERT OR REPLACE INTO patterns (id, threat_id, pattern_type, data, extracted_at) VALUES (?, ?, ?, ?, ?)""", (pattern.id, pattern.threat_id, pattern.pattern_type, json.dumps(asdict(pattern)), pattern.extracted_at) ) conn.commit() conn.close() if self.on_phase_change: self.on_phase_change(threat.id, Phase.ALBEDO) return patterns def _citrinitas(self, patterns: List[Pattern], threat: Threat) -> List[Rule]: """ CITRINITAS — Yellowing Solar consciousness dawns. Generate defense rules from the purified patterns. """ rules = [] for pattern in patterns: for generator in self.generators: generated = generator.generate(pattern, threat) rules.extend(generated) # Store rules conn = self._get_db() cur = conn.cursor() for rule in rules: cur.execute( """INSERT OR REPLACE INTO rules (id, pattern_id, rule_type, rule_content, confidence, validated, deployed, generated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", (rule.id, rule.pattern_id, rule.rule_type, rule.rule_content, rule.confidence, int(rule.validated), int(rule.deployed), rule.generated_at) ) # Write rule file rule_file = self.rules_dir / f"{rule.id}.{rule.rule_type}" rule_file.write_text(rule.rule_content) conn.commit() conn.close() if self.on_phase_change: self.on_phase_change(threat.id, Phase.CITRINITAS) return rules def _rubedo(self, threat: Threat, patterns: List[Pattern], rules: List[Rule], started_at: str) -> Transmutation: """ RUBEDO — Reddening The Stone is complete. Integration of all elements into the sovereign defense. """ transmutation = Transmutation( id=f"trans_{uuid.uuid4().hex[:8]}", threat=threat, patterns=patterns, rules=rules, phase=Phase.RUBEDO, started_at=started_at, completed_at=datetime.now(timezone.utc).isoformat() ) # Store transmutation conn = self._get_db() cur = conn.cursor() cur.execute( """INSERT INTO transmutations (id, threat_id, phase, started_at, completed_at, proof_id, data) VALUES (?, ?, ?, ?, ?, ?, ?)""", (transmutation.id, threat.id, Phase.RUBEDO.value, transmutation.started_at, transmutation.completed_at, transmutation.proof_id, json.dumps({ "threat": asdict(threat), "pattern_count": len(patterns), "rule_count": len(rules) })) ) conn.commit() conn.close() # Write transmutation record trans_file = self.tem_dir / f"{transmutation.id}.json" trans_file.write_text(json.dumps({ "id": transmutation.id, "threat_id": threat.id, "phase": Phase.RUBEDO.value, "started_at": transmutation.started_at, "completed_at": transmutation.completed_at, "patterns": len(patterns), "rules": len(rules), "rule_types": list(set(r.rule_type for r in rules)) }, indent=2)) if self.on_phase_change: self.on_phase_change(threat.id, Phase.RUBEDO) return transmutation # ═══════════════════════════════════════════════════════════════════════════ # PUBLIC API # ═══════════════════════════════════════════════════════════════════════════ async def transmute(self, finding: Dict) -> Transmutation: """ Execute complete transmutation of a threat. Nigredo → Albedo → Citrinitas → Rubedo Returns the complete Transmutation record. """ started_at = datetime.now(timezone.utc).isoformat() # Phase 1: Nigredo threat = self._nigredo(finding) # Phase 2: Albedo patterns = self._albedo(threat) # Phase 3: Citrinitas rules = self._citrinitas(patterns, threat) # Phase 4: Rubedo transmutation = self._rubedo(threat, patterns, rules, started_at) if self.on_transmute: self.on_transmute(transmutation) return transmutation async def transmute_batch(self, findings: List[Dict]) -> List[Transmutation]: """Transmute multiple findings.""" return [await self.transmute(f) for f in findings] def get_rules(self, rule_type: Optional[str] = None, deployed_only: bool = False) -> List[Dict]: """Get generated rules.""" conn = self._get_db() cur = conn.cursor() query = "SELECT * FROM rules WHERE 1=1" params = [] if rule_type: query += " AND rule_type = ?" params.append(rule_type) if deployed_only: query += " AND deployed = 1" cur.execute(query, params) rows = cur.fetchall() conn.close() return [dict(row) for row in rows] def get_transmutations(self, limit: int = 50) -> List[Dict]: """Get transmutation history.""" conn = self._get_db() cur = conn.cursor() cur.execute( "SELECT * FROM transmutations ORDER BY completed_at DESC LIMIT ?", (limit,) ) rows = cur.fetchall() conn.close() return [dict(row) for row in rows] def get_stats(self) -> Dict: """Get transmutation statistics.""" conn = self._get_db() cur = conn.cursor() cur.execute("SELECT COUNT(*) FROM transmutations") total_transmutations = cur.fetchone()[0] cur.execute("SELECT COUNT(*) FROM patterns") total_patterns = cur.fetchone()[0] cur.execute("SELECT COUNT(*) FROM rules") total_rules = cur.fetchone()[0] cur.execute("SELECT rule_type, COUNT(*) FROM rules GROUP BY rule_type") rules_by_type = dict(cur.fetchall()) cur.execute("SELECT COUNT(*) FROM rules WHERE deployed = 1") deployed_rules = cur.fetchone()[0] conn.close() return { "transmutations": total_transmutations, "patterns": total_patterns, "rules": total_rules, "rules_by_type": rules_by_type, "deployed_rules": deployed_rules } def deploy_rule(self, rule_id: str) -> bool: """Mark a rule as deployed.""" conn = self._get_db() cur = conn.cursor() cur.execute("UPDATE rules SET deployed = 1 WHERE id = ?", (rule_id,)) affected = cur.rowcount conn.commit() conn.close() return affected > 0 def validate_rule(self, rule_id: str) -> bool: """Mark a rule as validated.""" conn = self._get_db() cur = conn.cursor() cur.execute("UPDATE rules SET validated = 1 WHERE id = ?", (rule_id,)) affected = cur.rowcount conn.commit() conn.close() return affected > 0 # ═══════════════════════════════════════════════════════════════════════════════ # DAEMON MODE # ═══════════════════════════════════════════════════════════════════════════════ class TemDaemon: """ Daemon that watches Shield findings and auto-transmutes. """ def __init__(self, vaultmesh_root: Path): self.root = Path(vaultmesh_root) self.engine = TemEngine(vaultmesh_root) self.shield_dir = self.root / "shield" self.running = False async def start(self): """Start watching for Shield findings.""" self.running = True print(f"⚗ TEM Daemon started") print(f" Watching: {self.shield_dir}") print(f" Rules: {self.engine.rules_dir}") print() try: async for changes in awatch(self.shield_dir): if not self.running: break for change_type, path in changes: if Path(path).name == "latest_scan.json": await self._process_scan(Path(path)) except Exception as e: print(f"⚠ Daemon error: {e}") finally: print("⚗ TEM Daemon stopped") async def _process_scan(self, scan_file: Path): """Process new scan results.""" try: data = json.loads(scan_file.read_text()) findings = data.get("findings", []) if not findings: return print(f"🜂 Processing {len(findings)} findings...") for finding in findings: trans = await self.engine.transmute(finding) print(f" ✓ {trans.threat.id} → {len(trans.rules)} rules") stats = self.engine.get_stats() print(f"🜏 Transmutation complete. Total rules: {stats['rules']}") print() except Exception as e: print(f"⚠ Process error: {e}") def stop(self): """Stop the daemon.""" self.running = False # ═══════════════════════════════════════════════════════════════════════════════ # CLI # ═══════════════════════════════════════════════════════════════════════════════ def print_banner(): print(""" ████████╗███████╗███╗ ███╗ ╚══██╔══╝██╔════╝████╗ ████║ ██║ █████╗ ██╔████╔██║ ██║ ██╔══╝ ██║╚██╔╝██║ ██║ ███████╗██║ ╚═╝ ██║ ╚═╝ ╚══════╝╚═╝ ╚═╝ Threat Transmutation Engine "Every attack becomes a lesson" """) def main(): import argparse parser = argparse.ArgumentParser(description="TEM - Threat Transmutation Engine") parser.add_argument("command", choices=["daemon", "transmute", "generate", "history", "stats", "rules"], help="Command to run") parser.add_argument("--watch", type=str, default=None, help="Directory to watch (daemon mode)") parser.add_argument("--finding", type=str, default=None, help="Finding ID or JSON to transmute") parser.add_argument("--format", type=str, default="sigma", help="Rule format for generate command") parser.add_argument("--root", type=str, default=None, help="VaultMesh root directory") args = parser.parse_args() root = Path(args.root) if args.root else Path(os.environ.get("VAULTMESH_ROOT", Path.home() / "vaultmesh")) print_banner() if args.command == "daemon": daemon = TemDaemon(root) try: asyncio.run(daemon.start()) except KeyboardInterrupt: daemon.stop() elif args.command == "transmute": if not args.finding: print("Error: --finding required") sys.exit(1) # Parse finding (could be JSON or ID) try: if args.finding.startswith("{"): finding = json.loads(args.finding) else: finding = {"id": args.finding, "severity": "medium", "description": "Manual finding"} except json.JSONDecodeError: finding = {"id": args.finding, "severity": "medium", "description": "Manual finding"} engine = TemEngine(root) trans = asyncio.run(engine.transmute(finding)) print(f"✓ Transmutation complete: {trans.id}") print(f" Threat: {trans.threat.id}") print(f" Patterns: {len(trans.patterns)}") print(f" Rules: {len(trans.rules)}") print(f" Duration: {trans.started_at} → {trans.completed_at}") elif args.command == "generate": engine = TemEngine(root) rules = engine.get_rules(rule_type=args.format) print(f"Generated {len(rules)} {args.format} rules:\n") for rule in rules: print(f"--- {rule['id']} ---") print(rule['rule_content']) print() elif args.command == "history": engine = TemEngine(root) transmutations = engine.get_transmutations() print(f"Transmutation History ({len(transmutations)} records):\n") print(f"{'ID':<20} {'THREAT':<20} {'PHASE':<12} {'COMPLETED':<20}") print("-" * 75) for t in transmutations: print(f"{t['id']:<20} {t['threat_id']:<20} {t['phase']:<12} {t['completed_at'][:19]}") elif args.command == "stats": engine = TemEngine(root) stats = engine.get_stats() print("TEM Statistics:\n") print(f" Transmutations: {stats['transmutations']}") print(f" Patterns: {stats['patterns']}") print(f" Rules: {stats['rules']}") print(f" Deployed: {stats['deployed_rules']}") print(f"\n Rules by Type:") for rtype, count in stats['rules_by_type'].items(): print(f" {rtype}: {count}") elif args.command == "rules": engine = TemEngine(root) rules = engine.get_rules() print(f"Generated Rules ({len(rules)} total):\n") print(f"{'ID':<20} {'TYPE':<12} {'PATTERN':<20} {'CONF':<6} {'DEPLOYED':<8}") print("-" * 75) for r in rules: deployed = "✓" if r['deployed'] else "" print(f"{r['id']:<20} {r['rule_type']:<12} {r['pattern_id']:<20} {r['confidence']:<6.2f} {deployed:<8}") if __name__ == "__main__": main()