Files
vm-mcp/packages/vaultmesh_mcp/tools/key_binding.py
Vault Sovereign e4871c2a29
Some checks are pending
Governance CI / Constitution Hash Gate (push) Waiting to run
Governance CI / Governance Tests (push) Blocked by required conditions
Governance CI / Golden Drill Mini (push) Blocked by required conditions
init: vaultmesh mcp server
2025-12-26 23:23:08 +00:00

579 lines
18 KiB
Python

"""
Key Binding Engine - Authority bound to cryptographic reality
Every profile has a corresponding key reality:
- OBSERVER: Ephemeral (memory only, no signing power)
- OPERATOR: Session key (encrypted disk, revocable)
- GUARDIAN: Device-bound (secure enclave, non-exportable)
- PHOENIX: Time-locked (guardian key + approval artifact)
- SOVEREIGN: Offline root (hardware key, never automated)
Invariant: No profile exists without corresponding key reality.
If key cannot be proven → authority collapses downward, never upward.
"""
import json
import secrets
import hashlib
import os
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass, asdict, field
from enum import Enum
from pathlib import Path
from typing import Any, Dict, Optional, List
import blake3
# Optional: Ed25519 support
try:
from nacl.signing import SigningKey, VerifyKey
from nacl.encoding import Base64Encoder
from nacl.exceptions import BadSignature
NACL_AVAILABLE = True
except ImportError:
NACL_AVAILABLE = False
# VaultMesh paths
VAULTMESH_ROOT = Path(os.environ.get("VAULTMESH_ROOT", Path(__file__).parents[3])).resolve()
RECEIPTS_ROOT = VAULTMESH_ROOT / "receipts"
KEYS_ROOT = VAULTMESH_ROOT / "keys"
def _vmhash_blake3(data: bytes) -> str:
return f"blake3:{blake3.blake3(data).hexdigest()}"
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
class KeyBindingType(Enum):
"""Types of key bindings corresponding to profiles."""
EPHEMERAL = "ephemeral" # 👁 OBSERVER - memory only
SESSION = "session" # ⚙ OPERATOR - encrypted disk
DEVICE = "device" # 🛡 GUARDIAN - secure enclave
TIMELOCKED = "timelocked" # 🔥 PHOENIX - guardian + approval
HARDWARE = "hardware" # 👑 SOVEREIGN - offline, air-gapped
class KeyStatus(Enum):
"""Key lifecycle states."""
ACTIVE = "active"
EXPIRED = "expired"
REVOKED = "revoked"
PENDING = "pending"
@dataclass
class KeyBinding:
"""A cryptographic key bound to a profile."""
key_id: str
profile: str
binding_type: str
fingerprint: str
# Key material (public only stored here)
public_key_b64: str
# Binding constraints
created_at: str
expires_at: Optional[str]
device_id: Optional[str] = None
# Status
status: str = "active"
revoked_at: Optional[str] = None
revocation_reason: Optional[str] = None
# Proof
binding_receipt_hash: Optional[str] = None
@dataclass
class KeyAssertion:
"""Runtime assertion that a key is valid for a profile."""
assertion_id: str
key_id: str
profile: str
binding_type: str
fingerprint: str
# Verification
verified_at: str
signature_valid: bool
binding_valid: bool
not_expired: bool
not_revoked: bool
# Result
authority_granted: bool
collapse_to: Optional[str] = None # If authority denied, collapse to this
# In-memory key store (production would use secure storage)
_key_bindings: Dict[str, KeyBinding] = {}
_device_keys: Dict[str, str] = {} # device_id -> key_id
# =============================================================================
# PROFILE → KEY BINDING REQUIREMENTS
# =============================================================================
PROFILE_KEY_REQUIREMENTS = {
"observer": {
"binding_type": KeyBindingType.EPHEMERAL,
"requires_signature": False,
"requires_device": False,
"max_ttl_seconds": 3600, # 1 hour
"can_sign_receipts": False,
},
"operator": {
"binding_type": KeyBindingType.SESSION,
"requires_signature": True,
"requires_device": False,
"max_ttl_seconds": 86400, # 24 hours
"can_sign_receipts": True,
},
"guardian": {
"binding_type": KeyBindingType.DEVICE,
"requires_signature": True,
"requires_device": True,
"max_ttl_seconds": 604800, # 7 days
"can_sign_receipts": True,
"device_types": ["secure_enclave", "tpm", "operator_phone"],
},
"phoenix": {
"binding_type": KeyBindingType.TIMELOCKED,
"requires_signature": True,
"requires_device": True,
"requires_approval_artifact": True,
"max_ttl_seconds": 86400, # 24 hours (auto-expire)
"can_sign_receipts": True,
},
"sovereign": {
"binding_type": KeyBindingType.HARDWARE,
"requires_signature": True,
"requires_device": True,
"requires_human": True,
"max_ttl_seconds": None, # No auto-expire
"can_sign_receipts": True,
"device_types": ["hardware_key", "air_gapped"],
},
}
# =============================================================================
# KEY OPERATIONS
# =============================================================================
def generate_key_pair() -> Dict[str, str]:
"""Generate an Ed25519 key pair."""
if NACL_AVAILABLE:
signing_key = SigningKey.generate()
verify_key = signing_key.verify_key
return {
"private_key_b64": signing_key.encode(Base64Encoder).decode(),
"public_key_b64": verify_key.encode(Base64Encoder).decode(),
"fingerprint": _vmhash_blake3(verify_key.encode())[:24],
}
else:
# Fallback: generate placeholder for testing
fake_key = secrets.token_bytes(32)
return {
"private_key_b64": "PLACEHOLDER_" + secrets.token_urlsafe(32),
"public_key_b64": "PLACEHOLDER_" + secrets.token_urlsafe(32),
"fingerprint": _vmhash_blake3(fake_key)[:24],
}
def create_key_binding(
profile: str,
public_key_b64: str,
device_id: Optional[str] = None,
ttl_seconds: Optional[int] = None,
) -> Dict[str, Any]:
"""
Create a key binding for a profile.
Validates that the binding meets profile requirements.
"""
requirements = PROFILE_KEY_REQUIREMENTS.get(profile)
if not requirements:
return {"success": False, "error": f"Unknown profile: {profile}"}
binding_type = requirements["binding_type"]
# Validate device requirement
if requirements.get("requires_device") and not device_id:
return {
"success": False,
"error": f"Profile {profile} requires device binding",
}
# Calculate expiry
max_ttl = requirements.get("max_ttl_seconds")
if ttl_seconds and max_ttl and ttl_seconds > max_ttl:
ttl_seconds = max_ttl
expires_at = None
if ttl_seconds:
expires_at = (datetime.now(timezone.utc) + timedelta(seconds=ttl_seconds)).isoformat()
elif max_ttl:
expires_at = (datetime.now(timezone.utc) + timedelta(seconds=max_ttl)).isoformat()
# Generate fingerprint
fingerprint = _vmhash_blake3(public_key_b64.encode())[:24]
key_id = f"key_{secrets.token_hex(12)}"
binding = KeyBinding(
key_id=key_id,
profile=profile,
binding_type=binding_type.value,
fingerprint=fingerprint,
public_key_b64=public_key_b64,
created_at=_now_iso(),
expires_at=expires_at,
device_id=device_id,
status=KeyStatus.ACTIVE.value,
)
# Emit binding receipt
receipt = _emit_key_receipt("key_binding_created", asdict(binding))
binding.binding_receipt_hash = receipt["root_hash"]
# Store
_key_bindings[key_id] = binding
if device_id:
_device_keys[device_id] = key_id
return {
"success": True,
"key_id": key_id,
"profile": profile,
"binding_type": binding_type.value,
"fingerprint": fingerprint,
"expires_at": expires_at,
"device_id": device_id,
"receipt_hash": binding.binding_receipt_hash,
}
def assert_key_authority(
key_id: str,
required_profile: str,
signature: Optional[str] = None,
challenge: Optional[str] = None,
) -> Dict[str, Any]:
"""
Assert that a key has authority for a profile.
If assertion fails, returns collapse_to indicating the
maximum authority the key can claim.
"""
assertion_id = f"assert_{secrets.token_hex(8)}"
binding = _key_bindings.get(key_id)
if not binding:
return _authority_denied(assertion_id, required_profile, "Key not found", "observer")
# Check status
if binding.status == KeyStatus.REVOKED.value:
return _authority_denied(assertion_id, required_profile, "Key revoked", "observer")
# Check expiry
not_expired = True
if binding.expires_at:
expires = datetime.fromisoformat(binding.expires_at.replace('Z', '+00:00'))
if datetime.now(timezone.utc) > expires:
not_expired = False
# Auto-revoke expired keys
binding.status = KeyStatus.EXPIRED.value
return _authority_denied(assertion_id, required_profile, "Key expired", "observer")
# Check profile hierarchy
profile_order = ["observer", "operator", "guardian", "phoenix", "sovereign"]
bound_level = profile_order.index(binding.profile) if binding.profile in profile_order else -1
required_level = profile_order.index(required_profile) if required_profile in profile_order else 99
if bound_level < required_level:
# Key is for lower profile - collapse to its actual level
return _authority_denied(
assertion_id,
required_profile,
f"Key bound to {binding.profile}, not sufficient for {required_profile}",
binding.profile
)
# Check signature if required
requirements = PROFILE_KEY_REQUIREMENTS.get(required_profile, {})
signature_valid = True
if requirements.get("requires_signature"):
if not signature or not challenge:
return _authority_denied(
assertion_id,
required_profile,
"Signature required but not provided",
_collapse_profile(required_profile)
)
# Verify signature
if NACL_AVAILABLE and not binding.public_key_b64.startswith("PLACEHOLDER"):
try:
verify_key = VerifyKey(binding.public_key_b64.encode(), Base64Encoder)
sig_bytes = Base64Encoder.decode(signature.encode())
verify_key.verify(challenge.encode(), sig_bytes)
except BadSignature:
signature_valid = False
return _authority_denied(
assertion_id,
required_profile,
"Invalid signature",
_collapse_profile(required_profile)
)
# All checks passed
assertion = KeyAssertion(
assertion_id=assertion_id,
key_id=key_id,
profile=required_profile,
binding_type=binding.binding_type,
fingerprint=binding.fingerprint,
verified_at=_now_iso(),
signature_valid=signature_valid,
binding_valid=True,
not_expired=not_expired,
not_revoked=binding.status != KeyStatus.REVOKED.value,
authority_granted=True,
)
_emit_key_receipt("key_assertion_granted", asdict(assertion))
return {
"authority_granted": True,
"assertion_id": assertion_id,
"key_id": key_id,
"profile": required_profile,
"binding_type": binding.binding_type,
"fingerprint": binding.fingerprint,
"expires_at": binding.expires_at,
}
def _authority_denied(
assertion_id: str,
required_profile: str,
reason: str,
collapse_to: str,
) -> Dict[str, Any]:
"""Record authority denial and return collapse result."""
assertion = KeyAssertion(
assertion_id=assertion_id,
key_id="unknown",
profile=required_profile,
binding_type="none",
fingerprint="none",
verified_at=_now_iso(),
signature_valid=False,
binding_valid=False,
not_expired=False,
not_revoked=False,
authority_granted=False,
collapse_to=collapse_to,
)
_emit_key_receipt("key_assertion_denied", {
**asdict(assertion),
"reason": reason,
})
return {
"authority_granted": False,
"assertion_id": assertion_id,
"reason": reason,
"collapse_to": collapse_to,
"message": f"Authority denied. Collapsing to {collapse_to}.",
}
def _collapse_profile(profile: str) -> str:
"""Determine collapse target when authority is denied."""
collapse_map = {
"sovereign": "phoenix",
"phoenix": "guardian",
"guardian": "operator",
"operator": "observer",
"observer": "observer",
}
return collapse_map.get(profile, "observer")
def revoke_key(key_id: str, reason: str) -> Dict[str, Any]:
"""Revoke a key binding."""
binding = _key_bindings.get(key_id)
if not binding:
return {"success": False, "error": "Key not found"}
binding.status = KeyStatus.REVOKED.value
binding.revoked_at = _now_iso()
binding.revocation_reason = reason
_emit_key_receipt("key_revoked", {
"key_id": key_id,
"profile": binding.profile,
"fingerprint": binding.fingerprint,
"reason": reason,
"revoked_at": binding.revoked_at,
})
return {
"success": True,
"key_id": key_id,
"revoked_at": binding.revoked_at,
}
def get_device_key(device_id: str) -> Optional[Dict[str, Any]]:
"""Get the key binding for a device."""
key_id = _device_keys.get(device_id)
if not key_id:
return None
binding = _key_bindings.get(key_id)
if not binding:
return None
return {
"key_id": key_id,
"profile": binding.profile,
"binding_type": binding.binding_type,
"fingerprint": binding.fingerprint,
"device_id": device_id,
"status": binding.status,
"expires_at": binding.expires_at,
}
def list_key_bindings(profile: Optional[str] = None) -> Dict[str, Any]:
"""List all key bindings, optionally filtered by profile."""
bindings = []
for key_id, binding in _key_bindings.items():
if profile and binding.profile != profile:
continue
bindings.append({
"key_id": key_id,
"profile": binding.profile,
"binding_type": binding.binding_type,
"fingerprint": binding.fingerprint,
"status": binding.status,
"expires_at": binding.expires_at,
"device_id": binding.device_id,
})
return {
"count": len(bindings),
"bindings": bindings,
}
def _emit_key_receipt(receipt_type: str, body: dict) -> dict:
"""Emit a receipt for key operations."""
scroll_path = RECEIPTS_ROOT / "identity" / "key_events.jsonl"
scroll_path.parent.mkdir(parents=True, exist_ok=True)
receipt = {
"schema_version": "2.0.0",
"type": receipt_type,
"timestamp": _now_iso(),
"scroll": "identity",
"tags": ["key", receipt_type],
"root_hash": _vmhash_blake3(json.dumps(body, sort_keys=True).encode()),
"body": body,
}
with open(scroll_path, "a") as f:
f.write(json.dumps(receipt) + "\n")
return receipt
# =============================================================================
# GUARDIAN DEVICE BINDING
# =============================================================================
def bind_guardian_device(
device_id: str,
device_type: str,
public_key_b64: str,
device_attestation: Optional[str] = None,
) -> Dict[str, Any]:
"""
Bind a device as Guardian-of-record.
The Guardian device becomes:
- Escalation signer
- Receipt verifier
- Emergency revocation authority
"""
valid_types = PROFILE_KEY_REQUIREMENTS["guardian"]["device_types"]
if device_type not in valid_types:
return {
"success": False,
"error": f"Invalid device type. Must be one of: {valid_types}",
}
# Create guardian key binding
result = create_key_binding(
profile="guardian",
public_key_b64=public_key_b64,
device_id=device_id,
ttl_seconds=604800, # 7 days
)
if not result.get("success"):
return result
# Record device binding
device_binding = {
"device_id": device_id,
"device_type": device_type,
"key_id": result["key_id"],
"fingerprint": result["fingerprint"],
"bound_at": _now_iso(),
"attestation_hash": _vmhash_blake3(device_attestation.encode()) if device_attestation else None,
"capabilities": [
"escalation_signer",
"receipt_verifier",
"emergency_revocation",
],
}
_emit_key_receipt("guardian_device_bound", device_binding)
# Store guardian device info
guardian_path = KEYS_ROOT / "guardian_device.json"
guardian_path.parent.mkdir(parents=True, exist_ok=True)
with open(guardian_path, "w") as f:
json.dump(device_binding, f, indent=2)
return {
"success": True,
"device_id": device_id,
"device_type": device_type,
"key_id": result["key_id"],
"fingerprint": result["fingerprint"],
"capabilities": device_binding["capabilities"],
"message": f"Device {device_id} bound as Guardian-of-record",
}
def get_guardian_device() -> Optional[Dict[str, Any]]:
"""Get the current Guardian device binding."""
guardian_path = KEYS_ROOT / "guardian_device.json"
if not guardian_path.exists():
return None
with open(guardian_path, "r") as f:
return json.load(f)