Files
test/tem/tem.py
Vault Sovereign 1583890199 Initial commit - combined iTerm2 scripts
Contains:
- 1m-brag
- tem
- VaultMesh_Catalog_v1
- VAULTMESH-ETERNAL-PATTERN

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-28 03:58:39 +00:00

1147 lines
41 KiB
Python

#!/usr/bin/env python3
"""
████████╗███████╗███╗ ███╗
╚══██╔══╝██╔════╝████╗ ████║
██║ █████╗ ██╔████╔██║
██║ ██╔══╝ ██║╚██╔╝██║
██║ ███████╗██║ ╚═╝ ██║
╚═╝ ╚══════╝╚═╝ ╚═╝
TEM — THREAT TRANSMUTATION ENGINE
"Every attack becomes a lesson. Every danger becomes capability."
Tem is the alchemical organ of VaultMesh that transmutes threats into
defensive knowledge. When Shield detects a vulnerability, Tem doesn't
just log it — Tem *learns* from it, generates countermeasures, and
strengthens the system's immune response.
The Four Transmutation Phases:
1. NIGREDO — Threat ingestion (the raw material)
2. ALBEDO — Analysis & pattern extraction (purification)
3. CITRINITAS — Rule generation (solar consciousness)
4. RUBEDO — Integration & hardening (the Stone complete)
Usage:
# As standalone daemon
python3 tem.py daemon --watch ~/vaultmesh/shield/
# Process single finding
python3 tem.py transmute --finding CVE-2024-1234
# Generate defense rules
python3 tem.py generate --format sigma
# View transmutation history
python3 tem.py history
# Integrate with offsec_mcp_live.py
from tem import TemEngine
tem = TemEngine(vaultmesh_root)
await tem.transmute(finding)
The system learns. The system remembers. The system adapts.
"""
import asyncio
import hashlib
import json
import os
import re
import sqlite3
import sys
import time
import uuid
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Optional, List, Dict, Any, Callable
from watchfiles import awatch
try:
import httpx
HTTPX_AVAILABLE = True
except ImportError:
HTTPX_AVAILABLE = False
# ═══════════════════════════════════════════════════════════════════════════════
# ALCHEMICAL PHASES
# ═══════════════════════════════════════════════════════════════════════════════
class Phase(Enum):
"""The four phases of alchemical transmutation."""
NIGREDO = "nigredo" # Blackening — raw threat ingestion
ALBEDO = "albedo" # Whitening — analysis & extraction
CITRINITAS = "citrinitas" # Yellowing — rule generation
RUBEDO = "rubedo" # Reddening — integration complete
class Severity(Enum):
"""Threat severity levels."""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
# ═══════════════════════════════════════════════════════════════════════════════
# DATA STRUCTURES
# ═══════════════════════════════════════════════════════════════════════════════
@dataclass
class Threat:
"""A raw threat from Shield (Nigredo phase)."""
id: str
source: str # CVE, nuclei template, custom
severity: Severity
description: str
target: str # Affected system/component
detected_at: str
raw_data: Dict[str, Any] = field(default_factory=dict)
@classmethod
def from_shield_finding(cls, finding: Dict) -> "Threat":
return cls(
id=finding.get("id", f"threat_{uuid.uuid4().hex[:8]}"),
source=finding.get("source", "shield"),
severity=Severity(finding.get("severity", "medium")),
description=finding.get("description", "Unknown threat"),
target=finding.get("target", "unknown"),
detected_at=finding.get("detected_at", datetime.now(timezone.utc).isoformat()),
raw_data=finding
)
@dataclass
class Pattern:
"""An extracted pattern from threat analysis (Albedo phase)."""
id: str
threat_id: str
pattern_type: str # cve, misconfiguration, exposure, behavior
indicators: List[str] # IOCs, signatures, patterns
attack_vector: str # network, local, physical
exploit_complexity: str # low, high
privileges_required: str # none, low, high
user_interaction: str # none, required
scope: str # unchanged, changed
impact: Dict[str, str] # confidentiality, integrity, availability
extracted_at: str
def cvss_estimate(self) -> float:
"""Estimate CVSS-like score from pattern attributes."""
base = 5.0
# Attack vector
if self.attack_vector == "network":
base += 1.5
elif self.attack_vector == "local":
base += 0.5
# Complexity
if self.exploit_complexity == "low":
base += 1.0
# Privileges
if self.privileges_required == "none":
base += 1.0
# User interaction
if self.user_interaction == "none":
base += 0.5
# Impact
for impact_type, level in self.impact.items():
if level == "high":
base += 0.5
elif level == "low":
base += 0.2
return min(10.0, base)
@dataclass
class Rule:
"""A generated defense rule (Citrinitas phase)."""
id: str
pattern_id: str
rule_type: str # sigma, yara, snort, iptables, custom
rule_content: str # The actual rule
description: str
confidence: float # 0.0 - 1.0
false_positive_rate: str # low, medium, high
generated_at: str
validated: bool = False
deployed: bool = False
@dataclass
class Transmutation:
"""A complete transmutation record (Rubedo phase)."""
id: str
threat: Threat
patterns: List[Pattern]
rules: List[Rule]
phase: Phase
started_at: str
completed_at: Optional[str] = None
proof_id: Optional[str] = None # Link to VaultMesh proof
notes: str = ""
# ═══════════════════════════════════════════════════════════════════════════════
# PATTERN EXTRACTORS
# ═══════════════════════════════════════════════════════════════════════════════
class PatternExtractor:
"""Base class for pattern extraction strategies."""
def extract(self, threat: Threat) -> List[Pattern]:
raise NotImplementedError
class CVEExtractor(PatternExtractor):
"""Extract patterns from CVE-based threats."""
# Known CVE patterns and their typical characteristics
CVE_PATTERNS = {
r"buffer.?overflow": {
"pattern_type": "memory_corruption",
"attack_vector": "network",
"exploit_complexity": "high",
"impact": {"confidentiality": "high", "integrity": "high", "availability": "high"}
},
r"sql.?injection": {
"pattern_type": "injection",
"attack_vector": "network",
"exploit_complexity": "low",
"impact": {"confidentiality": "high", "integrity": "high", "availability": "low"}
},
r"xss|cross.?site.?script": {
"pattern_type": "injection",
"attack_vector": "network",
"exploit_complexity": "low",
"impact": {"confidentiality": "low", "integrity": "low", "availability": "none"}
},
r"remote.?code.?execution|rce": {
"pattern_type": "code_execution",
"attack_vector": "network",
"exploit_complexity": "low",
"impact": {"confidentiality": "high", "integrity": "high", "availability": "high"}
},
r"privilege.?escalation": {
"pattern_type": "privilege_escalation",
"attack_vector": "local",
"exploit_complexity": "low",
"impact": {"confidentiality": "high", "integrity": "high", "availability": "high"}
},
r"denial.?of.?service|dos": {
"pattern_type": "availability",
"attack_vector": "network",
"exploit_complexity": "low",
"impact": {"confidentiality": "none", "integrity": "none", "availability": "high"}
},
r"authentication.?bypass": {
"pattern_type": "authentication",
"attack_vector": "network",
"exploit_complexity": "low",
"impact": {"confidentiality": "high", "integrity": "high", "availability": "low"}
},
r"information.?disclosure|info.?leak": {
"pattern_type": "information_disclosure",
"attack_vector": "network",
"exploit_complexity": "low",
"impact": {"confidentiality": "high", "integrity": "none", "availability": "none"}
},
}
def extract(self, threat: Threat) -> List[Pattern]:
patterns = []
desc_lower = threat.description.lower()
for regex, attrs in self.CVE_PATTERNS.items():
if re.search(regex, desc_lower):
pattern = Pattern(
id=f"pat_{uuid.uuid4().hex[:8]}",
threat_id=threat.id,
pattern_type=attrs["pattern_type"],
indicators=[threat.id, regex],
attack_vector=attrs["attack_vector"],
exploit_complexity=attrs["exploit_complexity"],
privileges_required="none",
user_interaction="none",
scope="unchanged",
impact=attrs["impact"],
extracted_at=datetime.now(timezone.utc).isoformat()
)
patterns.append(pattern)
# Default pattern if no specific match
if not patterns:
patterns.append(Pattern(
id=f"pat_{uuid.uuid4().hex[:8]}",
threat_id=threat.id,
pattern_type="unknown",
indicators=[threat.id],
attack_vector="network",
exploit_complexity="high",
privileges_required="low",
user_interaction="none",
scope="unchanged",
impact={"confidentiality": "low", "integrity": "low", "availability": "low"},
extracted_at=datetime.now(timezone.utc).isoformat()
))
return patterns
class MisconfigExtractor(PatternExtractor):
"""Extract patterns from misconfiguration findings."""
MISCONFIG_PATTERNS = {
r"exposed|public|open": "exposure",
r"default.?password|weak.?password": "credential",
r"missing.?header|no.?csp|no.?hsts": "header_missing",
r"outdated|deprecated|eol": "version",
r"debug|verbose|trace": "information_disclosure",
r"permission|chmod|world.?readable": "permission",
}
def extract(self, threat: Threat) -> List[Pattern]:
patterns = []
desc_lower = threat.description.lower()
for regex, pattern_type in self.MISCONFIG_PATTERNS.items():
if re.search(regex, desc_lower):
patterns.append(Pattern(
id=f"pat_{uuid.uuid4().hex[:8]}",
threat_id=threat.id,
pattern_type=f"misconfiguration_{pattern_type}",
indicators=[regex, threat.target],
attack_vector="network",
exploit_complexity="low",
privileges_required="none",
user_interaction="none",
scope="unchanged",
impact={"confidentiality": "medium", "integrity": "low", "availability": "low"},
extracted_at=datetime.now(timezone.utc).isoformat()
))
return patterns if patterns else [self._default_pattern(threat)]
def _default_pattern(self, threat: Threat) -> Pattern:
return Pattern(
id=f"pat_{uuid.uuid4().hex[:8]}",
threat_id=threat.id,
pattern_type="misconfiguration_generic",
indicators=[threat.target],
attack_vector="network",
exploit_complexity="low",
privileges_required="none",
user_interaction="none",
scope="unchanged",
impact={"confidentiality": "low", "integrity": "low", "availability": "low"},
extracted_at=datetime.now(timezone.utc).isoformat()
)
# ═══════════════════════════════════════════════════════════════════════════════
# RULE GENERATORS
# ═══════════════════════════════════════════════════════════════════════════════
class RuleGenerator:
"""Base class for defense rule generation."""
def generate(self, pattern: Pattern, threat: Threat) -> List[Rule]:
raise NotImplementedError
class SigmaGenerator(RuleGenerator):
"""Generate Sigma detection rules."""
def generate(self, pattern: Pattern, threat: Threat) -> List[Rule]:
rules = []
# Generate Sigma rule based on pattern type
sigma_template = self._get_template(pattern)
if sigma_template:
rule_content = sigma_template.format(
title=f"TEM: {threat.id} Detection",
description=threat.description[:100],
threat_id=threat.id,
pattern_id=pattern.id,
target=threat.target,
severity=threat.severity.value,
date=datetime.now().strftime("%Y/%m/%d"),
)
rules.append(Rule(
id=f"rule_{uuid.uuid4().hex[:8]}",
pattern_id=pattern.id,
rule_type="sigma",
rule_content=rule_content,
description=f"Sigma rule for {threat.id}",
confidence=0.7,
false_positive_rate="medium",
generated_at=datetime.now(timezone.utc).isoformat()
))
return rules
def _get_template(self, pattern: Pattern) -> Optional[str]:
templates = {
"injection": """title: {title}
id: {pattern_id}
status: experimental
description: {description}
references:
- https://tem.vaultmesh.local/{threat_id}
author: TEM Engine
date: {date}
tags:
- attack.initial_access
- tem.generated
logsource:
category: webserver
detection:
selection:
cs-uri-query|contains:
- "'"
- '"'
- '--'
- ';'
- 'UNION'
- 'SELECT'
condition: selection
fields:
- cs-uri-query
- c-ip
- cs-host
falsepositives:
- Legitimate queries with special characters
level: {severity}
""",
"code_execution": """title: {title}
id: {pattern_id}
status: experimental
description: {description}
references:
- https://tem.vaultmesh.local/{threat_id}
author: TEM Engine
date: {date}
tags:
- attack.execution
- tem.generated
logsource:
category: process_creation
product: linux
detection:
selection:
CommandLine|contains:
- 'curl|bash'
- 'wget|sh'
- 'python -c'
- 'perl -e'
- 'ruby -e'
condition: selection
falsepositives:
- Legitimate administrative scripts
level: {severity}
""",
"authentication": """title: {title}
id: {pattern_id}
status: experimental
description: {description}
references:
- https://tem.vaultmesh.local/{threat_id}
author: TEM Engine
date: {date}
tags:
- attack.credential_access
- tem.generated
logsource:
category: authentication
detection:
selection:
EventType: authentication_failure
timeframe: 5m
condition: selection | count() > 10
falsepositives:
- Password reset procedures
level: {severity}
""",
}
for key in templates:
if key in pattern.pattern_type:
return templates[key]
# Generic fallback template
return """title: {title}
id: {pattern_id}
status: experimental
description: {description}
references:
- https://tem.vaultmesh.local/{threat_id}
author: TEM Engine
date: {date}
tags:
- tem.generated
logsource:
category: application
detection:
keywords:
- '{target}'
condition: keywords
falsepositives:
- Unknown
level: {severity}
"""
class FirewallGenerator(RuleGenerator):
"""Generate firewall/iptables rules."""
def generate(self, pattern: Pattern, threat: Threat) -> List[Rule]:
rules = []
# Only generate for network-based threats
if pattern.attack_vector != "network":
return rules
# Extract IP if present in target
ip_match = re.search(r'\d+\.\d+\.\d+\.\d+', threat.target)
if ip_match or "exposure" in pattern.pattern_type:
rule_content = f"""# TEM Generated: {threat.id}
# Pattern: {pattern.id}
# Generated: {datetime.now(timezone.utc).isoformat()}
# Rate limit potential attack source
iptables -A INPUT -p tcp --dport 80 -m state --state NEW -m recent --set --name HTTP
iptables -A INPUT -p tcp --dport 80 -m state --state NEW -m recent --update --seconds 60 --hitcount 20 --name HTTP -j DROP
# Log suspicious activity
iptables -A INPUT -p tcp --dport 80 -j LOG --log-prefix "TEM_{threat.id}: "
"""
rules.append(Rule(
id=f"rule_{uuid.uuid4().hex[:8]}",
pattern_id=pattern.id,
rule_type="iptables",
rule_content=rule_content,
description=f"Firewall rule for {threat.id}",
confidence=0.6,
false_positive_rate="low",
generated_at=datetime.now(timezone.utc).isoformat()
))
return rules
class RemediationGenerator(RuleGenerator):
"""Generate remediation recommendations."""
REMEDIATION_TEMPLATES = {
"buffer_overflow": "Update affected software to patched version. If unavailable, consider WAF rules or network segmentation.",
"injection": "Implement parameterized queries. Enable input validation. Deploy WAF with injection detection.",
"code_execution": "Patch immediately. Restrict network access. Enable process monitoring. Consider application sandboxing.",
"privilege_escalation": "Apply security patches. Review user permissions. Enable audit logging.",
"authentication": "Enforce MFA. Implement account lockout. Review authentication logs.",
"information_disclosure": "Remove debug endpoints. Configure proper error handling. Review exposed data.",
"misconfiguration": "Review and harden configuration. Apply CIS benchmarks. Enable configuration drift detection.",
"exposure": "Restrict network access. Implement authentication. Enable TLS.",
"credential": "Rotate credentials. Enforce password policy. Remove default accounts.",
"version": "Update to supported version. Enable auto-updates where safe. Track EOL dates.",
}
def generate(self, pattern: Pattern, threat: Threat) -> List[Rule]:
rules = []
# Find matching remediation
for key, template in self.REMEDIATION_TEMPLATES.items():
if key in pattern.pattern_type.lower():
rule_content = f"""# TEM Remediation: {threat.id}
# Pattern: {pattern.pattern_type}
# Target: {threat.target}
# Severity: {threat.severity.value}
# Generated: {datetime.now(timezone.utc).isoformat()}
## Recommended Actions
{template}
## Verification Steps
1. Apply recommended fix
2. Rescan target: {threat.target}
3. Verify finding no longer present
4. Generate proof: `proof generate remediation.{threat.id}`
## References
- Threat ID: {threat.id}
- Pattern ID: {pattern.id}
- CVSS Estimate: {pattern.cvss_estimate():.1f}
"""
rules.append(Rule(
id=f"rule_{uuid.uuid4().hex[:8]}",
pattern_id=pattern.id,
rule_type="remediation",
rule_content=rule_content,
description=f"Remediation guide for {threat.id}",
confidence=0.8,
false_positive_rate="low",
generated_at=datetime.now(timezone.utc).isoformat()
))
break
return rules
# ═══════════════════════════════════════════════════════════════════════════════
# TEM ENGINE
# ═══════════════════════════════════════════════════════════════════════════════
class TemEngine:
"""
The Threat Transmutation Engine.
Transforms threats into defensive capabilities through the four
alchemical phases: Nigredo → Albedo → Citrinitas → Rubedo.
"""
def __init__(self, vaultmesh_root: Path):
self.root = Path(vaultmesh_root)
self.tem_dir = self.root / "tem"
self.rules_dir = self.tem_dir / "rules"
self.db_path = self.tem_dir / "tem.db"
# Ensure directories exist
self.tem_dir.mkdir(parents=True, exist_ok=True)
self.rules_dir.mkdir(exist_ok=True)
# Initialize database
self._init_db()
# Pattern extractors
self.extractors: List[PatternExtractor] = [
CVEExtractor(),
MisconfigExtractor(),
]
# Rule generators
self.generators: List[RuleGenerator] = [
SigmaGenerator(),
FirewallGenerator(),
RemediationGenerator(),
]
# Event callbacks
self.on_transmute: Optional[Callable[[Transmutation], None]] = None
self.on_phase_change: Optional[Callable[[str, Phase], None]] = None
def _init_db(self):
"""Initialize SQLite database."""
conn = sqlite3.connect(str(self.db_path))
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS transmutations (
id TEXT PRIMARY KEY,
threat_id TEXT NOT NULL,
phase TEXT NOT NULL,
started_at TEXT NOT NULL,
completed_at TEXT,
proof_id TEXT,
data TEXT NOT NULL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS patterns (
id TEXT PRIMARY KEY,
threat_id TEXT NOT NULL,
pattern_type TEXT NOT NULL,
data TEXT NOT NULL,
extracted_at TEXT NOT NULL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS rules (
id TEXT PRIMARY KEY,
pattern_id TEXT NOT NULL,
rule_type TEXT NOT NULL,
rule_content TEXT NOT NULL,
confidence REAL,
validated INTEGER DEFAULT 0,
deployed INTEGER DEFAULT 0,
generated_at TEXT NOT NULL
)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_transmutations_threat
ON transmutations(threat_id)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_patterns_threat
ON patterns(threat_id)
""")
conn.commit()
conn.close()
def _get_db(self):
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
return conn
# ═══════════════════════════════════════════════════════════════════════════
# THE FOUR PHASES
# ═══════════════════════════════════════════════════════════════════════════
def _nigredo(self, finding: Dict) -> Threat:
"""
NIGREDO — Blackening
Ingest raw threat material from Shield findings.
The prima materia enters the athanor.
"""
threat = Threat.from_shield_finding(finding)
if self.on_phase_change:
self.on_phase_change(threat.id, Phase.NIGREDO)
return threat
def _albedo(self, threat: Threat) -> List[Pattern]:
"""
ALBEDO — Whitening
Purify and analyze the threat, extracting patterns.
Separation of the essential from the accidental.
"""
patterns = []
for extractor in self.extractors:
extracted = extractor.extract(threat)
patterns.extend(extracted)
# Store patterns
conn = self._get_db()
cur = conn.cursor()
for pattern in patterns:
cur.execute(
"""INSERT OR REPLACE INTO patterns
(id, threat_id, pattern_type, data, extracted_at)
VALUES (?, ?, ?, ?, ?)""",
(pattern.id, pattern.threat_id, pattern.pattern_type,
json.dumps(asdict(pattern)), pattern.extracted_at)
)
conn.commit()
conn.close()
if self.on_phase_change:
self.on_phase_change(threat.id, Phase.ALBEDO)
return patterns
def _citrinitas(self, patterns: List[Pattern], threat: Threat) -> List[Rule]:
"""
CITRINITAS — Yellowing
Solar consciousness dawns. Generate defense rules
from the purified patterns.
"""
rules = []
for pattern in patterns:
for generator in self.generators:
generated = generator.generate(pattern, threat)
rules.extend(generated)
# Store rules
conn = self._get_db()
cur = conn.cursor()
for rule in rules:
cur.execute(
"""INSERT OR REPLACE INTO rules
(id, pattern_id, rule_type, rule_content, confidence,
validated, deployed, generated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(rule.id, rule.pattern_id, rule.rule_type, rule.rule_content,
rule.confidence, int(rule.validated), int(rule.deployed),
rule.generated_at)
)
# Write rule file
rule_file = self.rules_dir / f"{rule.id}.{rule.rule_type}"
rule_file.write_text(rule.rule_content)
conn.commit()
conn.close()
if self.on_phase_change:
self.on_phase_change(threat.id, Phase.CITRINITAS)
return rules
def _rubedo(self, threat: Threat, patterns: List[Pattern],
rules: List[Rule], started_at: str) -> Transmutation:
"""
RUBEDO — Reddening
The Stone is complete. Integration of all elements
into the sovereign defense.
"""
transmutation = Transmutation(
id=f"trans_{uuid.uuid4().hex[:8]}",
threat=threat,
patterns=patterns,
rules=rules,
phase=Phase.RUBEDO,
started_at=started_at,
completed_at=datetime.now(timezone.utc).isoformat()
)
# Store transmutation
conn = self._get_db()
cur = conn.cursor()
cur.execute(
"""INSERT INTO transmutations
(id, threat_id, phase, started_at, completed_at, proof_id, data)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(transmutation.id, threat.id, Phase.RUBEDO.value,
transmutation.started_at, transmutation.completed_at,
transmutation.proof_id, json.dumps({
"threat": asdict(threat),
"pattern_count": len(patterns),
"rule_count": len(rules)
}))
)
conn.commit()
conn.close()
# Write transmutation record
trans_file = self.tem_dir / f"{transmutation.id}.json"
trans_file.write_text(json.dumps({
"id": transmutation.id,
"threat_id": threat.id,
"phase": Phase.RUBEDO.value,
"started_at": transmutation.started_at,
"completed_at": transmutation.completed_at,
"patterns": len(patterns),
"rules": len(rules),
"rule_types": list(set(r.rule_type for r in rules))
}, indent=2))
if self.on_phase_change:
self.on_phase_change(threat.id, Phase.RUBEDO)
return transmutation
# ═══════════════════════════════════════════════════════════════════════════
# PUBLIC API
# ═══════════════════════════════════════════════════════════════════════════
async def transmute(self, finding: Dict) -> Transmutation:
"""
Execute complete transmutation of a threat.
Nigredo → Albedo → Citrinitas → Rubedo
Returns the complete Transmutation record.
"""
started_at = datetime.now(timezone.utc).isoformat()
# Phase 1: Nigredo
threat = self._nigredo(finding)
# Phase 2: Albedo
patterns = self._albedo(threat)
# Phase 3: Citrinitas
rules = self._citrinitas(patterns, threat)
# Phase 4: Rubedo
transmutation = self._rubedo(threat, patterns, rules, started_at)
if self.on_transmute:
self.on_transmute(transmutation)
return transmutation
async def transmute_batch(self, findings: List[Dict]) -> List[Transmutation]:
"""Transmute multiple findings."""
return [await self.transmute(f) for f in findings]
def get_rules(self, rule_type: Optional[str] = None,
deployed_only: bool = False) -> List[Dict]:
"""Get generated rules."""
conn = self._get_db()
cur = conn.cursor()
query = "SELECT * FROM rules WHERE 1=1"
params = []
if rule_type:
query += " AND rule_type = ?"
params.append(rule_type)
if deployed_only:
query += " AND deployed = 1"
cur.execute(query, params)
rows = cur.fetchall()
conn.close()
return [dict(row) for row in rows]
def get_transmutations(self, limit: int = 50) -> List[Dict]:
"""Get transmutation history."""
conn = self._get_db()
cur = conn.cursor()
cur.execute(
"SELECT * FROM transmutations ORDER BY completed_at DESC LIMIT ?",
(limit,)
)
rows = cur.fetchall()
conn.close()
return [dict(row) for row in rows]
def get_stats(self) -> Dict:
"""Get transmutation statistics."""
conn = self._get_db()
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM transmutations")
total_transmutations = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM patterns")
total_patterns = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM rules")
total_rules = cur.fetchone()[0]
cur.execute("SELECT rule_type, COUNT(*) FROM rules GROUP BY rule_type")
rules_by_type = dict(cur.fetchall())
cur.execute("SELECT COUNT(*) FROM rules WHERE deployed = 1")
deployed_rules = cur.fetchone()[0]
conn.close()
return {
"transmutations": total_transmutations,
"patterns": total_patterns,
"rules": total_rules,
"rules_by_type": rules_by_type,
"deployed_rules": deployed_rules
}
def deploy_rule(self, rule_id: str) -> bool:
"""Mark a rule as deployed."""
conn = self._get_db()
cur = conn.cursor()
cur.execute("UPDATE rules SET deployed = 1 WHERE id = ?", (rule_id,))
affected = cur.rowcount
conn.commit()
conn.close()
return affected > 0
def validate_rule(self, rule_id: str) -> bool:
"""Mark a rule as validated."""
conn = self._get_db()
cur = conn.cursor()
cur.execute("UPDATE rules SET validated = 1 WHERE id = ?", (rule_id,))
affected = cur.rowcount
conn.commit()
conn.close()
return affected > 0
# ═══════════════════════════════════════════════════════════════════════════════
# DAEMON MODE
# ═══════════════════════════════════════════════════════════════════════════════
class TemDaemon:
"""
Daemon that watches Shield findings and auto-transmutes.
"""
def __init__(self, vaultmesh_root: Path):
self.root = Path(vaultmesh_root)
self.engine = TemEngine(vaultmesh_root)
self.shield_dir = self.root / "shield"
self.running = False
async def start(self):
"""Start watching for Shield findings."""
self.running = True
print(f"⚗ TEM Daemon started")
print(f" Watching: {self.shield_dir}")
print(f" Rules: {self.engine.rules_dir}")
print()
try:
async for changes in awatch(self.shield_dir):
if not self.running:
break
for change_type, path in changes:
if Path(path).name == "latest_scan.json":
await self._process_scan(Path(path))
except Exception as e:
print(f"⚠ Daemon error: {e}")
finally:
print("⚗ TEM Daemon stopped")
async def _process_scan(self, scan_file: Path):
"""Process new scan results."""
try:
data = json.loads(scan_file.read_text())
findings = data.get("findings", [])
if not findings:
return
print(f"🜂 Processing {len(findings)} findings...")
for finding in findings:
trans = await self.engine.transmute(finding)
print(f"{trans.threat.id}{len(trans.rules)} rules")
stats = self.engine.get_stats()
print(f"🜏 Transmutation complete. Total rules: {stats['rules']}")
print()
except Exception as e:
print(f"⚠ Process error: {e}")
def stop(self):
"""Stop the daemon."""
self.running = False
# ═══════════════════════════════════════════════════════════════════════════════
# CLI
# ═══════════════════════════════════════════════════════════════════════════════
def print_banner():
print("""
████████╗███████╗███╗ ███╗
╚══██╔══╝██╔════╝████╗ ████║
██║ █████╗ ██╔████╔██║
██║ ██╔══╝ ██║╚██╔╝██║
██║ ███████╗██║ ╚═╝ ██║
╚═╝ ╚══════╝╚═╝ ╚═╝
Threat Transmutation Engine
"Every attack becomes a lesson"
""")
def main():
import argparse
parser = argparse.ArgumentParser(description="TEM - Threat Transmutation Engine")
parser.add_argument("command", choices=["daemon", "transmute", "generate", "history", "stats", "rules"],
help="Command to run")
parser.add_argument("--watch", type=str, default=None,
help="Directory to watch (daemon mode)")
parser.add_argument("--finding", type=str, default=None,
help="Finding ID or JSON to transmute")
parser.add_argument("--format", type=str, default="sigma",
help="Rule format for generate command")
parser.add_argument("--root", type=str, default=None,
help="VaultMesh root directory")
args = parser.parse_args()
root = Path(args.root) if args.root else Path(os.environ.get("VAULTMESH_ROOT", Path.home() / "vaultmesh"))
print_banner()
if args.command == "daemon":
daemon = TemDaemon(root)
try:
asyncio.run(daemon.start())
except KeyboardInterrupt:
daemon.stop()
elif args.command == "transmute":
if not args.finding:
print("Error: --finding required")
sys.exit(1)
# Parse finding (could be JSON or ID)
try:
if args.finding.startswith("{"):
finding = json.loads(args.finding)
else:
finding = {"id": args.finding, "severity": "medium", "description": "Manual finding"}
except json.JSONDecodeError:
finding = {"id": args.finding, "severity": "medium", "description": "Manual finding"}
engine = TemEngine(root)
trans = asyncio.run(engine.transmute(finding))
print(f"✓ Transmutation complete: {trans.id}")
print(f" Threat: {trans.threat.id}")
print(f" Patterns: {len(trans.patterns)}")
print(f" Rules: {len(trans.rules)}")
print(f" Duration: {trans.started_at}{trans.completed_at}")
elif args.command == "generate":
engine = TemEngine(root)
rules = engine.get_rules(rule_type=args.format)
print(f"Generated {len(rules)} {args.format} rules:\n")
for rule in rules:
print(f"--- {rule['id']} ---")
print(rule['rule_content'])
print()
elif args.command == "history":
engine = TemEngine(root)
transmutations = engine.get_transmutations()
print(f"Transmutation History ({len(transmutations)} records):\n")
print(f"{'ID':<20} {'THREAT':<20} {'PHASE':<12} {'COMPLETED':<20}")
print("-" * 75)
for t in transmutations:
print(f"{t['id']:<20} {t['threat_id']:<20} {t['phase']:<12} {t['completed_at'][:19]}")
elif args.command == "stats":
engine = TemEngine(root)
stats = engine.get_stats()
print("TEM Statistics:\n")
print(f" Transmutations: {stats['transmutations']}")
print(f" Patterns: {stats['patterns']}")
print(f" Rules: {stats['rules']}")
print(f" Deployed: {stats['deployed_rules']}")
print(f"\n Rules by Type:")
for rtype, count in stats['rules_by_type'].items():
print(f" {rtype}: {count}")
elif args.command == "rules":
engine = TemEngine(root)
rules = engine.get_rules()
print(f"Generated Rules ({len(rules)} total):\n")
print(f"{'ID':<20} {'TYPE':<12} {'PATTERN':<20} {'CONF':<6} {'DEPLOYED':<8}")
print("-" * 75)
for r in rules:
deployed = "" if r['deployed'] else ""
print(f"{r['id']:<20} {r['rule_type']:<12} {r['pattern_id']:<20} {r['confidence']:<6.2f} {deployed:<8}")
if __name__ == "__main__":
main()