#!/usr/bin/env python3 """ vm_verify_sentinel_bundle.py Offline verifier for VaultMesh Sentinel v1 seal bundles. Usage: python3 vm_verify_sentinel_bundle.py --bundle /path/to/bundle_dir [--strict] [--report out.json] Exit codes: 0 - verification OK 1 - verification failed 2 - usage / unexpected error """ from __future__ import annotations import argparse import hashlib import json import sys from dataclasses import dataclass from pathlib import Path from typing import Any, Iterable, Optional from sentinel_failure_codes import FailureCode, WarningCode try: import blake3 # type: ignore except ImportError: # pragma: no cover blake3 = None SUPPORTED_SEAL_FORMATS = {"vm-sentinel-seal-v1"} SUPPORTED_INTEGRITY_FORMATS = {"vm-sentinel-integrity-v1"} SUPPORTED_VERIFIER_MANIFEST_FORMATS = {"vm-sentinel-verifier-manifest-v1"} SUPPORTED_CANONICALIZATION_VERSIONS = {"sentinel-event-jcs-v1"} ERROR_SCHEMA_INVALID = FailureCode.SCHEMA_INVALID.value ERROR_MANIFEST_HASH_MISMATCH = FailureCode.MANIFEST_HASH_MISMATCH.value ERROR_MISSING_REQUIRED_FILE = FailureCode.MISSING_REQUIRED_FILE.value ERROR_EVENT_HASH_MISMATCH = FailureCode.EVENT_HASH_MISMATCH.value ERROR_CHAIN_DISCONTINUITY = FailureCode.CHAIN_DISCONTINUITY.value ERROR_SEQ_NON_MONOTONIC = FailureCode.SEQ_NON_MONOTONIC.value ERROR_ROOT_MISMATCH = FailureCode.ROOT_MISMATCH.value ERROR_RANGE_MISMATCH = FailureCode.RANGE_MISMATCH.value ERROR_CANON_VERSION_UNSUPPORTED = FailureCode.CANON_VERSION_UNSUPPORTED.value ERROR_OVERSIZE_INPUT = FailureCode.OVERSIZE_INPUT.value ERROR_REVOKED_CAPABILITY_USED = FailureCode.REVOKED_CAPABILITY_USED.value WARNING_UNLISTED_FILE = WarningCode.FILE_NOT_IN_MANIFEST.value WARNING_RANGE_ROOT_PARTIAL = WarningCode.RANGE_ROOT_PARTIAL.value REPO_ROOT = Path(__file__).resolve().parents[1] SCHEMA_DIR = REPO_ROOT / "spec" / "sentinel" _EMBEDDED_SCHEMAS: dict[str, dict[str, Any]] = { "event.schema.json": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "VaultMesh Sentinel v1 Event", "type": "object", "additionalProperties": False, "required": [ "event_id", "seq", "ts", "event_type", "actor", "cap_hash", "op", "op_digest", "result", "trace_id", "prev_event_hash", "event_hash", "payload", ], "properties": { "event_id": {"type": "string"}, "seq": {"type": "integer", "minimum": 0}, "ts": { "description": "Monotonic + wallclock if available. Accepts ISO-8601 Z or a structured object.", "anyOf": [ {"type": "string"}, { "type": "object", "additionalProperties": False, "required": ["wall"], "properties": { "wall": {"type": "string"}, "mono_ns": {"type": "integer", "minimum": 0}, }, }, ], }, "event_type": { "type": "string", "enum": [ "action_intent", "policy_decision", "action_executed", "shadow_receipt", "cap_grant", "cap_revoke", "seal_created", "root_published", "corruption_detected", "tamper_signal", "boot_event", "health_event", ], }, "actor": {"type": "string", "minLength": 1}, "cap_hash": {"type": "string", "minLength": 1}, "op": {"type": "string", "minLength": 1}, "op_digest": {"type": "string", "minLength": 1}, "result": {"type": "string", "enum": ["ok", "deny", "error"]}, "root_before": {"type": "string"}, "root_after": {"type": "string"}, "trace_id": {"type": "string"}, "prev_event_hash": {"type": "string", "minLength": 1}, "event_hash": {"type": "string"}, "payload": {"type": "object"}, }, }, "seal.schema.json": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "VaultMesh Sentinel v1 Seal Bundle (seal.json)", "type": "object", "additionalProperties": False, "required": [ "format", "sentinel_version", "schema_version", "hash_algo", "canonicalization_version", "seal_id", "created_at", "range", "root", "files", ], "properties": { "format": {"type": "string", "const": "vm-sentinel-seal-v1"}, "sentinel_version": {"type": "string"}, "schema_version": {"type": "string"}, "hash_algo": {"type": "string", "enum": ["blake3", "sha256"]}, "canonicalization_version": {"type": "string"}, "seal_id": {"type": "string"}, "created_at": {"type": "string"}, "instance_id": {"type": "string"}, "ledger_type": {"type": "string", "enum": ["sqlite", "jsonl"]}, "range": { "type": "object", "additionalProperties": False, "required": ["since_seq", "until_seq"], "properties": { "since_seq": {"type": "integer", "minimum": 0}, "until_seq": {"type": "integer", "minimum": 0}, "since_ts": {"type": "string"}, "until_ts": {"type": "string"}, }, }, "root": { "type": "object", "additionalProperties": False, "required": ["start", "end"], "properties": { "start": {"type": "string"}, "end": {"type": "string"}, "seq": {"type": "integer", "minimum": 0}, }, }, "files": { "type": "object", "additionalProperties": False, "required": ["receipts", "roots", "integrity", "verifier_manifest"], "properties": { "receipts": {"type": "string"}, "roots": {"type": "string"}, "integrity": {"type": "string"}, "verifier_manifest": {"type": "string"}, }, }, "notes": {"type": "string"}, }, }, "integrity.schema.json": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "VaultMesh Sentinel v1 Integrity Manifest (integrity.json)", "type": "object", "additionalProperties": False, "required": ["format", "hash_algo", "files"], "properties": { "format": {"type": "string", "const": "vm-sentinel-integrity-v1"}, "hash_algo": {"type": "string", "enum": ["blake3", "sha256"]}, "files": { "type": "array", "items": { "type": "object", "additionalProperties": False, "required": ["path", "digest"], "properties": { "path": {"type": "string"}, "digest": {"type": "string"}, "size_bytes": {"type": "integer", "minimum": 0}, }, }, }, }, }, "verifier_manifest.schema.json": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "VaultMesh Sentinel v1 Verifier Manifest (verifier_manifest.json)", "type": "object", "additionalProperties": False, "required": [ "format", "sentinel_version", "schema_version", "canonicalization_version", ], "properties": { "format": { "type": "string", "const": "vm-sentinel-verifier-manifest-v1", }, "sentinel_version": {"type": "string"}, "schema_version": {"type": "string"}, "hash_algo": {"type": "string", "enum": ["blake3", "sha256"]}, "canonicalization_version": {"type": "string"}, "verifier": { "type": "object", "additionalProperties": True, "properties": { "name": {"type": "string"}, "version": {"type": "string"}, "sha256": {"type": "string"}, }, }, }, }, } @dataclass(frozen=True) class Finding: code: str message: str path: str | None = None def as_dict(self) -> dict: d = {"code": self.code, "message": self.message} if self.path: d["path"] = self.path return d def _contract_ids_for_finding(finding: Finding) -> list[str]: """ Best-effort mapping from verifier findings -> Contract Matrix IDs. This is meant to make verification_report.json auditor-friendly without requiring readers to inspect verifier source code. """ contract_ids: list[str] = [] code = finding.code path = finding.path or "" message = finding.message def add(contract_id: str) -> None: if contract_id not in contract_ids: contract_ids.append(contract_id) # Bundle-level if code == ERROR_MISSING_REQUIRED_FILE: add("B-1") if code == ERROR_OVERSIZE_INPUT: add("B-3") # Seal bundle / seal.json if path == "seal.json": add("B-1") if path.startswith("seal.json.format"): add("S-1") if path.startswith("seal.json.hash_algo"): add("S-2") if path.startswith("seal.json.range"): add("S-3") if path.startswith("seal.json.root"): add("S-4") if path.startswith("seal.json.files"): add("S-5") add("B-1") if "missing file referenced by seal.files." in message: add("S-5") add("B-1") if path.startswith("seal.json.canonicalization_version"): add("S-6") if code == ERROR_CANON_VERSION_UNSUPPORTED: add("S-6") # integrity.json if path.startswith("integrity.json.format"): add("I-1") if path.startswith("integrity.json.hash_algo"): add("I-2") if path.startswith("integrity.json.files"): add("I-3") if code == ERROR_MANIFEST_HASH_MISMATCH: add("I-3") if "size_bytes mismatch" in message: add("I-4") if message.startswith("file present but not listed in integrity.json:"): add("I-5") if message.startswith("integrity.json does not cover required seal file:"): add("I-6") # verifier_manifest.json if path.startswith("verifier_manifest.json.format"): add("V-1") if path.startswith("verifier_manifest.json.hash_algo"): add("V-3") if path.startswith("verifier_manifest.json") and not ( path.startswith("verifier_manifest.json.format") or path.startswith("verifier_manifest.json.hash_algo") ): add("V-2") # Event ledger if code == ERROR_SCHEMA_INVALID and path.endswith(".jsonl"): add("E-1") if code == ERROR_EVENT_HASH_MISMATCH: add("E-2") if code == ERROR_CHAIN_DISCONTINUITY: add("E-3") if code == ERROR_SEQ_NON_MONOTONIC: add("E-4") if code == ERROR_ROOT_MISMATCH: add("E-5") if code == ERROR_RANGE_MISMATCH: add("E-6") if code == ERROR_REVOKED_CAPABILITY_USED: add("E-7") return contract_ids def _finding_to_report_dict(finding: Finding) -> dict: d = finding.as_dict() contract_ids = _contract_ids_for_finding(finding) if contract_ids: d["contract_ids"] = contract_ids return d def _finalize_report_findings( report: dict[str, Any], *, errors: list[Finding], warnings: list[Finding] ) -> None: report["errors"] = [_finding_to_report_dict(e) for e in errors] report["warnings"] = [_finding_to_report_dict(w) for w in warnings] report["ok"] = not errors report["failure_code"] = errors[0].code if errors else None report["violated_contract_ids"] = sorted( {cid for e in errors for cid in _contract_ids_for_finding(e)} ) report["warned_contract_ids"] = sorted( {cid for w in warnings for cid in _contract_ids_for_finding(w)} ) def _load_json(path: Path) -> dict: return json.loads(path.read_text(encoding="utf-8")) def _hex_part(value: str) -> str: return value.split(":", 1)[-1] def _require_no_floats(value: Any, *, path: str = "$") -> None: if isinstance(value, float): raise ValueError(f"float not allowed in canonical JSON at {path}") if isinstance(value, dict): for k, v in value.items(): _require_no_floats(v, path=f"{path}.{k}") elif isinstance(value, list): for i, v in enumerate(value): _require_no_floats(v, path=f"{path}[{i}]") def _canonical_json_bytes(obj: Any) -> bytes: """ Deterministic canonical JSON bytes for Sentinel v1 hashing. This verifier enforces a strict subset compatible with sentinel-event-jcs-v1 for Sentinel artifacts: - UTF-8 - object keys sorted - separators (",", ":") - no NaN/Infinity - no floats (represent decimals as strings instead) """ _require_no_floats(obj) encoded = json.dumps( obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False, allow_nan=False, ).encode("utf-8") return encoded def _vmhash(data: bytes, *, hash_algo: str) -> str: if hash_algo == "blake3": if blake3 is None: raise RuntimeError( "Missing dependency: blake3 (required for blake3 bundles)" ) return f"blake3:{blake3.blake3(data).hexdigest()}" if hash_algo == "sha256": return f"sha256:{hashlib.sha256(data).hexdigest()}" raise ValueError(f"unsupported hash_algo: {hash_algo!r}") def _compute_merkle_root(leaves: list[str], *, hash_algo: str) -> str: if not leaves: return _vmhash(b"empty", hash_algo=hash_algo) if len(leaves) == 1: return leaves[0] level = leaves[:] while len(level) > 1: next_level: list[str] = [] for i in range(0, len(level), 2): left = level[i] right = level[i + 1] if i + 1 < len(level) else left combined = (_hex_part(left) + _hex_part(right)).encode("utf-8") next_level.append(_vmhash(combined, hash_algo=hash_algo)) level = next_level return level[0] def _iter_jsonl(path: Path) -> Iterable[dict]: with path.open("r", encoding="utf-8") as f: for line_no, line in enumerate(f, start=1): line = line.strip() if not line: continue try: obj = json.loads(line) except Exception as exc: raise ValueError( f"{path.name}:{line_no}: invalid JSON ({exc})" ) from exc if not isinstance(obj, dict): raise ValueError(f"{path.name}:{line_no}: expected JSON object") yield obj def _load_schema(filename: str) -> dict: path = SCHEMA_DIR / filename if path.exists(): return _load_json(path) embedded = _EMBEDDED_SCHEMAS.get(filename) if embedded is None: raise FileNotFoundError(f"schema not found: {filename}") return embedded def _validate_schema(instance: Any, schema: dict, *, path: str = "$") -> list[Finding]: """ Minimal JSON Schema validator (subset) for Sentinel v1 verifier. Supports: type, required, properties, additionalProperties, enum, const, anyOf, items, minimum. """ findings: list[Finding] = [] if "const" in schema: if instance != schema["const"]: findings.append( Finding( ERROR_SCHEMA_INVALID, f"expected const {schema['const']!r}, got {instance!r}", path=path, ) ) return findings if "enum" in schema: if instance not in schema["enum"]: findings.append( Finding( ERROR_SCHEMA_INVALID, f"expected one of {schema['enum']!r}, got {instance!r}", path=path, ) ) return findings if "anyOf" in schema: options = schema["anyOf"] for opt in options: if not _validate_schema(instance, opt, path=path): return [] findings.append( Finding(ERROR_SCHEMA_INVALID, "did not match anyOf schema", path=path) ) return findings schema_type = schema.get("type") if schema_type == "object": if not isinstance(instance, dict): findings.append(Finding(ERROR_SCHEMA_INVALID, "expected object", path=path)) return findings required = schema.get("required") or [] for key in required: if key not in instance: findings.append( Finding( ERROR_SCHEMA_INVALID, f"missing required property: {key}", path=path, ) ) properties = schema.get("properties") or {} additional = schema.get("additionalProperties", True) for key, value in instance.items(): key_path = f"{path}.{key}" if key in properties: findings.extend(_validate_schema(value, properties[key], path=key_path)) else: if additional is False: findings.append( Finding( ERROR_SCHEMA_INVALID, "unexpected additional property", path=key_path, ) ) return findings if schema_type == "array": if not isinstance(instance, list): findings.append(Finding(ERROR_SCHEMA_INVALID, "expected array", path=path)) return findings items_schema = schema.get("items") if isinstance(items_schema, dict): for i, item in enumerate(instance): findings.extend( _validate_schema(item, items_schema, path=f"{path}[{i}]") ) return findings if schema_type == "string": if not isinstance(instance, str): findings.append(Finding(ERROR_SCHEMA_INVALID, "expected string", path=path)) return findings min_len = schema.get("minLength") if isinstance(min_len, int) and len(instance) < min_len: findings.append( Finding( ERROR_SCHEMA_INVALID, f"minLength {min_len} violated", path=path, ) ) return findings if schema_type == "integer": if not isinstance(instance, int) or isinstance(instance, bool): findings.append( Finding(ERROR_SCHEMA_INVALID, "expected integer", path=path) ) return findings minimum = schema.get("minimum") if isinstance(minimum, int) and instance < minimum: findings.append( Finding( ERROR_SCHEMA_INVALID, f"minimum {minimum} violated", path=path, ) ) return findings if schema_type == "boolean": if not isinstance(instance, bool): findings.append( Finding(ERROR_SCHEMA_INVALID, "expected boolean", path=path) ) return findings # If schema has no type, treat as permissive. return findings def _parse_roots_txt(path: Path) -> list[tuple[int, str]]: roots: list[tuple[int, str]] = [] for line_no, line in enumerate( path.read_text(encoding="utf-8").splitlines(), start=1 ): s = line.strip() if not s or s.startswith("#"): continue if "seq=" in s and "root=" in s: parts = dict(part.split("=", 1) for part in s.split() if "=" in part) try: seq = int(parts["seq"]) except Exception as exc: raise ValueError(f"{path.name}:{line_no}: invalid seq ({exc})") from exc root = parts.get("root") if not root: raise ValueError(f"{path.name}:{line_no}: missing root") roots.append((seq, root)) else: raise ValueError( f"{path.name}:{line_no}: expected 'seq= root='" ) return roots def _write_report(path: Path, report: dict) -> None: path.write_text( json.dumps(report, sort_keys=True, separators=(",", ":"), ensure_ascii=False) + "\n", encoding="utf-8", ) def verify_bundle( bundle_dir: Path, *, strict: bool, report_path: Path, max_file_bytes: int, ) -> int: errors: list[Finding] = [] warnings: list[Finding] = [] report: dict[str, Any] = { "format": "vm-sentinel-verification-report-v1", "ok": False, "strict": strict, "failure_code": None, "inputs": {"bundle_dir": str(bundle_dir)}, "covered_seq_range": None, "verified_ranges": [], "observed_roots": {}, "computed_roots": {}, "observed_end_root": None, "computed_end_root": None, "mismatches": [], "corruption_findings": [], "versions": {}, "schema_versions_used": {}, "errors": [], "warnings": [], "verifier": { "name": "vm_verify_sentinel_bundle.py", "python": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", }, "declared_verifier": None, } seal_path = bundle_dir / "seal.json" if not seal_path.exists(): errors.append( Finding( ERROR_MISSING_REQUIRED_FILE, "seal.json not found", path="seal.json" ) ) _finalize_report_findings(report, errors=errors, warnings=warnings) _write_report(report_path, report) return 1 try: seal = _load_json(seal_path) except Exception as exc: errors.append( Finding( ERROR_SCHEMA_INVALID, f"failed to parse seal.json: {exc}", path="seal.json", ) ) _finalize_report_findings(report, errors=errors, warnings=warnings) _write_report(report_path, report) return 1 try: seal_schema = _load_schema("seal.schema.json") errors.extend(_validate_schema(seal, seal_schema, path="seal.json")) except Exception as exc: errors.append( Finding( ERROR_SCHEMA_INVALID, f"failed to load/validate seal.schema.json: {exc}", path="seal.schema.json", ) ) fmt = seal.get("format") if fmt not in SUPPORTED_SEAL_FORMATS: errors.append( Finding( ERROR_SCHEMA_INVALID, f"seal.format unsupported: {fmt!r}", path="seal.json.format", ) ) hash_algo = seal.get("hash_algo") if hash_algo not in ("blake3", "sha256"): errors.append( Finding( ERROR_SCHEMA_INVALID, f"seal.hash_algo invalid: {hash_algo!r}", path="seal.json.hash_algo", ) ) hash_algo = "sha256" # keep verifier progressing for report completeness canonicalization_version = seal.get("canonicalization_version") if not isinstance(canonicalization_version, str) or not canonicalization_version: errors.append( Finding( ERROR_SCHEMA_INVALID, "seal.canonicalization_version missing", path="seal.json.canonicalization_version", ) ) canonicalization_version = "" elif canonicalization_version not in SUPPORTED_CANONICALIZATION_VERSIONS: errors.append( Finding( ERROR_CANON_VERSION_UNSUPPORTED, f"unsupported canonicalization_version: {canonicalization_version!r}", path="seal.json.canonicalization_version", ) ) files = seal.get("files") if not isinstance(files, dict): errors.append( Finding(ERROR_SCHEMA_INVALID, "seal.files missing", path="seal.json.files") ) files = {} def _file_from_seal(key: str) -> Optional[Path]: rel = files.get(key) if not isinstance(rel, str) or not rel: errors.append( Finding( ERROR_SCHEMA_INVALID, f"seal.files.{key} missing", path=f"seal.json.files.{key}", ) ) return None p = (bundle_dir / rel).resolve() if not p.exists(): errors.append( Finding( ERROR_MISSING_REQUIRED_FILE, f"missing file referenced by seal.files.{key}: {rel}", path=rel, ) ) return None return p receipts_path = _file_from_seal("receipts") roots_path = _file_from_seal("roots") integrity_path = _file_from_seal("integrity") verifier_manifest_path = _file_from_seal("verifier_manifest") report["versions"] = { "sentinel_version": seal.get("sentinel_version"), "schema_version": seal.get("schema_version"), "hash_algo": hash_algo, "canonicalization_version": canonicalization_version, } report["schema_versions_used"]["seal"] = seal.get("schema_version") integrity: dict[str, Any] | None = None if integrity_path is not None: try: integrity = _load_json(integrity_path) integrity_schema = _load_schema("integrity.schema.json") errors.extend( _validate_schema( integrity, integrity_schema, path=str(integrity_path.name) ) ) if integrity.get("format") not in SUPPORTED_INTEGRITY_FORMATS: errors.append( Finding( ERROR_SCHEMA_INVALID, f"integrity.format unsupported: {integrity.get('format')!r}", path="integrity.json.format", ) ) if integrity.get("hash_algo") != hash_algo: errors.append( Finding( ERROR_SCHEMA_INVALID, "integrity.hash_algo != seal.hash_algo", path="integrity.json.hash_algo", ) ) listed: list[dict] = integrity.get("files") or [] if not isinstance(listed, list) or not listed: errors.append( Finding( ERROR_SCHEMA_INVALID, "integrity.files missing or empty", path="integrity.json.files", ) ) listed = [] # Deterministic ordering for verification/reporting. listed_sorted = sorted( (e for e in listed if isinstance(e, dict)), key=lambda e: str(e.get("path", "")), ) listed_paths = {str(e.get("path")) for e in listed_sorted if "path" in e} # Enforce that seal-referenced files are covered by integrity.json. # # Note: integrity.json MUST NOT be required to include a digest of itself (recursive), # so we only require the other seal files here. required_files = [ files.get("receipts"), files.get("roots"), files.get("verifier_manifest"), ] for required_rel in required_files: if ( isinstance(required_rel, str) and required_rel and required_rel not in listed_paths ): errors.append( Finding( ERROR_SCHEMA_INVALID, f"integrity.json does not cover required seal file: {required_rel}", path="integrity.json.files", ) ) # Recommended: cover seal.json too (strict mode enforces). if "seal.json" not in listed_paths: finding = Finding( WARNING_UNLISTED_FILE, "integrity.json does not cover seal.json (recommended)", path="integrity.json.files", ) if strict: errors.append( Finding( ERROR_SCHEMA_INVALID, finding.message, path="integrity.json.files", ) ) else: warnings.append(finding) # Hash verification for entry in listed_sorted: rel = entry.get("path") digest = entry.get("digest") if not isinstance(rel, str) or not rel: errors.append( Finding( ERROR_SCHEMA_INVALID, "integrity.files entry missing path", path="integrity.json.files", ) ) continue if not isinstance(digest, str) or not digest: errors.append( Finding( ERROR_SCHEMA_INVALID, f"integrity.files[{rel}] missing digest", path="integrity.json.files", ) ) continue file_path = (bundle_dir / rel).resolve() if not file_path.exists(): errors.append( Finding( ERROR_MISSING_REQUIRED_FILE, f"integrity missing file: {rel}", path=rel, ) ) continue size = file_path.stat().st_size if size > max_file_bytes: errors.append( Finding( ERROR_OVERSIZE_INPUT, f"file exceeds max size ({size} > {max_file_bytes} bytes): {rel}", path=rel, ) ) continue data = file_path.read_bytes() computed = _vmhash(data, hash_algo=hash_algo) if computed != digest: errors.append( Finding( ERROR_MANIFEST_HASH_MISMATCH, f"digest mismatch for {rel}: expected {digest}, got {computed}", path=rel, ) ) size_bytes = entry.get("size_bytes") if isinstance(size_bytes, int) and size_bytes != len(data): errors.append( Finding( ERROR_SCHEMA_INVALID, f"size_bytes mismatch for {rel}: expected {size_bytes}, got {len(data)}", path=rel, ) ) # Extra files present but not listed in integrity.json ignored = {".DS_Store", "verification_report.json", report_path.name} integrity_rel = files.get("integrity") if isinstance(integrity_rel, str) and integrity_rel: ignored.add(integrity_rel) for fp in sorted(bundle_dir.rglob("*")): if fp.is_dir(): continue rel = fp.relative_to(bundle_dir).as_posix() if rel in ignored: continue if rel not in listed_paths: finding = Finding( WARNING_UNLISTED_FILE, f"file present but not listed in integrity.json: {rel}", path=rel, ) if strict: errors.append( Finding(ERROR_SCHEMA_INVALID, finding.message, path=rel) ) else: warnings.append(finding) except Exception as exc: errors.append( Finding( ERROR_SCHEMA_INVALID, f"failed to verify integrity.json: {exc}", path="integrity.json", ) ) if verifier_manifest_path is not None: try: manifest = _load_json(verifier_manifest_path) manifest_schema = _load_schema("verifier_manifest.schema.json") errors.extend( _validate_schema( manifest, manifest_schema, path=str(verifier_manifest_path.name) ) ) mfmt = manifest.get("format") if mfmt not in SUPPORTED_VERIFIER_MANIFEST_FORMATS: errors.append( Finding( ERROR_SCHEMA_INVALID, f"verifier_manifest.format unsupported: {mfmt!r}", path="verifier_manifest.json.format", ) ) mv = manifest.get("canonicalization_version") if ( isinstance(mv, str) and canonicalization_version and mv != canonicalization_version ): errors.append( Finding( ERROR_SCHEMA_INVALID, "verifier_manifest.canonicalization_version != seal.canonicalization_version", path="verifier_manifest.json.canonicalization_version", ) ) mh = manifest.get("hash_algo") if isinstance(mh, str) and mh != hash_algo: errors.append( Finding( ERROR_SCHEMA_INVALID, "verifier_manifest.hash_algo != seal.hash_algo", path="verifier_manifest.json.hash_algo", ) ) report["schema_versions_used"]["verifier_manifest"] = manifest.get( "schema_version" ) dv = manifest.get("verifier") if isinstance(dv, dict): report["declared_verifier"] = dv except Exception as exc: errors.append( Finding( ERROR_SCHEMA_INVALID, f"failed to parse verifier_manifest.json: {exc}", path="verifier_manifest.json", ) ) range_obj = seal.get("range") or {} since_seq = range_obj.get("since_seq") until_seq = range_obj.get("until_seq") if not isinstance(since_seq, int) or not isinstance(until_seq, int): errors.append( Finding( ERROR_SCHEMA_INVALID, "seal.range.since_seq/until_seq missing or invalid", path="seal.json.range", ) ) since_seq = 0 until_seq = -1 report["covered_seq_range"] = {"since_seq": since_seq, "until_seq": until_seq} events: list[dict] = [] if receipts_path is not None: if receipts_path.stat().st_size > max_file_bytes: errors.append( Finding( ERROR_OVERSIZE_INPUT, f"receipts file exceeds max size ({receipts_path.stat().st_size} > {max_file_bytes} bytes)", path=receipts_path.name, ) ) else: event_schema = _load_schema("event.schema.json") last_good_seq: int | None = None last_good_line_no: int | None = None byte_offset = 0 with receipts_path.open("rb") as f: for line_no, raw in enumerate(f, start=1): line_start = byte_offset byte_offset += len(raw) if not raw.strip(): continue try: text = raw.decode("utf-8").strip() except UnicodeDecodeError as exc: errors.append( Finding( ERROR_SCHEMA_INVALID, f"{receipts_path.name}:{line_no}: utf-8 decode error ({exc})", path=receipts_path.name, ) ) report["corruption_findings"].append( { "file": receipts_path.name, "line_no": line_no, "byte_offset": line_start, "last_good_seq": last_good_seq, "last_good_line_no": last_good_line_no, "error": f"utf-8 decode error ({exc})", "recommended_recovery": [ "Verify an older seal bundle that predates this range.", "Restore receipts from WORM/immutable storage if available.", "Compare integrity.json digests to an out-of-band copy.", ], } ) break try: ev = json.loads(text) except Exception as exc: errors.append( Finding( ERROR_SCHEMA_INVALID, f"{receipts_path.name}:{line_no}: invalid JSON ({exc})", path=receipts_path.name, ) ) report["corruption_findings"].append( { "file": receipts_path.name, "line_no": line_no, "byte_offset": line_start, "last_good_seq": last_good_seq, "last_good_line_no": last_good_line_no, "error": f"invalid JSON ({exc})", "recommended_recovery": [ "Verify an older seal bundle that predates this range.", "Restore receipts from WORM/immutable storage if available.", "Compare integrity.json digests to an out-of-band copy.", ], } ) break if not isinstance(ev, dict): errors.append( Finding( ERROR_SCHEMA_INVALID, f"{receipts_path.name}:{line_no}: expected JSON object", path=receipts_path.name, ) ) report["corruption_findings"].append( { "file": receipts_path.name, "line_no": line_no, "byte_offset": line_start, "last_good_seq": last_good_seq, "last_good_line_no": last_good_line_no, "error": "expected JSON object", } ) break errors.extend( _validate_schema( ev, event_schema, path=f"{receipts_path.name}:{line_no}" ) ) seq = ev.get("seq") if isinstance(seq, int): last_good_seq = seq last_good_line_no = line_no events.append(ev) if events: # Deterministic ordering: sort by seq, not file order. by_seq: dict[int, list[dict]] = {} for ev in events: seq = ev.get("seq") if isinstance(seq, int): by_seq.setdefault(seq, []).append(ev) dupes = sorted([seq for seq, lst in by_seq.items() if len(lst) > 1]) for seq in dupes: errors.append( Finding( ERROR_SEQ_NON_MONOTONIC, f"duplicate seq value: {seq}", path=f"events.seq:{seq}", ) ) ordered_seqs = sorted(by_seq.keys()) if ordered_seqs: if ordered_seqs[0] != since_seq or ordered_seqs[-1] != until_seq: errors.append( Finding( ERROR_RANGE_MISMATCH, f"event seq range mismatch: got {ordered_seqs[0]}..{ordered_seqs[-1]}, expected {since_seq}..{until_seq}", path="seal.json.range", ) ) expected_count = until_seq - since_seq + 1 if expected_count != len(events): errors.append( Finding( ERROR_RANGE_MISMATCH, f"receipt count mismatch: expected {expected_count}, got {len(events)}", path=receipts_path.name if receipts_path else "receipts", ) ) missing = [s for s in range(since_seq, until_seq + 1) if s not in by_seq] if missing: errors.append( Finding( ERROR_RANGE_MISMATCH, f"missing seq values in range: {missing[:20]}{'...' if len(missing) > 20 else ''}", path="events.seq", ) ) # Flatten events in seq order (deterministic). events_ordered = [ by_seq[s][0] for s in range(since_seq, until_seq + 1) if s in by_seq ] events = events_ordered root_obj = seal.get("root") or {} root_start = root_obj.get("start") root_end = root_obj.get("end") report["observed_roots"] = {"start": root_start, "end": root_end} report["observed_end_root"] = root_end if not isinstance(root_start, str) or not isinstance(root_end, str): errors.append( Finding( ERROR_SCHEMA_INVALID, "seal.root.start/end missing or invalid", path="seal.json.root", ) ) # Event hashing, op_digest, and prev_event_hash chain verification. computed_event_hashes: list[str] = [] leaves: list[str] = [] if events: revoked_cap_hashes: set[str] = set() if canonicalization_version not in SUPPORTED_CANONICALIZATION_VERSIONS: errors.append( Finding( ERROR_CANON_VERSION_UNSUPPORTED, f"unsupported canonicalization_version: {canonicalization_version!r}", path="seal.json.canonicalization_version", ) ) else: for idx, ev in enumerate(events): seq = ev.get("seq") if not isinstance(seq, int): errors.append( Finding( ERROR_SCHEMA_INVALID, "event.seq missing or invalid", path=f"events[{idx}].seq", ) ) continue stored_event_hash = ev.get("event_hash") if not isinstance(stored_event_hash, str) or not stored_event_hash: errors.append( Finding( ERROR_EVENT_HASH_MISMATCH, "event_hash missing", path=f"events[{idx}].event_hash", ) ) continue ev_no_hash = dict(ev) ev_no_hash.pop("event_hash", None) try: canon = _canonical_json_bytes(ev_no_hash) except Exception as exc: errors.append( Finding( ERROR_SCHEMA_INVALID, f"canonicalization failed: {exc}", path=f"events[{idx}]", ) ) continue try: computed_hash = _vmhash(canon, hash_algo=hash_algo) except Exception as exc: errors.append( Finding( ERROR_SCHEMA_INVALID, f"hashing failed: {exc}", path=f"events[{idx}]", ) ) continue computed_event_hashes.append(computed_hash) leaves.append(computed_hash) if computed_hash != stored_event_hash: errors.append( Finding( ERROR_EVENT_HASH_MISMATCH, f"event_hash mismatch: expected {stored_event_hash}, got {computed_hash}", path=f"events[{idx}].event_hash", ) ) prev = ev.get("prev_event_hash") if not isinstance(prev, str) or not prev: errors.append( Finding( ERROR_CHAIN_DISCONTINUITY, "prev_event_hash missing", path=f"events[{idx}].prev_event_hash", ) ) else: if idx == 0: if seq == 0: if prev != "0": errors.append( Finding( ERROR_CHAIN_DISCONTINUITY, 'prev_event_hash must be "0" for seq=0', path=f"events[{idx}].prev_event_hash", ) ) else: finding = Finding( WARNING_RANGE_ROOT_PARTIAL, "first event is not seq=0; prev_event_hash cannot be verified without prior context", path=f"events[{idx}].prev_event_hash", ) if strict: errors.append( Finding( ERROR_CHAIN_DISCONTINUITY, finding.message, path=f"events[{idx}].prev_event_hash", ) ) else: warnings.append(finding) else: if prev != computed_event_hashes[idx - 1]: errors.append( Finding( ERROR_CHAIN_DISCONTINUITY, "prev_event_hash does not match previous event_hash", path=f"events[{idx}].prev_event_hash", ) ) # op_digest verification (params convention: payload.params) op = ev.get("op") op_digest = ev.get("op_digest") payload = ( ev.get("payload") if isinstance(ev.get("payload"), dict) else {} ) params = payload.get("params", {}) if params is None: params = {} if not isinstance(op, str) or not op: errors.append( Finding( ERROR_SCHEMA_INVALID, "op missing", path=f"events[{idx}].op" ) ) elif not isinstance(op_digest, str) or not op_digest: errors.append( Finding( ERROR_SCHEMA_INVALID, "op_digest missing", path=f"events[{idx}].op_digest", ) ) elif not isinstance(params, dict): errors.append( Finding( ERROR_SCHEMA_INVALID, "payload.params must be an object", path=f"events[{idx}].payload.params", ) ) else: try: op_obj = {"op": op, "params": params} op_bytes = _canonical_json_bytes(op_obj) computed_op_digest = _vmhash(op_bytes, hash_algo=hash_algo) if computed_op_digest != op_digest: errors.append( Finding( ERROR_EVENT_HASH_MISMATCH, f"op_digest mismatch: expected {op_digest}, got {computed_op_digest}", path=f"events[{idx}].op_digest", ) ) except Exception as exc: errors.append( Finding( ERROR_SCHEMA_INVALID, f"op_digest computation failed: {exc}", path=f"events[{idx}].op_digest", ) ) # Capability revocation enforcement (v1 hardening): # If a capability is revoked, subsequent action execution MUST NOT use it. event_type = ev.get("event_type") cap_hash = ev.get("cap_hash") if event_type == "cap_revoke" and isinstance(payload, dict): revoked = None if isinstance(payload.get("revoked_cap_hash"), str): revoked = payload.get("revoked_cap_hash") elif isinstance(params, dict) and isinstance( params.get("revoked_cap_hash"), str ): revoked = params.get("revoked_cap_hash") if isinstance(revoked, str) and revoked: revoked_cap_hashes.add(revoked) if ( event_type == "action_executed" and isinstance(cap_hash, str) and cap_hash in revoked_cap_hashes ): errors.append( Finding( ERROR_REVOKED_CAPABILITY_USED, f"action_executed uses revoked cap_hash: {cap_hash}", path=f"events[{idx}].cap_hash", ) ) report["verified_ranges"] = [{"since_seq": since_seq, "until_seq": until_seq}] # Merkle verification (only possible from genesis without additional continuation state). if ( isinstance(since_seq, int) and since_seq == 0 and leaves and isinstance(root_start, str) and isinstance(root_end, str) ): try: expected_start = _vmhash(b"empty", hash_algo=hash_algo) report["computed_roots"]["expected_start"] = expected_start if root_start != expected_start: errors.append( Finding( ERROR_ROOT_MISMATCH, f"seal.root.start mismatch: expected {expected_start}, got {root_start}", path="seal.json.root.start", ) ) computed_end = _compute_merkle_root(leaves, hash_algo=hash_algo) report["computed_roots"]["computed_end"] = computed_end report["computed_end_root"] = computed_end if computed_end != root_end: errors.append( Finding( ERROR_ROOT_MISMATCH, f"seal.root.end mismatch: expected {root_end}, got {computed_end}", path="seal.json.root.end", ) ) except Exception as exc: errors.append( Finding( ERROR_ROOT_MISMATCH, f"merkle verification failed: {exc}", path="seal.json.root", ) ) else: if isinstance(since_seq, int) and since_seq > 0: finding = Finding( WARNING_RANGE_ROOT_PARTIAL, "cannot recompute Merkle roots for since_seq>0 without a verifiable continuation state (frontier snapshot)", path="seal.json.range", ) if strict: errors.append( Finding( ERROR_ROOT_MISMATCH, finding.message, path="seal.json.range" ) ) else: warnings.append(finding) # roots.txt parsing (self-consistency with seal.root.end) if roots_path is not None and isinstance(root_end, str): try: roots = _parse_roots_txt(roots_path) if roots: last_seq, last_root = roots[-1] if last_seq != until_seq: errors.append( Finding( ERROR_RANGE_MISMATCH, f"roots.txt last seq mismatch: expected {until_seq}, got {last_seq}", path=roots_path.name, ) ) if last_root != root_end: errors.append( Finding( ERROR_ROOT_MISMATCH, "roots.txt last root does not match seal.root.end", path=roots_path.name, ) ) except Exception as exc: errors.append( Finding( ERROR_SCHEMA_INVALID, f"failed to parse roots.txt: {exc}", path=roots_path.name, ) ) # Strict-mode trace linkage integrity checks (intent → executed/denied). if strict and events: try: by_trace: dict[str, list[tuple[int, str]]] = {} for ev in events: trace_id = ev.get("trace_id") event_type = ev.get("event_type") seq = ev.get("seq") if ( isinstance(trace_id, str) and isinstance(event_type, str) and isinstance(seq, int) ): if event_type in ( "action_intent", "action_executed", "shadow_receipt", ): by_trace.setdefault(trace_id, []).append((seq, event_type)) for trace_id, seq_types in sorted(by_trace.items()): seq_types_sorted = sorted(seq_types, key=lambda t: t[0]) types = [t for _, t in seq_types_sorted] has_intent = "action_intent" in types has_exec = "action_executed" in types has_shadow = "shadow_receipt" in types if has_exec and not has_intent: errors.append( Finding( ERROR_CHAIN_DISCONTINUITY, f"execution without prior intent for trace_id {trace_id}", path=f"trace_id:{trace_id}", ) ) if has_intent and not (has_exec or has_shadow): errors.append( Finding( ERROR_CHAIN_DISCONTINUITY, f"intent without executed/denied outcome for trace_id {trace_id}", path=f"trace_id:{trace_id}", ) ) if has_exec and has_shadow: errors.append( Finding( ERROR_CHAIN_DISCONTINUITY, f"both action_executed and shadow_receipt present for trace_id {trace_id}", path=f"trace_id:{trace_id}", ) ) except Exception as exc: errors.append( Finding( ERROR_SCHEMA_INVALID, f"trace linkage verification failed: {exc}", path="trace_id", ) ) _finalize_report_findings(report, errors=errors, warnings=warnings) _write_report(report_path, report) # Console output: PASS/FAIL + stable codes if errors: for e in errors: loc = f" ({e.path})" if e.path else "" print(f"FAIL {e.code}{loc}: {e.message}", file=sys.stderr) for w in warnings: loc = f" ({w.path})" if w.path else "" print(f"WARN {w.code}{loc}: {w.message}", file=sys.stderr) return 1 for w in warnings: loc = f" ({w.path})" if w.path else "" print(f"WARN {w.code}{loc}: {w.message}", file=sys.stderr) print("PASS") return 0 def main(argv: list[str]) -> int: p = argparse.ArgumentParser( description="Verify a VaultMesh Sentinel v1 bundle directory." ) p.add_argument( "--bundle", required=True, help="Path to bundle directory (contains seal.json)" ) p.add_argument( "--strict", action="store_true", help="Treat warnings and partial verifications as failures.", ) p.add_argument( "--report", help="Write machine-readable verification report JSON to this path (default: verification_report.json in bundle).", ) p.add_argument( "--max-file-bytes", type=int, default=50_000_000, help="Reject any single input file larger than this many bytes (default: 50,000,000).", ) args = p.parse_args(argv) bundle_dir = Path(args.bundle).expanduser().resolve() if not bundle_dir.exists() or not bundle_dir.is_dir(): print(f"[ERROR] --bundle must be a directory: {bundle_dir}", file=sys.stderr) return 2 report_path = ( Path(args.report).expanduser().resolve() if args.report else (bundle_dir / "verification_report.json") ) try: return verify_bundle( bundle_dir, strict=bool(args.strict), report_path=report_path, max_file_bytes=int(args.max_file_bytes), ) except Exception as exc: print(f"[ERROR] unexpected error: {exc}", file=sys.stderr) return 2 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))