Files
vm-mcp/tests/governance/test_tool_permissions.py
Vault Sovereign e4871c2a29
Some checks are pending
Governance CI / Constitution Hash Gate (push) Waiting to run
Governance CI / Governance Tests (push) Blocked by required conditions
Governance CI / Golden Drill Mini (push) Blocked by required conditions
init: vaultmesh mcp server
2025-12-26 23:23:08 +00:00

251 lines
9.0 KiB
Python

"""
Test: Tool Permission Matrix
Ensures no permission drift from baseline.
New tools must be explicitly registered with proper receipts.
"""
import pytest
from vaultmesh_mcp.tools.auth import PROFILE_TOOLS, Profile, SCOPE_TOOLS, Scope
from vaultmesh_mcp.server import TOOLS as REGISTERED_TOOLS
class TestToolRegistration:
"""All tools must be properly registered."""
def test_all_server_tools_have_permissions(self):
"""Every tool in server must appear in permission matrix."""
registered_names = {t["name"] for t in REGISTERED_TOOLS}
# Collect all tools from all profiles
all_permitted_tools = set()
for profile_tools in PROFILE_TOOLS.values():
all_permitted_tools.update(profile_tools)
# Check each registered tool has a permission entry somewhere
# Note: Some tools might be implicitly denied (not in any profile)
# That's valid - we just want to ensure awareness
unmatched = []
for tool in registered_names:
# Check if tool is in any profile's allowed set
found = any(
tool in profile_tools
for profile_tools in PROFILE_TOOLS.values()
)
if not found:
unmatched.append(tool)
# Auth tools and some special tools may not be in profile matrix
# but should still be tracked
assert len(unmatched) < 5, (
f"Too many unregistered tools: {unmatched}. "
"Add to PROFILE_TOOLS or document as intentionally denied."
)
def test_no_orphan_permissions(self):
"""Permissions should not reference non-existent tools."""
registered_names = {t["name"] for t in REGISTERED_TOOLS}
# Get all tools mentioned in permissions
all_permitted_tools = set()
for profile_tools in PROFILE_TOOLS.values():
all_permitted_tools.update(profile_tools)
# External tools (from other MCP servers) are allowed
# But internal vaultmesh tools should be registered
vaultmesh_tools = {
t for t in all_permitted_tools
if t.startswith(("cognitive_", "guardian_", "treasury_", "auth_"))
}
orphans = vaultmesh_tools - registered_names
assert len(orphans) == 0, f"Orphan permissions found: {orphans}"
class TestPermissionMatrix:
"""Verify the permission matrix matches constitution."""
def test_observer_read_only(self):
"""OBSERVER can only read, not mutate."""
observer_tools = PROFILE_TOOLS.get(Profile.OBSERVER, set())
mutation_keywords = ["write", "create", "debit", "credit", "invoke", "decide"]
for tool in observer_tools:
for keyword in mutation_keywords:
if keyword in tool:
pytest.fail(
f"OBSERVER has mutation tool: {tool}. "
"OBSERVER must be read-only."
)
def test_profile_inheritance(self):
"""Higher profiles inherit lower profile permissions."""
profile_order = [
Profile.OBSERVER,
Profile.OPERATOR,
Profile.GUARDIAN,
Profile.PHOENIX,
Profile.SOVEREIGN,
]
for i in range(1, len(profile_order)):
lower = profile_order[i - 1]
higher = profile_order[i]
lower_tools = PROFILE_TOOLS.get(lower, set())
higher_tools = PROFILE_TOOLS.get(higher, set())
# Higher should contain all of lower
missing = lower_tools - higher_tools
# Allow some exceptions for explicitly removed tools
assert len(missing) < 3, (
f"{higher.value} missing inherited tools from {lower.value}: {missing}"
)
def test_sovereign_has_all_tools(self):
"""SOVEREIGN must have access to all registered tools."""
sovereign_tools = PROFILE_TOOLS.get(Profile.SOVEREIGN, set())
# SOVEREIGN should have the most tools
for profile in Profile:
if profile != Profile.SOVEREIGN:
other_tools = PROFILE_TOOLS.get(profile, set())
assert len(sovereign_tools) >= len(other_tools), (
f"SOVEREIGN has fewer tools than {profile.value}"
)
class TestMutationReceiptRequirement:
"""Mutation tools must emit receipts."""
def test_cognitive_decide_emits_receipt(self):
"""cognitive_decide must emit receipt."""
from vaultmesh_mcp.tools import cognitive_decide
result = cognitive_decide(
reasoning_chain=["test"],
decision="test",
confidence=0.5,
)
assert "receipt" in result, "cognitive_decide must emit receipt"
assert "root_hash" in result["receipt"], "Receipt must have hash"
def test_cognitive_invoke_tem_emits_receipt(self):
"""cognitive_invoke_tem must emit receipt."""
from vaultmesh_mcp.tools import cognitive_invoke_tem
result = cognitive_invoke_tem(
threat_type="test",
threat_id="test_001",
target="test",
evidence=["test"],
)
assert "receipt" in result, "cognitive_invoke_tem must emit receipt"
def test_treasury_debit_emits_receipt(self):
"""treasury_debit must emit receipt (or error with receipt)."""
from vaultmesh_mcp.tools import treasury_debit
# This may fail due to missing budget, but should still
# handle gracefully
result = treasury_debit(
budget_id="nonexistent",
amount=1,
description="test",
)
# Either success with receipt or error
# The key is it shouldn't crash
assert "error" in result or "receipt" in result
class TestCallBoundaryEnforcement:
"""Server call boundary must enforce session/profile permissions."""
def test_missing_session_token_denied(self):
from vaultmesh_mcp.server import handle_tool_call
result = handle_tool_call("guardian_status", {})
assert "error" in result
assert result.get("allowed") is False
def test_invalid_session_token_denied(self):
from vaultmesh_mcp.server import handle_tool_call
result = handle_tool_call("guardian_status", {"session_token": "invalid"})
assert "error" in result
assert result.get("allowed") is False
def test_observer_session_can_read(self):
from vaultmesh_mcp.server import handle_tool_call
from vaultmesh_mcp.tools.auth import auth_create_dev_session
session = auth_create_dev_session(scope="read")
result = handle_tool_call(
"guardian_status",
{"session_token": session["token"]},
)
assert "error" not in result
def test_observer_session_cannot_mutate(self):
from vaultmesh_mcp.server import handle_tool_call
from vaultmesh_mcp.tools.auth import auth_create_dev_session
session = auth_create_dev_session(scope="read")
result = handle_tool_call(
"treasury_debit",
{
"session_token": session["token"],
"budget_id": "nonexistent",
"amount": 1,
"description": "test",
},
)
assert "error" in result
assert result.get("allowed") is False
def test_wrong_profile_denied(self):
from vaultmesh_mcp.server import handle_tool_call
from vaultmesh_mcp.tools.auth import auth_create_dev_session
# admin scope maps to operator profile; should not invoke TEM
session = auth_create_dev_session(scope="admin")
result = handle_tool_call(
"cognitive_invoke_tem",
{
"session_token": session["token"],
"threat_type": "test",
"threat_id": "t1",
"target": "x",
"evidence": ["e1"],
},
)
assert result.get("allowed") is False
assert "Permission" in result.get("error", "") or "denied" in result.get("reason", "")
def test_valid_guardian_session_allowed(self):
from vaultmesh_mcp.server import handle_tool_call, MCP_RECEIPTS
from vaultmesh_mcp.tools.auth import auth_create_dev_session
import os
# Ensure clean receipt log
try:
os.remove(MCP_RECEIPTS)
except OSError:
pass
session = auth_create_dev_session(scope="anchor") # maps to guardian profile
result = handle_tool_call("guardian_status", {"session_token": session["token"]})
assert "error" not in result
# Receipt should be written without session_token arguments
with open(MCP_RECEIPTS, "r") as f:
last = f.readlines()[-1]
import json
rec = json.loads(last)
assert "session_token" not in rec["body"].get("arguments", {})