1651 lines
60 KiB
Python
1651 lines
60 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
vm_verify_sentinel_bundle.py
|
|
|
|
Offline verifier for VaultMesh Sentinel v1 seal bundles.
|
|
|
|
Usage:
|
|
python3 vm_verify_sentinel_bundle.py --bundle /path/to/bundle_dir [--strict] [--report out.json]
|
|
|
|
Exit codes:
|
|
0 - verification OK
|
|
1 - verification failed
|
|
2 - usage / unexpected error
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Iterable, Optional
|
|
|
|
from sentinel_failure_codes import FailureCode, WarningCode
|
|
|
|
try:
|
|
import blake3 # type: ignore
|
|
except ImportError: # pragma: no cover
|
|
blake3 = None
|
|
|
|
|
|
SUPPORTED_SEAL_FORMATS = {"vm-sentinel-seal-v1"}
|
|
SUPPORTED_INTEGRITY_FORMATS = {"vm-sentinel-integrity-v1"}
|
|
SUPPORTED_VERIFIER_MANIFEST_FORMATS = {"vm-sentinel-verifier-manifest-v1"}
|
|
SUPPORTED_CANONICALIZATION_VERSIONS = {"sentinel-event-jcs-v1"}
|
|
|
|
ERROR_SCHEMA_INVALID = FailureCode.SCHEMA_INVALID.value
|
|
ERROR_MANIFEST_HASH_MISMATCH = FailureCode.MANIFEST_HASH_MISMATCH.value
|
|
ERROR_MISSING_REQUIRED_FILE = FailureCode.MISSING_REQUIRED_FILE.value
|
|
ERROR_EVENT_HASH_MISMATCH = FailureCode.EVENT_HASH_MISMATCH.value
|
|
ERROR_CHAIN_DISCONTINUITY = FailureCode.CHAIN_DISCONTINUITY.value
|
|
ERROR_SEQ_NON_MONOTONIC = FailureCode.SEQ_NON_MONOTONIC.value
|
|
ERROR_ROOT_MISMATCH = FailureCode.ROOT_MISMATCH.value
|
|
ERROR_RANGE_MISMATCH = FailureCode.RANGE_MISMATCH.value
|
|
ERROR_CANON_VERSION_UNSUPPORTED = FailureCode.CANON_VERSION_UNSUPPORTED.value
|
|
ERROR_OVERSIZE_INPUT = FailureCode.OVERSIZE_INPUT.value
|
|
ERROR_REVOKED_CAPABILITY_USED = FailureCode.REVOKED_CAPABILITY_USED.value
|
|
|
|
WARNING_UNLISTED_FILE = WarningCode.FILE_NOT_IN_MANIFEST.value
|
|
WARNING_RANGE_ROOT_PARTIAL = WarningCode.RANGE_ROOT_PARTIAL.value
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
SCHEMA_DIR = REPO_ROOT / "spec" / "sentinel"
|
|
|
|
_EMBEDDED_SCHEMAS: dict[str, dict[str, Any]] = {
|
|
"event.schema.json": {
|
|
"$schema": "https://json-schema.org/draft/2020-12/schema",
|
|
"title": "VaultMesh Sentinel v1 Event",
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"required": [
|
|
"event_id",
|
|
"seq",
|
|
"ts",
|
|
"event_type",
|
|
"actor",
|
|
"cap_hash",
|
|
"op",
|
|
"op_digest",
|
|
"result",
|
|
"trace_id",
|
|
"prev_event_hash",
|
|
"event_hash",
|
|
"payload",
|
|
],
|
|
"properties": {
|
|
"event_id": {"type": "string"},
|
|
"seq": {"type": "integer", "minimum": 0},
|
|
"ts": {
|
|
"description": "Monotonic + wallclock if available. Accepts ISO-8601 Z or a structured object.",
|
|
"anyOf": [
|
|
{"type": "string"},
|
|
{
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"required": ["wall"],
|
|
"properties": {
|
|
"wall": {"type": "string"},
|
|
"mono_ns": {"type": "integer", "minimum": 0},
|
|
},
|
|
},
|
|
],
|
|
},
|
|
"event_type": {
|
|
"type": "string",
|
|
"enum": [
|
|
"action_intent",
|
|
"policy_decision",
|
|
"action_executed",
|
|
"shadow_receipt",
|
|
"cap_grant",
|
|
"cap_revoke",
|
|
"seal_created",
|
|
"root_published",
|
|
"corruption_detected",
|
|
"tamper_signal",
|
|
"boot_event",
|
|
"health_event",
|
|
],
|
|
},
|
|
"actor": {"type": "string", "minLength": 1},
|
|
"cap_hash": {"type": "string", "minLength": 1},
|
|
"op": {"type": "string", "minLength": 1},
|
|
"op_digest": {"type": "string", "minLength": 1},
|
|
"result": {"type": "string", "enum": ["ok", "deny", "error"]},
|
|
"root_before": {"type": "string"},
|
|
"root_after": {"type": "string"},
|
|
"trace_id": {"type": "string"},
|
|
"prev_event_hash": {"type": "string", "minLength": 1},
|
|
"event_hash": {"type": "string"},
|
|
"payload": {"type": "object"},
|
|
},
|
|
},
|
|
"seal.schema.json": {
|
|
"$schema": "https://json-schema.org/draft/2020-12/schema",
|
|
"title": "VaultMesh Sentinel v1 Seal Bundle (seal.json)",
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"required": [
|
|
"format",
|
|
"sentinel_version",
|
|
"schema_version",
|
|
"hash_algo",
|
|
"canonicalization_version",
|
|
"seal_id",
|
|
"created_at",
|
|
"range",
|
|
"root",
|
|
"files",
|
|
],
|
|
"properties": {
|
|
"format": {"type": "string", "const": "vm-sentinel-seal-v1"},
|
|
"sentinel_version": {"type": "string"},
|
|
"schema_version": {"type": "string"},
|
|
"hash_algo": {"type": "string", "enum": ["blake3", "sha256"]},
|
|
"canonicalization_version": {"type": "string"},
|
|
"seal_id": {"type": "string"},
|
|
"created_at": {"type": "string"},
|
|
"instance_id": {"type": "string"},
|
|
"ledger_type": {"type": "string", "enum": ["sqlite", "jsonl"]},
|
|
"range": {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"required": ["since_seq", "until_seq"],
|
|
"properties": {
|
|
"since_seq": {"type": "integer", "minimum": 0},
|
|
"until_seq": {"type": "integer", "minimum": 0},
|
|
"since_ts": {"type": "string"},
|
|
"until_ts": {"type": "string"},
|
|
},
|
|
},
|
|
"root": {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"required": ["start", "end"],
|
|
"properties": {
|
|
"start": {"type": "string"},
|
|
"end": {"type": "string"},
|
|
"seq": {"type": "integer", "minimum": 0},
|
|
},
|
|
},
|
|
"files": {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"required": ["receipts", "roots", "integrity", "verifier_manifest"],
|
|
"properties": {
|
|
"receipts": {"type": "string"},
|
|
"roots": {"type": "string"},
|
|
"integrity": {"type": "string"},
|
|
"verifier_manifest": {"type": "string"},
|
|
},
|
|
},
|
|
"notes": {"type": "string"},
|
|
},
|
|
},
|
|
"integrity.schema.json": {
|
|
"$schema": "https://json-schema.org/draft/2020-12/schema",
|
|
"title": "VaultMesh Sentinel v1 Integrity Manifest (integrity.json)",
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"required": ["format", "hash_algo", "files"],
|
|
"properties": {
|
|
"format": {"type": "string", "const": "vm-sentinel-integrity-v1"},
|
|
"hash_algo": {"type": "string", "enum": ["blake3", "sha256"]},
|
|
"files": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"required": ["path", "digest"],
|
|
"properties": {
|
|
"path": {"type": "string"},
|
|
"digest": {"type": "string"},
|
|
"size_bytes": {"type": "integer", "minimum": 0},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"verifier_manifest.schema.json": {
|
|
"$schema": "https://json-schema.org/draft/2020-12/schema",
|
|
"title": "VaultMesh Sentinel v1 Verifier Manifest (verifier_manifest.json)",
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"required": [
|
|
"format",
|
|
"sentinel_version",
|
|
"schema_version",
|
|
"canonicalization_version",
|
|
],
|
|
"properties": {
|
|
"format": {
|
|
"type": "string",
|
|
"const": "vm-sentinel-verifier-manifest-v1",
|
|
},
|
|
"sentinel_version": {"type": "string"},
|
|
"schema_version": {"type": "string"},
|
|
"hash_algo": {"type": "string", "enum": ["blake3", "sha256"]},
|
|
"canonicalization_version": {"type": "string"},
|
|
"verifier": {
|
|
"type": "object",
|
|
"additionalProperties": True,
|
|
"properties": {
|
|
"name": {"type": "string"},
|
|
"version": {"type": "string"},
|
|
"sha256": {"type": "string"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Finding:
|
|
code: str
|
|
message: str
|
|
path: str | None = None
|
|
|
|
def as_dict(self) -> dict:
|
|
d = {"code": self.code, "message": self.message}
|
|
if self.path:
|
|
d["path"] = self.path
|
|
return d
|
|
|
|
|
|
def _contract_ids_for_finding(finding: Finding) -> list[str]:
|
|
"""
|
|
Best-effort mapping from verifier findings -> Contract Matrix IDs.
|
|
|
|
This is meant to make verification_report.json auditor-friendly without
|
|
requiring readers to inspect verifier source code.
|
|
"""
|
|
|
|
contract_ids: list[str] = []
|
|
code = finding.code
|
|
path = finding.path or ""
|
|
message = finding.message
|
|
|
|
def add(contract_id: str) -> None:
|
|
if contract_id not in contract_ids:
|
|
contract_ids.append(contract_id)
|
|
|
|
# Bundle-level
|
|
if code == ERROR_MISSING_REQUIRED_FILE:
|
|
add("B-1")
|
|
if code == ERROR_OVERSIZE_INPUT:
|
|
add("B-3")
|
|
|
|
# Seal bundle / seal.json
|
|
if path == "seal.json":
|
|
add("B-1")
|
|
if path.startswith("seal.json.format"):
|
|
add("S-1")
|
|
if path.startswith("seal.json.hash_algo"):
|
|
add("S-2")
|
|
if path.startswith("seal.json.range"):
|
|
add("S-3")
|
|
if path.startswith("seal.json.root"):
|
|
add("S-4")
|
|
if path.startswith("seal.json.files"):
|
|
add("S-5")
|
|
add("B-1")
|
|
if "missing file referenced by seal.files." in message:
|
|
add("S-5")
|
|
add("B-1")
|
|
if path.startswith("seal.json.canonicalization_version"):
|
|
add("S-6")
|
|
if code == ERROR_CANON_VERSION_UNSUPPORTED:
|
|
add("S-6")
|
|
|
|
# integrity.json
|
|
if path.startswith("integrity.json.format"):
|
|
add("I-1")
|
|
if path.startswith("integrity.json.hash_algo"):
|
|
add("I-2")
|
|
if path.startswith("integrity.json.files"):
|
|
add("I-3")
|
|
if code == ERROR_MANIFEST_HASH_MISMATCH:
|
|
add("I-3")
|
|
if "size_bytes mismatch" in message:
|
|
add("I-4")
|
|
if message.startswith("file present but not listed in integrity.json:"):
|
|
add("I-5")
|
|
if message.startswith("integrity.json does not cover required seal file:"):
|
|
add("I-6")
|
|
|
|
# verifier_manifest.json
|
|
if path.startswith("verifier_manifest.json.format"):
|
|
add("V-1")
|
|
if path.startswith("verifier_manifest.json.hash_algo"):
|
|
add("V-3")
|
|
if path.startswith("verifier_manifest.json") and not (
|
|
path.startswith("verifier_manifest.json.format")
|
|
or path.startswith("verifier_manifest.json.hash_algo")
|
|
):
|
|
add("V-2")
|
|
|
|
# Event ledger
|
|
if code == ERROR_SCHEMA_INVALID and path.endswith(".jsonl"):
|
|
add("E-1")
|
|
if code == ERROR_EVENT_HASH_MISMATCH:
|
|
add("E-2")
|
|
if code == ERROR_CHAIN_DISCONTINUITY:
|
|
add("E-3")
|
|
if code == ERROR_SEQ_NON_MONOTONIC:
|
|
add("E-4")
|
|
if code == ERROR_ROOT_MISMATCH:
|
|
add("E-5")
|
|
if code == ERROR_RANGE_MISMATCH:
|
|
add("E-6")
|
|
if code == ERROR_REVOKED_CAPABILITY_USED:
|
|
add("E-7")
|
|
|
|
return contract_ids
|
|
|
|
|
|
def _finding_to_report_dict(finding: Finding) -> dict:
|
|
d = finding.as_dict()
|
|
contract_ids = _contract_ids_for_finding(finding)
|
|
if contract_ids:
|
|
d["contract_ids"] = contract_ids
|
|
return d
|
|
|
|
|
|
def _finalize_report_findings(
|
|
report: dict[str, Any], *, errors: list[Finding], warnings: list[Finding]
|
|
) -> None:
|
|
report["errors"] = [_finding_to_report_dict(e) for e in errors]
|
|
report["warnings"] = [_finding_to_report_dict(w) for w in warnings]
|
|
report["ok"] = not errors
|
|
report["failure_code"] = errors[0].code if errors else None
|
|
report["violated_contract_ids"] = sorted(
|
|
{cid for e in errors for cid in _contract_ids_for_finding(e)}
|
|
)
|
|
report["warned_contract_ids"] = sorted(
|
|
{cid for w in warnings for cid in _contract_ids_for_finding(w)}
|
|
)
|
|
|
|
|
|
def _load_json(path: Path) -> dict:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def _hex_part(value: str) -> str:
|
|
return value.split(":", 1)[-1]
|
|
|
|
|
|
def _require_no_floats(value: Any, *, path: str = "$") -> None:
|
|
if isinstance(value, float):
|
|
raise ValueError(f"float not allowed in canonical JSON at {path}")
|
|
if isinstance(value, dict):
|
|
for k, v in value.items():
|
|
_require_no_floats(v, path=f"{path}.{k}")
|
|
elif isinstance(value, list):
|
|
for i, v in enumerate(value):
|
|
_require_no_floats(v, path=f"{path}[{i}]")
|
|
|
|
|
|
def _canonical_json_bytes(obj: Any) -> bytes:
|
|
"""
|
|
Deterministic canonical JSON bytes for Sentinel v1 hashing.
|
|
|
|
This verifier enforces a strict subset compatible with sentinel-event-jcs-v1
|
|
for Sentinel artifacts:
|
|
- UTF-8
|
|
- object keys sorted
|
|
- separators (",", ":")
|
|
- no NaN/Infinity
|
|
- no floats (represent decimals as strings instead)
|
|
"""
|
|
_require_no_floats(obj)
|
|
encoded = json.dumps(
|
|
obj,
|
|
sort_keys=True,
|
|
separators=(",", ":"),
|
|
ensure_ascii=False,
|
|
allow_nan=False,
|
|
).encode("utf-8")
|
|
return encoded
|
|
|
|
|
|
def _vmhash(data: bytes, *, hash_algo: str) -> str:
|
|
if hash_algo == "blake3":
|
|
if blake3 is None:
|
|
raise RuntimeError(
|
|
"Missing dependency: blake3 (required for blake3 bundles)"
|
|
)
|
|
return f"blake3:{blake3.blake3(data).hexdigest()}"
|
|
if hash_algo == "sha256":
|
|
return f"sha256:{hashlib.sha256(data).hexdigest()}"
|
|
raise ValueError(f"unsupported hash_algo: {hash_algo!r}")
|
|
|
|
|
|
def _compute_merkle_root(leaves: list[str], *, hash_algo: str) -> str:
|
|
if not leaves:
|
|
return _vmhash(b"empty", hash_algo=hash_algo)
|
|
if len(leaves) == 1:
|
|
return leaves[0]
|
|
|
|
level = leaves[:]
|
|
while len(level) > 1:
|
|
next_level: list[str] = []
|
|
for i in range(0, len(level), 2):
|
|
left = level[i]
|
|
right = level[i + 1] if i + 1 < len(level) else left
|
|
combined = (_hex_part(left) + _hex_part(right)).encode("utf-8")
|
|
next_level.append(_vmhash(combined, hash_algo=hash_algo))
|
|
level = next_level
|
|
return level[0]
|
|
|
|
|
|
def _iter_jsonl(path: Path) -> Iterable[dict]:
|
|
with path.open("r", encoding="utf-8") as f:
|
|
for line_no, line in enumerate(f, start=1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
except Exception as exc:
|
|
raise ValueError(
|
|
f"{path.name}:{line_no}: invalid JSON ({exc})"
|
|
) from exc
|
|
if not isinstance(obj, dict):
|
|
raise ValueError(f"{path.name}:{line_no}: expected JSON object")
|
|
yield obj
|
|
|
|
|
|
def _load_schema(filename: str) -> dict:
|
|
path = SCHEMA_DIR / filename
|
|
if path.exists():
|
|
return _load_json(path)
|
|
embedded = _EMBEDDED_SCHEMAS.get(filename)
|
|
if embedded is None:
|
|
raise FileNotFoundError(f"schema not found: {filename}")
|
|
return embedded
|
|
|
|
|
|
def _validate_schema(instance: Any, schema: dict, *, path: str = "$") -> list[Finding]:
|
|
"""
|
|
Minimal JSON Schema validator (subset) for Sentinel v1 verifier.
|
|
|
|
Supports: type, required, properties, additionalProperties, enum, const, anyOf, items, minimum.
|
|
"""
|
|
findings: list[Finding] = []
|
|
|
|
if "const" in schema:
|
|
if instance != schema["const"]:
|
|
findings.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"expected const {schema['const']!r}, got {instance!r}",
|
|
path=path,
|
|
)
|
|
)
|
|
return findings
|
|
|
|
if "enum" in schema:
|
|
if instance not in schema["enum"]:
|
|
findings.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"expected one of {schema['enum']!r}, got {instance!r}",
|
|
path=path,
|
|
)
|
|
)
|
|
return findings
|
|
|
|
if "anyOf" in schema:
|
|
options = schema["anyOf"]
|
|
for opt in options:
|
|
if not _validate_schema(instance, opt, path=path):
|
|
return []
|
|
findings.append(
|
|
Finding(ERROR_SCHEMA_INVALID, "did not match anyOf schema", path=path)
|
|
)
|
|
return findings
|
|
|
|
schema_type = schema.get("type")
|
|
if schema_type == "object":
|
|
if not isinstance(instance, dict):
|
|
findings.append(Finding(ERROR_SCHEMA_INVALID, "expected object", path=path))
|
|
return findings
|
|
|
|
required = schema.get("required") or []
|
|
for key in required:
|
|
if key not in instance:
|
|
findings.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"missing required property: {key}",
|
|
path=path,
|
|
)
|
|
)
|
|
|
|
properties = schema.get("properties") or {}
|
|
additional = schema.get("additionalProperties", True)
|
|
|
|
for key, value in instance.items():
|
|
key_path = f"{path}.{key}"
|
|
if key in properties:
|
|
findings.extend(_validate_schema(value, properties[key], path=key_path))
|
|
else:
|
|
if additional is False:
|
|
findings.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
"unexpected additional property",
|
|
path=key_path,
|
|
)
|
|
)
|
|
|
|
return findings
|
|
|
|
if schema_type == "array":
|
|
if not isinstance(instance, list):
|
|
findings.append(Finding(ERROR_SCHEMA_INVALID, "expected array", path=path))
|
|
return findings
|
|
items_schema = schema.get("items")
|
|
if isinstance(items_schema, dict):
|
|
for i, item in enumerate(instance):
|
|
findings.extend(
|
|
_validate_schema(item, items_schema, path=f"{path}[{i}]")
|
|
)
|
|
return findings
|
|
|
|
if schema_type == "string":
|
|
if not isinstance(instance, str):
|
|
findings.append(Finding(ERROR_SCHEMA_INVALID, "expected string", path=path))
|
|
return findings
|
|
min_len = schema.get("minLength")
|
|
if isinstance(min_len, int) and len(instance) < min_len:
|
|
findings.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"minLength {min_len} violated",
|
|
path=path,
|
|
)
|
|
)
|
|
return findings
|
|
|
|
if schema_type == "integer":
|
|
if not isinstance(instance, int) or isinstance(instance, bool):
|
|
findings.append(
|
|
Finding(ERROR_SCHEMA_INVALID, "expected integer", path=path)
|
|
)
|
|
return findings
|
|
minimum = schema.get("minimum")
|
|
if isinstance(minimum, int) and instance < minimum:
|
|
findings.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"minimum {minimum} violated",
|
|
path=path,
|
|
)
|
|
)
|
|
return findings
|
|
|
|
if schema_type == "boolean":
|
|
if not isinstance(instance, bool):
|
|
findings.append(
|
|
Finding(ERROR_SCHEMA_INVALID, "expected boolean", path=path)
|
|
)
|
|
return findings
|
|
|
|
# If schema has no type, treat as permissive.
|
|
return findings
|
|
|
|
|
|
def _parse_roots_txt(path: Path) -> list[tuple[int, str]]:
|
|
roots: list[tuple[int, str]] = []
|
|
for line_no, line in enumerate(
|
|
path.read_text(encoding="utf-8").splitlines(), start=1
|
|
):
|
|
s = line.strip()
|
|
if not s or s.startswith("#"):
|
|
continue
|
|
if "seq=" in s and "root=" in s:
|
|
parts = dict(part.split("=", 1) for part in s.split() if "=" in part)
|
|
try:
|
|
seq = int(parts["seq"])
|
|
except Exception as exc:
|
|
raise ValueError(f"{path.name}:{line_no}: invalid seq ({exc})") from exc
|
|
root = parts.get("root")
|
|
if not root:
|
|
raise ValueError(f"{path.name}:{line_no}: missing root")
|
|
roots.append((seq, root))
|
|
else:
|
|
raise ValueError(
|
|
f"{path.name}:{line_no}: expected 'seq=<int> root=<algo:hex>'"
|
|
)
|
|
return roots
|
|
|
|
|
|
def _write_report(path: Path, report: dict) -> None:
|
|
path.write_text(
|
|
json.dumps(report, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
def verify_bundle(
|
|
bundle_dir: Path,
|
|
*,
|
|
strict: bool,
|
|
report_path: Path,
|
|
max_file_bytes: int,
|
|
) -> int:
|
|
errors: list[Finding] = []
|
|
warnings: list[Finding] = []
|
|
|
|
report: dict[str, Any] = {
|
|
"format": "vm-sentinel-verification-report-v1",
|
|
"ok": False,
|
|
"strict": strict,
|
|
"failure_code": None,
|
|
"inputs": {"bundle_dir": str(bundle_dir)},
|
|
"covered_seq_range": None,
|
|
"verified_ranges": [],
|
|
"observed_roots": {},
|
|
"computed_roots": {},
|
|
"observed_end_root": None,
|
|
"computed_end_root": None,
|
|
"mismatches": [],
|
|
"corruption_findings": [],
|
|
"versions": {},
|
|
"schema_versions_used": {},
|
|
"errors": [],
|
|
"warnings": [],
|
|
"verifier": {
|
|
"name": "vm_verify_sentinel_bundle.py",
|
|
"python": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
|
|
},
|
|
"declared_verifier": None,
|
|
}
|
|
|
|
seal_path = bundle_dir / "seal.json"
|
|
if not seal_path.exists():
|
|
errors.append(
|
|
Finding(
|
|
ERROR_MISSING_REQUIRED_FILE, "seal.json not found", path="seal.json"
|
|
)
|
|
)
|
|
_finalize_report_findings(report, errors=errors, warnings=warnings)
|
|
_write_report(report_path, report)
|
|
return 1
|
|
|
|
try:
|
|
seal = _load_json(seal_path)
|
|
except Exception as exc:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"failed to parse seal.json: {exc}",
|
|
path="seal.json",
|
|
)
|
|
)
|
|
_finalize_report_findings(report, errors=errors, warnings=warnings)
|
|
_write_report(report_path, report)
|
|
return 1
|
|
|
|
try:
|
|
seal_schema = _load_schema("seal.schema.json")
|
|
errors.extend(_validate_schema(seal, seal_schema, path="seal.json"))
|
|
except Exception as exc:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"failed to load/validate seal.schema.json: {exc}",
|
|
path="seal.schema.json",
|
|
)
|
|
)
|
|
|
|
fmt = seal.get("format")
|
|
if fmt not in SUPPORTED_SEAL_FORMATS:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"seal.format unsupported: {fmt!r}",
|
|
path="seal.json.format",
|
|
)
|
|
)
|
|
|
|
hash_algo = seal.get("hash_algo")
|
|
if hash_algo not in ("blake3", "sha256"):
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"seal.hash_algo invalid: {hash_algo!r}",
|
|
path="seal.json.hash_algo",
|
|
)
|
|
)
|
|
hash_algo = "sha256" # keep verifier progressing for report completeness
|
|
|
|
canonicalization_version = seal.get("canonicalization_version")
|
|
if not isinstance(canonicalization_version, str) or not canonicalization_version:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
"seal.canonicalization_version missing",
|
|
path="seal.json.canonicalization_version",
|
|
)
|
|
)
|
|
canonicalization_version = ""
|
|
elif canonicalization_version not in SUPPORTED_CANONICALIZATION_VERSIONS:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_CANON_VERSION_UNSUPPORTED,
|
|
f"unsupported canonicalization_version: {canonicalization_version!r}",
|
|
path="seal.json.canonicalization_version",
|
|
)
|
|
)
|
|
|
|
files = seal.get("files")
|
|
if not isinstance(files, dict):
|
|
errors.append(
|
|
Finding(ERROR_SCHEMA_INVALID, "seal.files missing", path="seal.json.files")
|
|
)
|
|
files = {}
|
|
|
|
def _file_from_seal(key: str) -> Optional[Path]:
|
|
rel = files.get(key)
|
|
if not isinstance(rel, str) or not rel:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"seal.files.{key} missing",
|
|
path=f"seal.json.files.{key}",
|
|
)
|
|
)
|
|
return None
|
|
p = (bundle_dir / rel).resolve()
|
|
if not p.exists():
|
|
errors.append(
|
|
Finding(
|
|
ERROR_MISSING_REQUIRED_FILE,
|
|
f"missing file referenced by seal.files.{key}: {rel}",
|
|
path=rel,
|
|
)
|
|
)
|
|
return None
|
|
return p
|
|
|
|
receipts_path = _file_from_seal("receipts")
|
|
roots_path = _file_from_seal("roots")
|
|
integrity_path = _file_from_seal("integrity")
|
|
verifier_manifest_path = _file_from_seal("verifier_manifest")
|
|
|
|
report["versions"] = {
|
|
"sentinel_version": seal.get("sentinel_version"),
|
|
"schema_version": seal.get("schema_version"),
|
|
"hash_algo": hash_algo,
|
|
"canonicalization_version": canonicalization_version,
|
|
}
|
|
report["schema_versions_used"]["seal"] = seal.get("schema_version")
|
|
|
|
integrity: dict[str, Any] | None = None
|
|
if integrity_path is not None:
|
|
try:
|
|
integrity = _load_json(integrity_path)
|
|
|
|
integrity_schema = _load_schema("integrity.schema.json")
|
|
errors.extend(
|
|
_validate_schema(
|
|
integrity, integrity_schema, path=str(integrity_path.name)
|
|
)
|
|
)
|
|
|
|
if integrity.get("format") not in SUPPORTED_INTEGRITY_FORMATS:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"integrity.format unsupported: {integrity.get('format')!r}",
|
|
path="integrity.json.format",
|
|
)
|
|
)
|
|
if integrity.get("hash_algo") != hash_algo:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
"integrity.hash_algo != seal.hash_algo",
|
|
path="integrity.json.hash_algo",
|
|
)
|
|
)
|
|
|
|
listed: list[dict] = integrity.get("files") or []
|
|
if not isinstance(listed, list) or not listed:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
"integrity.files missing or empty",
|
|
path="integrity.json.files",
|
|
)
|
|
)
|
|
listed = []
|
|
|
|
# Deterministic ordering for verification/reporting.
|
|
listed_sorted = sorted(
|
|
(e for e in listed if isinstance(e, dict)),
|
|
key=lambda e: str(e.get("path", "")),
|
|
)
|
|
|
|
listed_paths = {str(e.get("path")) for e in listed_sorted if "path" in e}
|
|
|
|
# Enforce that seal-referenced files are covered by integrity.json.
|
|
#
|
|
# Note: integrity.json MUST NOT be required to include a digest of itself (recursive),
|
|
# so we only require the other seal files here.
|
|
required_files = [
|
|
files.get("receipts"),
|
|
files.get("roots"),
|
|
files.get("verifier_manifest"),
|
|
]
|
|
for required_rel in required_files:
|
|
if (
|
|
isinstance(required_rel, str)
|
|
and required_rel
|
|
and required_rel not in listed_paths
|
|
):
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"integrity.json does not cover required seal file: {required_rel}",
|
|
path="integrity.json.files",
|
|
)
|
|
)
|
|
|
|
# Recommended: cover seal.json too (strict mode enforces).
|
|
if "seal.json" not in listed_paths:
|
|
finding = Finding(
|
|
WARNING_UNLISTED_FILE,
|
|
"integrity.json does not cover seal.json (recommended)",
|
|
path="integrity.json.files",
|
|
)
|
|
if strict:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
finding.message,
|
|
path="integrity.json.files",
|
|
)
|
|
)
|
|
else:
|
|
warnings.append(finding)
|
|
|
|
# Hash verification
|
|
for entry in listed_sorted:
|
|
rel = entry.get("path")
|
|
digest = entry.get("digest")
|
|
if not isinstance(rel, str) or not rel:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
"integrity.files entry missing path",
|
|
path="integrity.json.files",
|
|
)
|
|
)
|
|
continue
|
|
if not isinstance(digest, str) or not digest:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"integrity.files[{rel}] missing digest",
|
|
path="integrity.json.files",
|
|
)
|
|
)
|
|
continue
|
|
|
|
file_path = (bundle_dir / rel).resolve()
|
|
if not file_path.exists():
|
|
errors.append(
|
|
Finding(
|
|
ERROR_MISSING_REQUIRED_FILE,
|
|
f"integrity missing file: {rel}",
|
|
path=rel,
|
|
)
|
|
)
|
|
continue
|
|
|
|
size = file_path.stat().st_size
|
|
if size > max_file_bytes:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_OVERSIZE_INPUT,
|
|
f"file exceeds max size ({size} > {max_file_bytes} bytes): {rel}",
|
|
path=rel,
|
|
)
|
|
)
|
|
continue
|
|
|
|
data = file_path.read_bytes()
|
|
computed = _vmhash(data, hash_algo=hash_algo)
|
|
if computed != digest:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_MANIFEST_HASH_MISMATCH,
|
|
f"digest mismatch for {rel}: expected {digest}, got {computed}",
|
|
path=rel,
|
|
)
|
|
)
|
|
|
|
size_bytes = entry.get("size_bytes")
|
|
if isinstance(size_bytes, int) and size_bytes != len(data):
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"size_bytes mismatch for {rel}: expected {size_bytes}, got {len(data)}",
|
|
path=rel,
|
|
)
|
|
)
|
|
|
|
# Extra files present but not listed in integrity.json
|
|
ignored = {".DS_Store", "verification_report.json", report_path.name}
|
|
integrity_rel = files.get("integrity")
|
|
if isinstance(integrity_rel, str) and integrity_rel:
|
|
ignored.add(integrity_rel)
|
|
for fp in sorted(bundle_dir.rglob("*")):
|
|
if fp.is_dir():
|
|
continue
|
|
rel = fp.relative_to(bundle_dir).as_posix()
|
|
if rel in ignored:
|
|
continue
|
|
if rel not in listed_paths:
|
|
finding = Finding(
|
|
WARNING_UNLISTED_FILE,
|
|
f"file present but not listed in integrity.json: {rel}",
|
|
path=rel,
|
|
)
|
|
if strict:
|
|
errors.append(
|
|
Finding(ERROR_SCHEMA_INVALID, finding.message, path=rel)
|
|
)
|
|
else:
|
|
warnings.append(finding)
|
|
except Exception as exc:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"failed to verify integrity.json: {exc}",
|
|
path="integrity.json",
|
|
)
|
|
)
|
|
|
|
if verifier_manifest_path is not None:
|
|
try:
|
|
manifest = _load_json(verifier_manifest_path)
|
|
manifest_schema = _load_schema("verifier_manifest.schema.json")
|
|
errors.extend(
|
|
_validate_schema(
|
|
manifest, manifest_schema, path=str(verifier_manifest_path.name)
|
|
)
|
|
)
|
|
|
|
mfmt = manifest.get("format")
|
|
if mfmt not in SUPPORTED_VERIFIER_MANIFEST_FORMATS:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"verifier_manifest.format unsupported: {mfmt!r}",
|
|
path="verifier_manifest.json.format",
|
|
)
|
|
)
|
|
|
|
mv = manifest.get("canonicalization_version")
|
|
if (
|
|
isinstance(mv, str)
|
|
and canonicalization_version
|
|
and mv != canonicalization_version
|
|
):
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
"verifier_manifest.canonicalization_version != seal.canonicalization_version",
|
|
path="verifier_manifest.json.canonicalization_version",
|
|
)
|
|
)
|
|
|
|
mh = manifest.get("hash_algo")
|
|
if isinstance(mh, str) and mh != hash_algo:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
"verifier_manifest.hash_algo != seal.hash_algo",
|
|
path="verifier_manifest.json.hash_algo",
|
|
)
|
|
)
|
|
report["schema_versions_used"]["verifier_manifest"] = manifest.get(
|
|
"schema_version"
|
|
)
|
|
|
|
dv = manifest.get("verifier")
|
|
if isinstance(dv, dict):
|
|
report["declared_verifier"] = dv
|
|
except Exception as exc:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"failed to parse verifier_manifest.json: {exc}",
|
|
path="verifier_manifest.json",
|
|
)
|
|
)
|
|
|
|
range_obj = seal.get("range") or {}
|
|
since_seq = range_obj.get("since_seq")
|
|
until_seq = range_obj.get("until_seq")
|
|
if not isinstance(since_seq, int) or not isinstance(until_seq, int):
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
"seal.range.since_seq/until_seq missing or invalid",
|
|
path="seal.json.range",
|
|
)
|
|
)
|
|
since_seq = 0
|
|
until_seq = -1
|
|
|
|
report["covered_seq_range"] = {"since_seq": since_seq, "until_seq": until_seq}
|
|
|
|
events: list[dict] = []
|
|
if receipts_path is not None:
|
|
if receipts_path.stat().st_size > max_file_bytes:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_OVERSIZE_INPUT,
|
|
f"receipts file exceeds max size ({receipts_path.stat().st_size} > {max_file_bytes} bytes)",
|
|
path=receipts_path.name,
|
|
)
|
|
)
|
|
else:
|
|
event_schema = _load_schema("event.schema.json")
|
|
last_good_seq: int | None = None
|
|
last_good_line_no: int | None = None
|
|
byte_offset = 0
|
|
with receipts_path.open("rb") as f:
|
|
for line_no, raw in enumerate(f, start=1):
|
|
line_start = byte_offset
|
|
byte_offset += len(raw)
|
|
if not raw.strip():
|
|
continue
|
|
try:
|
|
text = raw.decode("utf-8").strip()
|
|
except UnicodeDecodeError as exc:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"{receipts_path.name}:{line_no}: utf-8 decode error ({exc})",
|
|
path=receipts_path.name,
|
|
)
|
|
)
|
|
report["corruption_findings"].append(
|
|
{
|
|
"file": receipts_path.name,
|
|
"line_no": line_no,
|
|
"byte_offset": line_start,
|
|
"last_good_seq": last_good_seq,
|
|
"last_good_line_no": last_good_line_no,
|
|
"error": f"utf-8 decode error ({exc})",
|
|
"recommended_recovery": [
|
|
"Verify an older seal bundle that predates this range.",
|
|
"Restore receipts from WORM/immutable storage if available.",
|
|
"Compare integrity.json digests to an out-of-band copy.",
|
|
],
|
|
}
|
|
)
|
|
break
|
|
try:
|
|
ev = json.loads(text)
|
|
except Exception as exc:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"{receipts_path.name}:{line_no}: invalid JSON ({exc})",
|
|
path=receipts_path.name,
|
|
)
|
|
)
|
|
report["corruption_findings"].append(
|
|
{
|
|
"file": receipts_path.name,
|
|
"line_no": line_no,
|
|
"byte_offset": line_start,
|
|
"last_good_seq": last_good_seq,
|
|
"last_good_line_no": last_good_line_no,
|
|
"error": f"invalid JSON ({exc})",
|
|
"recommended_recovery": [
|
|
"Verify an older seal bundle that predates this range.",
|
|
"Restore receipts from WORM/immutable storage if available.",
|
|
"Compare integrity.json digests to an out-of-band copy.",
|
|
],
|
|
}
|
|
)
|
|
break
|
|
|
|
if not isinstance(ev, dict):
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"{receipts_path.name}:{line_no}: expected JSON object",
|
|
path=receipts_path.name,
|
|
)
|
|
)
|
|
report["corruption_findings"].append(
|
|
{
|
|
"file": receipts_path.name,
|
|
"line_no": line_no,
|
|
"byte_offset": line_start,
|
|
"last_good_seq": last_good_seq,
|
|
"last_good_line_no": last_good_line_no,
|
|
"error": "expected JSON object",
|
|
}
|
|
)
|
|
break
|
|
|
|
errors.extend(
|
|
_validate_schema(
|
|
ev, event_schema, path=f"{receipts_path.name}:{line_no}"
|
|
)
|
|
)
|
|
seq = ev.get("seq")
|
|
if isinstance(seq, int):
|
|
last_good_seq = seq
|
|
last_good_line_no = line_no
|
|
events.append(ev)
|
|
|
|
if events:
|
|
# Deterministic ordering: sort by seq, not file order.
|
|
by_seq: dict[int, list[dict]] = {}
|
|
for ev in events:
|
|
seq = ev.get("seq")
|
|
if isinstance(seq, int):
|
|
by_seq.setdefault(seq, []).append(ev)
|
|
|
|
dupes = sorted([seq for seq, lst in by_seq.items() if len(lst) > 1])
|
|
for seq in dupes:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SEQ_NON_MONOTONIC,
|
|
f"duplicate seq value: {seq}",
|
|
path=f"events.seq:{seq}",
|
|
)
|
|
)
|
|
|
|
ordered_seqs = sorted(by_seq.keys())
|
|
if ordered_seqs:
|
|
if ordered_seqs[0] != since_seq or ordered_seqs[-1] != until_seq:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_RANGE_MISMATCH,
|
|
f"event seq range mismatch: got {ordered_seqs[0]}..{ordered_seqs[-1]}, expected {since_seq}..{until_seq}",
|
|
path="seal.json.range",
|
|
)
|
|
)
|
|
|
|
expected_count = until_seq - since_seq + 1
|
|
if expected_count != len(events):
|
|
errors.append(
|
|
Finding(
|
|
ERROR_RANGE_MISMATCH,
|
|
f"receipt count mismatch: expected {expected_count}, got {len(events)}",
|
|
path=receipts_path.name if receipts_path else "receipts",
|
|
)
|
|
)
|
|
|
|
missing = [s for s in range(since_seq, until_seq + 1) if s not in by_seq]
|
|
if missing:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_RANGE_MISMATCH,
|
|
f"missing seq values in range: {missing[:20]}{'...' if len(missing) > 20 else ''}",
|
|
path="events.seq",
|
|
)
|
|
)
|
|
|
|
# Flatten events in seq order (deterministic).
|
|
events_ordered = [
|
|
by_seq[s][0] for s in range(since_seq, until_seq + 1) if s in by_seq
|
|
]
|
|
events = events_ordered
|
|
|
|
root_obj = seal.get("root") or {}
|
|
root_start = root_obj.get("start")
|
|
root_end = root_obj.get("end")
|
|
report["observed_roots"] = {"start": root_start, "end": root_end}
|
|
report["observed_end_root"] = root_end
|
|
if not isinstance(root_start, str) or not isinstance(root_end, str):
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
"seal.root.start/end missing or invalid",
|
|
path="seal.json.root",
|
|
)
|
|
)
|
|
|
|
# Event hashing, op_digest, and prev_event_hash chain verification.
|
|
computed_event_hashes: list[str] = []
|
|
leaves: list[str] = []
|
|
if events:
|
|
revoked_cap_hashes: set[str] = set()
|
|
if canonicalization_version not in SUPPORTED_CANONICALIZATION_VERSIONS:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_CANON_VERSION_UNSUPPORTED,
|
|
f"unsupported canonicalization_version: {canonicalization_version!r}",
|
|
path="seal.json.canonicalization_version",
|
|
)
|
|
)
|
|
else:
|
|
for idx, ev in enumerate(events):
|
|
seq = ev.get("seq")
|
|
if not isinstance(seq, int):
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
"event.seq missing or invalid",
|
|
path=f"events[{idx}].seq",
|
|
)
|
|
)
|
|
continue
|
|
|
|
stored_event_hash = ev.get("event_hash")
|
|
if not isinstance(stored_event_hash, str) or not stored_event_hash:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_EVENT_HASH_MISMATCH,
|
|
"event_hash missing",
|
|
path=f"events[{idx}].event_hash",
|
|
)
|
|
)
|
|
continue
|
|
|
|
ev_no_hash = dict(ev)
|
|
ev_no_hash.pop("event_hash", None)
|
|
try:
|
|
canon = _canonical_json_bytes(ev_no_hash)
|
|
except Exception as exc:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"canonicalization failed: {exc}",
|
|
path=f"events[{idx}]",
|
|
)
|
|
)
|
|
continue
|
|
|
|
try:
|
|
computed_hash = _vmhash(canon, hash_algo=hash_algo)
|
|
except Exception as exc:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"hashing failed: {exc}",
|
|
path=f"events[{idx}]",
|
|
)
|
|
)
|
|
continue
|
|
|
|
computed_event_hashes.append(computed_hash)
|
|
leaves.append(computed_hash)
|
|
|
|
if computed_hash != stored_event_hash:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_EVENT_HASH_MISMATCH,
|
|
f"event_hash mismatch: expected {stored_event_hash}, got {computed_hash}",
|
|
path=f"events[{idx}].event_hash",
|
|
)
|
|
)
|
|
|
|
prev = ev.get("prev_event_hash")
|
|
if not isinstance(prev, str) or not prev:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_CHAIN_DISCONTINUITY,
|
|
"prev_event_hash missing",
|
|
path=f"events[{idx}].prev_event_hash",
|
|
)
|
|
)
|
|
else:
|
|
if idx == 0:
|
|
if seq == 0:
|
|
if prev != "0":
|
|
errors.append(
|
|
Finding(
|
|
ERROR_CHAIN_DISCONTINUITY,
|
|
'prev_event_hash must be "0" for seq=0',
|
|
path=f"events[{idx}].prev_event_hash",
|
|
)
|
|
)
|
|
else:
|
|
finding = Finding(
|
|
WARNING_RANGE_ROOT_PARTIAL,
|
|
"first event is not seq=0; prev_event_hash cannot be verified without prior context",
|
|
path=f"events[{idx}].prev_event_hash",
|
|
)
|
|
if strict:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_CHAIN_DISCONTINUITY,
|
|
finding.message,
|
|
path=f"events[{idx}].prev_event_hash",
|
|
)
|
|
)
|
|
else:
|
|
warnings.append(finding)
|
|
else:
|
|
if prev != computed_event_hashes[idx - 1]:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_CHAIN_DISCONTINUITY,
|
|
"prev_event_hash does not match previous event_hash",
|
|
path=f"events[{idx}].prev_event_hash",
|
|
)
|
|
)
|
|
|
|
# op_digest verification (params convention: payload.params)
|
|
op = ev.get("op")
|
|
op_digest = ev.get("op_digest")
|
|
payload = (
|
|
ev.get("payload") if isinstance(ev.get("payload"), dict) else {}
|
|
)
|
|
params = payload.get("params", {})
|
|
if params is None:
|
|
params = {}
|
|
if not isinstance(op, str) or not op:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID, "op missing", path=f"events[{idx}].op"
|
|
)
|
|
)
|
|
elif not isinstance(op_digest, str) or not op_digest:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
"op_digest missing",
|
|
path=f"events[{idx}].op_digest",
|
|
)
|
|
)
|
|
elif not isinstance(params, dict):
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
"payload.params must be an object",
|
|
path=f"events[{idx}].payload.params",
|
|
)
|
|
)
|
|
else:
|
|
try:
|
|
op_obj = {"op": op, "params": params}
|
|
op_bytes = _canonical_json_bytes(op_obj)
|
|
computed_op_digest = _vmhash(op_bytes, hash_algo=hash_algo)
|
|
if computed_op_digest != op_digest:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_EVENT_HASH_MISMATCH,
|
|
f"op_digest mismatch: expected {op_digest}, got {computed_op_digest}",
|
|
path=f"events[{idx}].op_digest",
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"op_digest computation failed: {exc}",
|
|
path=f"events[{idx}].op_digest",
|
|
)
|
|
)
|
|
|
|
# Capability revocation enforcement (v1 hardening):
|
|
# If a capability is revoked, subsequent action execution MUST NOT use it.
|
|
event_type = ev.get("event_type")
|
|
cap_hash = ev.get("cap_hash")
|
|
if event_type == "cap_revoke" and isinstance(payload, dict):
|
|
revoked = None
|
|
if isinstance(payload.get("revoked_cap_hash"), str):
|
|
revoked = payload.get("revoked_cap_hash")
|
|
elif isinstance(params, dict) and isinstance(
|
|
params.get("revoked_cap_hash"), str
|
|
):
|
|
revoked = params.get("revoked_cap_hash")
|
|
if isinstance(revoked, str) and revoked:
|
|
revoked_cap_hashes.add(revoked)
|
|
|
|
if (
|
|
event_type == "action_executed"
|
|
and isinstance(cap_hash, str)
|
|
and cap_hash in revoked_cap_hashes
|
|
):
|
|
errors.append(
|
|
Finding(
|
|
ERROR_REVOKED_CAPABILITY_USED,
|
|
f"action_executed uses revoked cap_hash: {cap_hash}",
|
|
path=f"events[{idx}].cap_hash",
|
|
)
|
|
)
|
|
|
|
report["verified_ranges"] = [{"since_seq": since_seq, "until_seq": until_seq}]
|
|
|
|
# Merkle verification (only possible from genesis without additional continuation state).
|
|
if (
|
|
isinstance(since_seq, int)
|
|
and since_seq == 0
|
|
and leaves
|
|
and isinstance(root_start, str)
|
|
and isinstance(root_end, str)
|
|
):
|
|
try:
|
|
expected_start = _vmhash(b"empty", hash_algo=hash_algo)
|
|
report["computed_roots"]["expected_start"] = expected_start
|
|
if root_start != expected_start:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_ROOT_MISMATCH,
|
|
f"seal.root.start mismatch: expected {expected_start}, got {root_start}",
|
|
path="seal.json.root.start",
|
|
)
|
|
)
|
|
|
|
computed_end = _compute_merkle_root(leaves, hash_algo=hash_algo)
|
|
report["computed_roots"]["computed_end"] = computed_end
|
|
report["computed_end_root"] = computed_end
|
|
if computed_end != root_end:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_ROOT_MISMATCH,
|
|
f"seal.root.end mismatch: expected {root_end}, got {computed_end}",
|
|
path="seal.json.root.end",
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_ROOT_MISMATCH,
|
|
f"merkle verification failed: {exc}",
|
|
path="seal.json.root",
|
|
)
|
|
)
|
|
else:
|
|
if isinstance(since_seq, int) and since_seq > 0:
|
|
finding = Finding(
|
|
WARNING_RANGE_ROOT_PARTIAL,
|
|
"cannot recompute Merkle roots for since_seq>0 without a verifiable continuation state (frontier snapshot)",
|
|
path="seal.json.range",
|
|
)
|
|
if strict:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_ROOT_MISMATCH, finding.message, path="seal.json.range"
|
|
)
|
|
)
|
|
else:
|
|
warnings.append(finding)
|
|
|
|
# roots.txt parsing (self-consistency with seal.root.end)
|
|
if roots_path is not None and isinstance(root_end, str):
|
|
try:
|
|
roots = _parse_roots_txt(roots_path)
|
|
if roots:
|
|
last_seq, last_root = roots[-1]
|
|
if last_seq != until_seq:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_RANGE_MISMATCH,
|
|
f"roots.txt last seq mismatch: expected {until_seq}, got {last_seq}",
|
|
path=roots_path.name,
|
|
)
|
|
)
|
|
if last_root != root_end:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_ROOT_MISMATCH,
|
|
"roots.txt last root does not match seal.root.end",
|
|
path=roots_path.name,
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"failed to parse roots.txt: {exc}",
|
|
path=roots_path.name,
|
|
)
|
|
)
|
|
|
|
# Strict-mode trace linkage integrity checks (intent → executed/denied).
|
|
if strict and events:
|
|
try:
|
|
by_trace: dict[str, list[tuple[int, str]]] = {}
|
|
for ev in events:
|
|
trace_id = ev.get("trace_id")
|
|
event_type = ev.get("event_type")
|
|
seq = ev.get("seq")
|
|
if (
|
|
isinstance(trace_id, str)
|
|
and isinstance(event_type, str)
|
|
and isinstance(seq, int)
|
|
):
|
|
if event_type in (
|
|
"action_intent",
|
|
"action_executed",
|
|
"shadow_receipt",
|
|
):
|
|
by_trace.setdefault(trace_id, []).append((seq, event_type))
|
|
|
|
for trace_id, seq_types in sorted(by_trace.items()):
|
|
seq_types_sorted = sorted(seq_types, key=lambda t: t[0])
|
|
types = [t for _, t in seq_types_sorted]
|
|
has_intent = "action_intent" in types
|
|
has_exec = "action_executed" in types
|
|
has_shadow = "shadow_receipt" in types
|
|
|
|
if has_exec and not has_intent:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_CHAIN_DISCONTINUITY,
|
|
f"execution without prior intent for trace_id {trace_id}",
|
|
path=f"trace_id:{trace_id}",
|
|
)
|
|
)
|
|
|
|
if has_intent and not (has_exec or has_shadow):
|
|
errors.append(
|
|
Finding(
|
|
ERROR_CHAIN_DISCONTINUITY,
|
|
f"intent without executed/denied outcome for trace_id {trace_id}",
|
|
path=f"trace_id:{trace_id}",
|
|
)
|
|
)
|
|
|
|
if has_exec and has_shadow:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_CHAIN_DISCONTINUITY,
|
|
f"both action_executed and shadow_receipt present for trace_id {trace_id}",
|
|
path=f"trace_id:{trace_id}",
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
errors.append(
|
|
Finding(
|
|
ERROR_SCHEMA_INVALID,
|
|
f"trace linkage verification failed: {exc}",
|
|
path="trace_id",
|
|
)
|
|
)
|
|
|
|
_finalize_report_findings(report, errors=errors, warnings=warnings)
|
|
|
|
_write_report(report_path, report)
|
|
|
|
# Console output: PASS/FAIL + stable codes
|
|
if errors:
|
|
for e in errors:
|
|
loc = f" ({e.path})" if e.path else ""
|
|
print(f"FAIL {e.code}{loc}: {e.message}", file=sys.stderr)
|
|
for w in warnings:
|
|
loc = f" ({w.path})" if w.path else ""
|
|
print(f"WARN {w.code}{loc}: {w.message}", file=sys.stderr)
|
|
return 1
|
|
|
|
for w in warnings:
|
|
loc = f" ({w.path})" if w.path else ""
|
|
print(f"WARN {w.code}{loc}: {w.message}", file=sys.stderr)
|
|
|
|
print("PASS")
|
|
return 0
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
p = argparse.ArgumentParser(
|
|
description="Verify a VaultMesh Sentinel v1 bundle directory."
|
|
)
|
|
p.add_argument(
|
|
"--bundle", required=True, help="Path to bundle directory (contains seal.json)"
|
|
)
|
|
p.add_argument(
|
|
"--strict",
|
|
action="store_true",
|
|
help="Treat warnings and partial verifications as failures.",
|
|
)
|
|
p.add_argument(
|
|
"--report",
|
|
help="Write machine-readable verification report JSON to this path (default: verification_report.json in bundle).",
|
|
)
|
|
p.add_argument(
|
|
"--max-file-bytes",
|
|
type=int,
|
|
default=50_000_000,
|
|
help="Reject any single input file larger than this many bytes (default: 50,000,000).",
|
|
)
|
|
args = p.parse_args(argv)
|
|
|
|
bundle_dir = Path(args.bundle).expanduser().resolve()
|
|
if not bundle_dir.exists() or not bundle_dir.is_dir():
|
|
print(f"[ERROR] --bundle must be a directory: {bundle_dir}", file=sys.stderr)
|
|
return 2
|
|
|
|
report_path = (
|
|
Path(args.report).expanduser().resolve()
|
|
if args.report
|
|
else (bundle_dir / "verification_report.json")
|
|
)
|
|
|
|
try:
|
|
return verify_bundle(
|
|
bundle_dir,
|
|
strict=bool(args.strict),
|
|
report_path=report_path,
|
|
max_file_bytes=int(args.max_file_bytes),
|
|
)
|
|
except Exception as exc:
|
|
print(f"[ERROR] unexpected error: {exc}", file=sys.stderr)
|
|
return 2
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main(sys.argv[1:]))
|