Files
vm-core/docs/VAULTMESH-MCP-SERVERS.md
2025-12-27 00:10:32 +00:00

29 KiB

VAULTMESH-MCP-SERVERS.md

Exposing the Civilization Ledger to Claude

Every tool is a doorway. Every doorway has a guardian.

MCP (Model Context Protocol) servers expose VaultMesh engines to Claude, enabling AI-assisted operations while maintaining constitutional governance.


1. MCP Architecture

┌─────────────────────────────────────────────────────────────┐
│                      CLAUDE                                 │
│                                                             │
│  "What's our compliance status for Annex IV?"               │
│  "Start a drill for IoT security"                           │
│  "Show me the mesh topology"                                │
└───────────────────────────┬─────────────────────────────────┘
                            │
                            │ MCP Protocol
                            ▼
┌─────────────────────────────────────────────────────────────┐
│                   MCP GATEWAY                               │
│                                                             │
│  • Authentication (capability verification)                 │
│  • Rate limiting                                            │
│  • Audit logging (all tool calls receipted)                 │
│  • Constitutional compliance checking                       │
└───────────────────────────┬─────────────────────────────────┘
                            │
            ┌───────────────┼───────────────┐
            │               │               │
            ▼               ▼               ▼
    ┌───────────┐   ┌───────────┐   ┌───────────┐
    │  Oracle   │   │  Drills   │   │   Mesh    │
    │  Server   │   │  Server   │   │  Server   │
    └───────────┘   └───────────┘   └───────────┘
            │               │               │
            ▼               ▼               ▼
    ┌─────────────────────────────────────────┐
    │           VAULTMESH ENGINES             │
    └─────────────────────────────────────────┘

2. Core MCP Server

2.1 Server Definition

# vaultmesh_mcp/server.py

from mcp.server import Server
from mcp.types import Tool, TextContent
from typing import Any
import json

from .engines import (
    oracle_engine,
    drills_engine,
    mesh_engine,
    treasury_engine,
    identity_engine,
    guardian_engine,
    psi_engine,
    governance_engine,
)
from .auth import verify_capability, get_caller_identity
from .receipts import emit_tool_call_receipt

server = Server("vaultmesh")

# ============================================================================
# Oracle Tools
# ============================================================================

@server.tool()
async def oracle_answer(
    question: str,
    frameworks: list[str] = None,
    context_docs: list[str] = None,
) -> str:
    """
    Ask the VaultMesh Compliance Oracle a question about regulatory compliance.

    Args:
        question: The compliance question to answer
        frameworks: Optional list of frameworks to focus on (e.g., ["AI_Act", "GDPR"])
        context_docs: Optional list of document IDs to use as context

    Returns:
        Structured compliance answer with citations and confidence indicators
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "oracle_query")

    result = await oracle_engine.answer(
        question=question,
        frameworks=frameworks or [],
        context_docs=context_docs or [],
    )

    await emit_tool_call_receipt(
        tool="oracle_answer",
        caller=caller,
        params={"question": question, "frameworks": frameworks},
        result_hash=result.answer_hash,
    )

    return json.dumps(result.to_dict(), indent=2)

@server.tool()
async def oracle_corpus_search(
    query: str,
    limit: int = 10,
) -> str:
    """
    Search the compliance corpus for relevant documents.

    Args:
        query: Search query
        limit: Maximum number of results

    Returns:
        List of matching documents with relevance scores
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "oracle_query")

    results = await oracle_engine.search_corpus(query, limit)

    return json.dumps([r.to_dict() for r in results], indent=2)

# ============================================================================
# Drills Tools
# ============================================================================

@server.tool()
async def drills_create(
    prompt: str,
    skills: list[str] = None,
    severity: str = "medium",
) -> str:
    """
    Create a new security drill contract from a scenario description.

    Args:
        prompt: Natural language description of the security scenario
        skills: Optional list of specific skills to include
        severity: Drill severity (low, medium, high, critical)

    Returns:
        Generated drill contract with stages and objectives
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "drills_create")

    contract = await drills_engine.create_contract(
        prompt=prompt,
        skills=skills,
        severity=severity,
    )

    await emit_tool_call_receipt(
        tool="drills_create",
        caller=caller,
        params={"prompt": prompt, "severity": severity},
        result_hash=contract.contract_hash,
    )

    return json.dumps(contract.to_dict(), indent=2)

@server.tool()
async def drills_status(
    drill_id: str = None,
) -> str:
    """
    Get status of active drills or a specific drill.

    Args:
        drill_id: Optional specific drill ID to check

    Returns:
        Drill status information
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "drills_view")

    if drill_id:
        status = await drills_engine.get_drill_status(drill_id)
    else:
        status = await drills_engine.get_active_drills()

    return json.dumps(status, indent=2)

@server.tool()
async def drills_complete_stage(
    drill_id: str,
    stage_id: str,
    outputs: list[str] = None,
    findings: str = None,
) -> str:
    """
    Mark a drill stage as complete with outputs.

    Args:
        drill_id: The drill ID
        stage_id: The stage to complete
        outputs: List of output artifact paths
        findings: Summary of findings from this stage

    Returns:
        Updated drill state
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "drills_execute")

    state = await drills_engine.complete_stage(
        drill_id=drill_id,
        stage_id=stage_id,
        outputs=outputs or [],
        findings=findings,
    )

    await emit_tool_call_receipt(
        tool="drills_complete_stage",
        caller=caller,
        params={"drill_id": drill_id, "stage_id": stage_id},
        result_hash=state.state_hash,
    )

    return json.dumps(state.to_dict(), indent=2)

# ============================================================================
# Mesh Tools
# ============================================================================

@server.tool()
async def mesh_topology() -> str:
    """
    Get current mesh topology including nodes, routes, and health.

    Returns:
        Current topology snapshot
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "mesh_view")

    topology = await mesh_engine.get_topology()

    return json.dumps(topology.to_dict(), indent=2)

@server.tool()
async def mesh_node_status(
    node_id: str,
) -> str:
    """
    Get detailed status of a specific node.

    Args:
        node_id: Node identifier (e.g., "brick-01")

    Returns:
        Node status including health, capabilities, and recent events
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "mesh_view")

    status = await mesh_engine.get_node_status(f"did:vm:node:{node_id}")

    return json.dumps(status.to_dict(), indent=2)

@server.tool()
async def mesh_capability_check(
    node_id: str,
    capability: str,
) -> str:
    """
    Check if a node has a specific capability.

    Args:
        node_id: Node identifier
        capability: Capability to check (anchor, storage, compute, oracle, admin)

    Returns:
        Capability status and details
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "mesh_view")

    result = await mesh_engine.check_capability(
        f"did:vm:node:{node_id}",
        capability,
    )

    return json.dumps(result, indent=2)

# ============================================================================
# Treasury Tools
# ============================================================================

@server.tool()
async def treasury_balance(
    account_id: str = None,
) -> str:
    """
    Get treasury balance for an account or all accounts.

    Args:
        account_id: Optional specific account ID

    Returns:
        Balance information
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "treasury_view")

    if account_id:
        balance = await treasury_engine.get_balance(account_id)
    else:
        balance = await treasury_engine.get_all_balances()

    return json.dumps(balance, indent=2)

@server.tool()
async def treasury_record_entry(
    entry_type: str,
    account: str,
    amount: float,
    currency: str,
    memo: str,
    tags: list[str] = None,
) -> str:
    """
    Record a treasury entry (credit or debit).

    Args:
        entry_type: "credit" or "debit"
        account: Account ID
        amount: Amount
        currency: Currency code (EUR, USD, etc.)
        memo: Transaction memo
        tags: Optional tags

    Returns:
        Entry receipt
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "treasury_write")

    receipt = await treasury_engine.record_entry(
        entry_type=entry_type,
        account=account,
        amount=amount,
        currency=currency,
        memo=memo,
        tags=tags or [],
    )

    await emit_tool_call_receipt(
        tool="treasury_record_entry",
        caller=caller,
        params={"entry_type": entry_type, "account": account, "amount": amount},
        result_hash=receipt.root_hash,
    )

    return json.dumps(receipt.to_dict(), indent=2)

# ============================================================================
# Guardian Tools
# ============================================================================

@server.tool()
async def guardian_anchor_status() -> str:
    """
    Get current Guardian anchor status including last anchor time and health.

    Returns:
        Anchor status information
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "guardian_view")

    status = await guardian_engine.get_anchor_status()

    return json.dumps(status.to_dict(), indent=2)

@server.tool()
async def guardian_anchor_now(
    scrolls: list[str] = None,
) -> str:
    """
    Trigger an immediate anchor cycle.

    Args:
        scrolls: Optional list of specific scrolls to anchor (default: all)

    Returns:
        Anchor result
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "anchor")

    result = await guardian_engine.anchor_now(scrolls)

    await emit_tool_call_receipt(
        tool="guardian_anchor_now",
        caller=caller,
        params={"scrolls": scrolls},
        result_hash=result.anchor_hash,
    )

    return json.dumps(result.to_dict(), indent=2)

@server.tool()
async def guardian_verify_receipt(
    receipt_hash: str,
    scroll: str,
) -> str:
    """
    Verify a specific receipt against the anchor chain.

    Args:
        receipt_hash: Hash of the receipt to verify
        scroll: Scroll containing the receipt

    Returns:
        Verification result with proof path
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "guardian_view")

    result = await guardian_engine.verify_receipt(receipt_hash, scroll)

    return json.dumps(result.to_dict(), indent=2)

# ============================================================================
# Identity Tools
# ============================================================================

@server.tool()
async def identity_resolve_did(
    did: str,
) -> str:
    """
    Resolve a DID to its document.

    Args:
        did: DID to resolve (e.g., "did:vm:node:brick-01")

    Returns:
        DID document
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "identity_view")

    doc = await identity_engine.resolve_did(did)

    return json.dumps(doc.to_dict(), indent=2)

@server.tool()
async def identity_verify_credential(
    credential_id: str,
) -> str:
    """
    Verify a verifiable credential.

    Args:
        credential_id: Credential ID to verify

    Returns:
        Verification result
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "identity_view")

    result = await identity_engine.verify_credential(credential_id)

    return json.dumps(result, indent=2)

@server.tool()
async def identity_whoami() -> str:
    """
    Get the current caller's identity and capabilities.

    Returns:
        Current identity context
    """
    caller = await get_caller_identity()

    identity = await identity_engine.get_identity_context(caller)

    return json.dumps(identity.to_dict(), indent=2)

# ============================================================================
# Psi-Field Tools
# ============================================================================

@server.tool()
async def psi_phase_status() -> str:
    """
    Get current alchemical phase status and Magnum Opus progress.

    Returns:
        Current phase and recent transitions
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "psi_view")

    status = await psi_engine.get_phase_status()

    return json.dumps(status.to_dict(), indent=2)

@server.tool()
async def psi_transmute(
    input_reference: str,
    input_type: str,
    title: str,
) -> str:
    """
    Initiate a transmutation process to transform a negative event into capability.

    Args:
        input_reference: Reference to the input event (e.g., "INC-2025-12-001")
        input_type: Type of input (incident, vulnerability, drill)
        title: Title for the transmutation

    Returns:
        Initiated transmutation contract
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "psi_transmute")

    contract = await psi_engine.initiate_transmutation(
        input_reference=input_reference,
        input_type=input_type,
        title=title,
    )

    await emit_tool_call_receipt(
        tool="psi_transmute",
        caller=caller,
        params={"input_reference": input_reference, "input_type": input_type},
        result_hash=contract.transmutation_hash,
    )

    return json.dumps(contract.to_dict(), indent=2)

@server.tool()
async def psi_opus_status() -> str:
    """
    Get full Magnum Opus status including phases, transmutations, and resonances.

    Returns:
        Complete opus status
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "psi_view")

    opus = await psi_engine.get_opus_status()

    return json.dumps(opus.to_dict(), indent=2)

# ============================================================================
# Governance Tools
# ============================================================================

@server.tool()
async def governance_constitution_summary() -> str:
    """
    Get a summary of the current constitution including version and key articles.

    Returns:
        Constitution summary
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "governance_view")

    summary = await governance_engine.get_constitution_summary()

    return json.dumps(summary.to_dict(), indent=2)

@server.tool()
async def governance_active_proposals() -> str:
    """
    Get list of active governance proposals.

    Returns:
        Active proposals in deliberation or voting
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "governance_view")

    proposals = await governance_engine.get_active_proposals()

    return json.dumps([p.to_dict() for p in proposals], indent=2)

@server.tool()
async def governance_check_compliance(
    action: str,
    actor: str,
    target: str = None,
) -> str:
    """
    Check if an action would comply with the constitution.

    Args:
        action: Action to check (e.g., "modify_receipt", "revoke_capability")
        actor: DID of the actor
        target: Optional target of the action

    Returns:
        Compliance check result with relevant articles
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "governance_view")

    result = await governance_engine.check_compliance(action, actor, target)

    return json.dumps(result.to_dict(), indent=2)

# ============================================================================
# Cross-Engine Query Tools
# ============================================================================

@server.tool()
async def receipts_search(
    scroll: str = None,
    receipt_type: str = None,
    from_date: str = None,
    to_date: str = None,
    tags: list[str] = None,
    limit: int = 50,
) -> str:
    """
    Search receipts across scrolls with filters.

    Args:
        scroll: Specific scroll to search (default: all)
        receipt_type: Filter by receipt type
        from_date: Start date (ISO format)
        to_date: End date (ISO format)
        tags: Filter by tags
        limit: Maximum results

    Returns:
        Matching receipts
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "receipts_view")

    from .receipts import search_receipts

    results = await search_receipts(
        scroll=scroll,
        receipt_type=receipt_type,
        from_date=from_date,
        to_date=to_date,
        tags=tags,
        limit=limit,
    )

    return json.dumps([r.to_dict() for r in results], indent=2)

@server.tool()
async def system_health() -> str:
    """
    Get comprehensive system health across all engines.

    Returns:
        Health status for all engines and nodes
    """
    caller = await get_caller_identity()
    await verify_capability(caller, "system_view")

    from datetime import datetime
    from .receipts import get_total_receipt_count

    health = {
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "mesh": (await mesh_engine.get_health()).to_dict(),
        "guardian": (await guardian_engine.get_health()).to_dict(),
        "oracle": (await oracle_engine.get_health()).to_dict(),
        "psi_phase": (await psi_engine.get_phase_status()).current_phase,
        "receipts_total": await get_total_receipt_count(),
        "last_anchor_age_seconds": await guardian_engine.get_last_anchor_age(),
    }

    return json.dumps(health, indent=2)

# ============================================================================
# Server Entry Point
# ============================================================================

def main():
    """Run the VaultMesh MCP server."""
    import asyncio
    from mcp.server.stdio import stdio_server

    async def run():
        async with stdio_server() as (read_stream, write_stream):
            await server.run(
                read_stream,
                write_stream,
                server.create_initialization_options(),
            )

    asyncio.run(run())

if __name__ == "__main__":
    main()

2.2 Tool Call Receipt

Every MCP tool call is receipted:

{
  "type": "mcp_tool_call",
  "call_id": "mcp-call-2025-12-06-001",
  "timestamp": "2025-12-06T14:30:00Z",
  "caller": "did:vm:agent:claude-session-abc123",
  "tool": "oracle_answer",
  "params_hash": "blake3:params...",
  "result_hash": "blake3:result...",
  "duration_ms": 1250,
  "capability_used": "oracle_query",
  "session_id": "session-xyz789",
  "tags": ["mcp", "oracle", "tool-call"],
  "root_hash": "blake3:aaa111..."
}

3. MCP Gateway

3.1 Authentication Flow

# vaultmesh_mcp/auth.py

from typing import Optional
from .identity import identity_engine

async def get_caller_identity() -> str:
    """Get the DID of the current MCP caller."""
    # In production, this comes from the MCP session context
    # For Claude Desktop, it's derived from the session token
    session = get_current_session()

    if session.authenticated_did:
        return session.authenticated_did

    # Anonymous callers get a session-scoped agent DID
    return f"did:vm:agent:mcp-session-{session.id}"

async def verify_capability(caller: str, capability: str) -> bool:
    """Verify the caller has the required capability."""
    # Check capability token
    has_cap = await identity_engine.check_capability(caller, capability)

    if not has_cap:
        raise PermissionError(
            f"Caller {caller} lacks capability: {capability}"
        )

    # Log capability exercise
    await identity_engine.log_capability_exercise(
        caller=caller,
        capability=capability,
        action="mcp_tool_call",
    )

    return True

3.2 Rate Limiting

# vaultmesh_mcp/ratelimit.py

from collections import defaultdict
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self):
        self.calls = defaultdict(list)
        self.limits = {
            "oracle_answer": (10, timedelta(minutes=1)),  # 10/min
            "guardian_anchor_now": (5, timedelta(hours=1)),  # 5/hour
            "treasury_record_entry": (100, timedelta(hours=1)),  # 100/hour
            "default": (60, timedelta(minutes=1)),  # 60/min
        }

    async def check(self, caller: str, tool: str) -> bool:
        key = f"{caller}:{tool}"
        now = datetime.utcnow()

        limit, window = self.limits.get(tool, self.limits["default"])

        # Clean old entries
        self.calls[key] = [
            t for t in self.calls[key]
            if now - t < window
        ]

        if len(self.calls[key]) >= limit:
            raise RateLimitExceeded(
                f"Rate limit exceeded for {tool}: {limit} per {window}"
            )

        self.calls[key].append(now)
        return True

3.3 Constitutional Compliance

# vaultmesh_mcp/compliance.py

async def check_constitutional_compliance(
    tool: str,
    caller: str,
    params: dict,
) -> bool:
    """Check if a tool call complies with the constitution."""

    # Check against axioms
    if tool in AXIOM_VIOLATING_TOOLS:
        raise ConstitutionalViolation(
            f"Tool {tool} would violate constitutional axioms"
        )

    # Check for emergency restrictions
    emergency = await governance_engine.get_active_emergency()
    if emergency and tool in emergency.restricted_tools:
        raise EmergencyRestriction(
            f"Tool {tool} is restricted during current emergency"
        )

    # Check for specific parameter violations
    violations = await governance_engine.check_params_compliance(tool, params)
    if violations:
        raise ConstitutionalViolation(
            f"Parameters violate constitution: {violations}"
        )

    return True

4. Claude Desktop Configuration

4.1 config.json

{
  "mcpServers": {
    "vaultmesh": {
      "command": "python",
      "args": ["-m", "vaultmesh_mcp.server"],
      "env": {
        "VAULTMESH_CONFIG": "/path/to/vaultmesh/config.toml",
        "VAULTMESH_IDENTITY": "did:vm:agent:claude-desktop"
      }
    }
  }
}

4.2 Capability Configuration

# vaultmesh/config.toml

[mcp.capabilities]
# Default capabilities for Claude Desktop sessions
default_capabilities = [
    "oracle_query",
    "drills_view",
    "mesh_view",
    "treasury_view",
    "guardian_view",
    "identity_view",
    "psi_view",
    "governance_view",
    "receipts_view",
    "system_view",
]

# Elevated capabilities require explicit grant
elevated_capabilities = [
    "drills_create",
    "drills_execute",
    "treasury_write",
    "anchor",
    "psi_transmute",
]

[mcp.rate_limits]
oracle_answer = { limit = 10, window_minutes = 1 }
guardian_anchor_now = { limit = 5, window_hours = 1 }
treasury_record_entry = { limit = 100, window_hours = 1 }
default = { limit = 60, window_minutes = 1 }

[mcp.audit]
receipt_all_calls = true
log_params = true
log_results = false  # Results may contain sensitive data

5. Tool Categories

5.1 Read-Only Tools (Default Access)

Tool Description
oracle_answer Ask compliance questions
oracle_corpus_search Search compliance corpus
drills_status View drill status
mesh_topology View mesh topology
mesh_node_status View node status
mesh_capability_check Check node capabilities
treasury_balance View balances
guardian_anchor_status View anchor status
guardian_verify_receipt Verify receipts
identity_resolve_did Resolve DIDs
identity_verify_credential Verify credentials
identity_whoami View own identity
psi_phase_status View phase status
psi_opus_status View opus status
governance_constitution_summary View constitution
governance_active_proposals View proposals
governance_check_compliance Check compliance
receipts_search Search receipts
system_health View system health

5.2 Write Tools (Elevated Access)

Tool Capability Required
drills_create drills_create
drills_complete_stage drills_execute
treasury_record_entry treasury_write
guardian_anchor_now anchor
psi_transmute psi_transmute

6. Example Interactions

6.1 Compliance Query

Human: What's our current compliance status for the AI Act Annex IV?

Claude: [Calls oracle_answer with question="What is our compliance status for AI Act Annex IV?", frameworks=["AI_Act"]]

Based on the Oracle's assessment:

**Compliance Status: Partial (73%)**

Key findings:
- Technical documentation requirements (§4.2): ✅ Compliant
- Risk management system (§4.3): ⚠️ Needs update for latest guidance
- Data governance (§4.5): ✅ Compliant
- Human oversight measures (§4.7): ⚠️ Documentation incomplete

The Oracle flagged that our risk management documentation needs updating to reflect the December 2024 EU guidance. I can help draft those updates if you'd like.

6.2 Security Drill

Human: Start a drill for detecting lateral movement in our Kubernetes cluster

Claude: [Calls drills_create with prompt="Detect and respond to lateral movement within Kubernetes cluster", skills=["kubernetes-security", "detection-defense-ir"]]

I've created a new security drill:

**Drill ID:** drill-1764691432
**Scenario:** Lateral Movement Detection in Kubernetes

**Stages:**
1. **Initial Access Simulation** (kubernetes-security)
   - Simulate compromised pod gaining shell access
   - Establish baseline network patterns

2. **Lateral Movement Attempt** (kubernetes-security)
   - Attempt service account token theft
   - Test pod-to-pod communication abuse
   - Probe for secrets and configmaps

3. **Detection Validation** (detection-defense-ir)
   - Verify Falco alerts triggered
   - Check network policy enforcement
   - Validate audit log capture

4. **Response Execution** (detection-defense-ir)
   - Execute containment playbook
   - Document response timeline

Ready to begin Stage 1?

6.3 System Health Check

Human: How's the system looking?

Claude: [Calls system_health]

**VaultMesh Health Summary**

🟢 **Overall Status: Healthy**

**Mesh:**
- Nodes: 5 active, 0 degraded
- Routes: 12 healthy
- Last topology change: 3 days ago

**Guardian:**
- Last anchor: 47 minutes ago
- Anchor backend: Ethereum (block 12345678)
- Receipts since last anchor: 23

**Oracle:**
- Corpus documents: 147
- Last query: 12 minutes ago
- Average response time: 1.2s

**Psi-Field:**
- Current phase: ALBEDO 🜄
- Time in phase: 4 days
- Recent transmutations: 2

**Receipts:**
- Total: 1,847 across all scrolls
- Today: 34 new receipts

Everything looks good. The Guardian is anchoring on schedule, and no anomalies detected.