"""Guardian MCP tools - Merkle root anchoring operations.""" import json import os from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional import blake3 # VaultMesh root from env or default VAULTMESH_ROOT = Path(os.environ.get("VAULTMESH_ROOT", Path(__file__).parents[3])).resolve() RECEIPTS_ROOT = VAULTMESH_ROOT / "receipts" # Scroll definitions SCROLLS = { "drills": {"jsonl": "receipts/drills/drill_runs.jsonl"}, "compliance": {"jsonl": "receipts/compliance/oracle_answers.jsonl"}, "guardian": {"jsonl": "receipts/guardian/anchor_events.jsonl"}, "treasury": {"jsonl": "receipts/treasury/treasury_events.jsonl"}, "mesh": {"jsonl": "receipts/mesh/mesh_events.jsonl"}, "offsec": {"jsonl": "receipts/offsec/offsec_events.jsonl"}, "identity": {"jsonl": "receipts/identity/identity_events.jsonl"}, "observability": {"jsonl": "receipts/observability/observability_events.jsonl"}, "automation": {"jsonl": "receipts/automation/automation_events.jsonl"}, "psi": {"jsonl": "receipts/psi/psi_events.jsonl"}, } def _vmhash_blake3(data: bytes) -> str: """VaultMesh hash: blake3:.""" return f"blake3:{blake3.blake3(data).hexdigest()}" def _merkle_root(hashes: list[str]) -> str: """Compute Merkle root from list of VaultMesh hashes.""" if not hashes: return _vmhash_blake3(b"empty") if len(hashes) == 1: return hashes[0] # Iteratively combine pairs current = hashes while len(current) > 1: next_level = [] for i in range(0, len(current), 2): if i + 1 < len(current): combined = current[i] + current[i + 1] else: combined = current[i] + current[i] # Duplicate odd leaf next_level.append(_vmhash_blake3(combined.encode())) current = next_level return current[0] def _compute_scroll_root(scroll_name: str) -> dict[str, Any]: """Compute Merkle root for a single scroll.""" if scroll_name not in SCROLLS: return {"error": f"Unknown scroll: {scroll_name}"} jsonl_path = VAULTMESH_ROOT / SCROLLS[scroll_name]["jsonl"] if not jsonl_path.exists(): return { "scroll": scroll_name, "root": _vmhash_blake3(b"empty"), "leaf_count": 0, "exists": False, } hashes = [] with open(jsonl_path, "r") as f: for line in f: line = line.strip() if line: hashes.append(_vmhash_blake3(line.encode())) root = _merkle_root(hashes) return { "scroll": scroll_name, "root": root, "leaf_count": len(hashes), "exists": True, } def guardian_anchor_now( scrolls: Optional[list[str]] = None, guardian_did: str = "did:vm:guardian:mcp", backend: str = "local", ) -> dict[str, Any]: """ Anchor specified scrolls and emit a guardian receipt. Args: scrolls: List of scroll names to anchor (default: all) guardian_did: DID of the guardian performing the anchor backend: Backend identifier (local, ethereum, stellar) Returns: Anchor receipt with roots for each scroll """ if scrolls is None: scrolls = list(SCROLLS.keys()) # Validate scrolls invalid = [s for s in scrolls if s not in SCROLLS] if invalid: return {"error": f"Invalid scrolls: {invalid}"} # Compute roots for each scroll roots = {} for scroll_name in scrolls: result = _compute_scroll_root(scroll_name) if "error" in result: return result roots[scroll_name] = result["root"] # Compute anchor hash over all roots roots_json = json.dumps(roots, sort_keys=True).encode() anchor_hash = _vmhash_blake3(roots_json) now = datetime.now(timezone.utc) anchor_id = f"anchor-{now.strftime('%Y%m%d%H%M%S')}" receipt = { "schema_version": "2.0.0", "type": "guardian_anchor", "timestamp": now.isoformat(), "anchor_id": anchor_id, "backend": backend, "anchor_by": guardian_did, "anchor_epoch": int(now.timestamp()), "roots": roots, "scrolls": scrolls, "anchor_hash": anchor_hash, } # Write receipt to guardian JSONL guardian_path = VAULTMESH_ROOT / "receipts/guardian/anchor_events.jsonl" guardian_path.parent.mkdir(parents=True, exist_ok=True) with open(guardian_path, "a") as f: f.write(json.dumps(receipt) + "\n") # Update ROOT.guardian.txt root_result = _compute_scroll_root("guardian") root_file = VAULTMESH_ROOT / "ROOT.guardian.txt" root_file.write_text(root_result["root"]) return { "success": True, "receipt": receipt, "message": f"Anchored {len(scrolls)} scrolls with ID {anchor_id}", } def guardian_verify_receipt(receipt_hash: str, scroll: str = "guardian") -> dict[str, Any]: """ Verify a receipt exists in a scroll's JSONL. Args: receipt_hash: The root_hash of the receipt to verify scroll: The scroll to search in Returns: Verification result with proof if found """ if scroll not in SCROLLS: return {"error": f"Unknown scroll: {scroll}"} jsonl_path = VAULTMESH_ROOT / SCROLLS[scroll]["jsonl"] if not jsonl_path.exists(): return {"verified": False, "reason": "Scroll JSONL does not exist"} # Search for receipt with matching hash with open(jsonl_path, "r") as f: line_num = 0 for line in f: line = line.strip() if not line: continue line_num += 1 line_hash = _vmhash_blake3(line.encode()) # Check if the line hash matches or if the JSON contains the hash try: data = json.loads(line) if data.get("anchor_hash") == receipt_hash or data.get("root_hash") == receipt_hash: return { "verified": True, "line_number": line_num, "line_hash": line_hash, "receipt": data, } except json.JSONDecodeError: continue return {"verified": False, "reason": "Receipt not found in scroll"} def guardian_status() -> dict[str, Any]: """ Get current status of all scrolls. Returns: Status of each scroll including root hash and leaf count """ status = {} for scroll_name in SCROLLS: result = _compute_scroll_root(scroll_name) status[scroll_name] = { "root": result["root"], "leaf_count": result["leaf_count"], "exists": result.get("exists", False), } # Get last anchor info guardian_path = VAULTMESH_ROOT / "receipts/guardian/anchor_events.jsonl" last_anchor = None if guardian_path.exists(): with open(guardian_path, "r") as f: for line in f: line = line.strip() if line: try: last_anchor = json.loads(line) except json.JSONDecodeError: pass return { "scrolls": status, "last_anchor": last_anchor, "vaultmesh_root": str(VAULTMESH_ROOT), }