""" Escalation Engine - Profile transitions as first-class proofs Every escalation is: - A receipt (immutable record) - A Tem-context (threat awareness) - A reversibility flag (can it be undone?) - A time-bound (when does it expire?) Escalation is not runtime magic — it is auditable history. """ import json import secrets from datetime import datetime, timezone, timedelta from dataclasses import dataclass, asdict from enum import Enum from pathlib import Path from typing import Any, Dict, Optional, List import os import blake3 # VaultMesh paths VAULTMESH_ROOT = Path(os.environ.get("VAULTMESH_ROOT", Path(__file__).parents[3])).resolve() RECEIPTS_ROOT = VAULTMESH_ROOT / "receipts" ESCALATION_LOG = RECEIPTS_ROOT / "identity" / "escalation_events.jsonl" def _vmhash_blake3(data: bytes) -> str: return f"blake3:{blake3.blake3(data).hexdigest()}" def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _now_ts() -> float: return datetime.now(timezone.utc).timestamp() class EscalationType(Enum): """Types of profile escalation.""" THREAT_DETECTED = "threat_detected" # Automatic: threat confidence > threshold OPERATOR_REQUEST = "operator_request" # Manual: operator requests higher authority CRISIS_DECLARED = "crisis_declared" # Emergency: system failure or attack QUORUM_APPROVED = "quorum_approved" # Governance: multi-sig approval SOVEREIGN_OVERRIDE = "sovereign_override" # Human: direct intervention class DeescalationType(Enum): """Types of profile de-escalation.""" TIMEOUT_EXPIRED = "timeout_expired" # Automatic: time limit reached THREAT_RESOLVED = "threat_resolved" # Automatic: no active threats OPERATOR_RELEASE = "operator_release" # Manual: operator releases authority CRISIS_CONCLUDED = "crisis_concluded" # Phoenix: crisis resolved SOVEREIGN_REVOKE = "sovereign_revoke" # Human: explicit revocation @dataclass class EscalationContext: """Context captured at escalation time.""" threat_id: Optional[str] = None threat_type: Optional[str] = None threat_confidence: Optional[float] = None active_alerts: int = 0 mesh_health: str = "unknown" triggering_tool: Optional[str] = None triggering_decision: Optional[str] = None @dataclass class Escalation: """A profile escalation event.""" escalation_id: str from_profile: str to_profile: str escalation_type: str context: EscalationContext # Reversibility reversible: bool auto_deescalate: bool deescalate_after_seconds: Optional[int] deescalate_on_condition: Optional[str] # Time tracking created_at: str expires_at: Optional[str] # Proof receipt_hash: Optional[str] = None tem_context_hash: Optional[str] = None # State active: bool = True deescalated_at: Optional[str] = None deescalation_type: Optional[str] = None # In-memory active escalations (would be persisted in production) _active_escalations: Dict[str, Escalation] = {} def _emit_escalation_receipt(escalation: Escalation, event_type: str) -> dict: """Emit a receipt for escalation events.""" ESCALATION_LOG.parent.mkdir(parents=True, exist_ok=True) body = { "escalation_id": escalation.escalation_id, "event_type": event_type, "from_profile": escalation.from_profile, "to_profile": escalation.to_profile, "escalation_type": escalation.escalation_type, "reversible": escalation.reversible, "context": asdict(escalation.context), "expires_at": escalation.expires_at, "active": escalation.active, } if event_type == "deescalation": body["deescalated_at"] = escalation.deescalated_at body["deescalation_type"] = escalation.deescalation_type receipt = { "schema_version": "2.0.0", "type": f"profile_{event_type}", "timestamp": _now_iso(), "scroll": "identity", "tags": ["escalation", event_type, escalation.from_profile, escalation.to_profile], "root_hash": _vmhash_blake3(json.dumps(body, sort_keys=True).encode()), "body": body, } with open(ESCALATION_LOG, "a") as f: f.write(json.dumps(receipt) + "\n") return receipt # ============================================================================= # ESCALATION POLICIES # ============================================================================= ESCALATION_POLICIES = { # OBSERVER → OPERATOR ("observer", "operator"): { "reversible": True, "auto_deescalate": True, "default_ttl_seconds": 3600, # 1 hour "requires_reason": True, "requires_approval": False, }, # OPERATOR → GUARDIAN ("operator", "guardian"): { "reversible": True, "auto_deescalate": True, "default_ttl_seconds": 7200, # 2 hours "requires_reason": True, "requires_approval": False, "auto_on_threat_confidence": 0.8, }, # GUARDIAN → PHOENIX ("guardian", "phoenix"): { "reversible": True, "auto_deescalate": True, "default_ttl_seconds": 1800, # 30 minutes "requires_reason": True, "requires_approval": True, # Requires quorum or sovereign "auto_on_crisis": True, }, # PHOENIX → SOVEREIGN ("phoenix", "sovereign"): { "reversible": False, # Cannot auto-deescalate from sovereign "auto_deescalate": False, "default_ttl_seconds": None, "requires_reason": True, "requires_approval": True, "requires_human": True, }, } DEESCALATION_CONDITIONS = { "no_active_threats_1h": "No active threats for 1 hour", "no_active_alerts_24h": "No active alerts for 24 hours", "crisis_resolved": "Crisis formally concluded", "manual_release": "Operator explicitly released authority", "timeout": "Escalation TTL expired", } # ============================================================================= # ESCALATION OPERATIONS # ============================================================================= def escalate( from_profile: str, to_profile: str, escalation_type: EscalationType, context: Optional[EscalationContext] = None, ttl_seconds: Optional[int] = None, deescalate_condition: Optional[str] = None, approved_by: Optional[str] = None, ) -> Dict[str, Any]: """ Escalate from one profile to another with full proof chain. Returns escalation receipt and Tem context. """ # Get policy policy_key = (from_profile, to_profile) policy = ESCALATION_POLICIES.get(policy_key) if not policy: return { "success": False, "error": f"No escalation path from {from_profile} to {to_profile}", } # Check approval requirements if policy.get("requires_human") and escalation_type != EscalationType.SOVEREIGN_OVERRIDE: return { "success": False, "error": f"Escalation to {to_profile} requires human (sovereign) approval", } if policy.get("requires_approval") and not approved_by: if escalation_type not in [EscalationType.QUORUM_APPROVED, EscalationType.SOVEREIGN_OVERRIDE]: return { "success": False, "error": f"Escalation to {to_profile} requires approval", "approval_required": True, } # Build context if context is None: context = EscalationContext() # Calculate expiry ttl = ttl_seconds or policy.get("default_ttl_seconds") expires_at = None if ttl: expires_at = (datetime.now(timezone.utc) + timedelta(seconds=ttl)).isoformat() # Create escalation escalation_id = f"esc_{secrets.token_hex(12)}" escalation = Escalation( escalation_id=escalation_id, from_profile=from_profile, to_profile=to_profile, escalation_type=escalation_type.value, context=context, reversible=policy["reversible"], auto_deescalate=policy["auto_deescalate"], deescalate_after_seconds=ttl, deescalate_on_condition=deescalate_condition, created_at=_now_iso(), expires_at=expires_at, active=True, ) # Emit receipt receipt = _emit_escalation_receipt(escalation, "escalation") escalation.receipt_hash = receipt["root_hash"] # Create Tem context hash (for threat awareness) tem_context = { "escalation_id": escalation_id, "profile_transition": f"{from_profile} → {to_profile}", "threat_context": asdict(context), "timestamp": _now_iso(), } escalation.tem_context_hash = _vmhash_blake3( json.dumps(tem_context, sort_keys=True).encode() ) # Store active escalation _active_escalations[escalation_id] = escalation return { "success": True, "escalation_id": escalation_id, "from_profile": from_profile, "to_profile": to_profile, "escalation_type": escalation_type.value, "reversible": escalation.reversible, "expires_at": expires_at, "receipt_hash": escalation.receipt_hash, "tem_context_hash": escalation.tem_context_hash, "deescalate_condition": deescalate_condition or "timeout", } def deescalate( escalation_id: str, deescalation_type: DeescalationType, reason: Optional[str] = None, ) -> Dict[str, Any]: """ De-escalate an active escalation. """ escalation = _active_escalations.get(escalation_id) if not escalation: return { "success": False, "error": f"Escalation {escalation_id} not found or already inactive", } if not escalation.reversible and deescalation_type != DeescalationType.SOVEREIGN_REVOKE: return { "success": False, "error": f"Escalation {escalation_id} is not reversible without sovereign override", } # Update escalation escalation.active = False escalation.deescalated_at = _now_iso() escalation.deescalation_type = deescalation_type.value # Emit receipt receipt = _emit_escalation_receipt(escalation, "deescalation") # Remove from active del _active_escalations[escalation_id] return { "success": True, "escalation_id": escalation_id, "from_profile": escalation.to_profile, # Note: going back "to_profile": escalation.from_profile, "deescalation_type": deescalation_type.value, "reason": reason, "receipt_hash": receipt["root_hash"], "duration_seconds": ( datetime.fromisoformat(escalation.deescalated_at.replace('Z', '+00:00')) - datetime.fromisoformat(escalation.created_at.replace('Z', '+00:00')) ).total_seconds(), } def check_expired_escalations() -> List[Dict[str, Any]]: """ Check for and auto-deescalate expired escalations. Called periodically by the system. """ now = datetime.now(timezone.utc) expired = [] for esc_id, escalation in list(_active_escalations.items()): if not escalation.expires_at: continue expires = datetime.fromisoformat(escalation.expires_at.replace('Z', '+00:00')) if now > expires and escalation.auto_deescalate: result = deescalate( esc_id, DeescalationType.TIMEOUT_EXPIRED, reason=f"TTL of {escalation.deescalate_after_seconds}s expired" ) expired.append(result) return expired def get_active_escalations() -> Dict[str, Any]: """Get all active escalations.""" return { "active_count": len(_active_escalations), "escalations": [ { "escalation_id": e.escalation_id, "from_profile": e.from_profile, "to_profile": e.to_profile, "escalation_type": e.escalation_type, "created_at": e.created_at, "expires_at": e.expires_at, "reversible": e.reversible, } for e in _active_escalations.values() ], } def get_escalation_history( profile: Optional[str] = None, limit: int = 100, ) -> Dict[str, Any]: """Query escalation history from receipts.""" if not ESCALATION_LOG.exists(): return {"history": [], "count": 0} history = [] with open(ESCALATION_LOG, "r") as f: for line in f: line = line.strip() if not line: continue try: receipt = json.loads(line) body = receipt.get("body", {}) # Filter by profile if specified if profile: if body.get("from_profile") != profile and body.get("to_profile") != profile: continue history.append({ "escalation_id": body.get("escalation_id"), "event_type": body.get("event_type"), "from_profile": body.get("from_profile"), "to_profile": body.get("to_profile"), "timestamp": receipt.get("timestamp"), "receipt_hash": receipt.get("root_hash"), }) except json.JSONDecodeError: continue # Return most recent first history.reverse() return { "history": history[:limit], "count": len(history), } # ============================================================================= # CONVENIENCE FUNCTIONS FOR COMMON ESCALATIONS # ============================================================================= def escalate_on_threat( current_profile: str, threat_id: str, threat_type: str, confidence: float, ) -> Dict[str, Any]: """ Escalate based on detected threat. Auto-determines target profile based on confidence. """ context = EscalationContext( threat_id=threat_id, threat_type=threat_type, threat_confidence=confidence, ) # Determine target profile if current_profile == "observer": to_profile = "operator" elif current_profile == "operator" and confidence >= 0.8: to_profile = "guardian" elif current_profile == "guardian" and confidence >= 0.95: to_profile = "phoenix" else: return { "success": False, "escalated": False, "reason": f"Confidence {confidence} insufficient for escalation from {current_profile}", } return escalate( from_profile=current_profile, to_profile=to_profile, escalation_type=EscalationType.THREAT_DETECTED, context=context, deescalate_condition="no_active_threats_1h", ) def escalate_to_phoenix( reason: str, approved_by: str, ) -> Dict[str, Any]: """ Emergency escalation to Phoenix profile. Requires approval. """ context = EscalationContext( mesh_health="crisis", ) return escalate( from_profile="guardian", to_profile="phoenix", escalation_type=EscalationType.CRISIS_DECLARED, context=context, approved_by=approved_by, deescalate_condition="crisis_resolved", )