Files
vm-core/tools/vm_verify_sentinel_bundle.py
2025-12-27 00:10:32 +00:00

1651 lines
60 KiB
Python

#!/usr/bin/env python3
"""
vm_verify_sentinel_bundle.py
Offline verifier for VaultMesh Sentinel v1 seal bundles.
Usage:
python3 vm_verify_sentinel_bundle.py --bundle /path/to/bundle_dir [--strict] [--report out.json]
Exit codes:
0 - verification OK
1 - verification failed
2 - usage / unexpected error
"""
from __future__ import annotations
import argparse
import hashlib
import json
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterable, Optional
from sentinel_failure_codes import FailureCode, WarningCode
try:
import blake3 # type: ignore
except ImportError: # pragma: no cover
blake3 = None
SUPPORTED_SEAL_FORMATS = {"vm-sentinel-seal-v1"}
SUPPORTED_INTEGRITY_FORMATS = {"vm-sentinel-integrity-v1"}
SUPPORTED_VERIFIER_MANIFEST_FORMATS = {"vm-sentinel-verifier-manifest-v1"}
SUPPORTED_CANONICALIZATION_VERSIONS = {"sentinel-event-jcs-v1"}
ERROR_SCHEMA_INVALID = FailureCode.SCHEMA_INVALID.value
ERROR_MANIFEST_HASH_MISMATCH = FailureCode.MANIFEST_HASH_MISMATCH.value
ERROR_MISSING_REQUIRED_FILE = FailureCode.MISSING_REQUIRED_FILE.value
ERROR_EVENT_HASH_MISMATCH = FailureCode.EVENT_HASH_MISMATCH.value
ERROR_CHAIN_DISCONTINUITY = FailureCode.CHAIN_DISCONTINUITY.value
ERROR_SEQ_NON_MONOTONIC = FailureCode.SEQ_NON_MONOTONIC.value
ERROR_ROOT_MISMATCH = FailureCode.ROOT_MISMATCH.value
ERROR_RANGE_MISMATCH = FailureCode.RANGE_MISMATCH.value
ERROR_CANON_VERSION_UNSUPPORTED = FailureCode.CANON_VERSION_UNSUPPORTED.value
ERROR_OVERSIZE_INPUT = FailureCode.OVERSIZE_INPUT.value
ERROR_REVOKED_CAPABILITY_USED = FailureCode.REVOKED_CAPABILITY_USED.value
WARNING_UNLISTED_FILE = WarningCode.FILE_NOT_IN_MANIFEST.value
WARNING_RANGE_ROOT_PARTIAL = WarningCode.RANGE_ROOT_PARTIAL.value
REPO_ROOT = Path(__file__).resolve().parents[1]
SCHEMA_DIR = REPO_ROOT / "spec" / "sentinel"
_EMBEDDED_SCHEMAS: dict[str, dict[str, Any]] = {
"event.schema.json": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "VaultMesh Sentinel v1 Event",
"type": "object",
"additionalProperties": False,
"required": [
"event_id",
"seq",
"ts",
"event_type",
"actor",
"cap_hash",
"op",
"op_digest",
"result",
"trace_id",
"prev_event_hash",
"event_hash",
"payload",
],
"properties": {
"event_id": {"type": "string"},
"seq": {"type": "integer", "minimum": 0},
"ts": {
"description": "Monotonic + wallclock if available. Accepts ISO-8601 Z or a structured object.",
"anyOf": [
{"type": "string"},
{
"type": "object",
"additionalProperties": False,
"required": ["wall"],
"properties": {
"wall": {"type": "string"},
"mono_ns": {"type": "integer", "minimum": 0},
},
},
],
},
"event_type": {
"type": "string",
"enum": [
"action_intent",
"policy_decision",
"action_executed",
"shadow_receipt",
"cap_grant",
"cap_revoke",
"seal_created",
"root_published",
"corruption_detected",
"tamper_signal",
"boot_event",
"health_event",
],
},
"actor": {"type": "string", "minLength": 1},
"cap_hash": {"type": "string", "minLength": 1},
"op": {"type": "string", "minLength": 1},
"op_digest": {"type": "string", "minLength": 1},
"result": {"type": "string", "enum": ["ok", "deny", "error"]},
"root_before": {"type": "string"},
"root_after": {"type": "string"},
"trace_id": {"type": "string"},
"prev_event_hash": {"type": "string", "minLength": 1},
"event_hash": {"type": "string"},
"payload": {"type": "object"},
},
},
"seal.schema.json": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "VaultMesh Sentinel v1 Seal Bundle (seal.json)",
"type": "object",
"additionalProperties": False,
"required": [
"format",
"sentinel_version",
"schema_version",
"hash_algo",
"canonicalization_version",
"seal_id",
"created_at",
"range",
"root",
"files",
],
"properties": {
"format": {"type": "string", "const": "vm-sentinel-seal-v1"},
"sentinel_version": {"type": "string"},
"schema_version": {"type": "string"},
"hash_algo": {"type": "string", "enum": ["blake3", "sha256"]},
"canonicalization_version": {"type": "string"},
"seal_id": {"type": "string"},
"created_at": {"type": "string"},
"instance_id": {"type": "string"},
"ledger_type": {"type": "string", "enum": ["sqlite", "jsonl"]},
"range": {
"type": "object",
"additionalProperties": False,
"required": ["since_seq", "until_seq"],
"properties": {
"since_seq": {"type": "integer", "minimum": 0},
"until_seq": {"type": "integer", "minimum": 0},
"since_ts": {"type": "string"},
"until_ts": {"type": "string"},
},
},
"root": {
"type": "object",
"additionalProperties": False,
"required": ["start", "end"],
"properties": {
"start": {"type": "string"},
"end": {"type": "string"},
"seq": {"type": "integer", "minimum": 0},
},
},
"files": {
"type": "object",
"additionalProperties": False,
"required": ["receipts", "roots", "integrity", "verifier_manifest"],
"properties": {
"receipts": {"type": "string"},
"roots": {"type": "string"},
"integrity": {"type": "string"},
"verifier_manifest": {"type": "string"},
},
},
"notes": {"type": "string"},
},
},
"integrity.schema.json": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "VaultMesh Sentinel v1 Integrity Manifest (integrity.json)",
"type": "object",
"additionalProperties": False,
"required": ["format", "hash_algo", "files"],
"properties": {
"format": {"type": "string", "const": "vm-sentinel-integrity-v1"},
"hash_algo": {"type": "string", "enum": ["blake3", "sha256"]},
"files": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": False,
"required": ["path", "digest"],
"properties": {
"path": {"type": "string"},
"digest": {"type": "string"},
"size_bytes": {"type": "integer", "minimum": 0},
},
},
},
},
},
"verifier_manifest.schema.json": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "VaultMesh Sentinel v1 Verifier Manifest (verifier_manifest.json)",
"type": "object",
"additionalProperties": False,
"required": [
"format",
"sentinel_version",
"schema_version",
"canonicalization_version",
],
"properties": {
"format": {
"type": "string",
"const": "vm-sentinel-verifier-manifest-v1",
},
"sentinel_version": {"type": "string"},
"schema_version": {"type": "string"},
"hash_algo": {"type": "string", "enum": ["blake3", "sha256"]},
"canonicalization_version": {"type": "string"},
"verifier": {
"type": "object",
"additionalProperties": True,
"properties": {
"name": {"type": "string"},
"version": {"type": "string"},
"sha256": {"type": "string"},
},
},
},
},
}
@dataclass(frozen=True)
class Finding:
code: str
message: str
path: str | None = None
def as_dict(self) -> dict:
d = {"code": self.code, "message": self.message}
if self.path:
d["path"] = self.path
return d
def _contract_ids_for_finding(finding: Finding) -> list[str]:
"""
Best-effort mapping from verifier findings -> Contract Matrix IDs.
This is meant to make verification_report.json auditor-friendly without
requiring readers to inspect verifier source code.
"""
contract_ids: list[str] = []
code = finding.code
path = finding.path or ""
message = finding.message
def add(contract_id: str) -> None:
if contract_id not in contract_ids:
contract_ids.append(contract_id)
# Bundle-level
if code == ERROR_MISSING_REQUIRED_FILE:
add("B-1")
if code == ERROR_OVERSIZE_INPUT:
add("B-3")
# Seal bundle / seal.json
if path == "seal.json":
add("B-1")
if path.startswith("seal.json.format"):
add("S-1")
if path.startswith("seal.json.hash_algo"):
add("S-2")
if path.startswith("seal.json.range"):
add("S-3")
if path.startswith("seal.json.root"):
add("S-4")
if path.startswith("seal.json.files"):
add("S-5")
add("B-1")
if "missing file referenced by seal.files." in message:
add("S-5")
add("B-1")
if path.startswith("seal.json.canonicalization_version"):
add("S-6")
if code == ERROR_CANON_VERSION_UNSUPPORTED:
add("S-6")
# integrity.json
if path.startswith("integrity.json.format"):
add("I-1")
if path.startswith("integrity.json.hash_algo"):
add("I-2")
if path.startswith("integrity.json.files"):
add("I-3")
if code == ERROR_MANIFEST_HASH_MISMATCH:
add("I-3")
if "size_bytes mismatch" in message:
add("I-4")
if message.startswith("file present but not listed in integrity.json:"):
add("I-5")
if message.startswith("integrity.json does not cover required seal file:"):
add("I-6")
# verifier_manifest.json
if path.startswith("verifier_manifest.json.format"):
add("V-1")
if path.startswith("verifier_manifest.json.hash_algo"):
add("V-3")
if path.startswith("verifier_manifest.json") and not (
path.startswith("verifier_manifest.json.format")
or path.startswith("verifier_manifest.json.hash_algo")
):
add("V-2")
# Event ledger
if code == ERROR_SCHEMA_INVALID and path.endswith(".jsonl"):
add("E-1")
if code == ERROR_EVENT_HASH_MISMATCH:
add("E-2")
if code == ERROR_CHAIN_DISCONTINUITY:
add("E-3")
if code == ERROR_SEQ_NON_MONOTONIC:
add("E-4")
if code == ERROR_ROOT_MISMATCH:
add("E-5")
if code == ERROR_RANGE_MISMATCH:
add("E-6")
if code == ERROR_REVOKED_CAPABILITY_USED:
add("E-7")
return contract_ids
def _finding_to_report_dict(finding: Finding) -> dict:
d = finding.as_dict()
contract_ids = _contract_ids_for_finding(finding)
if contract_ids:
d["contract_ids"] = contract_ids
return d
def _finalize_report_findings(
report: dict[str, Any], *, errors: list[Finding], warnings: list[Finding]
) -> None:
report["errors"] = [_finding_to_report_dict(e) for e in errors]
report["warnings"] = [_finding_to_report_dict(w) for w in warnings]
report["ok"] = not errors
report["failure_code"] = errors[0].code if errors else None
report["violated_contract_ids"] = sorted(
{cid for e in errors for cid in _contract_ids_for_finding(e)}
)
report["warned_contract_ids"] = sorted(
{cid for w in warnings for cid in _contract_ids_for_finding(w)}
)
def _load_json(path: Path) -> dict:
return json.loads(path.read_text(encoding="utf-8"))
def _hex_part(value: str) -> str:
return value.split(":", 1)[-1]
def _require_no_floats(value: Any, *, path: str = "$") -> None:
if isinstance(value, float):
raise ValueError(f"float not allowed in canonical JSON at {path}")
if isinstance(value, dict):
for k, v in value.items():
_require_no_floats(v, path=f"{path}.{k}")
elif isinstance(value, list):
for i, v in enumerate(value):
_require_no_floats(v, path=f"{path}[{i}]")
def _canonical_json_bytes(obj: Any) -> bytes:
"""
Deterministic canonical JSON bytes for Sentinel v1 hashing.
This verifier enforces a strict subset compatible with sentinel-event-jcs-v1
for Sentinel artifacts:
- UTF-8
- object keys sorted
- separators (",", ":")
- no NaN/Infinity
- no floats (represent decimals as strings instead)
"""
_require_no_floats(obj)
encoded = json.dumps(
obj,
sort_keys=True,
separators=(",", ":"),
ensure_ascii=False,
allow_nan=False,
).encode("utf-8")
return encoded
def _vmhash(data: bytes, *, hash_algo: str) -> str:
if hash_algo == "blake3":
if blake3 is None:
raise RuntimeError(
"Missing dependency: blake3 (required for blake3 bundles)"
)
return f"blake3:{blake3.blake3(data).hexdigest()}"
if hash_algo == "sha256":
return f"sha256:{hashlib.sha256(data).hexdigest()}"
raise ValueError(f"unsupported hash_algo: {hash_algo!r}")
def _compute_merkle_root(leaves: list[str], *, hash_algo: str) -> str:
if not leaves:
return _vmhash(b"empty", hash_algo=hash_algo)
if len(leaves) == 1:
return leaves[0]
level = leaves[:]
while len(level) > 1:
next_level: list[str] = []
for i in range(0, len(level), 2):
left = level[i]
right = level[i + 1] if i + 1 < len(level) else left
combined = (_hex_part(left) + _hex_part(right)).encode("utf-8")
next_level.append(_vmhash(combined, hash_algo=hash_algo))
level = next_level
return level[0]
def _iter_jsonl(path: Path) -> Iterable[dict]:
with path.open("r", encoding="utf-8") as f:
for line_no, line in enumerate(f, start=1):
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except Exception as exc:
raise ValueError(
f"{path.name}:{line_no}: invalid JSON ({exc})"
) from exc
if not isinstance(obj, dict):
raise ValueError(f"{path.name}:{line_no}: expected JSON object")
yield obj
def _load_schema(filename: str) -> dict:
path = SCHEMA_DIR / filename
if path.exists():
return _load_json(path)
embedded = _EMBEDDED_SCHEMAS.get(filename)
if embedded is None:
raise FileNotFoundError(f"schema not found: {filename}")
return embedded
def _validate_schema(instance: Any, schema: dict, *, path: str = "$") -> list[Finding]:
"""
Minimal JSON Schema validator (subset) for Sentinel v1 verifier.
Supports: type, required, properties, additionalProperties, enum, const, anyOf, items, minimum.
"""
findings: list[Finding] = []
if "const" in schema:
if instance != schema["const"]:
findings.append(
Finding(
ERROR_SCHEMA_INVALID,
f"expected const {schema['const']!r}, got {instance!r}",
path=path,
)
)
return findings
if "enum" in schema:
if instance not in schema["enum"]:
findings.append(
Finding(
ERROR_SCHEMA_INVALID,
f"expected one of {schema['enum']!r}, got {instance!r}",
path=path,
)
)
return findings
if "anyOf" in schema:
options = schema["anyOf"]
for opt in options:
if not _validate_schema(instance, opt, path=path):
return []
findings.append(
Finding(ERROR_SCHEMA_INVALID, "did not match anyOf schema", path=path)
)
return findings
schema_type = schema.get("type")
if schema_type == "object":
if not isinstance(instance, dict):
findings.append(Finding(ERROR_SCHEMA_INVALID, "expected object", path=path))
return findings
required = schema.get("required") or []
for key in required:
if key not in instance:
findings.append(
Finding(
ERROR_SCHEMA_INVALID,
f"missing required property: {key}",
path=path,
)
)
properties = schema.get("properties") or {}
additional = schema.get("additionalProperties", True)
for key, value in instance.items():
key_path = f"{path}.{key}"
if key in properties:
findings.extend(_validate_schema(value, properties[key], path=key_path))
else:
if additional is False:
findings.append(
Finding(
ERROR_SCHEMA_INVALID,
"unexpected additional property",
path=key_path,
)
)
return findings
if schema_type == "array":
if not isinstance(instance, list):
findings.append(Finding(ERROR_SCHEMA_INVALID, "expected array", path=path))
return findings
items_schema = schema.get("items")
if isinstance(items_schema, dict):
for i, item in enumerate(instance):
findings.extend(
_validate_schema(item, items_schema, path=f"{path}[{i}]")
)
return findings
if schema_type == "string":
if not isinstance(instance, str):
findings.append(Finding(ERROR_SCHEMA_INVALID, "expected string", path=path))
return findings
min_len = schema.get("minLength")
if isinstance(min_len, int) and len(instance) < min_len:
findings.append(
Finding(
ERROR_SCHEMA_INVALID,
f"minLength {min_len} violated",
path=path,
)
)
return findings
if schema_type == "integer":
if not isinstance(instance, int) or isinstance(instance, bool):
findings.append(
Finding(ERROR_SCHEMA_INVALID, "expected integer", path=path)
)
return findings
minimum = schema.get("minimum")
if isinstance(minimum, int) and instance < minimum:
findings.append(
Finding(
ERROR_SCHEMA_INVALID,
f"minimum {minimum} violated",
path=path,
)
)
return findings
if schema_type == "boolean":
if not isinstance(instance, bool):
findings.append(
Finding(ERROR_SCHEMA_INVALID, "expected boolean", path=path)
)
return findings
# If schema has no type, treat as permissive.
return findings
def _parse_roots_txt(path: Path) -> list[tuple[int, str]]:
roots: list[tuple[int, str]] = []
for line_no, line in enumerate(
path.read_text(encoding="utf-8").splitlines(), start=1
):
s = line.strip()
if not s or s.startswith("#"):
continue
if "seq=" in s and "root=" in s:
parts = dict(part.split("=", 1) for part in s.split() if "=" in part)
try:
seq = int(parts["seq"])
except Exception as exc:
raise ValueError(f"{path.name}:{line_no}: invalid seq ({exc})") from exc
root = parts.get("root")
if not root:
raise ValueError(f"{path.name}:{line_no}: missing root")
roots.append((seq, root))
else:
raise ValueError(
f"{path.name}:{line_no}: expected 'seq=<int> root=<algo:hex>'"
)
return roots
def _write_report(path: Path, report: dict) -> None:
path.write_text(
json.dumps(report, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
+ "\n",
encoding="utf-8",
)
def verify_bundle(
bundle_dir: Path,
*,
strict: bool,
report_path: Path,
max_file_bytes: int,
) -> int:
errors: list[Finding] = []
warnings: list[Finding] = []
report: dict[str, Any] = {
"format": "vm-sentinel-verification-report-v1",
"ok": False,
"strict": strict,
"failure_code": None,
"inputs": {"bundle_dir": str(bundle_dir)},
"covered_seq_range": None,
"verified_ranges": [],
"observed_roots": {},
"computed_roots": {},
"observed_end_root": None,
"computed_end_root": None,
"mismatches": [],
"corruption_findings": [],
"versions": {},
"schema_versions_used": {},
"errors": [],
"warnings": [],
"verifier": {
"name": "vm_verify_sentinel_bundle.py",
"python": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
},
"declared_verifier": None,
}
seal_path = bundle_dir / "seal.json"
if not seal_path.exists():
errors.append(
Finding(
ERROR_MISSING_REQUIRED_FILE, "seal.json not found", path="seal.json"
)
)
_finalize_report_findings(report, errors=errors, warnings=warnings)
_write_report(report_path, report)
return 1
try:
seal = _load_json(seal_path)
except Exception as exc:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"failed to parse seal.json: {exc}",
path="seal.json",
)
)
_finalize_report_findings(report, errors=errors, warnings=warnings)
_write_report(report_path, report)
return 1
try:
seal_schema = _load_schema("seal.schema.json")
errors.extend(_validate_schema(seal, seal_schema, path="seal.json"))
except Exception as exc:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"failed to load/validate seal.schema.json: {exc}",
path="seal.schema.json",
)
)
fmt = seal.get("format")
if fmt not in SUPPORTED_SEAL_FORMATS:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"seal.format unsupported: {fmt!r}",
path="seal.json.format",
)
)
hash_algo = seal.get("hash_algo")
if hash_algo not in ("blake3", "sha256"):
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"seal.hash_algo invalid: {hash_algo!r}",
path="seal.json.hash_algo",
)
)
hash_algo = "sha256" # keep verifier progressing for report completeness
canonicalization_version = seal.get("canonicalization_version")
if not isinstance(canonicalization_version, str) or not canonicalization_version:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
"seal.canonicalization_version missing",
path="seal.json.canonicalization_version",
)
)
canonicalization_version = ""
elif canonicalization_version not in SUPPORTED_CANONICALIZATION_VERSIONS:
errors.append(
Finding(
ERROR_CANON_VERSION_UNSUPPORTED,
f"unsupported canonicalization_version: {canonicalization_version!r}",
path="seal.json.canonicalization_version",
)
)
files = seal.get("files")
if not isinstance(files, dict):
errors.append(
Finding(ERROR_SCHEMA_INVALID, "seal.files missing", path="seal.json.files")
)
files = {}
def _file_from_seal(key: str) -> Optional[Path]:
rel = files.get(key)
if not isinstance(rel, str) or not rel:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"seal.files.{key} missing",
path=f"seal.json.files.{key}",
)
)
return None
p = (bundle_dir / rel).resolve()
if not p.exists():
errors.append(
Finding(
ERROR_MISSING_REQUIRED_FILE,
f"missing file referenced by seal.files.{key}: {rel}",
path=rel,
)
)
return None
return p
receipts_path = _file_from_seal("receipts")
roots_path = _file_from_seal("roots")
integrity_path = _file_from_seal("integrity")
verifier_manifest_path = _file_from_seal("verifier_manifest")
report["versions"] = {
"sentinel_version": seal.get("sentinel_version"),
"schema_version": seal.get("schema_version"),
"hash_algo": hash_algo,
"canonicalization_version": canonicalization_version,
}
report["schema_versions_used"]["seal"] = seal.get("schema_version")
integrity: dict[str, Any] | None = None
if integrity_path is not None:
try:
integrity = _load_json(integrity_path)
integrity_schema = _load_schema("integrity.schema.json")
errors.extend(
_validate_schema(
integrity, integrity_schema, path=str(integrity_path.name)
)
)
if integrity.get("format") not in SUPPORTED_INTEGRITY_FORMATS:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"integrity.format unsupported: {integrity.get('format')!r}",
path="integrity.json.format",
)
)
if integrity.get("hash_algo") != hash_algo:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
"integrity.hash_algo != seal.hash_algo",
path="integrity.json.hash_algo",
)
)
listed: list[dict] = integrity.get("files") or []
if not isinstance(listed, list) or not listed:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
"integrity.files missing or empty",
path="integrity.json.files",
)
)
listed = []
# Deterministic ordering for verification/reporting.
listed_sorted = sorted(
(e for e in listed if isinstance(e, dict)),
key=lambda e: str(e.get("path", "")),
)
listed_paths = {str(e.get("path")) for e in listed_sorted if "path" in e}
# Enforce that seal-referenced files are covered by integrity.json.
#
# Note: integrity.json MUST NOT be required to include a digest of itself (recursive),
# so we only require the other seal files here.
required_files = [
files.get("receipts"),
files.get("roots"),
files.get("verifier_manifest"),
]
for required_rel in required_files:
if (
isinstance(required_rel, str)
and required_rel
and required_rel not in listed_paths
):
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"integrity.json does not cover required seal file: {required_rel}",
path="integrity.json.files",
)
)
# Recommended: cover seal.json too (strict mode enforces).
if "seal.json" not in listed_paths:
finding = Finding(
WARNING_UNLISTED_FILE,
"integrity.json does not cover seal.json (recommended)",
path="integrity.json.files",
)
if strict:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
finding.message,
path="integrity.json.files",
)
)
else:
warnings.append(finding)
# Hash verification
for entry in listed_sorted:
rel = entry.get("path")
digest = entry.get("digest")
if not isinstance(rel, str) or not rel:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
"integrity.files entry missing path",
path="integrity.json.files",
)
)
continue
if not isinstance(digest, str) or not digest:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"integrity.files[{rel}] missing digest",
path="integrity.json.files",
)
)
continue
file_path = (bundle_dir / rel).resolve()
if not file_path.exists():
errors.append(
Finding(
ERROR_MISSING_REQUIRED_FILE,
f"integrity missing file: {rel}",
path=rel,
)
)
continue
size = file_path.stat().st_size
if size > max_file_bytes:
errors.append(
Finding(
ERROR_OVERSIZE_INPUT,
f"file exceeds max size ({size} > {max_file_bytes} bytes): {rel}",
path=rel,
)
)
continue
data = file_path.read_bytes()
computed = _vmhash(data, hash_algo=hash_algo)
if computed != digest:
errors.append(
Finding(
ERROR_MANIFEST_HASH_MISMATCH,
f"digest mismatch for {rel}: expected {digest}, got {computed}",
path=rel,
)
)
size_bytes = entry.get("size_bytes")
if isinstance(size_bytes, int) and size_bytes != len(data):
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"size_bytes mismatch for {rel}: expected {size_bytes}, got {len(data)}",
path=rel,
)
)
# Extra files present but not listed in integrity.json
ignored = {".DS_Store", "verification_report.json", report_path.name}
integrity_rel = files.get("integrity")
if isinstance(integrity_rel, str) and integrity_rel:
ignored.add(integrity_rel)
for fp in sorted(bundle_dir.rglob("*")):
if fp.is_dir():
continue
rel = fp.relative_to(bundle_dir).as_posix()
if rel in ignored:
continue
if rel not in listed_paths:
finding = Finding(
WARNING_UNLISTED_FILE,
f"file present but not listed in integrity.json: {rel}",
path=rel,
)
if strict:
errors.append(
Finding(ERROR_SCHEMA_INVALID, finding.message, path=rel)
)
else:
warnings.append(finding)
except Exception as exc:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"failed to verify integrity.json: {exc}",
path="integrity.json",
)
)
if verifier_manifest_path is not None:
try:
manifest = _load_json(verifier_manifest_path)
manifest_schema = _load_schema("verifier_manifest.schema.json")
errors.extend(
_validate_schema(
manifest, manifest_schema, path=str(verifier_manifest_path.name)
)
)
mfmt = manifest.get("format")
if mfmt not in SUPPORTED_VERIFIER_MANIFEST_FORMATS:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"verifier_manifest.format unsupported: {mfmt!r}",
path="verifier_manifest.json.format",
)
)
mv = manifest.get("canonicalization_version")
if (
isinstance(mv, str)
and canonicalization_version
and mv != canonicalization_version
):
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
"verifier_manifest.canonicalization_version != seal.canonicalization_version",
path="verifier_manifest.json.canonicalization_version",
)
)
mh = manifest.get("hash_algo")
if isinstance(mh, str) and mh != hash_algo:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
"verifier_manifest.hash_algo != seal.hash_algo",
path="verifier_manifest.json.hash_algo",
)
)
report["schema_versions_used"]["verifier_manifest"] = manifest.get(
"schema_version"
)
dv = manifest.get("verifier")
if isinstance(dv, dict):
report["declared_verifier"] = dv
except Exception as exc:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"failed to parse verifier_manifest.json: {exc}",
path="verifier_manifest.json",
)
)
range_obj = seal.get("range") or {}
since_seq = range_obj.get("since_seq")
until_seq = range_obj.get("until_seq")
if not isinstance(since_seq, int) or not isinstance(until_seq, int):
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
"seal.range.since_seq/until_seq missing or invalid",
path="seal.json.range",
)
)
since_seq = 0
until_seq = -1
report["covered_seq_range"] = {"since_seq": since_seq, "until_seq": until_seq}
events: list[dict] = []
if receipts_path is not None:
if receipts_path.stat().st_size > max_file_bytes:
errors.append(
Finding(
ERROR_OVERSIZE_INPUT,
f"receipts file exceeds max size ({receipts_path.stat().st_size} > {max_file_bytes} bytes)",
path=receipts_path.name,
)
)
else:
event_schema = _load_schema("event.schema.json")
last_good_seq: int | None = None
last_good_line_no: int | None = None
byte_offset = 0
with receipts_path.open("rb") as f:
for line_no, raw in enumerate(f, start=1):
line_start = byte_offset
byte_offset += len(raw)
if not raw.strip():
continue
try:
text = raw.decode("utf-8").strip()
except UnicodeDecodeError as exc:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"{receipts_path.name}:{line_no}: utf-8 decode error ({exc})",
path=receipts_path.name,
)
)
report["corruption_findings"].append(
{
"file": receipts_path.name,
"line_no": line_no,
"byte_offset": line_start,
"last_good_seq": last_good_seq,
"last_good_line_no": last_good_line_no,
"error": f"utf-8 decode error ({exc})",
"recommended_recovery": [
"Verify an older seal bundle that predates this range.",
"Restore receipts from WORM/immutable storage if available.",
"Compare integrity.json digests to an out-of-band copy.",
],
}
)
break
try:
ev = json.loads(text)
except Exception as exc:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"{receipts_path.name}:{line_no}: invalid JSON ({exc})",
path=receipts_path.name,
)
)
report["corruption_findings"].append(
{
"file": receipts_path.name,
"line_no": line_no,
"byte_offset": line_start,
"last_good_seq": last_good_seq,
"last_good_line_no": last_good_line_no,
"error": f"invalid JSON ({exc})",
"recommended_recovery": [
"Verify an older seal bundle that predates this range.",
"Restore receipts from WORM/immutable storage if available.",
"Compare integrity.json digests to an out-of-band copy.",
],
}
)
break
if not isinstance(ev, dict):
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"{receipts_path.name}:{line_no}: expected JSON object",
path=receipts_path.name,
)
)
report["corruption_findings"].append(
{
"file": receipts_path.name,
"line_no": line_no,
"byte_offset": line_start,
"last_good_seq": last_good_seq,
"last_good_line_no": last_good_line_no,
"error": "expected JSON object",
}
)
break
errors.extend(
_validate_schema(
ev, event_schema, path=f"{receipts_path.name}:{line_no}"
)
)
seq = ev.get("seq")
if isinstance(seq, int):
last_good_seq = seq
last_good_line_no = line_no
events.append(ev)
if events:
# Deterministic ordering: sort by seq, not file order.
by_seq: dict[int, list[dict]] = {}
for ev in events:
seq = ev.get("seq")
if isinstance(seq, int):
by_seq.setdefault(seq, []).append(ev)
dupes = sorted([seq for seq, lst in by_seq.items() if len(lst) > 1])
for seq in dupes:
errors.append(
Finding(
ERROR_SEQ_NON_MONOTONIC,
f"duplicate seq value: {seq}",
path=f"events.seq:{seq}",
)
)
ordered_seqs = sorted(by_seq.keys())
if ordered_seqs:
if ordered_seqs[0] != since_seq or ordered_seqs[-1] != until_seq:
errors.append(
Finding(
ERROR_RANGE_MISMATCH,
f"event seq range mismatch: got {ordered_seqs[0]}..{ordered_seqs[-1]}, expected {since_seq}..{until_seq}",
path="seal.json.range",
)
)
expected_count = until_seq - since_seq + 1
if expected_count != len(events):
errors.append(
Finding(
ERROR_RANGE_MISMATCH,
f"receipt count mismatch: expected {expected_count}, got {len(events)}",
path=receipts_path.name if receipts_path else "receipts",
)
)
missing = [s for s in range(since_seq, until_seq + 1) if s not in by_seq]
if missing:
errors.append(
Finding(
ERROR_RANGE_MISMATCH,
f"missing seq values in range: {missing[:20]}{'...' if len(missing) > 20 else ''}",
path="events.seq",
)
)
# Flatten events in seq order (deterministic).
events_ordered = [
by_seq[s][0] for s in range(since_seq, until_seq + 1) if s in by_seq
]
events = events_ordered
root_obj = seal.get("root") or {}
root_start = root_obj.get("start")
root_end = root_obj.get("end")
report["observed_roots"] = {"start": root_start, "end": root_end}
report["observed_end_root"] = root_end
if not isinstance(root_start, str) or not isinstance(root_end, str):
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
"seal.root.start/end missing or invalid",
path="seal.json.root",
)
)
# Event hashing, op_digest, and prev_event_hash chain verification.
computed_event_hashes: list[str] = []
leaves: list[str] = []
if events:
revoked_cap_hashes: set[str] = set()
if canonicalization_version not in SUPPORTED_CANONICALIZATION_VERSIONS:
errors.append(
Finding(
ERROR_CANON_VERSION_UNSUPPORTED,
f"unsupported canonicalization_version: {canonicalization_version!r}",
path="seal.json.canonicalization_version",
)
)
else:
for idx, ev in enumerate(events):
seq = ev.get("seq")
if not isinstance(seq, int):
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
"event.seq missing or invalid",
path=f"events[{idx}].seq",
)
)
continue
stored_event_hash = ev.get("event_hash")
if not isinstance(stored_event_hash, str) or not stored_event_hash:
errors.append(
Finding(
ERROR_EVENT_HASH_MISMATCH,
"event_hash missing",
path=f"events[{idx}].event_hash",
)
)
continue
ev_no_hash = dict(ev)
ev_no_hash.pop("event_hash", None)
try:
canon = _canonical_json_bytes(ev_no_hash)
except Exception as exc:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"canonicalization failed: {exc}",
path=f"events[{idx}]",
)
)
continue
try:
computed_hash = _vmhash(canon, hash_algo=hash_algo)
except Exception as exc:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"hashing failed: {exc}",
path=f"events[{idx}]",
)
)
continue
computed_event_hashes.append(computed_hash)
leaves.append(computed_hash)
if computed_hash != stored_event_hash:
errors.append(
Finding(
ERROR_EVENT_HASH_MISMATCH,
f"event_hash mismatch: expected {stored_event_hash}, got {computed_hash}",
path=f"events[{idx}].event_hash",
)
)
prev = ev.get("prev_event_hash")
if not isinstance(prev, str) or not prev:
errors.append(
Finding(
ERROR_CHAIN_DISCONTINUITY,
"prev_event_hash missing",
path=f"events[{idx}].prev_event_hash",
)
)
else:
if idx == 0:
if seq == 0:
if prev != "0":
errors.append(
Finding(
ERROR_CHAIN_DISCONTINUITY,
'prev_event_hash must be "0" for seq=0',
path=f"events[{idx}].prev_event_hash",
)
)
else:
finding = Finding(
WARNING_RANGE_ROOT_PARTIAL,
"first event is not seq=0; prev_event_hash cannot be verified without prior context",
path=f"events[{idx}].prev_event_hash",
)
if strict:
errors.append(
Finding(
ERROR_CHAIN_DISCONTINUITY,
finding.message,
path=f"events[{idx}].prev_event_hash",
)
)
else:
warnings.append(finding)
else:
if prev != computed_event_hashes[idx - 1]:
errors.append(
Finding(
ERROR_CHAIN_DISCONTINUITY,
"prev_event_hash does not match previous event_hash",
path=f"events[{idx}].prev_event_hash",
)
)
# op_digest verification (params convention: payload.params)
op = ev.get("op")
op_digest = ev.get("op_digest")
payload = (
ev.get("payload") if isinstance(ev.get("payload"), dict) else {}
)
params = payload.get("params", {})
if params is None:
params = {}
if not isinstance(op, str) or not op:
errors.append(
Finding(
ERROR_SCHEMA_INVALID, "op missing", path=f"events[{idx}].op"
)
)
elif not isinstance(op_digest, str) or not op_digest:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
"op_digest missing",
path=f"events[{idx}].op_digest",
)
)
elif not isinstance(params, dict):
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
"payload.params must be an object",
path=f"events[{idx}].payload.params",
)
)
else:
try:
op_obj = {"op": op, "params": params}
op_bytes = _canonical_json_bytes(op_obj)
computed_op_digest = _vmhash(op_bytes, hash_algo=hash_algo)
if computed_op_digest != op_digest:
errors.append(
Finding(
ERROR_EVENT_HASH_MISMATCH,
f"op_digest mismatch: expected {op_digest}, got {computed_op_digest}",
path=f"events[{idx}].op_digest",
)
)
except Exception as exc:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"op_digest computation failed: {exc}",
path=f"events[{idx}].op_digest",
)
)
# Capability revocation enforcement (v1 hardening):
# If a capability is revoked, subsequent action execution MUST NOT use it.
event_type = ev.get("event_type")
cap_hash = ev.get("cap_hash")
if event_type == "cap_revoke" and isinstance(payload, dict):
revoked = None
if isinstance(payload.get("revoked_cap_hash"), str):
revoked = payload.get("revoked_cap_hash")
elif isinstance(params, dict) and isinstance(
params.get("revoked_cap_hash"), str
):
revoked = params.get("revoked_cap_hash")
if isinstance(revoked, str) and revoked:
revoked_cap_hashes.add(revoked)
if (
event_type == "action_executed"
and isinstance(cap_hash, str)
and cap_hash in revoked_cap_hashes
):
errors.append(
Finding(
ERROR_REVOKED_CAPABILITY_USED,
f"action_executed uses revoked cap_hash: {cap_hash}",
path=f"events[{idx}].cap_hash",
)
)
report["verified_ranges"] = [{"since_seq": since_seq, "until_seq": until_seq}]
# Merkle verification (only possible from genesis without additional continuation state).
if (
isinstance(since_seq, int)
and since_seq == 0
and leaves
and isinstance(root_start, str)
and isinstance(root_end, str)
):
try:
expected_start = _vmhash(b"empty", hash_algo=hash_algo)
report["computed_roots"]["expected_start"] = expected_start
if root_start != expected_start:
errors.append(
Finding(
ERROR_ROOT_MISMATCH,
f"seal.root.start mismatch: expected {expected_start}, got {root_start}",
path="seal.json.root.start",
)
)
computed_end = _compute_merkle_root(leaves, hash_algo=hash_algo)
report["computed_roots"]["computed_end"] = computed_end
report["computed_end_root"] = computed_end
if computed_end != root_end:
errors.append(
Finding(
ERROR_ROOT_MISMATCH,
f"seal.root.end mismatch: expected {root_end}, got {computed_end}",
path="seal.json.root.end",
)
)
except Exception as exc:
errors.append(
Finding(
ERROR_ROOT_MISMATCH,
f"merkle verification failed: {exc}",
path="seal.json.root",
)
)
else:
if isinstance(since_seq, int) and since_seq > 0:
finding = Finding(
WARNING_RANGE_ROOT_PARTIAL,
"cannot recompute Merkle roots for since_seq>0 without a verifiable continuation state (frontier snapshot)",
path="seal.json.range",
)
if strict:
errors.append(
Finding(
ERROR_ROOT_MISMATCH, finding.message, path="seal.json.range"
)
)
else:
warnings.append(finding)
# roots.txt parsing (self-consistency with seal.root.end)
if roots_path is not None and isinstance(root_end, str):
try:
roots = _parse_roots_txt(roots_path)
if roots:
last_seq, last_root = roots[-1]
if last_seq != until_seq:
errors.append(
Finding(
ERROR_RANGE_MISMATCH,
f"roots.txt last seq mismatch: expected {until_seq}, got {last_seq}",
path=roots_path.name,
)
)
if last_root != root_end:
errors.append(
Finding(
ERROR_ROOT_MISMATCH,
"roots.txt last root does not match seal.root.end",
path=roots_path.name,
)
)
except Exception as exc:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"failed to parse roots.txt: {exc}",
path=roots_path.name,
)
)
# Strict-mode trace linkage integrity checks (intent → executed/denied).
if strict and events:
try:
by_trace: dict[str, list[tuple[int, str]]] = {}
for ev in events:
trace_id = ev.get("trace_id")
event_type = ev.get("event_type")
seq = ev.get("seq")
if (
isinstance(trace_id, str)
and isinstance(event_type, str)
and isinstance(seq, int)
):
if event_type in (
"action_intent",
"action_executed",
"shadow_receipt",
):
by_trace.setdefault(trace_id, []).append((seq, event_type))
for trace_id, seq_types in sorted(by_trace.items()):
seq_types_sorted = sorted(seq_types, key=lambda t: t[0])
types = [t for _, t in seq_types_sorted]
has_intent = "action_intent" in types
has_exec = "action_executed" in types
has_shadow = "shadow_receipt" in types
if has_exec and not has_intent:
errors.append(
Finding(
ERROR_CHAIN_DISCONTINUITY,
f"execution without prior intent for trace_id {trace_id}",
path=f"trace_id:{trace_id}",
)
)
if has_intent and not (has_exec or has_shadow):
errors.append(
Finding(
ERROR_CHAIN_DISCONTINUITY,
f"intent without executed/denied outcome for trace_id {trace_id}",
path=f"trace_id:{trace_id}",
)
)
if has_exec and has_shadow:
errors.append(
Finding(
ERROR_CHAIN_DISCONTINUITY,
f"both action_executed and shadow_receipt present for trace_id {trace_id}",
path=f"trace_id:{trace_id}",
)
)
except Exception as exc:
errors.append(
Finding(
ERROR_SCHEMA_INVALID,
f"trace linkage verification failed: {exc}",
path="trace_id",
)
)
_finalize_report_findings(report, errors=errors, warnings=warnings)
_write_report(report_path, report)
# Console output: PASS/FAIL + stable codes
if errors:
for e in errors:
loc = f" ({e.path})" if e.path else ""
print(f"FAIL {e.code}{loc}: {e.message}", file=sys.stderr)
for w in warnings:
loc = f" ({w.path})" if w.path else ""
print(f"WARN {w.code}{loc}: {w.message}", file=sys.stderr)
return 1
for w in warnings:
loc = f" ({w.path})" if w.path else ""
print(f"WARN {w.code}{loc}: {w.message}", file=sys.stderr)
print("PASS")
return 0
def main(argv: list[str]) -> int:
p = argparse.ArgumentParser(
description="Verify a VaultMesh Sentinel v1 bundle directory."
)
p.add_argument(
"--bundle", required=True, help="Path to bundle directory (contains seal.json)"
)
p.add_argument(
"--strict",
action="store_true",
help="Treat warnings and partial verifications as failures.",
)
p.add_argument(
"--report",
help="Write machine-readable verification report JSON to this path (default: verification_report.json in bundle).",
)
p.add_argument(
"--max-file-bytes",
type=int,
default=50_000_000,
help="Reject any single input file larger than this many bytes (default: 50,000,000).",
)
args = p.parse_args(argv)
bundle_dir = Path(args.bundle).expanduser().resolve()
if not bundle_dir.exists() or not bundle_dir.is_dir():
print(f"[ERROR] --bundle must be a directory: {bundle_dir}", file=sys.stderr)
return 2
report_path = (
Path(args.report).expanduser().resolve()
if args.report
else (bundle_dir / "verification_report.json")
)
try:
return verify_bundle(
bundle_dir,
strict=bool(args.strict),
report_path=report_path,
max_file_bytes=int(args.max_file_bytes),
)
except Exception as exc:
print(f"[ERROR] unexpected error: {exc}", file=sys.stderr)
return 2
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))