#!/usr/bin/env python3 from __future__ import annotations import argparse import json import shutil import subprocess import sys from dataclasses import dataclass from pathlib import Path @dataclass(frozen=True) class TestExpect: exit: int failure_code: str | None violated_contract_ids: list[str] error_path_contains: str | None = None @dataclass(frozen=True) class TestCase: id: str name: str category: str fixture: str strict: bool max_file_bytes: int | None mode: str expect: TestExpect def _repo_root_from(suite_dir: Path) -> Path: return suite_dir.parent.resolve() def _load_manifest(path: Path) -> dict: raw = path.read_text(encoding="utf-8").strip() if not raw.startswith("{"): raise ValueError( "manifest.yaml is expected to be JSON (YAML 1.2 compatible) to avoid external YAML deps" ) return json.loads(raw) def _import_sentinel_codes(repo_root: Path) -> tuple[set[str], set[str]]: tools_dir = repo_root / "tools" sys.path.insert(0, str(tools_dir)) from sentinel_failure_codes import FailureCode, WarningCode # type: ignore return ({c.value for c in FailureCode}, {c.value for c in WarningCode}) def _parse_tests(manifest: dict) -> list[TestCase]: tests: list[TestCase] = [] for t in manifest.get("tests") or []: expect = t.get("expect") or {} tests.append( TestCase( id=str(t["id"]), name=str(t.get("name") or t["id"]), category=str(t.get("category") or "uncategorized"), fixture=str(t["fixture"]), strict=bool((t.get("verifier") or {}).get("strict", True)), max_file_bytes=( int((t.get("verifier") or {}).get("max_file_bytes")) if (t.get("verifier") or {}).get("max_file_bytes") is not None else None ), mode=str(t.get("mode") or "verify_fixture"), expect=TestExpect( exit=int(expect["exit"]), failure_code=( None if expect.get("failure_code") is None else str(expect["failure_code"]) ), violated_contract_ids=list( expect.get("violated_contract_ids") or [] ), error_path_contains=( None if expect.get("error_path_contains") is None else str(expect["error_path_contains"]) ), ), ) ) return tests def _run_verifier( *, repo_root: Path, bundle_dir: Path, report_path: Path, strict: bool, max_file_bytes: int | None, ) -> tuple[int, str, str]: verifier = repo_root / "tools" / "vm_verify_sentinel_bundle.py" cmd = [sys.executable, str(verifier), "--bundle", str(bundle_dir)] if strict: cmd.append("--strict") cmd += ["--report", str(report_path)] if max_file_bytes is not None: cmd += ["--max-file-bytes", str(max_file_bytes)] proc = subprocess.run(cmd, capture_output=True, text=True) return proc.returncode, proc.stdout, proc.stderr def _read_json(path: Path) -> dict: return json.loads(path.read_text(encoding="utf-8")) def _write_text(path: Path, text: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(text, encoding="utf-8") def _stable_json(obj: object) -> str: return json.dumps(obj, indent=2, sort_keys=True, ensure_ascii=False) + "\n" def main(argv: list[str]) -> int: p = argparse.ArgumentParser() default_manifest = (Path(__file__).resolve().parent / "manifest.yaml").as_posix() p.add_argument( "--manifest", default=default_manifest, help="Path to manifest.yaml (JSON content). Defaults to manifest.yaml beside this script.", ) p.add_argument( "--list", action="store_true", help="List tests and expected outcomes (no execution).", ) args = p.parse_args(argv) manifest_path = Path(args.manifest).resolve() suite_dir = manifest_path.parent.resolve() repo_root = _repo_root_from(suite_dir) out_dir = suite_dir / "out" reports_dir = out_dir / "sentinel_reports" stdio_dir = out_dir / "sentinel_stdio" clean_room_dir = out_dir / "clean_room" records_dir = out_dir / "tests" reports_dir.mkdir(parents=True, exist_ok=True) stdio_dir.mkdir(parents=True, exist_ok=True) records_dir.mkdir(parents=True, exist_ok=True) manifest = _load_manifest(manifest_path) tests = _parse_tests(manifest) tests = sorted(tests, key=lambda t: t.id) if args.list: lines: list[str] = [] suite = manifest.get("suite") or {} lines.append( f"Suite: {suite.get('name', 'MERIDIAN_V1_CONFORMANCE_TEST_SUITE')}" ) lines.append(f"Version: {suite.get('version', '')}".rstrip()) lines.append(f"Tests: {len(tests)}") lines.append("") for t in tests: exp = t.expect lines.append( f"{t.id} | expect.exit={exp.exit} failure_code={exp.failure_code} violated={exp.violated_contract_ids} | fixture={t.fixture} | strict={t.strict}" ) print("\n".join(lines)) return 0 if not tests: out_dir = suite_dir / "out" out_dir.mkdir(parents=True, exist_ok=True) (out_dir / "meridian_v1_conformance_report.txt").write_text( "MERIDIAN v1 Conformance Suite\n\n[FAIL] manifest.yaml contains zero tests\n", encoding="utf-8", ) return 2 known_failures, known_warnings = _import_sentinel_codes(repo_root) suite_results: list[dict] = [] failures: list[str] = [] for t in tests: fixture_src = (suite_dir / t.fixture).resolve() if not fixture_src.exists(): failures.append(f"{t.id}: missing fixture path {t.fixture}") continue if t.mode == "verify_fixture": bundle_dir = fixture_src elif t.mode == "verify_clean_copy": target = (clean_room_dir / t.id).resolve() if target.exists(): shutil.rmtree(target) shutil.copytree(fixture_src, target) bundle_dir = target else: failures.append(f"{t.id}: unknown mode {t.mode!r}") continue report_path = (reports_dir / f"{t.id}.verification_report.json").resolve() code, stdout, stderr = _run_verifier( repo_root=repo_root, bundle_dir=bundle_dir, report_path=report_path, strict=t.strict, max_file_bytes=t.max_file_bytes, ) stdout_path = stdio_dir / f"{t.id}.stdout.txt" stderr_path = stdio_dir / f"{t.id}.stderr.txt" _write_text(stdout_path, stdout) _write_text(stderr_path, stderr) if not report_path.exists(): failures.append(f"{t.id}: verifier did not write report: {report_path}") continue report = _read_json(report_path) observed_failure_code = report.get("failure_code") observed_ok = bool(report.get("ok")) observed_errors = report.get("errors") or [] observed_warnings = report.get("warnings") or [] observed_warned_contract_ids = report.get("warned_contract_ids") or [] # Basic invariants: only known Sentinel codes appear. if ( observed_failure_code is not None and observed_failure_code not in known_failures ): failures.append( f"{t.id}: unknown failure_code in report: {observed_failure_code}" ) for w in report.get("warnings") or []: wc = w.get("code") if isinstance(wc, str) and wc not in known_warnings: failures.append(f"{t.id}: unknown warning code in report: {wc}") violated = report.get("violated_contract_ids") or [] # Expectations ok = True messages: list[str] = [] def fail(msg: str) -> None: nonlocal ok ok = False messages.append(msg) failures.append(f"{t.id}: {msg}") if code != t.expect.exit: fail(f"exit={code} expected={t.expect.exit}") if observed_failure_code != t.expect.failure_code: fail( f"failure_code={observed_failure_code!r} expected={t.expect.failure_code!r}" ) expected_contracts = set(t.expect.violated_contract_ids) if not expected_contracts.issubset(set(violated)): fail( f"violated_contract_ids missing expected entries: {sorted(expected_contracts - set(violated))}" ) if t.expect.error_path_contains: found = False for e in report.get("errors") or []: path = e.get("path") or "" if t.expect.error_path_contains in str(path): found = True break if not found: fail( f"expected an error path containing {t.expect.error_path_contains!r}" ) # PASS hygiene: strict PASS implies no findings. if t.expect.exit == 0 and t.expect.failure_code is None: if not observed_ok: fail("expected ok:true in verification_report.json") if observed_errors: fail("expected errors:[] in verification_report.json") if observed_warnings: fail("expected warnings:[] in verification_report.json") if violated: fail("expected violated_contract_ids:[] in verification_report.json") if observed_warned_contract_ids: fail("expected warned_contract_ids:[] in verification_report.json") record_obj = { "format": "meridian-v1-test-record-v1", "id": t.id, "name": t.name, "category": t.category, "fixture": t.fixture, "mode": t.mode, "strict": t.strict, "max_file_bytes": t.max_file_bytes, "expected": { "exit": t.expect.exit, "failure_code": t.expect.failure_code, "violated_contract_ids": t.expect.violated_contract_ids, "error_path_contains": t.expect.error_path_contains, }, "observed": { "exit": code, "failure_code": observed_failure_code, "violated_contract_ids": violated, "warned_contract_ids": observed_warned_contract_ids, }, "artifacts": { "bundle_dir": str(bundle_dir.relative_to(suite_dir)), "verification_report_json": str(report_path.relative_to(suite_dir)), "stdout_txt": str(stdout_path.relative_to(suite_dir)), "stderr_txt": str(stderr_path.relative_to(suite_dir)), }, "ok": ok, "failure_messages": messages, } (records_dir / f"{t.id}.record.json").write_text( _stable_json(record_obj), encoding="utf-8" ) suite_results.append( { "id": t.id, "name": t.name, "category": t.category, "fixture": t.fixture, "mode": t.mode, "strict": t.strict, "max_file_bytes": t.max_file_bytes, "expected": { "exit": t.expect.exit, "failure_code": t.expect.failure_code, "violated_contract_ids": t.expect.violated_contract_ids, }, "observed": { "exit": code, "failure_code": observed_failure_code, "violated_contract_ids": violated, }, "artifacts": record_obj["artifacts"], "ok": ok, } ) report_obj = { "format": "meridian-v1-conformance-report-v1", "suite": manifest.get("suite") or {}, "counts": { "tests": len(tests), "passed": sum(1 for r in suite_results if r.get("ok") is True), "failed": sum(1 for r in suite_results if r.get("ok") is False), }, "results": suite_results, "failures": failures, } out_json = out_dir / "meridian_v1_conformance_report.json" out_txt = out_dir / "meridian_v1_conformance_report.txt" out_json.write_text(_stable_json(report_obj), encoding="utf-8") # Insurer/auditor-friendly canonical names. (out_dir / "report.json").write_text(_stable_json(report_obj), encoding="utf-8") lines: list[str] = [] lines.append("MERIDIAN v1 Conformance Suite") lines.append(f"Tests: {report_obj['counts']['tests']}") lines.append(f"Passed: {report_obj['counts']['passed']}") lines.append(f"Failed: {report_obj['counts']['failed']}") if failures: lines.append("") lines.append("Failures:") for f in failures: lines.append(f"- {f}") summary_text = "\n".join(lines) + "\n" out_txt.write_text(summary_text, encoding="utf-8") (out_dir / "report.txt").write_text(summary_text, encoding="utf-8") if failures: return 1 return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))