""" Test: Escalation Proof Requirements Every escalation must emit proof (receipt, Tem context, TTL, reversibility). Authority cannot increase without proof chain. """ import pytest from vaultmesh_mcp.tools.escalation import ( escalate, deescalate, escalate_on_threat, get_active_escalations, get_escalation_history, EscalationType, DeescalationType, ESCALATION_POLICIES, ) class TestEscalationProof: """Every escalation must produce proof.""" def test_escalation_emits_receipt_hash(self): """Escalation must return receipt_hash.""" result = escalate( from_profile="observer", to_profile="operator", escalation_type=EscalationType.OPERATOR_REQUEST, ) assert result.get("success"), f"Escalation failed: {result}" assert "receipt_hash" in result, "Escalation must emit receipt_hash" assert result["receipt_hash"].startswith("blake3:"), "Receipt hash must be blake3" # Cleanup if result.get("escalation_id"): deescalate(result["escalation_id"], DeescalationType.OPERATOR_RELEASE) def test_escalation_captures_tem_context(self): """Escalation must capture Tem context hash.""" result = escalate( from_profile="operator", to_profile="guardian", escalation_type=EscalationType.THREAT_DETECTED, ) assert result.get("success"), f"Escalation failed: {result}" assert "tem_context_hash" in result, "Escalation must capture Tem context" assert result["tem_context_hash"].startswith("blake3:"), "Tem context must be blake3" # Cleanup if result.get("escalation_id"): deescalate(result["escalation_id"], DeescalationType.THREAT_RESOLVED) def test_escalation_specifies_reversibility(self): """Escalation must specify reversibility at creation.""" result = escalate( from_profile="observer", to_profile="operator", escalation_type=EscalationType.OPERATOR_REQUEST, ) assert "reversible" in result, "Escalation must specify reversibility" assert isinstance(result["reversible"], bool), "Reversibility must be boolean" # Cleanup if result.get("escalation_id"): deescalate(result["escalation_id"], DeescalationType.OPERATOR_RELEASE) def test_escalation_specifies_expiry(self): """Escalation must specify expiry (TTL).""" result = escalate( from_profile="observer", to_profile="operator", escalation_type=EscalationType.OPERATOR_REQUEST, ) assert result.get("success") # expires_at may be None for SOVEREIGN, but should exist for others assert "expires_at" in result, "Escalation must include expires_at field" # For non-sovereign escalations, TTL should be set if result.get("to_profile") != "sovereign": assert result["expires_at"] is not None, ( f"Non-sovereign escalation to {result['to_profile']} must have TTL" ) # Cleanup if result.get("escalation_id"): deescalate(result["escalation_id"], DeescalationType.OPERATOR_RELEASE) class TestDeescalationProof: """De-escalation must also produce proof.""" def test_deescalation_emits_receipt(self): """De-escalation must emit receipt.""" # First escalate esc = escalate( from_profile="observer", to_profile="operator", escalation_type=EscalationType.OPERATOR_REQUEST, ) assert esc.get("success") # Then de-escalate result = deescalate( escalation_id=esc["escalation_id"], deescalation_type=DeescalationType.OPERATOR_RELEASE, reason="Test cleanup", ) assert result.get("success"), f"De-escalation failed: {result}" assert "receipt_hash" in result, "De-escalation must emit receipt" def test_deescalation_records_duration(self): """De-escalation must record duration.""" # Escalate esc = escalate( from_profile="observer", to_profile="operator", escalation_type=EscalationType.OPERATOR_REQUEST, ) # De-escalate result = deescalate( escalation_id=esc["escalation_id"], deescalation_type=DeescalationType.OPERATOR_RELEASE, ) assert "duration_seconds" in result, "De-escalation must record duration" assert result["duration_seconds"] >= 0, "Duration must be non-negative" class TestEscalationPathEnforcement: """Escalation paths must follow constitution.""" def test_skip_levels_blocked(self): """Cannot skip escalation levels.""" invalid_paths = [ ("observer", "guardian"), ("observer", "phoenix"), ("observer", "sovereign"), ("operator", "phoenix"), ("operator", "sovereign"), ("guardian", "sovereign"), ] for from_p, to_p in invalid_paths: result = escalate( from_profile=from_p, to_profile=to_p, escalation_type=EscalationType.OPERATOR_REQUEST, ) assert not result.get("success"), ( f"Escalation {from_p} -> {to_p} should be blocked" ) assert "error" in result, f"Should have error for {from_p} -> {to_p}" def test_phoenix_requires_approval(self): """Phoenix escalation requires approval.""" result = escalate( from_profile="guardian", to_profile="phoenix", escalation_type=EscalationType.CRISIS_DECLARED, # approved_by intentionally missing ) assert not result.get("success"), "Phoenix without approval should fail" assert "approval" in result.get("error", "").lower(), ( "Error should mention approval requirement" ) def test_sovereign_requires_human(self): """Sovereign escalation requires human verification.""" result = escalate( from_profile="phoenix", to_profile="sovereign", escalation_type=EscalationType.CRISIS_DECLARED, approved_by="did:vm:agent:automated", # Not human ) assert not result.get("success"), "Sovereign without human should fail" assert "human" in result.get("error", "").lower(), ( "Error should mention human requirement" ) class TestEscalationAudit: """Escalation history must be auditable.""" def test_escalation_appears_in_history(self): """Completed escalation cycle must appear in history.""" # Escalate esc = escalate( from_profile="observer", to_profile="operator", escalation_type=EscalationType.OPERATOR_REQUEST, ) esc_id = esc["escalation_id"] # De-escalate deescalate(esc_id, DeescalationType.OPERATOR_RELEASE) # Check history history = get_escalation_history() assert history["count"] > 0, "History should not be empty" # Find our escalation found_esc = False found_deesc = False for event in history["history"]: if event.get("escalation_id") == esc_id: if event.get("event_type") == "escalation": found_esc = True elif event.get("event_type") == "deescalation": found_deesc = True assert found_esc, f"Escalation {esc_id} not found in history" assert found_deesc, f"De-escalation {esc_id} not found in history" def test_active_escalations_trackable(self): """Active escalations must be queryable.""" # Start clean initial = get_active_escalations() initial_count = initial["active_count"] # Escalate esc = escalate( from_profile="observer", to_profile="operator", escalation_type=EscalationType.OPERATOR_REQUEST, ) # Check active active = get_active_escalations() assert active["active_count"] == initial_count + 1, ( "Active count should increase by 1" ) # Cleanup deescalate(esc["escalation_id"], DeescalationType.OPERATOR_RELEASE) # Verify cleanup final = get_active_escalations() assert final["active_count"] == initial_count, ( "Active count should return to initial" )