Initialize repository snapshot

This commit is contained in:
Vault Sovereign
2025-12-27 00:10:32 +00:00
commit 110d644e10
281 changed files with 40331 additions and 0 deletions

550
cli/skill_validator.py Executable file
View File

@@ -0,0 +1,550 @@
#!/usr/bin/env python3
"""
VaultMesh Claude Skill Validator + Receipt Emitter
Checks that the VaultMesh skill is correctly installed under:
~/.claude/skills/vaultmesh/
Validates:
- Directory exists
- All expected files present and non-empty
- SKILL.md has valid YAML frontmatter
- Supporting docs are properly linked
- Emits:
- VAULTMESH_SKILL_ROOT.txt (BLAKE3 integrity hash)
- VaultMesh Automation scroll receipts for each run:
automation_vm_skill_validate_success
automation_vm_skill_validate_warning
automation_vm_skill_validate_failure
"""
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional, Dict, Tuple
# ---------------------------------------------------------------------------
# Path configuration (self-rooting)
# ---------------------------------------------------------------------------
THIS_FILE = Path(__file__).resolve()
CLI_DIR = THIS_FILE.parent # /root/work/vaultmesh/cli
REPO_ROOT = THIS_FILE.parents[1] # /root/work/vaultmesh
# Allow override via env var, but default to auto-detected repo root
VM_ROOT = Path(os.environ.get("VAULTMESH_ROOT", REPO_ROOT)).resolve()
RECEIPTS_ROOT = Path(os.environ.get("VAULTMESH_RECEIPTS_ROOT", VM_ROOT / "receipts"))
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
EXPECTED_FILES = [
"SKILL.md",
"QUICK_REFERENCE.md",
"OPERATIONS.md",
"MCP_INTEGRATION.md",
"PROTOCOLS.md",
"ALCHEMICAL_PATTERNS.md",
"INFRASTRUCTURE.md",
"CODE_TEMPLATES.md",
"ENGINE_SPECS.md",
]
SUPPORTING_DOC_LINKS = {
"QUICK_REFERENCE.md": "Quick Reference",
"OPERATIONS.md": "Operations Guide",
"MCP_INTEGRATION.md": "MCP Integration",
"PROTOCOLS.md": "Protocols",
"ALCHEMICAL_PATTERNS.md": "Alchemical Patterns",
"INFRASTRUCTURE.md": "Infrastructure",
"CODE_TEMPLATES.md": "Code Templates",
"ENGINE_SPECS.md": "Engine Specs",
}
@dataclass
class CheckResult:
name: str
status: str # "ok", "warn", "fail"
details: str
@dataclass
class ValidationReport:
skill_dir: str
checks: List[CheckResult]
overall_status: str # "ok", "warn", "fail"
hash_algorithm: Optional[str] = None
root_hash: Optional[str] = None
def to_dict(self) -> Dict:
return {
"skill_dir": self.skill_dir,
"overall_status": self.overall_status,
"hash_algorithm": self.hash_algorithm,
"root_hash": self.root_hash,
"checks": [asdict(c) for c in self.checks],
}
# ---------------------------------------------------------------------------
# Hashing helpers
# ---------------------------------------------------------------------------
def load_hasher():
"""Return (name, constructor) for hash function (blake3 preferred)."""
try:
import blake3 # type: ignore
return "blake3", blake3.blake3
except Exception:
import hashlib
return "sha256", hashlib.sha256
# ---------------------------------------------------------------------------
# Basic checks
# ---------------------------------------------------------------------------
def check_dir_exists(skill_dir: Path) -> CheckResult:
if skill_dir.is_dir():
return CheckResult(
name="skill_dir_exists",
status="ok",
details=f"Found skill directory at {skill_dir}",
)
return CheckResult(
name="skill_dir_exists",
status="fail",
details=f"Skill directory not found: {skill_dir}",
)
def check_expected_files(skill_dir: Path) -> List[CheckResult]:
results = []
for fname in EXPECTED_FILES:
path = skill_dir / fname
if not path.exists():
results.append(
CheckResult(
name=f"file_missing:{fname}",
status="fail",
details=f"Expected file missing: {fname}",
)
)
elif path.stat().st_size == 0:
results.append(
CheckResult(
name=f"file_empty:{fname}",
status="warn",
details=f"File present but empty: {fname}",
)
)
else:
results.append(
CheckResult(
name=f"file_ok:{fname}",
status="ok",
details=f"File present: {fname}",
)
)
return results
FRONTMATTER_RE = re.compile(
r"^---\s*\n(.*?)\n---\s*\n",
re.DOTALL,
)
def parse_frontmatter(text: str) -> Optional[Dict[str, str]]:
m = FRONTMATTER_RE.match(text)
if not m:
return None
body = m.group(1)
data: Dict[str, str] = {}
for line in body.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if ":" not in line:
continue
key, value = line.split(":", 1)
data[key.strip()] = value.strip().strip('"').strip("'")
return data
def check_skill_md(skill_dir: Path) -> List[CheckResult]:
results: List[CheckResult] = []
path = skill_dir / "SKILL.md"
if not path.exists():
return [
CheckResult(
name="skill_md_exists",
status="fail",
details="SKILL.md is missing",
)
]
text = path.read_text(encoding="utf-8")
fm = parse_frontmatter(text)
if fm is None:
results.append(
CheckResult(
name="skill_md_frontmatter",
status="fail",
details="YAML frontmatter block (--- ... ---) not found at top of SKILL.md",
)
)
return results
# name
if fm.get("name") == "vaultmesh":
results.append(
CheckResult(
name="skill_md_name",
status="ok",
details='Frontmatter name is "vaultmesh".',
)
)
else:
results.append(
CheckResult(
name="skill_md_name",
status="fail",
details=f'Frontmatter "name" should be "vaultmesh", got {fm.get("name")!r}',
)
)
# description
desc = fm.get("description", "").strip()
if desc:
results.append(
CheckResult(
name="skill_md_description",
status="ok",
details=f"Description present ({len(desc)} chars).",
)
)
else:
results.append(
CheckResult(
name="skill_md_description",
status="fail",
details="Frontmatter 'description' is missing or empty.",
)
)
# Supporting doc links
link_checks = check_supporting_links(text)
results.extend(link_checks)
return results
def check_supporting_links(skill_md_text: str) -> List[CheckResult]:
results: List[CheckResult] = []
# Simple markdown link regex: [Label](FILE.md)
link_re = re.compile(r"\[([^\]]+)\]\(([^)]+)\)")
found_links: Dict[str, str] = {}
for label, target in link_re.findall(skill_md_text):
found_links[target] = label
for fname, expected_label in SUPPORTING_DOC_LINKS.items():
if fname not in found_links:
results.append(
CheckResult(
name=f"link_missing:{fname}",
status="fail",
details=f"Missing markdown link to {fname} in SKILL.md",
)
)
else:
label = found_links[fname]
# Only warn if label is very different
if expected_label.lower() not in label.lower():
results.append(
CheckResult(
name=f"link_label_warn:{fname}",
status="warn",
details=(
f"Link to {fname} present but label is '{label}', "
f"expected something like '{expected_label}'."
),
)
)
else:
results.append(
CheckResult(
name=f"link_ok:{fname}",
status="ok",
details=f"Link to {fname} present with label '{label}'.",
)
)
return results
# ---------------------------------------------------------------------------
# Integrity root for the skill (VAULTMESH_SKILL_ROOT.txt)
# ---------------------------------------------------------------------------
def compute_skill_root_hash(skill_dir: Path) -> Tuple[str, str]:
algo_name, hasher_ctor = load_hasher()
h = hasher_ctor()
# Sort to keep deterministic ordering
for fname in sorted(EXPECTED_FILES):
path = skill_dir / fname
if not path.exists():
continue
h.update(fname.encode("utf-8"))
h.update(b"\0")
with path.open("rb") as f:
while True:
chunk = f.read(8192)
if not chunk:
break
h.update(chunk)
digest = h.hexdigest()
return algo_name, digest
def write_skill_root_file(skill_dir: Path, algo_name: str, digest: str) -> CheckResult:
out_path = skill_dir / "VAULTMESH_SKILL_ROOT.txt"
if algo_name.lower() == "blake3":
value = f"blake3:{digest}\n"
else:
value = f"{algo_name}:{digest}\n"
out_path.write_text(value, encoding="utf-8")
return CheckResult(
name="root_file_written",
status="ok",
details=f"Wrote integrity root to {out_path} using {algo_name}.",
)
# ---------------------------------------------------------------------------
# Automation scroll receipt emission
# ---------------------------------------------------------------------------
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def _read_last_receipt(receipts_path: Path) -> Optional[Dict]:
"""Read the last receipt from the JSONL scroll file."""
if not receipts_path.exists():
return None
try:
with receipts_path.open("r", encoding="utf-8") as f:
last_line = None
for line in f:
line = line.strip()
if line:
last_line = line
if not last_line:
return None
return json.loads(last_line)
except Exception:
return None
def emit_automation_receipt(
event_type: str,
report: ValidationReport,
skill_root_algo: str,
skill_root_hash: str,
) -> None:
"""
Emit a VaultMesh Automation scroll receipt for this validator run.
Scroll: Automation
Types:
- automation_vm_skill_validate_success
- automation_vm_skill_validate_warning
- automation_vm_skill_validate_failure
"""
scroll_name = "Automation"
scroll_dir = RECEIPTS_ROOT / "automation"
scroll_dir.mkdir(parents=True, exist_ok=True)
receipts_path = scroll_dir / "automation_events.jsonl"
# Determine sequence and previous_hash
last = _read_last_receipt(receipts_path)
if last is None:
sequence = 0
previous_hash = None
else:
sequence = int(last.get("meta", {}).get("sequence", -1)) + 1
previous_hash = last.get("header", {}).get("root_hash")
# Body snapshot (we keep it compact)
body = {
"skill_dir": report.skill_dir,
"hash_algorithm": skill_root_algo,
"root_hash": f"blake3:{skill_root_hash}"
if skill_root_algo.lower() == "blake3"
else f"{skill_root_algo}:{skill_root_hash}",
"overall_status": report.overall_status,
"checks": [
{
"name": c.name,
"status": c.status,
}
for c in report.checks
],
}
# Build receipt (schema v2-style)
timestamp = _now_iso()
receipt = {
"schema_version": "2.0.0",
"type": event_type,
"timestamp": timestamp,
"header": {
"root_hash": None, # filled after hash
"tags": [
"vaultmesh_skill",
"validator",
f"status:{report.overall_status}",
],
"previous_hash": previous_hash,
},
"meta": {
"scroll": scroll_name,
"sequence": sequence,
"anchor_epoch": None,
"proof_path": None,
},
"body": body,
}
# Compute receipt hash over canonical JSON
algo_name, hasher_ctor = load_hasher()
h = hasher_ctor()
encoded = json.dumps(receipt, sort_keys=True, separators=(",", ":")).encode("utf-8")
h.update(encoded)
digest = h.hexdigest()
if algo_name.lower() == "blake3":
receipt_hash = f"blake3:{digest}"
else:
receipt_hash = f"{algo_name}:{digest}"
receipt["header"]["root_hash"] = receipt_hash
# Append to scroll file
with receipts_path.open("a", encoding="utf-8") as f:
f.write(json.dumps(receipt, separators=(",", ":")) + "\n")
# ---------------------------------------------------------------------------
# Aggregation + main
# ---------------------------------------------------------------------------
def aggregate_status(checks: List[CheckResult]) -> str:
worst = "ok"
for c in checks:
if c.status == "fail":
return "fail"
if c.status == "warn" and worst == "ok":
worst = "warn"
return worst
def main(argv: List[str]) -> int:
if len(argv) > 2:
print(f"Usage: {argv[0]} [skill_dir]", file=sys.stderr)
return 2
if len(argv) == 2:
skill_dir = Path(argv[1]).expanduser()
else:
skill_dir = Path("~/.claude/skills/vaultmesh").expanduser()
checks: List[CheckResult] = []
# 1. Directory
dir_check = check_dir_exists(skill_dir)
checks.append(dir_check)
if dir_check.status == "fail":
report = ValidationReport(
skill_dir=str(skill_dir),
checks=checks,
overall_status="fail",
)
print(json.dumps(report.to_dict(), indent=2))
# No receipt emitted if the skill dir doesn't exist
return 2
# 2. Files
checks.extend(check_expected_files(skill_dir))
# 3. SKILL.md + links
checks.extend(check_skill_md(skill_dir))
# 4. Skill integrity root
skill_algo, skill_digest = compute_skill_root_hash(skill_dir)
checks.append(
CheckResult(
name="hash_algorithm",
status="ok" if skill_algo == "blake3" else "warn",
details=(
f"Using {skill_algo} for integrity hash "
+ ("(preferred)." if skill_algo == "blake3" else "(BLAKE3 not available, using fallback.)")
),
)
)
checks.append(write_skill_root_file(skill_dir, skill_algo, skill_digest))
overall = aggregate_status(checks)
report = ValidationReport(
skill_dir=str(skill_dir),
checks=checks,
overall_status=overall,
hash_algorithm=skill_algo,
root_hash=skill_digest,
)
# Print JSON report to stdout
print(json.dumps(report.to_dict(), indent=2))
# Emit Automation scroll receipt
if overall == "ok":
event_type = "automation_vm_skill_validate_success"
elif overall == "warn":
event_type = "automation_vm_skill_validate_warning"
else:
event_type = "automation_vm_skill_validate_failure"
try:
emit_automation_receipt(
event_type=event_type,
report=report,
skill_root_algo=skill_algo,
skill_root_hash=skill_digest,
)
except Exception as e:
# We don't want receipt emission failures to hide validation output,
# so just log to stderr and keep the original exit code.
print(f"WARNING: failed to emit automation receipt: {e}", file=sys.stderr)
if overall == "ok":
return 0
if overall == "warn":
return 1
return 2
if __name__ == "__main__":
raise SystemExit(main(sys.argv))