""" Test: Tool Permission Matrix Ensures no permission drift from baseline. New tools must be explicitly registered with proper receipts. """ import pytest from vaultmesh_mcp.tools.auth import PROFILE_TOOLS, Profile, SCOPE_TOOLS, Scope from vaultmesh_mcp.server import TOOLS as REGISTERED_TOOLS class TestToolRegistration: """All tools must be properly registered.""" def test_all_server_tools_have_permissions(self): """Every tool in server must appear in permission matrix.""" registered_names = {t["name"] for t in REGISTERED_TOOLS} # Collect all tools from all profiles all_permitted_tools = set() for profile_tools in PROFILE_TOOLS.values(): all_permitted_tools.update(profile_tools) # Check each registered tool has a permission entry somewhere # Note: Some tools might be implicitly denied (not in any profile) # That's valid - we just want to ensure awareness unmatched = [] for tool in registered_names: # Check if tool is in any profile's allowed set found = any( tool in profile_tools for profile_tools in PROFILE_TOOLS.values() ) if not found: unmatched.append(tool) # Auth tools and some special tools may not be in profile matrix # but should still be tracked assert len(unmatched) < 5, ( f"Too many unregistered tools: {unmatched}. " "Add to PROFILE_TOOLS or document as intentionally denied." ) def test_no_orphan_permissions(self): """Permissions should not reference non-existent tools.""" registered_names = {t["name"] for t in REGISTERED_TOOLS} # Get all tools mentioned in permissions all_permitted_tools = set() for profile_tools in PROFILE_TOOLS.values(): all_permitted_tools.update(profile_tools) # External tools (from other MCP servers) are allowed # But internal vaultmesh tools should be registered vaultmesh_tools = { t for t in all_permitted_tools if t.startswith(("cognitive_", "guardian_", "treasury_", "auth_")) } orphans = vaultmesh_tools - registered_names assert len(orphans) == 0, f"Orphan permissions found: {orphans}" class TestPermissionMatrix: """Verify the permission matrix matches constitution.""" def test_observer_read_only(self): """OBSERVER can only read, not mutate.""" observer_tools = PROFILE_TOOLS.get(Profile.OBSERVER, set()) mutation_keywords = ["write", "create", "debit", "credit", "invoke", "decide"] for tool in observer_tools: for keyword in mutation_keywords: if keyword in tool: pytest.fail( f"OBSERVER has mutation tool: {tool}. " "OBSERVER must be read-only." ) def test_profile_inheritance(self): """Higher profiles inherit lower profile permissions.""" profile_order = [ Profile.OBSERVER, Profile.OPERATOR, Profile.GUARDIAN, Profile.PHOENIX, Profile.SOVEREIGN, ] for i in range(1, len(profile_order)): lower = profile_order[i - 1] higher = profile_order[i] lower_tools = PROFILE_TOOLS.get(lower, set()) higher_tools = PROFILE_TOOLS.get(higher, set()) # Higher should contain all of lower missing = lower_tools - higher_tools # Allow some exceptions for explicitly removed tools assert len(missing) < 3, ( f"{higher.value} missing inherited tools from {lower.value}: {missing}" ) def test_sovereign_has_all_tools(self): """SOVEREIGN must have access to all registered tools.""" sovereign_tools = PROFILE_TOOLS.get(Profile.SOVEREIGN, set()) # SOVEREIGN should have the most tools for profile in Profile: if profile != Profile.SOVEREIGN: other_tools = PROFILE_TOOLS.get(profile, set()) assert len(sovereign_tools) >= len(other_tools), ( f"SOVEREIGN has fewer tools than {profile.value}" ) class TestMutationReceiptRequirement: """Mutation tools must emit receipts.""" def test_cognitive_decide_emits_receipt(self): """cognitive_decide must emit receipt.""" from vaultmesh_mcp.tools import cognitive_decide result = cognitive_decide( reasoning_chain=["test"], decision="test", confidence=0.5, ) assert "receipt" in result, "cognitive_decide must emit receipt" assert "root_hash" in result["receipt"], "Receipt must have hash" def test_cognitive_invoke_tem_emits_receipt(self): """cognitive_invoke_tem must emit receipt.""" from vaultmesh_mcp.tools import cognitive_invoke_tem result = cognitive_invoke_tem( threat_type="test", threat_id="test_001", target="test", evidence=["test"], ) assert "receipt" in result, "cognitive_invoke_tem must emit receipt" def test_treasury_debit_emits_receipt(self): """treasury_debit must emit receipt (or error with receipt).""" from vaultmesh_mcp.tools import treasury_debit # This may fail due to missing budget, but should still # handle gracefully result = treasury_debit( budget_id="nonexistent", amount=1, description="test", ) # Either success with receipt or error # The key is it shouldn't crash assert "error" in result or "receipt" in result class TestCallBoundaryEnforcement: """Server call boundary must enforce session/profile permissions.""" def test_missing_session_token_denied(self): from vaultmesh_mcp.server import handle_tool_call result = handle_tool_call("guardian_status", {}) assert "error" in result assert result.get("allowed") is False def test_invalid_session_token_denied(self): from vaultmesh_mcp.server import handle_tool_call result = handle_tool_call("guardian_status", {"session_token": "invalid"}) assert "error" in result assert result.get("allowed") is False def test_observer_session_can_read(self): from vaultmesh_mcp.server import handle_tool_call from vaultmesh_mcp.tools.auth import auth_create_dev_session session = auth_create_dev_session(scope="read") result = handle_tool_call( "guardian_status", {"session_token": session["token"]}, ) assert "error" not in result def test_observer_session_cannot_mutate(self): from vaultmesh_mcp.server import handle_tool_call from vaultmesh_mcp.tools.auth import auth_create_dev_session session = auth_create_dev_session(scope="read") result = handle_tool_call( "treasury_debit", { "session_token": session["token"], "budget_id": "nonexistent", "amount": 1, "description": "test", }, ) assert "error" in result assert result.get("allowed") is False def test_wrong_profile_denied(self): from vaultmesh_mcp.server import handle_tool_call from vaultmesh_mcp.tools.auth import auth_create_dev_session # admin scope maps to operator profile; should not invoke TEM session = auth_create_dev_session(scope="admin") result = handle_tool_call( "cognitive_invoke_tem", { "session_token": session["token"], "threat_type": "test", "threat_id": "t1", "target": "x", "evidence": ["e1"], }, ) assert result.get("allowed") is False assert "Permission" in result.get("error", "") or "denied" in result.get("reason", "") def test_valid_guardian_session_allowed(self): from vaultmesh_mcp.server import handle_tool_call, MCP_RECEIPTS from vaultmesh_mcp.tools.auth import auth_create_dev_session import os # Ensure clean receipt log try: os.remove(MCP_RECEIPTS) except OSError: pass session = auth_create_dev_session(scope="anchor") # maps to guardian profile result = handle_tool_call("guardian_status", {"session_token": session["token"]}) assert "error" not in result # Receipt should be written without session_token arguments with open(MCP_RECEIPTS, "r") as f: last = f.readlines()[-1] import json rec = json.loads(last) assert "session_token" not in rec["body"].get("arguments", {})