251 lines
9.0 KiB
Python
251 lines
9.0 KiB
Python
"""
|
|
Test: Tool Permission Matrix
|
|
|
|
Ensures no permission drift from baseline.
|
|
New tools must be explicitly registered with proper receipts.
|
|
"""
|
|
|
|
import pytest
|
|
from vaultmesh_mcp.tools.auth import PROFILE_TOOLS, Profile, SCOPE_TOOLS, Scope
|
|
from vaultmesh_mcp.server import TOOLS as REGISTERED_TOOLS
|
|
|
|
|
|
class TestToolRegistration:
|
|
"""All tools must be properly registered."""
|
|
|
|
def test_all_server_tools_have_permissions(self):
|
|
"""Every tool in server must appear in permission matrix."""
|
|
registered_names = {t["name"] for t in REGISTERED_TOOLS}
|
|
|
|
# Collect all tools from all profiles
|
|
all_permitted_tools = set()
|
|
for profile_tools in PROFILE_TOOLS.values():
|
|
all_permitted_tools.update(profile_tools)
|
|
|
|
# Check each registered tool has a permission entry somewhere
|
|
# Note: Some tools might be implicitly denied (not in any profile)
|
|
# That's valid - we just want to ensure awareness
|
|
|
|
unmatched = []
|
|
for tool in registered_names:
|
|
# Check if tool is in any profile's allowed set
|
|
found = any(
|
|
tool in profile_tools
|
|
for profile_tools in PROFILE_TOOLS.values()
|
|
)
|
|
if not found:
|
|
unmatched.append(tool)
|
|
|
|
# Auth tools and some special tools may not be in profile matrix
|
|
# but should still be tracked
|
|
assert len(unmatched) < 5, (
|
|
f"Too many unregistered tools: {unmatched}. "
|
|
"Add to PROFILE_TOOLS or document as intentionally denied."
|
|
)
|
|
|
|
def test_no_orphan_permissions(self):
|
|
"""Permissions should not reference non-existent tools."""
|
|
registered_names = {t["name"] for t in REGISTERED_TOOLS}
|
|
|
|
# Get all tools mentioned in permissions
|
|
all_permitted_tools = set()
|
|
for profile_tools in PROFILE_TOOLS.values():
|
|
all_permitted_tools.update(profile_tools)
|
|
|
|
# External tools (from other MCP servers) are allowed
|
|
# But internal vaultmesh tools should be registered
|
|
vaultmesh_tools = {
|
|
t for t in all_permitted_tools
|
|
if t.startswith(("cognitive_", "guardian_", "treasury_", "auth_"))
|
|
}
|
|
|
|
orphans = vaultmesh_tools - registered_names
|
|
assert len(orphans) == 0, f"Orphan permissions found: {orphans}"
|
|
|
|
|
|
class TestPermissionMatrix:
|
|
"""Verify the permission matrix matches constitution."""
|
|
|
|
def test_observer_read_only(self):
|
|
"""OBSERVER can only read, not mutate."""
|
|
observer_tools = PROFILE_TOOLS.get(Profile.OBSERVER, set())
|
|
|
|
mutation_keywords = ["write", "create", "debit", "credit", "invoke", "decide"]
|
|
|
|
for tool in observer_tools:
|
|
for keyword in mutation_keywords:
|
|
if keyword in tool:
|
|
pytest.fail(
|
|
f"OBSERVER has mutation tool: {tool}. "
|
|
"OBSERVER must be read-only."
|
|
)
|
|
|
|
def test_profile_inheritance(self):
|
|
"""Higher profiles inherit lower profile permissions."""
|
|
profile_order = [
|
|
Profile.OBSERVER,
|
|
Profile.OPERATOR,
|
|
Profile.GUARDIAN,
|
|
Profile.PHOENIX,
|
|
Profile.SOVEREIGN,
|
|
]
|
|
|
|
for i in range(1, len(profile_order)):
|
|
lower = profile_order[i - 1]
|
|
higher = profile_order[i]
|
|
|
|
lower_tools = PROFILE_TOOLS.get(lower, set())
|
|
higher_tools = PROFILE_TOOLS.get(higher, set())
|
|
|
|
# Higher should contain all of lower
|
|
missing = lower_tools - higher_tools
|
|
|
|
# Allow some exceptions for explicitly removed tools
|
|
assert len(missing) < 3, (
|
|
f"{higher.value} missing inherited tools from {lower.value}: {missing}"
|
|
)
|
|
|
|
def test_sovereign_has_all_tools(self):
|
|
"""SOVEREIGN must have access to all registered tools."""
|
|
sovereign_tools = PROFILE_TOOLS.get(Profile.SOVEREIGN, set())
|
|
|
|
# SOVEREIGN should have the most tools
|
|
for profile in Profile:
|
|
if profile != Profile.SOVEREIGN:
|
|
other_tools = PROFILE_TOOLS.get(profile, set())
|
|
assert len(sovereign_tools) >= len(other_tools), (
|
|
f"SOVEREIGN has fewer tools than {profile.value}"
|
|
)
|
|
|
|
|
|
class TestMutationReceiptRequirement:
|
|
"""Mutation tools must emit receipts."""
|
|
|
|
def test_cognitive_decide_emits_receipt(self):
|
|
"""cognitive_decide must emit receipt."""
|
|
from vaultmesh_mcp.tools import cognitive_decide
|
|
|
|
result = cognitive_decide(
|
|
reasoning_chain=["test"],
|
|
decision="test",
|
|
confidence=0.5,
|
|
)
|
|
|
|
assert "receipt" in result, "cognitive_decide must emit receipt"
|
|
assert "root_hash" in result["receipt"], "Receipt must have hash"
|
|
|
|
def test_cognitive_invoke_tem_emits_receipt(self):
|
|
"""cognitive_invoke_tem must emit receipt."""
|
|
from vaultmesh_mcp.tools import cognitive_invoke_tem
|
|
|
|
result = cognitive_invoke_tem(
|
|
threat_type="test",
|
|
threat_id="test_001",
|
|
target="test",
|
|
evidence=["test"],
|
|
)
|
|
|
|
assert "receipt" in result, "cognitive_invoke_tem must emit receipt"
|
|
|
|
def test_treasury_debit_emits_receipt(self):
|
|
"""treasury_debit must emit receipt (or error with receipt)."""
|
|
from vaultmesh_mcp.tools import treasury_debit
|
|
|
|
# This may fail due to missing budget, but should still
|
|
# handle gracefully
|
|
result = treasury_debit(
|
|
budget_id="nonexistent",
|
|
amount=1,
|
|
description="test",
|
|
)
|
|
|
|
# Either success with receipt or error
|
|
# The key is it shouldn't crash
|
|
assert "error" in result or "receipt" in result
|
|
|
|
|
|
class TestCallBoundaryEnforcement:
|
|
"""Server call boundary must enforce session/profile permissions."""
|
|
|
|
def test_missing_session_token_denied(self):
|
|
from vaultmesh_mcp.server import handle_tool_call
|
|
|
|
result = handle_tool_call("guardian_status", {})
|
|
assert "error" in result
|
|
assert result.get("allowed") is False
|
|
|
|
def test_invalid_session_token_denied(self):
|
|
from vaultmesh_mcp.server import handle_tool_call
|
|
|
|
result = handle_tool_call("guardian_status", {"session_token": "invalid"})
|
|
assert "error" in result
|
|
assert result.get("allowed") is False
|
|
|
|
def test_observer_session_can_read(self):
|
|
from vaultmesh_mcp.server import handle_tool_call
|
|
from vaultmesh_mcp.tools.auth import auth_create_dev_session
|
|
|
|
session = auth_create_dev_session(scope="read")
|
|
result = handle_tool_call(
|
|
"guardian_status",
|
|
{"session_token": session["token"]},
|
|
)
|
|
assert "error" not in result
|
|
|
|
def test_observer_session_cannot_mutate(self):
|
|
from vaultmesh_mcp.server import handle_tool_call
|
|
from vaultmesh_mcp.tools.auth import auth_create_dev_session
|
|
|
|
session = auth_create_dev_session(scope="read")
|
|
result = handle_tool_call(
|
|
"treasury_debit",
|
|
{
|
|
"session_token": session["token"],
|
|
"budget_id": "nonexistent",
|
|
"amount": 1,
|
|
"description": "test",
|
|
},
|
|
)
|
|
assert "error" in result
|
|
assert result.get("allowed") is False
|
|
|
|
def test_wrong_profile_denied(self):
|
|
from vaultmesh_mcp.server import handle_tool_call
|
|
from vaultmesh_mcp.tools.auth import auth_create_dev_session
|
|
|
|
# admin scope maps to operator profile; should not invoke TEM
|
|
session = auth_create_dev_session(scope="admin")
|
|
result = handle_tool_call(
|
|
"cognitive_invoke_tem",
|
|
{
|
|
"session_token": session["token"],
|
|
"threat_type": "test",
|
|
"threat_id": "t1",
|
|
"target": "x",
|
|
"evidence": ["e1"],
|
|
},
|
|
)
|
|
assert result.get("allowed") is False
|
|
assert "Permission" in result.get("error", "") or "denied" in result.get("reason", "")
|
|
|
|
def test_valid_guardian_session_allowed(self):
|
|
from vaultmesh_mcp.server import handle_tool_call, MCP_RECEIPTS
|
|
from vaultmesh_mcp.tools.auth import auth_create_dev_session
|
|
import os
|
|
# Ensure clean receipt log
|
|
try:
|
|
os.remove(MCP_RECEIPTS)
|
|
except OSError:
|
|
pass
|
|
|
|
session = auth_create_dev_session(scope="anchor") # maps to guardian profile
|
|
result = handle_tool_call("guardian_status", {"session_token": session["token"]})
|
|
assert "error" not in result
|
|
|
|
# Receipt should be written without session_token arguments
|
|
with open(MCP_RECEIPTS, "r") as f:
|
|
last = f.readlines()[-1]
|
|
import json
|
|
rec = json.loads(last)
|
|
assert "session_token" not in rec["body"].get("arguments", {})
|