235 lines
7.2 KiB
Python
235 lines
7.2 KiB
Python
"""Guardian MCP tools - Merkle root anchoring operations."""
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import blake3
|
|
|
|
# VaultMesh root from env or default
|
|
VAULTMESH_ROOT = Path(os.environ.get("VAULTMESH_ROOT", Path(__file__).parents[3])).resolve()
|
|
RECEIPTS_ROOT = VAULTMESH_ROOT / "receipts"
|
|
|
|
# Scroll definitions
|
|
SCROLLS = {
|
|
"drills": {"jsonl": "receipts/drills/drill_runs.jsonl"},
|
|
"compliance": {"jsonl": "receipts/compliance/oracle_answers.jsonl"},
|
|
"guardian": {"jsonl": "receipts/guardian/anchor_events.jsonl"},
|
|
"treasury": {"jsonl": "receipts/treasury/treasury_events.jsonl"},
|
|
"mesh": {"jsonl": "receipts/mesh/mesh_events.jsonl"},
|
|
"offsec": {"jsonl": "receipts/offsec/offsec_events.jsonl"},
|
|
"identity": {"jsonl": "receipts/identity/identity_events.jsonl"},
|
|
"observability": {"jsonl": "receipts/observability/observability_events.jsonl"},
|
|
"automation": {"jsonl": "receipts/automation/automation_events.jsonl"},
|
|
"psi": {"jsonl": "receipts/psi/psi_events.jsonl"},
|
|
}
|
|
|
|
|
|
def _vmhash_blake3(data: bytes) -> str:
|
|
"""VaultMesh hash: blake3:<hex>."""
|
|
return f"blake3:{blake3.blake3(data).hexdigest()}"
|
|
|
|
|
|
def _merkle_root(hashes: list[str]) -> str:
|
|
"""Compute Merkle root from list of VaultMesh hashes."""
|
|
if not hashes:
|
|
return _vmhash_blake3(b"empty")
|
|
if len(hashes) == 1:
|
|
return hashes[0]
|
|
|
|
# Iteratively combine pairs
|
|
current = hashes
|
|
while len(current) > 1:
|
|
next_level = []
|
|
for i in range(0, len(current), 2):
|
|
if i + 1 < len(current):
|
|
combined = current[i] + current[i + 1]
|
|
else:
|
|
combined = current[i] + current[i] # Duplicate odd leaf
|
|
next_level.append(_vmhash_blake3(combined.encode()))
|
|
current = next_level
|
|
return current[0]
|
|
|
|
|
|
def _compute_scroll_root(scroll_name: str) -> dict[str, Any]:
|
|
"""Compute Merkle root for a single scroll."""
|
|
if scroll_name not in SCROLLS:
|
|
return {"error": f"Unknown scroll: {scroll_name}"}
|
|
|
|
jsonl_path = VAULTMESH_ROOT / SCROLLS[scroll_name]["jsonl"]
|
|
if not jsonl_path.exists():
|
|
return {
|
|
"scroll": scroll_name,
|
|
"root": _vmhash_blake3(b"empty"),
|
|
"leaf_count": 0,
|
|
"exists": False,
|
|
}
|
|
|
|
hashes = []
|
|
with open(jsonl_path, "r") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
hashes.append(_vmhash_blake3(line.encode()))
|
|
|
|
root = _merkle_root(hashes)
|
|
return {
|
|
"scroll": scroll_name,
|
|
"root": root,
|
|
"leaf_count": len(hashes),
|
|
"exists": True,
|
|
}
|
|
|
|
|
|
def guardian_anchor_now(
|
|
scrolls: Optional[list[str]] = None,
|
|
guardian_did: str = "did:vm:guardian:mcp",
|
|
backend: str = "local",
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Anchor specified scrolls and emit a guardian receipt.
|
|
|
|
Args:
|
|
scrolls: List of scroll names to anchor (default: all)
|
|
guardian_did: DID of the guardian performing the anchor
|
|
backend: Backend identifier (local, ethereum, stellar)
|
|
|
|
Returns:
|
|
Anchor receipt with roots for each scroll
|
|
"""
|
|
if scrolls is None:
|
|
scrolls = list(SCROLLS.keys())
|
|
|
|
# Validate scrolls
|
|
invalid = [s for s in scrolls if s not in SCROLLS]
|
|
if invalid:
|
|
return {"error": f"Invalid scrolls: {invalid}"}
|
|
|
|
# Compute roots for each scroll
|
|
roots = {}
|
|
for scroll_name in scrolls:
|
|
result = _compute_scroll_root(scroll_name)
|
|
if "error" in result:
|
|
return result
|
|
roots[scroll_name] = result["root"]
|
|
|
|
# Compute anchor hash over all roots
|
|
roots_json = json.dumps(roots, sort_keys=True).encode()
|
|
anchor_hash = _vmhash_blake3(roots_json)
|
|
|
|
now = datetime.now(timezone.utc)
|
|
anchor_id = f"anchor-{now.strftime('%Y%m%d%H%M%S')}"
|
|
|
|
receipt = {
|
|
"schema_version": "2.0.0",
|
|
"type": "guardian_anchor",
|
|
"timestamp": now.isoformat(),
|
|
"anchor_id": anchor_id,
|
|
"backend": backend,
|
|
"anchor_by": guardian_did,
|
|
"anchor_epoch": int(now.timestamp()),
|
|
"roots": roots,
|
|
"scrolls": scrolls,
|
|
"anchor_hash": anchor_hash,
|
|
}
|
|
|
|
# Write receipt to guardian JSONL
|
|
guardian_path = VAULTMESH_ROOT / "receipts/guardian/anchor_events.jsonl"
|
|
guardian_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(guardian_path, "a") as f:
|
|
f.write(json.dumps(receipt) + "\n")
|
|
|
|
# Update ROOT.guardian.txt
|
|
root_result = _compute_scroll_root("guardian")
|
|
root_file = VAULTMESH_ROOT / "ROOT.guardian.txt"
|
|
root_file.write_text(root_result["root"])
|
|
|
|
return {
|
|
"success": True,
|
|
"receipt": receipt,
|
|
"message": f"Anchored {len(scrolls)} scrolls with ID {anchor_id}",
|
|
}
|
|
|
|
|
|
def guardian_verify_receipt(receipt_hash: str, scroll: str = "guardian") -> dict[str, Any]:
|
|
"""
|
|
Verify a receipt exists in a scroll's JSONL.
|
|
|
|
Args:
|
|
receipt_hash: The root_hash of the receipt to verify
|
|
scroll: The scroll to search in
|
|
|
|
Returns:
|
|
Verification result with proof if found
|
|
"""
|
|
if scroll not in SCROLLS:
|
|
return {"error": f"Unknown scroll: {scroll}"}
|
|
|
|
jsonl_path = VAULTMESH_ROOT / SCROLLS[scroll]["jsonl"]
|
|
if not jsonl_path.exists():
|
|
return {"verified": False, "reason": "Scroll JSONL does not exist"}
|
|
|
|
# Search for receipt with matching hash
|
|
with open(jsonl_path, "r") as f:
|
|
line_num = 0
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
line_num += 1
|
|
line_hash = _vmhash_blake3(line.encode())
|
|
|
|
# Check if the line hash matches or if the JSON contains the hash
|
|
try:
|
|
data = json.loads(line)
|
|
if data.get("anchor_hash") == receipt_hash or data.get("root_hash") == receipt_hash:
|
|
return {
|
|
"verified": True,
|
|
"line_number": line_num,
|
|
"line_hash": line_hash,
|
|
"receipt": data,
|
|
}
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
return {"verified": False, "reason": "Receipt not found in scroll"}
|
|
|
|
|
|
def guardian_status() -> dict[str, Any]:
|
|
"""
|
|
Get current status of all scrolls.
|
|
|
|
Returns:
|
|
Status of each scroll including root hash and leaf count
|
|
"""
|
|
status = {}
|
|
for scroll_name in SCROLLS:
|
|
result = _compute_scroll_root(scroll_name)
|
|
status[scroll_name] = {
|
|
"root": result["root"],
|
|
"leaf_count": result["leaf_count"],
|
|
"exists": result.get("exists", False),
|
|
}
|
|
|
|
# Get last anchor info
|
|
guardian_path = VAULTMESH_ROOT / "receipts/guardian/anchor_events.jsonl"
|
|
last_anchor = None
|
|
if guardian_path.exists():
|
|
with open(guardian_path, "r") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
try:
|
|
last_anchor = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
return {
|
|
"scrolls": status,
|
|
"last_anchor": last_anchor,
|
|
"vaultmesh_root": str(VAULTMESH_ROOT),
|
|
}
|