init: vaultmesh mcp server
Some checks are pending
Governance CI / Constitution Hash Gate (push) Waiting to run
Governance CI / Governance Tests (push) Blocked by required conditions
Governance CI / Golden Drill Mini (push) Blocked by required conditions

This commit is contained in:
Vault Sovereign
2025-12-26 23:23:08 +00:00
commit e4871c2a29
35 changed files with 6511 additions and 0 deletions

View File

@@ -0,0 +1,50 @@
"""
Governance Test Configuration
Shared fixtures for all governance tests.
"""
import os
import sys
import pytest
from pathlib import Path
# Add packages to path
REPO_ROOT = Path(__file__).parents[2]
sys.path.insert(0, str(REPO_ROOT / "packages"))
# Set VaultMesh root
os.environ["VAULTMESH_ROOT"] = str(REPO_ROOT)
@pytest.fixture
def repo_root():
"""Return the repository root path."""
return REPO_ROOT
@pytest.fixture
def constitution_path(repo_root):
"""Return path to the constitution."""
return repo_root / "docs" / "MCP-CONSTITUTION.md"
@pytest.fixture
def constitution_lock_path(repo_root):
"""Return path to the constitution lock file."""
return repo_root / "governance" / "constitution.lock"
@pytest.fixture
def parse_lock_file(constitution_lock_path):
"""Parse the constitution lock file into a dict."""
lock = {}
with open(constitution_lock_path, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", 1)
lock[key.strip()] = value.strip()
return lock

View File

@@ -0,0 +1,140 @@
"""
Test: Authentication Fail-Closed
Ensures unknown tools, profiles, and scopes are denied.
Authority must never be granted by default.
"""
import pytest
from vaultmesh_mcp.tools.auth import (
auth_check_permission,
auth_create_dev_session,
Profile,
check_profile_permission,
get_profile_for_scope,
SCOPE_TOOLS,
)
class TestFailClosed:
"""Fail-closed semantics - deny by default."""
def test_unknown_tool_denied(self):
"""Unknown tool must be denied regardless of scope."""
session = auth_create_dev_session(scope="sovereign")
token = session["token"]
result = auth_check_permission(token, "unknown_tool_xyz")
assert not result["allowed"], "Unknown tool should be denied"
def test_unknown_scope_maps_to_observer(self):
"""Unknown scope must map to OBSERVER (most restrictive)."""
profile = get_profile_for_scope("unknown_scope_xyz")
assert profile == Profile.OBSERVER, (
f"Unknown scope should map to OBSERVER, got {profile}"
)
def test_invalid_token_denied(self):
"""Invalid token must be denied."""
result = auth_check_permission("invalid_token_xyz", "cognitive_context")
assert not result["allowed"], "Invalid token should be denied"
def test_expired_session_denied(self):
"""Expired session must be denied (simulated via missing session)."""
result = auth_check_permission("expired_session_token", "cognitive_context")
assert not result["allowed"], "Expired session should be denied"
class TestProfileDeny:
"""Profile-based denials."""
def test_observer_denied_mutations(self):
"""OBSERVER cannot perform mutations."""
mutation_tools = [
"write_file",
"cognitive_decide",
"treasury_debit",
"offsec_tem_transmute",
]
for tool in mutation_tools:
result = check_profile_permission(Profile.OBSERVER, tool)
assert not result["allowed"], f"OBSERVER should be denied {tool}"
def test_operator_denied_tem(self):
"""OPERATOR cannot invoke Tem."""
result = check_profile_permission(Profile.OPERATOR, "cognitive_invoke_tem")
assert not result["allowed"], "OPERATOR should be denied Tem invocation"
def test_guardian_denied_phoenix_ops(self):
"""GUARDIAN cannot perform Phoenix operations."""
phoenix_ops = [
"offsec_phoenix_enable",
"offsec_phoenix_inject_crisis",
]
for tool in phoenix_ops:
result = check_profile_permission(Profile.GUARDIAN, tool)
assert not result["allowed"], f"GUARDIAN should be denied {tool}"
def test_phoenix_denied_treasury_create(self):
"""PHOENIX cannot create budgets (SOVEREIGN only)."""
result = check_profile_permission(Profile.PHOENIX, "treasury_create_budget")
assert not result["allowed"], "PHOENIX should be denied treasury creation"
class TestSovereignRequiresHuman:
"""SOVEREIGN profile requires human verification."""
def test_sovereign_cannot_be_auto_granted(self):
"""
SOVEREIGN authority cannot be granted through normal dev session.
This tests the constitutional invariant.
"""
# Dev session creates a session, but SOVEREIGN operations
# should still require additional human verification
session = auth_create_dev_session(scope="cognitive")
token = session["token"]
# Even with dev session, sovereign-only operations need proof
# The dev session scope is "cognitive", not "vault"
result = auth_check_permission(token, "treasury_create_budget")
# This should be denied because cognitive scope doesn't include
# treasury creation - that requires vault/sovereign scope
# The key point: sovereign authority isn't auto-granted
assert session["scope"] != "sovereign" or session.get("dev_mode"), (
"Production sessions should not auto-grant sovereign"
)
class TestCollapseSemantics:
"""Authority collapse tests - always downward, never upward."""
def test_insufficient_profile_collapses(self):
"""When profile is insufficient, result indicates collapse target."""
result = check_profile_permission(Profile.OBSERVER, "cognitive_decide")
assert not result["allowed"]
# The denial should indicate the profile level
assert result["profile"] == "observer"
def test_profile_hierarchy_is_strict(self):
"""Profile hierarchy: OBSERVER < OPERATOR < GUARDIAN < PHOENIX < SOVEREIGN."""
profiles = [
Profile.OBSERVER,
Profile.OPERATOR,
Profile.GUARDIAN,
Profile.PHOENIX,
Profile.SOVEREIGN,
]
# Each profile should have MORE tools than the one before
prev_count = 0
for profile in profiles:
from vaultmesh_mcp.tools.auth import PROFILE_TOOLS
tool_count = len(PROFILE_TOOLS.get(profile, set()))
assert tool_count >= prev_count, (
f"{profile.value} should have >= tools than previous profile"
)
prev_count = tool_count

View File

@@ -0,0 +1,118 @@
"""
Test: Constitution Hash Gate
Ensures the constitution has not been modified without proper amendment.
CI MUST fail if the constitution hash doesn't match the lock file.
"""
import pytest
import blake3
class TestConstitutionHash:
"""Constitution integrity tests - HARD GATE."""
def test_constitution_exists(self, constitution_path):
"""Constitution file must exist."""
assert constitution_path.exists(), "MCP-CONSTITUTION.md not found"
def test_lock_file_exists(self, constitution_lock_path):
"""Constitution lock file must exist."""
assert constitution_lock_path.exists(), "governance/constitution.lock not found"
def test_constitution_hash_matches_lock(self, constitution_path, parse_lock_file):
"""
HARD GATE: Constitution hash must match lock file.
If this fails, either:
1. Constitution was modified without amendment procedure
2. Lock file needs updating via proper amendment
"""
# Read constitution
content = constitution_path.read_text()
lines = content.split('\n')
# Hash excludes signature block (last 12 lines as per original ceremony)
# But after amendment protocol was added, we need to use the locked line count
hash_lines = int(parse_lock_file.get("hash_lines", 288))
hashable_content = '\n'.join(lines[:hash_lines])
computed_hash = f"blake3:{blake3.blake3(hashable_content.encode()).hexdigest()}"
locked_hash = parse_lock_file["hash"]
assert computed_hash == locked_hash, (
f"Constitution hash mismatch!\n"
f" Computed: {computed_hash}\n"
f" Locked: {locked_hash}\n"
f" If intentional, follow amendment procedure."
)
def test_version_not_decreased(self, parse_lock_file):
"""Version must not decrease (no rollbacks without amendment)."""
version = parse_lock_file["version"]
parts = [int(p) for p in version.split(".")]
# Version 1.0.0 is the minimum
assert parts >= [1, 0, 0], "Constitution version cannot be below 1.0.0"
def test_immutable_rules_count(self, parse_lock_file):
"""Immutable rules count must be exactly 5."""
immutable_count = int(parse_lock_file["immutable_rules"])
assert immutable_count == 5, (
f"Immutable rules count changed from 5 to {immutable_count}. "
"This violates immutability clause."
)
def test_cooldown_days_minimum(self, parse_lock_file):
"""Amendment cooldown must be at least 7 days."""
cooldown = int(parse_lock_file["cooldown_days"])
assert cooldown >= 7, (
f"Cooldown period reduced to {cooldown} days. "
"Minimum is 7 days per constitution."
)
def test_btc_anchor_required(self, parse_lock_file):
"""BTC anchor requirement must be true."""
requires_anchor = parse_lock_file["requires_btc_anchor"].lower() == "true"
assert requires_anchor, "BTC anchor requirement cannot be disabled"
def test_sovereign_key_present(self, parse_lock_file):
"""Sovereign key must be specified."""
sovereign_key = parse_lock_file.get("sovereign_key")
assert sovereign_key and sovereign_key.startswith("key_"), (
"Sovereign key must be specified in lock file"
)
class TestConstitutionContent:
"""Tests that verify constitution content invariants."""
def test_profiles_defined(self, constitution_path):
"""All five profiles must be defined."""
content = constitution_path.read_text()
profiles = ["OBSERVER", "OPERATOR", "GUARDIAN", "PHOENIX", "SOVEREIGN"]
for profile in profiles:
assert profile in content, f"Profile {profile} not found in constitution"
def test_immutable_clauses_present(self, constitution_path):
"""All immutable clauses must be present."""
content = constitution_path.read_text()
immutables = [
"SOVEREIGN profile requires human verification",
"No AI may grant itself SOVEREIGN authority",
"Every mutation emits a receipt",
"Authority collapses downward, never upward",
"This immutability clause itself",
]
for clause in immutables:
assert clause in content, f"Immutable clause missing: {clause}"
def test_amendment_protocol_exists(self, constitution_path):
"""Amendment protocol must be defined."""
content = constitution_path.read_text()
assert "Amendment Protocol" in content, "Amendment protocol section missing"
assert "Cooling Period" in content or "cooling" in content.lower(), (
"Cooling period not defined in amendment protocol"
)

View File

@@ -0,0 +1,251 @@
"""
Test: Escalation Proof Requirements
Every escalation must emit proof (receipt, Tem context, TTL, reversibility).
Authority cannot increase without proof chain.
"""
import pytest
from vaultmesh_mcp.tools.escalation import (
escalate,
deescalate,
escalate_on_threat,
get_active_escalations,
get_escalation_history,
EscalationType,
DeescalationType,
ESCALATION_POLICIES,
)
class TestEscalationProof:
"""Every escalation must produce proof."""
def test_escalation_emits_receipt_hash(self):
"""Escalation must return receipt_hash."""
result = escalate(
from_profile="observer",
to_profile="operator",
escalation_type=EscalationType.OPERATOR_REQUEST,
)
assert result.get("success"), f"Escalation failed: {result}"
assert "receipt_hash" in result, "Escalation must emit receipt_hash"
assert result["receipt_hash"].startswith("blake3:"), "Receipt hash must be blake3"
# Cleanup
if result.get("escalation_id"):
deescalate(result["escalation_id"], DeescalationType.OPERATOR_RELEASE)
def test_escalation_captures_tem_context(self):
"""Escalation must capture Tem context hash."""
result = escalate(
from_profile="operator",
to_profile="guardian",
escalation_type=EscalationType.THREAT_DETECTED,
)
assert result.get("success"), f"Escalation failed: {result}"
assert "tem_context_hash" in result, "Escalation must capture Tem context"
assert result["tem_context_hash"].startswith("blake3:"), "Tem context must be blake3"
# Cleanup
if result.get("escalation_id"):
deescalate(result["escalation_id"], DeescalationType.THREAT_RESOLVED)
def test_escalation_specifies_reversibility(self):
"""Escalation must specify reversibility at creation."""
result = escalate(
from_profile="observer",
to_profile="operator",
escalation_type=EscalationType.OPERATOR_REQUEST,
)
assert "reversible" in result, "Escalation must specify reversibility"
assert isinstance(result["reversible"], bool), "Reversibility must be boolean"
# Cleanup
if result.get("escalation_id"):
deescalate(result["escalation_id"], DeescalationType.OPERATOR_RELEASE)
def test_escalation_specifies_expiry(self):
"""Escalation must specify expiry (TTL)."""
result = escalate(
from_profile="observer",
to_profile="operator",
escalation_type=EscalationType.OPERATOR_REQUEST,
)
assert result.get("success")
# expires_at may be None for SOVEREIGN, but should exist for others
assert "expires_at" in result, "Escalation must include expires_at field"
# For non-sovereign escalations, TTL should be set
if result.get("to_profile") != "sovereign":
assert result["expires_at"] is not None, (
f"Non-sovereign escalation to {result['to_profile']} must have TTL"
)
# Cleanup
if result.get("escalation_id"):
deescalate(result["escalation_id"], DeescalationType.OPERATOR_RELEASE)
class TestDeescalationProof:
"""De-escalation must also produce proof."""
def test_deescalation_emits_receipt(self):
"""De-escalation must emit receipt."""
# First escalate
esc = escalate(
from_profile="observer",
to_profile="operator",
escalation_type=EscalationType.OPERATOR_REQUEST,
)
assert esc.get("success")
# Then de-escalate
result = deescalate(
escalation_id=esc["escalation_id"],
deescalation_type=DeescalationType.OPERATOR_RELEASE,
reason="Test cleanup",
)
assert result.get("success"), f"De-escalation failed: {result}"
assert "receipt_hash" in result, "De-escalation must emit receipt"
def test_deescalation_records_duration(self):
"""De-escalation must record duration."""
# Escalate
esc = escalate(
from_profile="observer",
to_profile="operator",
escalation_type=EscalationType.OPERATOR_REQUEST,
)
# De-escalate
result = deescalate(
escalation_id=esc["escalation_id"],
deescalation_type=DeescalationType.OPERATOR_RELEASE,
)
assert "duration_seconds" in result, "De-escalation must record duration"
assert result["duration_seconds"] >= 0, "Duration must be non-negative"
class TestEscalationPathEnforcement:
"""Escalation paths must follow constitution."""
def test_skip_levels_blocked(self):
"""Cannot skip escalation levels."""
invalid_paths = [
("observer", "guardian"),
("observer", "phoenix"),
("observer", "sovereign"),
("operator", "phoenix"),
("operator", "sovereign"),
("guardian", "sovereign"),
]
for from_p, to_p in invalid_paths:
result = escalate(
from_profile=from_p,
to_profile=to_p,
escalation_type=EscalationType.OPERATOR_REQUEST,
)
assert not result.get("success"), (
f"Escalation {from_p} -> {to_p} should be blocked"
)
assert "error" in result, f"Should have error for {from_p} -> {to_p}"
def test_phoenix_requires_approval(self):
"""Phoenix escalation requires approval."""
result = escalate(
from_profile="guardian",
to_profile="phoenix",
escalation_type=EscalationType.CRISIS_DECLARED,
# approved_by intentionally missing
)
assert not result.get("success"), "Phoenix without approval should fail"
assert "approval" in result.get("error", "").lower(), (
"Error should mention approval requirement"
)
def test_sovereign_requires_human(self):
"""Sovereign escalation requires human verification."""
result = escalate(
from_profile="phoenix",
to_profile="sovereign",
escalation_type=EscalationType.CRISIS_DECLARED,
approved_by="did:vm:agent:automated", # Not human
)
assert not result.get("success"), "Sovereign without human should fail"
assert "human" in result.get("error", "").lower(), (
"Error should mention human requirement"
)
class TestEscalationAudit:
"""Escalation history must be auditable."""
def test_escalation_appears_in_history(self):
"""Completed escalation cycle must appear in history."""
# Escalate
esc = escalate(
from_profile="observer",
to_profile="operator",
escalation_type=EscalationType.OPERATOR_REQUEST,
)
esc_id = esc["escalation_id"]
# De-escalate
deescalate(esc_id, DeescalationType.OPERATOR_RELEASE)
# Check history
history = get_escalation_history()
assert history["count"] > 0, "History should not be empty"
# Find our escalation
found_esc = False
found_deesc = False
for event in history["history"]:
if event.get("escalation_id") == esc_id:
if event.get("event_type") == "escalation":
found_esc = True
elif event.get("event_type") == "deescalation":
found_deesc = True
assert found_esc, f"Escalation {esc_id} not found in history"
assert found_deesc, f"De-escalation {esc_id} not found in history"
def test_active_escalations_trackable(self):
"""Active escalations must be queryable."""
# Start clean
initial = get_active_escalations()
initial_count = initial["active_count"]
# Escalate
esc = escalate(
from_profile="observer",
to_profile="operator",
escalation_type=EscalationType.OPERATOR_REQUEST,
)
# Check active
active = get_active_escalations()
assert active["active_count"] == initial_count + 1, (
"Active count should increase by 1"
)
# Cleanup
deescalate(esc["escalation_id"], DeescalationType.OPERATOR_RELEASE)
# Verify cleanup
final = get_active_escalations()
assert final["active_count"] == initial_count, (
"Active count should return to initial"
)

View File

@@ -0,0 +1,208 @@
"""
Test: Golden Drill Mini
Fast, deterministic version of D1 and D3 for CI.
Must complete in under 5 seconds.
"""
import pytest
from vaultmesh_mcp.tools import (
cognitive_context,
cognitive_decide,
cognitive_invoke_tem,
)
from vaultmesh_mcp.tools.escalation import (
escalate,
deescalate,
escalate_on_threat,
get_active_escalations,
EscalationType,
DeescalationType,
)
# Deterministic drill marker
DRILL_MARKER = "CI/GOLDEN-DRILL/MINI"
class TestGoldenDrillD1Mini:
"""
Mini D1: Threat → Escalate → Tem → De-escalate
Validates the complete threat response chain.
"""
def test_d1_threat_escalation_chain(self):
"""
Complete chain:
1. Threat detected → escalation receipt
2. Decision made → decision receipt
3. Tem invoked → invocation receipt
4. De-escalate → return to baseline
"""
results = {}
# Step 1: Threat triggers escalation
esc_result = escalate_on_threat(
current_profile="operator",
threat_id=f"thr_{DRILL_MARKER}",
threat_type="ci_synthetic",
confidence=0.92,
)
assert esc_result.get("success") or esc_result.get("escalation_id"), (
f"Escalation failed: {esc_result}"
)
results["escalation"] = esc_result
# Verify proof captured
assert "receipt_hash" in esc_result, "Missing escalation receipt"
assert "tem_context_hash" in esc_result, "Missing Tem context"
# Step 2: Decision (as Guardian)
decision = cognitive_decide(
reasoning_chain=[
f"DRILL: {DRILL_MARKER}",
"Synthetic threat for CI validation",
"Confidence 92% - auto-escalated to guardian",
],
decision="invoke_tem",
confidence=0.92,
evidence=[esc_result.get("receipt_hash", "none")],
)
assert decision.get("success"), f"Decision failed: {decision}"
assert "receipt" in decision, "Missing decision receipt"
results["decision"] = decision
# Step 3: Tem invocation
tem = cognitive_invoke_tem(
threat_type="ci_synthetic",
threat_id=f"thr_{DRILL_MARKER}",
target="ci-target",
evidence=[decision["receipt"]["root_hash"]],
)
assert tem.get("success"), f"Tem failed: {tem}"
assert "receipt" in tem, "Missing Tem receipt"
assert "capability" in tem, "Missing capability artifact"
results["tem"] = tem
# Step 4: De-escalate
deesc = deescalate(
escalation_id=esc_result["escalation_id"],
deescalation_type=DeescalationType.THREAT_RESOLVED,
reason=f"DRILL: {DRILL_MARKER} complete",
)
assert deesc.get("success"), f"De-escalation failed: {deesc}"
assert "receipt_hash" in deesc, "Missing de-escalation receipt"
results["deescalation"] = deesc
# Step 5: Verify baseline
active = get_active_escalations()
# Note: We cleaned up our escalation, but others may exist
# Just verify our specific escalation is gone
our_esc_active = any(
e["escalation_id"] == esc_result["escalation_id"]
for e in active.get("escalations", [])
)
assert not our_esc_active, "Our escalation should be inactive"
# Collect receipt chain for audit
receipt_chain = [
esc_result["receipt_hash"],
decision["receipt"]["root_hash"],
tem["receipt"]["root_hash"],
deesc["receipt_hash"],
]
assert len(receipt_chain) == 4, "Should have 4 receipts in chain"
assert all(r.startswith("blake3:") for r in receipt_chain), (
"All receipts must be blake3 hashes"
)
class TestGoldenDrillD3Mini:
"""
Mini D3: Escalation abuse attempts
Validates constitutional enforcement.
"""
def test_d3_skip_levels_blocked(self):
"""OPERATOR → PHOENIX direct must be blocked."""
result = escalate(
from_profile="operator",
to_profile="phoenix",
escalation_type=EscalationType.THREAT_DETECTED,
)
assert not result.get("success"), "Skip levels should be blocked"
assert "error" in result, "Should have error message"
def test_d3_missing_approval_blocked(self):
"""GUARDIAN → PHOENIX without approval must be blocked."""
result = escalate(
from_profile="guardian",
to_profile="phoenix",
escalation_type=EscalationType.CRISIS_DECLARED,
)
assert not result.get("success"), "Missing approval should be blocked"
assert "approval" in result.get("error", "").lower()
def test_d3_sovereign_requires_human(self):
"""PHOENIX → SOVEREIGN without human must be blocked."""
result = escalate(
from_profile="phoenix",
to_profile="sovereign",
escalation_type=EscalationType.CRISIS_DECLARED,
)
assert not result.get("success"), "Sovereign without human should be blocked"
assert "human" in result.get("error", "").lower()
def test_d3_observer_to_phoenix_blocked(self):
"""OBSERVER → PHOENIX must be blocked (multiple level skip)."""
result = escalate(
from_profile="observer",
to_profile="phoenix",
escalation_type=EscalationType.CRISIS_DECLARED,
)
assert not result.get("success"), "Observer to Phoenix should be blocked"
class TestGoldenDrillInvariants:
"""Cross-cutting invariants that must hold."""
def test_context_always_available(self):
"""cognitive_context must always be available (read-only)."""
result = cognitive_context(include=["health"])
assert "health" in result, "Health context must be available"
assert result["health"]["status"] == "operational", (
"System should be operational for drills"
)
def test_receipts_accumulate(self):
"""Receipts must accumulate, never decrease."""
from pathlib import Path
import os
receipts_dir = Path(os.environ["VAULTMESH_ROOT"]) / "receipts"
cognitive_log = receipts_dir / "cognitive" / "cognitive_events.jsonl"
if cognitive_log.exists():
initial_count = len(cognitive_log.read_text().strip().split('\n'))
# Do something that emits receipt
cognitive_decide(
reasoning_chain=["CI invariant test"],
decision="test",
confidence=0.1,
)
final_count = len(cognitive_log.read_text().strip().split('\n'))
assert final_count > initial_count, "Receipts must accumulate"

View File

@@ -0,0 +1,250 @@
"""
Test: Tool Permission Matrix
Ensures no permission drift from baseline.
New tools must be explicitly registered with proper receipts.
"""
import pytest
from vaultmesh_mcp.tools.auth import PROFILE_TOOLS, Profile, SCOPE_TOOLS, Scope
from vaultmesh_mcp.server import TOOLS as REGISTERED_TOOLS
class TestToolRegistration:
"""All tools must be properly registered."""
def test_all_server_tools_have_permissions(self):
"""Every tool in server must appear in permission matrix."""
registered_names = {t["name"] for t in REGISTERED_TOOLS}
# Collect all tools from all profiles
all_permitted_tools = set()
for profile_tools in PROFILE_TOOLS.values():
all_permitted_tools.update(profile_tools)
# Check each registered tool has a permission entry somewhere
# Note: Some tools might be implicitly denied (not in any profile)
# That's valid - we just want to ensure awareness
unmatched = []
for tool in registered_names:
# Check if tool is in any profile's allowed set
found = any(
tool in profile_tools
for profile_tools in PROFILE_TOOLS.values()
)
if not found:
unmatched.append(tool)
# Auth tools and some special tools may not be in profile matrix
# but should still be tracked
assert len(unmatched) < 5, (
f"Too many unregistered tools: {unmatched}. "
"Add to PROFILE_TOOLS or document as intentionally denied."
)
def test_no_orphan_permissions(self):
"""Permissions should not reference non-existent tools."""
registered_names = {t["name"] for t in REGISTERED_TOOLS}
# Get all tools mentioned in permissions
all_permitted_tools = set()
for profile_tools in PROFILE_TOOLS.values():
all_permitted_tools.update(profile_tools)
# External tools (from other MCP servers) are allowed
# But internal vaultmesh tools should be registered
vaultmesh_tools = {
t for t in all_permitted_tools
if t.startswith(("cognitive_", "guardian_", "treasury_", "auth_"))
}
orphans = vaultmesh_tools - registered_names
assert len(orphans) == 0, f"Orphan permissions found: {orphans}"
class TestPermissionMatrix:
"""Verify the permission matrix matches constitution."""
def test_observer_read_only(self):
"""OBSERVER can only read, not mutate."""
observer_tools = PROFILE_TOOLS.get(Profile.OBSERVER, set())
mutation_keywords = ["write", "create", "debit", "credit", "invoke", "decide"]
for tool in observer_tools:
for keyword in mutation_keywords:
if keyword in tool:
pytest.fail(
f"OBSERVER has mutation tool: {tool}. "
"OBSERVER must be read-only."
)
def test_profile_inheritance(self):
"""Higher profiles inherit lower profile permissions."""
profile_order = [
Profile.OBSERVER,
Profile.OPERATOR,
Profile.GUARDIAN,
Profile.PHOENIX,
Profile.SOVEREIGN,
]
for i in range(1, len(profile_order)):
lower = profile_order[i - 1]
higher = profile_order[i]
lower_tools = PROFILE_TOOLS.get(lower, set())
higher_tools = PROFILE_TOOLS.get(higher, set())
# Higher should contain all of lower
missing = lower_tools - higher_tools
# Allow some exceptions for explicitly removed tools
assert len(missing) < 3, (
f"{higher.value} missing inherited tools from {lower.value}: {missing}"
)
def test_sovereign_has_all_tools(self):
"""SOVEREIGN must have access to all registered tools."""
sovereign_tools = PROFILE_TOOLS.get(Profile.SOVEREIGN, set())
# SOVEREIGN should have the most tools
for profile in Profile:
if profile != Profile.SOVEREIGN:
other_tools = PROFILE_TOOLS.get(profile, set())
assert len(sovereign_tools) >= len(other_tools), (
f"SOVEREIGN has fewer tools than {profile.value}"
)
class TestMutationReceiptRequirement:
"""Mutation tools must emit receipts."""
def test_cognitive_decide_emits_receipt(self):
"""cognitive_decide must emit receipt."""
from vaultmesh_mcp.tools import cognitive_decide
result = cognitive_decide(
reasoning_chain=["test"],
decision="test",
confidence=0.5,
)
assert "receipt" in result, "cognitive_decide must emit receipt"
assert "root_hash" in result["receipt"], "Receipt must have hash"
def test_cognitive_invoke_tem_emits_receipt(self):
"""cognitive_invoke_tem must emit receipt."""
from vaultmesh_mcp.tools import cognitive_invoke_tem
result = cognitive_invoke_tem(
threat_type="test",
threat_id="test_001",
target="test",
evidence=["test"],
)
assert "receipt" in result, "cognitive_invoke_tem must emit receipt"
def test_treasury_debit_emits_receipt(self):
"""treasury_debit must emit receipt (or error with receipt)."""
from vaultmesh_mcp.tools import treasury_debit
# This may fail due to missing budget, but should still
# handle gracefully
result = treasury_debit(
budget_id="nonexistent",
amount=1,
description="test",
)
# Either success with receipt or error
# The key is it shouldn't crash
assert "error" in result or "receipt" in result
class TestCallBoundaryEnforcement:
"""Server call boundary must enforce session/profile permissions."""
def test_missing_session_token_denied(self):
from vaultmesh_mcp.server import handle_tool_call
result = handle_tool_call("guardian_status", {})
assert "error" in result
assert result.get("allowed") is False
def test_invalid_session_token_denied(self):
from vaultmesh_mcp.server import handle_tool_call
result = handle_tool_call("guardian_status", {"session_token": "invalid"})
assert "error" in result
assert result.get("allowed") is False
def test_observer_session_can_read(self):
from vaultmesh_mcp.server import handle_tool_call
from vaultmesh_mcp.tools.auth import auth_create_dev_session
session = auth_create_dev_session(scope="read")
result = handle_tool_call(
"guardian_status",
{"session_token": session["token"]},
)
assert "error" not in result
def test_observer_session_cannot_mutate(self):
from vaultmesh_mcp.server import handle_tool_call
from vaultmesh_mcp.tools.auth import auth_create_dev_session
session = auth_create_dev_session(scope="read")
result = handle_tool_call(
"treasury_debit",
{
"session_token": session["token"],
"budget_id": "nonexistent",
"amount": 1,
"description": "test",
},
)
assert "error" in result
assert result.get("allowed") is False
def test_wrong_profile_denied(self):
from vaultmesh_mcp.server import handle_tool_call
from vaultmesh_mcp.tools.auth import auth_create_dev_session
# admin scope maps to operator profile; should not invoke TEM
session = auth_create_dev_session(scope="admin")
result = handle_tool_call(
"cognitive_invoke_tem",
{
"session_token": session["token"],
"threat_type": "test",
"threat_id": "t1",
"target": "x",
"evidence": ["e1"],
},
)
assert result.get("allowed") is False
assert "Permission" in result.get("error", "") or "denied" in result.get("reason", "")
def test_valid_guardian_session_allowed(self):
from vaultmesh_mcp.server import handle_tool_call, MCP_RECEIPTS
from vaultmesh_mcp.tools.auth import auth_create_dev_session
import os
# Ensure clean receipt log
try:
os.remove(MCP_RECEIPTS)
except OSError:
pass
session = auth_create_dev_session(scope="anchor") # maps to guardian profile
result = handle_tool_call("guardian_status", {"session_token": session["token"]})
assert "error" not in result
# Receipt should be written without session_token arguments
with open(MCP_RECEIPTS, "r") as f:
last = f.readlines()[-1]
import json
rec = json.loads(last)
assert "session_token" not in rec["body"].get("arguments", {})