""" VaultMesh MCP Authentication - Ed25519 Challenge-Response Implements cryptographic authentication for MCP operators with capability-based access control and session management. Scopes: - read: Query state (mesh_status, proof_verify) - admin: Execute commands (tactical_execute) - vault: Access treasury, sensitive data - anchor: Create blockchain proofs - cognitive: AI reasoning capabilities (Claude integration) """ import json import os import secrets import time from dataclasses import dataclass, asdict from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any, Dict, Optional, Set from enum import Enum import blake3 # Optional: Ed25519 support try: from nacl.signing import VerifyKey from nacl.exceptions import BadSignature import nacl.encoding NACL_AVAILABLE = True except ImportError: NACL_AVAILABLE = False # VaultMesh paths VAULTMESH_ROOT = Path(os.environ.get("VAULTMESH_ROOT", Path(__file__).parents[3])).resolve() RECEIPTS_ROOT = VAULTMESH_ROOT / "receipts" AUTH_STORE = VAULTMESH_ROOT / "auth" def _vmhash_blake3(data: bytes) -> str: """VaultMesh hash: blake3:.""" return f"blake3:{blake3.blake3(data).hexdigest()}" def _now_iso() -> str: """Current UTC timestamp in ISO format.""" return datetime.now(timezone.utc).isoformat() class Scope(Enum): """Capability scopes for MCP access control.""" READ = "read" ADMIN = "admin" VAULT = "vault" ANCHOR = "anchor" COGNITIVE = "cognitive" # Tool permissions by scope SCOPE_TOOLS: Dict[Scope, Set[str]] = { Scope.READ: { "mesh_status", "shield_status", "proof_verify", "guardian_status", "treasury_balance", "cognitive_context", "cognitive_memory_get", "cognitive_audit_trail", }, Scope.ADMIN: { "tactical_execute", "mesh_configure", "agent_task", }, Scope.VAULT: { "treasury_debit", "treasury_credit", "treasury_create_budget", }, Scope.ANCHOR: { "guardian_anchor_now", "proof_anchor", "cognitive_attest", }, Scope.COGNITIVE: { "cognitive_context", "cognitive_decide", "cognitive_invoke_tem", "cognitive_memory_get", "cognitive_memory_set", "cognitive_attest", "cognitive_audit_trail", "cognitive_oracle_chain", # Inherits from READ "mesh_status", "shield_status", "proof_verify", "guardian_status", "treasury_balance", }, } @dataclass class Challenge: """Authentication challenge.""" challenge_id: str nonce: str operator_pubkey: str scope: str created_at: str expires_at: str def is_expired(self) -> bool: now = datetime.now(timezone.utc) expires = datetime.fromisoformat(self.expires_at.replace('Z', '+00:00')) return now > expires @dataclass class Session: """Authenticated session.""" session_id: str token: str operator_pubkey: str operator_did: str scope: str created_at: str expires_at: str ip_hint: Optional[str] = None def is_expired(self) -> bool: now = datetime.now(timezone.utc) expires = datetime.fromisoformat(self.expires_at.replace('Z', '+00:00')) return now > expires # In-memory stores (would be persisted in production) _challenges: Dict[str, Challenge] = {} _sessions: Dict[str, Session] = {} def _emit_auth_receipt(receipt_type: str, body: dict) -> dict: """Emit a receipt for authentication events.""" scroll_path = RECEIPTS_ROOT / "identity" / "identity_events.jsonl" scroll_path.parent.mkdir(parents=True, exist_ok=True) receipt = { "schema_version": "2.0.0", "type": receipt_type, "timestamp": _now_iso(), "scroll": "identity", "tags": ["auth", receipt_type], "root_hash": _vmhash_blake3(json.dumps(body, sort_keys=True).encode()), "body": body, } with open(scroll_path, "a") as f: f.write(json.dumps(receipt) + "\n") return receipt def auth_challenge( operator_pubkey_b64: str, scope: str = "read", ttl_seconds: int = 300, ) -> Dict[str, Any]: """ Generate an authentication challenge for an operator. Args: operator_pubkey_b64: Base64-encoded Ed25519 public key scope: Requested scope (read, admin, vault, anchor, cognitive) ttl_seconds: Challenge validity period Returns: Challenge ID and nonce for signing """ # Validate scope try: scope_enum = Scope(scope) except ValueError: return {"error": f"Invalid scope: {scope}. Valid: {[s.value for s in Scope]}"} challenge_id = f"ch_{secrets.token_hex(16)}" nonce = secrets.token_hex(32) now = datetime.now(timezone.utc) expires = now + timedelta(seconds=ttl_seconds) challenge = Challenge( challenge_id=challenge_id, nonce=nonce, operator_pubkey=operator_pubkey_b64, scope=scope, created_at=now.isoformat(), expires_at=expires.isoformat(), ) _challenges[challenge_id] = challenge _emit_auth_receipt("auth_challenge", { "challenge_id": challenge_id, "operator_pubkey": operator_pubkey_b64, "scope": scope, "expires_at": expires.isoformat(), }) return { "challenge_id": challenge_id, "nonce": nonce, "scope": scope, "expires_at": expires.isoformat(), "message": "Sign the nonce with your Ed25519 private key", } def auth_verify( challenge_id: str, signature_b64: str, ip_hint: Optional[str] = None, ) -> Dict[str, Any]: """ Verify a signed challenge and issue session token. Args: challenge_id: The challenge ID from auth_challenge signature_b64: Base64-encoded Ed25519 signature of the nonce ip_hint: Optional IP hint for session binding Returns: Session token and metadata """ # Get challenge challenge = _challenges.get(challenge_id) if not challenge: return {"error": "Challenge not found or expired"} if challenge.is_expired(): del _challenges[challenge_id] return {"error": "Challenge expired"} # Verify signature if NACL_AVAILABLE: try: pubkey_bytes = nacl.encoding.Base64Encoder.decode(challenge.operator_pubkey.encode()) verify_key = VerifyKey(pubkey_bytes) sig_bytes = nacl.encoding.Base64Encoder.decode(signature_b64.encode()) verify_key.verify(challenge.nonce.encode(), sig_bytes) except (BadSignature, Exception) as e: _emit_auth_receipt("auth_failure", { "challenge_id": challenge_id, "reason": "invalid_signature", "error": str(e), }) return {"error": "Invalid signature"} else: # For testing without nacl, accept any signature pass # Remove used challenge del _challenges[challenge_id] # Create session session_id = f"ses_{secrets.token_hex(16)}" token = secrets.token_urlsafe(48) now = datetime.now(timezone.utc) expires = now + timedelta(minutes=30) # Derive DID from pubkey operator_did = f"did:vm:operator:{_vmhash_blake3(challenge.operator_pubkey.encode())[:16]}" session = Session( session_id=session_id, token=token, operator_pubkey=challenge.operator_pubkey, operator_did=operator_did, scope=challenge.scope, created_at=now.isoformat(), expires_at=expires.isoformat(), ip_hint=ip_hint, ) _sessions[token] = session _emit_auth_receipt("auth_success", { "session_id": session_id, "operator_did": operator_did, "scope": challenge.scope, "expires_at": expires.isoformat(), }) return { "success": True, "session_id": session_id, "token": token, "operator_did": operator_did, "scope": challenge.scope, "expires_at": expires.isoformat(), "ttl_seconds": 1800, } def auth_validate_token(token: str) -> Dict[str, Any]: """ Validate a session token. Args: token: The session token to validate Returns: Session info if valid, error otherwise """ session = _sessions.get(token) if not session: return {"valid": False, "error": "Session not found"} if session.is_expired(): del _sessions[token] return {"valid": False, "error": "Session expired"} return { "valid": True, "session_id": session.session_id, "operator_did": session.operator_did, "scope": session.scope, "expires_at": session.expires_at, } def auth_check_permission(token: str, tool_name: str) -> Dict[str, Any]: """ Check if a session has permission to call a tool. Args: token: Session token tool_name: Name of the tool to check Returns: Permission check result """ validation = auth_validate_token(token) if not validation.get("valid"): return {"allowed": False, "reason": validation.get("error")} scope_name = validation["scope"] try: scope = Scope(scope_name) except ValueError: return {"allowed": False, "reason": f"Invalid scope: {scope_name}"} allowed_tools = SCOPE_TOOLS.get(scope, set()) if tool_name in allowed_tools: return { "allowed": True, "scope": scope_name, "operator_did": validation["operator_did"], } return { "allowed": False, "reason": f"Tool '{tool_name}' not allowed for scope '{scope_name}'", "allowed_tools": list(allowed_tools), } def auth_revoke(token: str) -> Dict[str, Any]: """ Revoke a session token. Args: token: Session token to revoke Returns: Revocation result """ session = _sessions.pop(token, None) if not session: return {"revoked": False, "error": "Session not found"} _emit_auth_receipt("auth_revoke", { "session_id": session.session_id, "operator_did": session.operator_did, }) return { "revoked": True, "session_id": session.session_id, } def auth_list_sessions() -> Dict[str, Any]: """ List all active sessions (admin only). Returns: List of active sessions """ active = [] expired = [] for token, session in list(_sessions.items()): if session.is_expired(): del _sessions[token] expired.append(session.session_id) else: active.append({ "session_id": session.session_id, "operator_did": session.operator_did, "scope": session.scope, "expires_at": session.expires_at, }) return { "active_sessions": active, "expired_cleaned": len(expired), } # Convenience function for testing without full auth def auth_create_dev_session( scope: str = "cognitive", operator_did: str = "did:vm:cognitive:claude-dev", ) -> Dict[str, Any]: """ Create a development session for testing (DEV ONLY). Args: scope: Scope for the session operator_did: DID for the operator Returns: Session token and metadata """ # Fail-closed: dev sessions may not grant SOVEREIGN-equivalent access. # Accept only known, non-vault scopes. normalized_scope = scope try: scope_enum = Scope(scope) if scope_enum == Scope.VAULT: normalized_scope = Scope.READ.value except ValueError: normalized_scope = Scope.READ.value session_id = f"dev_{secrets.token_hex(8)}" token = f"dev_{secrets.token_urlsafe(32)}" now = datetime.now(timezone.utc) expires = now + timedelta(hours=24) session = Session( session_id=session_id, token=token, operator_pubkey="dev_key", operator_did=operator_did, scope=normalized_scope, created_at=now.isoformat(), expires_at=expires.isoformat(), ) _sessions[token] = session return { "dev_mode": True, "session_id": session_id, "token": token, "operator_did": operator_did, "scope": normalized_scope, "expires_at": expires.isoformat(), "warning": "DEV SESSION - Do not use in production", } # ============================================================================= # AGENT CAPABILITY PROFILES # ============================================================================= class Profile(Enum): """Agent capability profiles with hierarchical trust.""" OBSERVER = "observer" # 👁 Read-only OPERATOR = "operator" # ⚙ Mutations allowed GUARDIAN = "guardian" # 🛡 Threat response PHOENIX = "phoenix" # 🔥 Crisis mode SOVEREIGN = "sovereign" # 👑 Full authority # Profile → Tool permissions PROFILE_TOOLS: Dict[Profile, Set[str]] = { Profile.OBSERVER: { # L0 Perception (read) "get_current_tab", "list_tabs", "get_page_content", # L1 Substrate (read) "read_file", "read_multiple_files", "list_directory", "search_files", "get_file_info", "directory_tree", "list_allowed_directories", # L2 Cognition (read) "cognitive_context", "cognitive_memory_get", "cognitive_audit_trail", # L3 Security (read) "offsec_status", "offsec_shield_status", "offsec_tem_status", "offsec_mesh_status", "offsec_phoenix_status", "offsec_braid_list", # L4 Infrastructure (read) "worker_list", "kv_list", "r2_list_buckets", "d1_list_databases", "zones_list", "queue_list", "workflow_list", # L-1 Proof (read) "guardian_status", "guardian_verify_receipt", "offsec_proof_latest", # Treasury (read) "treasury_balance", # Auth (read) "auth_check_permission", }, Profile.OPERATOR: set(), # Computed below Profile.GUARDIAN: set(), # Computed below Profile.PHOENIX: set(), # Computed below Profile.SOVEREIGN: set(), # All tools } # OPERATOR = OBSERVER + mutations PROFILE_TOOLS[Profile.OPERATOR] = PROFILE_TOOLS[Profile.OBSERVER] | { # L0 Perception (act) "execute_javascript", "puppeteer_click", "puppeteer_fill", "puppeteer_select", "open_url", "reload_tab", "go_back", "go_forward", # L1 Substrate (write) "write_file", "edit_file", "create_directory", "move_file", "start_process", "interact_with_process", # L2 Cognition (decide, low confidence) "cognitive_decide", "cognitive_memory_set", # L3 Security (shield ops) "offsec_shield_arm", "offsec_shield_disarm", # L4 Infrastructure (deploy) "kv_put", "kv_delete", "worker_put", "r2_put_object", # L-1 Proof (local anchor) "guardian_anchor_now", } # GUARDIAN = OPERATOR + TEM + attestation PROFILE_TOOLS[Profile.GUARDIAN] = PROFILE_TOOLS[Profile.OPERATOR] | { # L2 Cognition (full) "cognitive_invoke_tem", "cognitive_attest", "cognitive_oracle_chain", # L3 Security (TEM) "offsec_tem_transmute", "offsec_tem_rules", "offsec_tem_history", "offsec_braid_import", # L4 Infrastructure (more) "worker_deploy", "d1_query", "queue_send_message", "workflow_execute", # L-1 Proof (eth anchor) "offsec_proof_generate", # Process control "kill_process", "force_terminate", } # PHOENIX = GUARDIAN + destructive ops + emergency treasury PROFILE_TOOLS[Profile.PHOENIX] = PROFILE_TOOLS[Profile.GUARDIAN] | { # L3 Security (Phoenix) "offsec_phoenix_enable", "offsec_phoenix_disable", "offsec_phoenix_inject_crisis", "offsec_phoenix_history", # L4 Infrastructure (destructive) "worker_delete", "r2_delete_bucket", "r2_delete_object", "d1_delete_database", "queue_delete", "workflow_delete", "kv_delete", # Treasury (emergency) "treasury_debit", } # SOVEREIGN = everything PROFILE_TOOLS[Profile.SOVEREIGN] = PROFILE_TOOLS[Profile.PHOENIX] | { # Auth (full) "auth_challenge", "auth_verify", "auth_create_dev_session", "auth_revoke", "auth_list_sessions", # Treasury (full) "treasury_create_budget", "treasury_credit", # All remaining tools } def get_profile_for_scope(scope: str) -> Profile: """Map scope to profile.""" mapping = { "read": Profile.OBSERVER, "admin": Profile.OPERATOR, "cognitive": Profile.GUARDIAN, "anchor": Profile.GUARDIAN, "vault": Profile.SOVEREIGN, } return mapping.get(scope, Profile.OBSERVER) def check_profile_permission(profile: Profile, tool_name: str) -> Dict[str, Any]: """Check if a profile has permission for a tool.""" allowed_tools = PROFILE_TOOLS.get(profile, set()) # Handle wildcards in profile tools for pattern in allowed_tools: if pattern.endswith("*"): prefix = pattern[:-1] if tool_name.startswith(prefix): return {"allowed": True, "profile": profile.value} if tool_name in allowed_tools: return {"allowed": True, "profile": profile.value} return { "allowed": False, "profile": profile.value, "reason": f"Tool '{tool_name}' not allowed for profile '{profile.value}'", } def escalate_profile(current: Profile, reason: str) -> Dict[str, Any]: """Request profile escalation.""" escalation_path = { Profile.OBSERVER: Profile.OPERATOR, Profile.OPERATOR: Profile.GUARDIAN, Profile.GUARDIAN: Profile.PHOENIX, Profile.PHOENIX: Profile.SOVEREIGN, Profile.SOVEREIGN: None, } next_profile = escalation_path.get(current) if next_profile is None: return {"escalated": False, "reason": "Already at maximum profile"} _emit_auth_receipt("profile_escalation", { "from_profile": current.value, "to_profile": next_profile.value, "reason": reason, }) return { "escalated": True, "from_profile": current.value, "to_profile": next_profile.value, "reason": reason, }