493 lines
15 KiB
Python
493 lines
15 KiB
Python
"""
|
|
Escalation Engine - Profile transitions as first-class proofs
|
|
|
|
Every escalation is:
|
|
- A receipt (immutable record)
|
|
- A Tem-context (threat awareness)
|
|
- A reversibility flag (can it be undone?)
|
|
- A time-bound (when does it expire?)
|
|
|
|
Escalation is not runtime magic — it is auditable history.
|
|
"""
|
|
|
|
import json
|
|
import secrets
|
|
from datetime import datetime, timezone, timedelta
|
|
from dataclasses import dataclass, asdict
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional, List
|
|
import os
|
|
|
|
import blake3
|
|
|
|
# VaultMesh paths
|
|
VAULTMESH_ROOT = Path(os.environ.get("VAULTMESH_ROOT", Path(__file__).parents[3])).resolve()
|
|
RECEIPTS_ROOT = VAULTMESH_ROOT / "receipts"
|
|
ESCALATION_LOG = RECEIPTS_ROOT / "identity" / "escalation_events.jsonl"
|
|
|
|
|
|
def _vmhash_blake3(data: bytes) -> str:
|
|
return f"blake3:{blake3.blake3(data).hexdigest()}"
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _now_ts() -> float:
|
|
return datetime.now(timezone.utc).timestamp()
|
|
|
|
|
|
class EscalationType(Enum):
|
|
"""Types of profile escalation."""
|
|
THREAT_DETECTED = "threat_detected" # Automatic: threat confidence > threshold
|
|
OPERATOR_REQUEST = "operator_request" # Manual: operator requests higher authority
|
|
CRISIS_DECLARED = "crisis_declared" # Emergency: system failure or attack
|
|
QUORUM_APPROVED = "quorum_approved" # Governance: multi-sig approval
|
|
SOVEREIGN_OVERRIDE = "sovereign_override" # Human: direct intervention
|
|
|
|
|
|
class DeescalationType(Enum):
|
|
"""Types of profile de-escalation."""
|
|
TIMEOUT_EXPIRED = "timeout_expired" # Automatic: time limit reached
|
|
THREAT_RESOLVED = "threat_resolved" # Automatic: no active threats
|
|
OPERATOR_RELEASE = "operator_release" # Manual: operator releases authority
|
|
CRISIS_CONCLUDED = "crisis_concluded" # Phoenix: crisis resolved
|
|
SOVEREIGN_REVOKE = "sovereign_revoke" # Human: explicit revocation
|
|
|
|
|
|
@dataclass
|
|
class EscalationContext:
|
|
"""Context captured at escalation time."""
|
|
threat_id: Optional[str] = None
|
|
threat_type: Optional[str] = None
|
|
threat_confidence: Optional[float] = None
|
|
active_alerts: int = 0
|
|
mesh_health: str = "unknown"
|
|
triggering_tool: Optional[str] = None
|
|
triggering_decision: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class Escalation:
|
|
"""A profile escalation event."""
|
|
escalation_id: str
|
|
from_profile: str
|
|
to_profile: str
|
|
escalation_type: str
|
|
context: EscalationContext
|
|
|
|
# Reversibility
|
|
reversible: bool
|
|
auto_deescalate: bool
|
|
deescalate_after_seconds: Optional[int]
|
|
deescalate_on_condition: Optional[str]
|
|
|
|
# Time tracking
|
|
created_at: str
|
|
expires_at: Optional[str]
|
|
|
|
# Proof
|
|
receipt_hash: Optional[str] = None
|
|
tem_context_hash: Optional[str] = None
|
|
|
|
# State
|
|
active: bool = True
|
|
deescalated_at: Optional[str] = None
|
|
deescalation_type: Optional[str] = None
|
|
|
|
|
|
# In-memory active escalations (would be persisted in production)
|
|
_active_escalations: Dict[str, Escalation] = {}
|
|
|
|
|
|
def _emit_escalation_receipt(escalation: Escalation, event_type: str) -> dict:
|
|
"""Emit a receipt for escalation events."""
|
|
ESCALATION_LOG.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
body = {
|
|
"escalation_id": escalation.escalation_id,
|
|
"event_type": event_type,
|
|
"from_profile": escalation.from_profile,
|
|
"to_profile": escalation.to_profile,
|
|
"escalation_type": escalation.escalation_type,
|
|
"reversible": escalation.reversible,
|
|
"context": asdict(escalation.context),
|
|
"expires_at": escalation.expires_at,
|
|
"active": escalation.active,
|
|
}
|
|
|
|
if event_type == "deescalation":
|
|
body["deescalated_at"] = escalation.deescalated_at
|
|
body["deescalation_type"] = escalation.deescalation_type
|
|
|
|
receipt = {
|
|
"schema_version": "2.0.0",
|
|
"type": f"profile_{event_type}",
|
|
"timestamp": _now_iso(),
|
|
"scroll": "identity",
|
|
"tags": ["escalation", event_type, escalation.from_profile, escalation.to_profile],
|
|
"root_hash": _vmhash_blake3(json.dumps(body, sort_keys=True).encode()),
|
|
"body": body,
|
|
}
|
|
|
|
with open(ESCALATION_LOG, "a") as f:
|
|
f.write(json.dumps(receipt) + "\n")
|
|
|
|
return receipt
|
|
|
|
|
|
# =============================================================================
|
|
# ESCALATION POLICIES
|
|
# =============================================================================
|
|
|
|
ESCALATION_POLICIES = {
|
|
# OBSERVER → OPERATOR
|
|
("observer", "operator"): {
|
|
"reversible": True,
|
|
"auto_deescalate": True,
|
|
"default_ttl_seconds": 3600, # 1 hour
|
|
"requires_reason": True,
|
|
"requires_approval": False,
|
|
},
|
|
# OPERATOR → GUARDIAN
|
|
("operator", "guardian"): {
|
|
"reversible": True,
|
|
"auto_deescalate": True,
|
|
"default_ttl_seconds": 7200, # 2 hours
|
|
"requires_reason": True,
|
|
"requires_approval": False,
|
|
"auto_on_threat_confidence": 0.8,
|
|
},
|
|
# GUARDIAN → PHOENIX
|
|
("guardian", "phoenix"): {
|
|
"reversible": True,
|
|
"auto_deescalate": True,
|
|
"default_ttl_seconds": 1800, # 30 minutes
|
|
"requires_reason": True,
|
|
"requires_approval": True, # Requires quorum or sovereign
|
|
"auto_on_crisis": True,
|
|
},
|
|
# PHOENIX → SOVEREIGN
|
|
("phoenix", "sovereign"): {
|
|
"reversible": False, # Cannot auto-deescalate from sovereign
|
|
"auto_deescalate": False,
|
|
"default_ttl_seconds": None,
|
|
"requires_reason": True,
|
|
"requires_approval": True,
|
|
"requires_human": True,
|
|
},
|
|
}
|
|
|
|
DEESCALATION_CONDITIONS = {
|
|
"no_active_threats_1h": "No active threats for 1 hour",
|
|
"no_active_alerts_24h": "No active alerts for 24 hours",
|
|
"crisis_resolved": "Crisis formally concluded",
|
|
"manual_release": "Operator explicitly released authority",
|
|
"timeout": "Escalation TTL expired",
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# ESCALATION OPERATIONS
|
|
# =============================================================================
|
|
|
|
def escalate(
|
|
from_profile: str,
|
|
to_profile: str,
|
|
escalation_type: EscalationType,
|
|
context: Optional[EscalationContext] = None,
|
|
ttl_seconds: Optional[int] = None,
|
|
deescalate_condition: Optional[str] = None,
|
|
approved_by: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Escalate from one profile to another with full proof chain.
|
|
|
|
Returns escalation receipt and Tem context.
|
|
"""
|
|
# Get policy
|
|
policy_key = (from_profile, to_profile)
|
|
policy = ESCALATION_POLICIES.get(policy_key)
|
|
|
|
if not policy:
|
|
return {
|
|
"success": False,
|
|
"error": f"No escalation path from {from_profile} to {to_profile}",
|
|
}
|
|
|
|
# Check approval requirements
|
|
if policy.get("requires_human") and escalation_type != EscalationType.SOVEREIGN_OVERRIDE:
|
|
return {
|
|
"success": False,
|
|
"error": f"Escalation to {to_profile} requires human (sovereign) approval",
|
|
}
|
|
|
|
if policy.get("requires_approval") and not approved_by:
|
|
if escalation_type not in [EscalationType.QUORUM_APPROVED, EscalationType.SOVEREIGN_OVERRIDE]:
|
|
return {
|
|
"success": False,
|
|
"error": f"Escalation to {to_profile} requires approval",
|
|
"approval_required": True,
|
|
}
|
|
|
|
# Build context
|
|
if context is None:
|
|
context = EscalationContext()
|
|
|
|
# Calculate expiry
|
|
ttl = ttl_seconds or policy.get("default_ttl_seconds")
|
|
expires_at = None
|
|
if ttl:
|
|
expires_at = (datetime.now(timezone.utc) + timedelta(seconds=ttl)).isoformat()
|
|
|
|
# Create escalation
|
|
escalation_id = f"esc_{secrets.token_hex(12)}"
|
|
|
|
escalation = Escalation(
|
|
escalation_id=escalation_id,
|
|
from_profile=from_profile,
|
|
to_profile=to_profile,
|
|
escalation_type=escalation_type.value,
|
|
context=context,
|
|
reversible=policy["reversible"],
|
|
auto_deescalate=policy["auto_deescalate"],
|
|
deescalate_after_seconds=ttl,
|
|
deescalate_on_condition=deescalate_condition,
|
|
created_at=_now_iso(),
|
|
expires_at=expires_at,
|
|
active=True,
|
|
)
|
|
|
|
# Emit receipt
|
|
receipt = _emit_escalation_receipt(escalation, "escalation")
|
|
escalation.receipt_hash = receipt["root_hash"]
|
|
|
|
# Create Tem context hash (for threat awareness)
|
|
tem_context = {
|
|
"escalation_id": escalation_id,
|
|
"profile_transition": f"{from_profile} → {to_profile}",
|
|
"threat_context": asdict(context),
|
|
"timestamp": _now_iso(),
|
|
}
|
|
escalation.tem_context_hash = _vmhash_blake3(
|
|
json.dumps(tem_context, sort_keys=True).encode()
|
|
)
|
|
|
|
# Store active escalation
|
|
_active_escalations[escalation_id] = escalation
|
|
|
|
return {
|
|
"success": True,
|
|
"escalation_id": escalation_id,
|
|
"from_profile": from_profile,
|
|
"to_profile": to_profile,
|
|
"escalation_type": escalation_type.value,
|
|
"reversible": escalation.reversible,
|
|
"expires_at": expires_at,
|
|
"receipt_hash": escalation.receipt_hash,
|
|
"tem_context_hash": escalation.tem_context_hash,
|
|
"deescalate_condition": deescalate_condition or "timeout",
|
|
}
|
|
|
|
|
|
def deescalate(
|
|
escalation_id: str,
|
|
deescalation_type: DeescalationType,
|
|
reason: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
De-escalate an active escalation.
|
|
"""
|
|
escalation = _active_escalations.get(escalation_id)
|
|
|
|
if not escalation:
|
|
return {
|
|
"success": False,
|
|
"error": f"Escalation {escalation_id} not found or already inactive",
|
|
}
|
|
|
|
if not escalation.reversible and deescalation_type != DeescalationType.SOVEREIGN_REVOKE:
|
|
return {
|
|
"success": False,
|
|
"error": f"Escalation {escalation_id} is not reversible without sovereign override",
|
|
}
|
|
|
|
# Update escalation
|
|
escalation.active = False
|
|
escalation.deescalated_at = _now_iso()
|
|
escalation.deescalation_type = deescalation_type.value
|
|
|
|
# Emit receipt
|
|
receipt = _emit_escalation_receipt(escalation, "deescalation")
|
|
|
|
# Remove from active
|
|
del _active_escalations[escalation_id]
|
|
|
|
return {
|
|
"success": True,
|
|
"escalation_id": escalation_id,
|
|
"from_profile": escalation.to_profile, # Note: going back
|
|
"to_profile": escalation.from_profile,
|
|
"deescalation_type": deescalation_type.value,
|
|
"reason": reason,
|
|
"receipt_hash": receipt["root_hash"],
|
|
"duration_seconds": (
|
|
datetime.fromisoformat(escalation.deescalated_at.replace('Z', '+00:00')) -
|
|
datetime.fromisoformat(escalation.created_at.replace('Z', '+00:00'))
|
|
).total_seconds(),
|
|
}
|
|
|
|
|
|
def check_expired_escalations() -> List[Dict[str, Any]]:
|
|
"""
|
|
Check for and auto-deescalate expired escalations.
|
|
Called periodically by the system.
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
expired = []
|
|
|
|
for esc_id, escalation in list(_active_escalations.items()):
|
|
if not escalation.expires_at:
|
|
continue
|
|
|
|
expires = datetime.fromisoformat(escalation.expires_at.replace('Z', '+00:00'))
|
|
|
|
if now > expires and escalation.auto_deescalate:
|
|
result = deescalate(
|
|
esc_id,
|
|
DeescalationType.TIMEOUT_EXPIRED,
|
|
reason=f"TTL of {escalation.deescalate_after_seconds}s expired"
|
|
)
|
|
expired.append(result)
|
|
|
|
return expired
|
|
|
|
|
|
def get_active_escalations() -> Dict[str, Any]:
|
|
"""Get all active escalations."""
|
|
return {
|
|
"active_count": len(_active_escalations),
|
|
"escalations": [
|
|
{
|
|
"escalation_id": e.escalation_id,
|
|
"from_profile": e.from_profile,
|
|
"to_profile": e.to_profile,
|
|
"escalation_type": e.escalation_type,
|
|
"created_at": e.created_at,
|
|
"expires_at": e.expires_at,
|
|
"reversible": e.reversible,
|
|
}
|
|
for e in _active_escalations.values()
|
|
],
|
|
}
|
|
|
|
|
|
def get_escalation_history(
|
|
profile: Optional[str] = None,
|
|
limit: int = 100,
|
|
) -> Dict[str, Any]:
|
|
"""Query escalation history from receipts."""
|
|
if not ESCALATION_LOG.exists():
|
|
return {"history": [], "count": 0}
|
|
|
|
history = []
|
|
with open(ESCALATION_LOG, "r") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
receipt = json.loads(line)
|
|
body = receipt.get("body", {})
|
|
|
|
# Filter by profile if specified
|
|
if profile:
|
|
if body.get("from_profile") != profile and body.get("to_profile") != profile:
|
|
continue
|
|
|
|
history.append({
|
|
"escalation_id": body.get("escalation_id"),
|
|
"event_type": body.get("event_type"),
|
|
"from_profile": body.get("from_profile"),
|
|
"to_profile": body.get("to_profile"),
|
|
"timestamp": receipt.get("timestamp"),
|
|
"receipt_hash": receipt.get("root_hash"),
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
# Return most recent first
|
|
history.reverse()
|
|
|
|
return {
|
|
"history": history[:limit],
|
|
"count": len(history),
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# CONVENIENCE FUNCTIONS FOR COMMON ESCALATIONS
|
|
# =============================================================================
|
|
|
|
def escalate_on_threat(
|
|
current_profile: str,
|
|
threat_id: str,
|
|
threat_type: str,
|
|
confidence: float,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Escalate based on detected threat.
|
|
Auto-determines target profile based on confidence.
|
|
"""
|
|
context = EscalationContext(
|
|
threat_id=threat_id,
|
|
threat_type=threat_type,
|
|
threat_confidence=confidence,
|
|
)
|
|
|
|
# Determine target profile
|
|
if current_profile == "observer":
|
|
to_profile = "operator"
|
|
elif current_profile == "operator" and confidence >= 0.8:
|
|
to_profile = "guardian"
|
|
elif current_profile == "guardian" and confidence >= 0.95:
|
|
to_profile = "phoenix"
|
|
else:
|
|
return {
|
|
"success": False,
|
|
"escalated": False,
|
|
"reason": f"Confidence {confidence} insufficient for escalation from {current_profile}",
|
|
}
|
|
|
|
return escalate(
|
|
from_profile=current_profile,
|
|
to_profile=to_profile,
|
|
escalation_type=EscalationType.THREAT_DETECTED,
|
|
context=context,
|
|
deescalate_condition="no_active_threats_1h",
|
|
)
|
|
|
|
|
|
def escalate_to_phoenix(
|
|
reason: str,
|
|
approved_by: str,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Emergency escalation to Phoenix profile.
|
|
Requires approval.
|
|
"""
|
|
context = EscalationContext(
|
|
mesh_health="crisis",
|
|
)
|
|
|
|
return escalate(
|
|
from_profile="guardian",
|
|
to_profile="phoenix",
|
|
escalation_type=EscalationType.CRISIS_DECLARED,
|
|
context=context,
|
|
approved_by=approved_by,
|
|
deescalate_condition="crisis_resolved",
|
|
)
|