#!/usr/bin/env python3 """ VaultMesh Claude Skill Validator + Receipt Emitter Checks that the VaultMesh skill is correctly installed under: ~/.claude/skills/vaultmesh/ Validates: - Directory exists - All expected files present and non-empty - SKILL.md has valid YAML frontmatter - Supporting docs are properly linked - Emits: - VAULTMESH_SKILL_ROOT.txt (BLAKE3 integrity hash) - VaultMesh Automation scroll receipts for each run: automation_vm_skill_validate_success automation_vm_skill_validate_warning automation_vm_skill_validate_failure """ import json import os import re import sys from dataclasses import dataclass, asdict from datetime import datetime, timezone from pathlib import Path from typing import List, Optional, Dict, Tuple # --------------------------------------------------------------------------- # Path configuration (self-rooting) # --------------------------------------------------------------------------- THIS_FILE = Path(__file__).resolve() CLI_DIR = THIS_FILE.parent # /root/work/vaultmesh/cli REPO_ROOT = THIS_FILE.parents[1] # /root/work/vaultmesh # Allow override via env var, but default to auto-detected repo root VM_ROOT = Path(os.environ.get("VAULTMESH_ROOT", REPO_ROOT)).resolve() RECEIPTS_ROOT = Path(os.environ.get("VAULTMESH_RECEIPTS_ROOT", VM_ROOT / "receipts")) # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- EXPECTED_FILES = [ "SKILL.md", "QUICK_REFERENCE.md", "OPERATIONS.md", "MCP_INTEGRATION.md", "PROTOCOLS.md", "ALCHEMICAL_PATTERNS.md", "INFRASTRUCTURE.md", "CODE_TEMPLATES.md", "ENGINE_SPECS.md", ] SUPPORTING_DOC_LINKS = { "QUICK_REFERENCE.md": "Quick Reference", "OPERATIONS.md": "Operations Guide", "MCP_INTEGRATION.md": "MCP Integration", "PROTOCOLS.md": "Protocols", "ALCHEMICAL_PATTERNS.md": "Alchemical Patterns", "INFRASTRUCTURE.md": "Infrastructure", "CODE_TEMPLATES.md": "Code Templates", "ENGINE_SPECS.md": "Engine Specs", } @dataclass class CheckResult: name: str status: str # "ok", "warn", "fail" details: str @dataclass class ValidationReport: skill_dir: str checks: List[CheckResult] overall_status: str # "ok", "warn", "fail" hash_algorithm: Optional[str] = None root_hash: Optional[str] = None def to_dict(self) -> Dict: return { "skill_dir": self.skill_dir, "overall_status": self.overall_status, "hash_algorithm": self.hash_algorithm, "root_hash": self.root_hash, "checks": [asdict(c) for c in self.checks], } # --------------------------------------------------------------------------- # Hashing helpers # --------------------------------------------------------------------------- def load_hasher(): """Return (name, constructor) for hash function (blake3 preferred).""" try: import blake3 # type: ignore return "blake3", blake3.blake3 except Exception: import hashlib return "sha256", hashlib.sha256 # --------------------------------------------------------------------------- # Basic checks # --------------------------------------------------------------------------- def check_dir_exists(skill_dir: Path) -> CheckResult: if skill_dir.is_dir(): return CheckResult( name="skill_dir_exists", status="ok", details=f"Found skill directory at {skill_dir}", ) return CheckResult( name="skill_dir_exists", status="fail", details=f"Skill directory not found: {skill_dir}", ) def check_expected_files(skill_dir: Path) -> List[CheckResult]: results = [] for fname in EXPECTED_FILES: path = skill_dir / fname if not path.exists(): results.append( CheckResult( name=f"file_missing:{fname}", status="fail", details=f"Expected file missing: {fname}", ) ) elif path.stat().st_size == 0: results.append( CheckResult( name=f"file_empty:{fname}", status="warn", details=f"File present but empty: {fname}", ) ) else: results.append( CheckResult( name=f"file_ok:{fname}", status="ok", details=f"File present: {fname}", ) ) return results FRONTMATTER_RE = re.compile( r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL, ) def parse_frontmatter(text: str) -> Optional[Dict[str, str]]: m = FRONTMATTER_RE.match(text) if not m: return None body = m.group(1) data: Dict[str, str] = {} for line in body.splitlines(): line = line.strip() if not line or line.startswith("#"): continue if ":" not in line: continue key, value = line.split(":", 1) data[key.strip()] = value.strip().strip('"').strip("'") return data def check_skill_md(skill_dir: Path) -> List[CheckResult]: results: List[CheckResult] = [] path = skill_dir / "SKILL.md" if not path.exists(): return [ CheckResult( name="skill_md_exists", status="fail", details="SKILL.md is missing", ) ] text = path.read_text(encoding="utf-8") fm = parse_frontmatter(text) if fm is None: results.append( CheckResult( name="skill_md_frontmatter", status="fail", details="YAML frontmatter block (--- ... ---) not found at top of SKILL.md", ) ) return results # name if fm.get("name") == "vaultmesh": results.append( CheckResult( name="skill_md_name", status="ok", details='Frontmatter name is "vaultmesh".', ) ) else: results.append( CheckResult( name="skill_md_name", status="fail", details=f'Frontmatter "name" should be "vaultmesh", got {fm.get("name")!r}', ) ) # description desc = fm.get("description", "").strip() if desc: results.append( CheckResult( name="skill_md_description", status="ok", details=f"Description present ({len(desc)} chars).", ) ) else: results.append( CheckResult( name="skill_md_description", status="fail", details="Frontmatter 'description' is missing or empty.", ) ) # Supporting doc links link_checks = check_supporting_links(text) results.extend(link_checks) return results def check_supporting_links(skill_md_text: str) -> List[CheckResult]: results: List[CheckResult] = [] # Simple markdown link regex: [Label](FILE.md) link_re = re.compile(r"\[([^\]]+)\]\(([^)]+)\)") found_links: Dict[str, str] = {} for label, target in link_re.findall(skill_md_text): found_links[target] = label for fname, expected_label in SUPPORTING_DOC_LINKS.items(): if fname not in found_links: results.append( CheckResult( name=f"link_missing:{fname}", status="fail", details=f"Missing markdown link to {fname} in SKILL.md", ) ) else: label = found_links[fname] # Only warn if label is very different if expected_label.lower() not in label.lower(): results.append( CheckResult( name=f"link_label_warn:{fname}", status="warn", details=( f"Link to {fname} present but label is '{label}', " f"expected something like '{expected_label}'." ), ) ) else: results.append( CheckResult( name=f"link_ok:{fname}", status="ok", details=f"Link to {fname} present with label '{label}'.", ) ) return results # --------------------------------------------------------------------------- # Integrity root for the skill (VAULTMESH_SKILL_ROOT.txt) # --------------------------------------------------------------------------- def compute_skill_root_hash(skill_dir: Path) -> Tuple[str, str]: algo_name, hasher_ctor = load_hasher() h = hasher_ctor() # Sort to keep deterministic ordering for fname in sorted(EXPECTED_FILES): path = skill_dir / fname if not path.exists(): continue h.update(fname.encode("utf-8")) h.update(b"\0") with path.open("rb") as f: while True: chunk = f.read(8192) if not chunk: break h.update(chunk) digest = h.hexdigest() return algo_name, digest def write_skill_root_file(skill_dir: Path, algo_name: str, digest: str) -> CheckResult: out_path = skill_dir / "VAULTMESH_SKILL_ROOT.txt" if algo_name.lower() == "blake3": value = f"blake3:{digest}\n" else: value = f"{algo_name}:{digest}\n" out_path.write_text(value, encoding="utf-8") return CheckResult( name="root_file_written", status="ok", details=f"Wrote integrity root to {out_path} using {algo_name}.", ) # --------------------------------------------------------------------------- # Automation scroll receipt emission # --------------------------------------------------------------------------- def _now_iso() -> str: return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") def _read_last_receipt(receipts_path: Path) -> Optional[Dict]: """Read the last receipt from the JSONL scroll file.""" if not receipts_path.exists(): return None try: with receipts_path.open("r", encoding="utf-8") as f: last_line = None for line in f: line = line.strip() if line: last_line = line if not last_line: return None return json.loads(last_line) except Exception: return None def emit_automation_receipt( event_type: str, report: ValidationReport, skill_root_algo: str, skill_root_hash: str, ) -> None: """ Emit a VaultMesh Automation scroll receipt for this validator run. Scroll: Automation Types: - automation_vm_skill_validate_success - automation_vm_skill_validate_warning - automation_vm_skill_validate_failure """ scroll_name = "Automation" scroll_dir = RECEIPTS_ROOT / "automation" scroll_dir.mkdir(parents=True, exist_ok=True) receipts_path = scroll_dir / "automation_events.jsonl" # Determine sequence and previous_hash last = _read_last_receipt(receipts_path) if last is None: sequence = 0 previous_hash = None else: sequence = int(last.get("meta", {}).get("sequence", -1)) + 1 previous_hash = last.get("header", {}).get("root_hash") # Body snapshot (we keep it compact) body = { "skill_dir": report.skill_dir, "hash_algorithm": skill_root_algo, "root_hash": f"blake3:{skill_root_hash}" if skill_root_algo.lower() == "blake3" else f"{skill_root_algo}:{skill_root_hash}", "overall_status": report.overall_status, "checks": [ { "name": c.name, "status": c.status, } for c in report.checks ], } # Build receipt (schema v2-style) timestamp = _now_iso() receipt = { "schema_version": "2.0.0", "type": event_type, "timestamp": timestamp, "header": { "root_hash": None, # filled after hash "tags": [ "vaultmesh_skill", "validator", f"status:{report.overall_status}", ], "previous_hash": previous_hash, }, "meta": { "scroll": scroll_name, "sequence": sequence, "anchor_epoch": None, "proof_path": None, }, "body": body, } # Compute receipt hash over canonical JSON algo_name, hasher_ctor = load_hasher() h = hasher_ctor() encoded = json.dumps(receipt, sort_keys=True, separators=(",", ":")).encode("utf-8") h.update(encoded) digest = h.hexdigest() if algo_name.lower() == "blake3": receipt_hash = f"blake3:{digest}" else: receipt_hash = f"{algo_name}:{digest}" receipt["header"]["root_hash"] = receipt_hash # Append to scroll file with receipts_path.open("a", encoding="utf-8") as f: f.write(json.dumps(receipt, separators=(",", ":")) + "\n") # --------------------------------------------------------------------------- # Aggregation + main # --------------------------------------------------------------------------- def aggregate_status(checks: List[CheckResult]) -> str: worst = "ok" for c in checks: if c.status == "fail": return "fail" if c.status == "warn" and worst == "ok": worst = "warn" return worst def main(argv: List[str]) -> int: if len(argv) > 2: print(f"Usage: {argv[0]} [skill_dir]", file=sys.stderr) return 2 if len(argv) == 2: skill_dir = Path(argv[1]).expanduser() else: skill_dir = Path("~/.claude/skills/vaultmesh").expanduser() checks: List[CheckResult] = [] # 1. Directory dir_check = check_dir_exists(skill_dir) checks.append(dir_check) if dir_check.status == "fail": report = ValidationReport( skill_dir=str(skill_dir), checks=checks, overall_status="fail", ) print(json.dumps(report.to_dict(), indent=2)) # No receipt emitted if the skill dir doesn't exist return 2 # 2. Files checks.extend(check_expected_files(skill_dir)) # 3. SKILL.md + links checks.extend(check_skill_md(skill_dir)) # 4. Skill integrity root skill_algo, skill_digest = compute_skill_root_hash(skill_dir) checks.append( CheckResult( name="hash_algorithm", status="ok" if skill_algo == "blake3" else "warn", details=( f"Using {skill_algo} for integrity hash " + ("(preferred)." if skill_algo == "blake3" else "(BLAKE3 not available, using fallback.)") ), ) ) checks.append(write_skill_root_file(skill_dir, skill_algo, skill_digest)) overall = aggregate_status(checks) report = ValidationReport( skill_dir=str(skill_dir), checks=checks, overall_status=overall, hash_algorithm=skill_algo, root_hash=skill_digest, ) # Print JSON report to stdout print(json.dumps(report.to_dict(), indent=2)) # Emit Automation scroll receipt if overall == "ok": event_type = "automation_vm_skill_validate_success" elif overall == "warn": event_type = "automation_vm_skill_validate_warning" else: event_type = "automation_vm_skill_validate_failure" try: emit_automation_receipt( event_type=event_type, report=report, skill_root_algo=skill_algo, skill_root_hash=skill_digest, ) except Exception as e: # We don't want receipt emission failures to hide validation output, # so just log to stderr and keep the original exit code. print(f"WARNING: failed to emit automation receipt: {e}", file=sys.stderr) if overall == "ok": return 0 if overall == "warn": return 1 return 2 if __name__ == "__main__": raise SystemExit(main(sys.argv))