13 KiB
13 KiB
VaultMesh MCP Integration Patterns
Architecture
┌─────────────────────────────────────────────────────────────┐
│ CLAUDE │
└───────────────────────────┬─────────────────────────────────┘
│ MCP Protocol
▼
┌─────────────────────────────────────────────────────────────┐
│ MCP GATEWAY │
│ • Authentication (capability verification) │
│ • Rate limiting │
│ • Audit logging (all tool calls receipted) │
│ • Constitutional compliance checking │
└───────────────────────────┬─────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Oracle │ │ Drills │ │ Mesh │
│ Server │ │ Server │ │ Server │
└───────────┘ └───────────┘ └───────────┘
Tool Categories
Read-Only Tools (Default Access)
| Tool | Capability | Description |
|---|---|---|
oracle_answer |
oracle_query |
Ask compliance questions |
oracle_corpus_search |
oracle_query |
Search compliance corpus |
drills_status |
drills_view |
View drill status |
mesh_topology |
mesh_view |
View mesh topology |
mesh_node_status |
mesh_view |
View node status |
treasury_balance |
treasury_view |
View balances |
guardian_anchor_status |
guardian_view |
View anchor status |
guardian_verify_receipt |
guardian_view |
Verify receipts |
identity_resolve_did |
identity_view |
Resolve DIDs |
identity_whoami |
(any) | View own identity |
psi_phase_status |
psi_view |
View phase status |
psi_opus_status |
psi_view |
View opus status |
governance_constitution_summary |
governance_view |
View constitution |
receipts_search |
receipts_view |
Search receipts |
system_health |
system_view |
View system health |
Write Tools (Elevated Access)
| Tool | Capability | Description |
|---|---|---|
drills_create |
drills_create |
Create new drill |
drills_complete_stage |
drills_execute |
Complete drill stage |
treasury_record_entry |
treasury_write |
Record financial entry |
guardian_anchor_now |
anchor |
Trigger anchor cycle |
psi_transmute |
psi_transmute |
Start transmutation |
Tool Implementation Patterns
Basic Read Tool
@server.tool()
async def my_read_tool(
filter_param: str = None,
limit: int = 50,
) -> str:
"""
Description of what this tool does.
Args:
filter_param: Optional filter
limit: Maximum results
Returns:
Query results as JSON
"""
# Verify capability
caller = await get_caller_identity()
await verify_capability(caller, "my_view")
# Perform query
results = await engine.query(filter_param, limit)
return json.dumps([r.to_dict() for r in results], indent=2)
Write Tool with Receipt
@server.tool()
async def my_write_tool(
param1: str,
param2: int,
) -> str:
"""
Description of write operation.
Args:
param1: First parameter
param2: Second parameter
Returns:
Operation result as JSON
"""
# Verify elevated capability
caller = await get_caller_identity()
await verify_capability(caller, "my_write")
# Perform operation
result = await engine.perform_operation(param1, param2)
# Emit receipt for audit
await emit_tool_call_receipt(
tool="my_write_tool",
caller=caller,
params={"param1": param1, "param2": param2},
result_hash=result.hash,
)
return json.dumps(result.to_dict(), indent=2)
Tool with Constitutional Check
@server.tool()
async def sensitive_operation(
target: str,
action: str,
) -> str:
"""
Operation requiring constitutional compliance check.
"""
caller = await get_caller_identity()
await verify_capability(caller, "admin")
# Check constitutional compliance BEFORE executing
compliance = await governance_engine.check_compliance(
action=action,
actor=caller,
target=target,
)
if not compliance.allowed:
return json.dumps({
"error": "constitutional_violation",
"violated_articles": compliance.violated_articles,
"message": compliance.message,
}, indent=2)
# Execute if compliant
result = await engine.execute(target, action)
await emit_tool_call_receipt(
tool="sensitive_operation",
caller=caller,
params={"target": target, "action": action},
result_hash=result.hash,
)
return json.dumps(result.to_dict(), indent=2)
Tool Call Receipt
Every MCP tool call is receipted:
{
"type": "mcp_tool_call",
"call_id": "mcp-call-2025-12-06-001",
"timestamp": "2025-12-06T14:30:00Z",
"caller": "did:vm:agent:claude-session-abc123",
"tool": "oracle_answer",
"params_hash": "blake3:params...",
"result_hash": "blake3:result...",
"duration_ms": 1250,
"capability_used": "oracle_query",
"session_id": "session-xyz789",
"tags": ["mcp", "oracle", "tool-call"],
"root_hash": "blake3:aaa111..."
}
Authentication
Session Identity
async def get_caller_identity() -> str:
"""Get the DID of the current MCP caller."""
session = get_current_session()
if session.authenticated_did:
return session.authenticated_did
# Anonymous callers get session-scoped agent DID
return f"did:vm:agent:mcp-session-{session.id}"
Capability Verification
async def verify_capability(caller: str, capability: str) -> bool:
"""Verify the caller has the required capability."""
has_cap = await identity_engine.check_capability(caller, capability)
if not has_cap:
raise PermissionError(
f"Caller {caller} lacks capability: {capability}"
)
# Log capability exercise
await identity_engine.log_capability_exercise(
caller=caller,
capability=capability,
action="mcp_tool_call",
)
return True
Rate Limiting
class RateLimiter:
def __init__(self):
self.limits = {
"oracle_answer": (10, timedelta(minutes=1)),
"guardian_anchor_now": (5, timedelta(hours=1)),
"treasury_record_entry": (100, timedelta(hours=1)),
"default": (60, timedelta(minutes=1)),
}
async def check(self, caller: str, tool: str) -> bool:
key = f"{caller}:{tool}"
limit, window = self.limits.get(tool, self.limits["default"])
# Check against limit
current_count = await self.get_count(key, window)
if current_count >= limit:
raise RateLimitExceeded(
f"Rate limit exceeded: {limit} per {window}"
)
await self.increment(key)
return True
Claude Desktop Configuration
config.json
{
"mcpServers": {
"vaultmesh": {
"command": "python",
"args": ["-m", "vaultmesh_mcp.server"],
"env": {
"VAULTMESH_CONFIG": "/path/to/config.toml",
"VAULTMESH_IDENTITY": "did:vm:agent:claude-desktop"
}
}
}
}
Capability Configuration
# config.toml
[mcp.capabilities]
default_capabilities = [
"oracle_query",
"drills_view",
"mesh_view",
"treasury_view",
"guardian_view",
"identity_view",
"psi_view",
"governance_view",
"receipts_view",
"system_view",
]
elevated_capabilities = [
"drills_create",
"drills_execute",
"treasury_write",
"anchor",
"psi_transmute",
]
[mcp.rate_limits]
oracle_answer = { limit = 10, window_minutes = 1 }
guardian_anchor_now = { limit = 5, window_hours = 1 }
default = { limit = 60, window_minutes = 1 }
[mcp.audit]
receipt_all_calls = true
log_params = true
log_results = false
Example Interactions
Compliance Query
Human: What's our compliance status for AI Act Annex IV?
Claude: [Calls oracle_answer]
Based on the Oracle's assessment:
**Compliance Status: Partial (73%)**
Key findings:
- Technical documentation: ✅ Compliant
- Risk management: ⚠️ Needs update
- Data governance: ✅ Compliant
- Human oversight: ⚠️ Documentation incomplete
Security Drill
Human: Start a drill for Kubernetes lateral movement
Claude: [Calls drills_create]
Created drill: drill-1764691432
**Stages:**
1. Initial Access Simulation
2. Lateral Movement Attempt
3. Detection Validation
4. Response Execution
Ready to begin?
System Health
Human: How's the system?
Claude: [Calls system_health]
**VaultMesh Health: 🟢 Healthy**
- Nodes: 5 active
- Last anchor: 47 min ago
- Phase: RUBEDO 🜂
- Receipts today: 34
Server Entry Point
# vaultmesh_mcp/server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
server = Server("vaultmesh")
# Register all tools
from .tools import (
oracle_tools,
drills_tools,
mesh_tools,
treasury_tools,
guardian_tools,
identity_tools,
psi_tools,
governance_tools,
)
def main():
import asyncio
async def run():
async with stdio_server() as (read, write):
await server.run(read, write, server.create_initialization_options())
asyncio.run(run())
if __name__ == "__main__":
main()
Custom VaultMesh Nodes for n8n
When integrating with n8n workflows:
// VaultMesh Receipt Emit Node
{
name: 'vaultmesh-receipt-emit',
displayName: 'VaultMesh Receipt',
description: 'Emit a receipt to VaultMesh',
properties: [
{
displayName: 'Scroll',
name: 'scroll',
type: 'options',
options: [
{ name: 'Automation', value: 'automation' },
{ name: 'Compliance', value: 'compliance' },
// ...
],
},
{
displayName: 'Receipt Type',
name: 'receiptType',
type: 'string',
},
{
displayName: 'Body',
name: 'body',
type: 'json',
},
{
displayName: 'Tags',
name: 'tags',
type: 'string',
description: 'Comma-separated tags',
},
],
async execute() {
const scroll = this.getNodeParameter('scroll', 0);
const receiptType = this.getNodeParameter('receiptType', 0);
const body = this.getNodeParameter('body', 0);
const tags = this.getNodeParameter('tags', 0).split(',');
const receipt = await vaultmesh.emitReceipt({
scroll,
receiptType,
body,
tags,
});
return [{ json: receipt }];
},
}
Error Handling
@server.tool()
async def robust_tool(param: str) -> str:
"""Tool with comprehensive error handling."""
try:
caller = await get_caller_identity()
await verify_capability(caller, "required_cap")
result = await engine.operation(param)
return json.dumps(result.to_dict(), indent=2)
except PermissionError as e:
return json.dumps({
"error": "permission_denied",
"message": str(e),
"required_capability": "required_cap",
}, indent=2)
except RateLimitExceeded as e:
return json.dumps({
"error": "rate_limit_exceeded",
"message": str(e),
"retry_after_seconds": e.retry_after,
}, indent=2)
except ConstitutionalViolation as e:
return json.dumps({
"error": "constitutional_violation",
"violated_axioms": e.axioms,
"message": str(e),
}, indent=2)
except Exception as e:
logger.error(f"Tool error: {e}")
return json.dumps({
"error": "internal_error",
"message": "An unexpected error occurred",
}, indent=2)