from __future__ import annotations import argparse import hashlib import json import os import sqlite3 import uuid from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Iterable from .pattern_store import PatternStore, write_pattern_snapshot from .shadow_classifier import Classification, ShadowClassifier THIS_FILE = Path(__file__).resolve() LAYER0_DIR = THIS_FILE.parent REPO_ROOT = LAYER0_DIR.parent.parent def _utc_now_iso_z() -> str: return ( datetime.now(timezone.utc) .replace(microsecond=0) .isoformat() .replace("+00:00", "Z") ) def _default_db_path() -> Path: for key in ("LEDGER_DB_PATH", "VAULTMESH_LEDGER_DB"): v = (os.environ.get(key) or "").strip() if v: return Path(v).expanduser().resolve() return (REPO_ROOT / ".state" / "ledger.sqlite").resolve() def _read_jsonl(paths: Iterable[Path], *, limit: int | None) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for path in paths: if not path.exists(): continue for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue try: obj = json.loads(line) except Exception: continue if isinstance(obj, dict): rows.append(obj) if limit is not None and limit > 0 and len(rows) > limit: return rows[-limit:] return rows def _telemetry_query(event: dict[str, Any]) -> str: v = event.get("query") or event.get("prompt") or event.get("input") return v if isinstance(v, str) else "" def _outcome(event: dict[str, Any]) -> str | None: v = event.get("outcome") or event.get("result") or event.get("status") if isinstance(v, str) and v.strip(): return v.strip() return None def _ground_truth(event: dict[str, Any]) -> Classification | None: outcome = (_outcome(event) or "").lower() if outcome in {"success", "ok"}: return Classification.BLESSED if outcome in {"blocked_by_guardrails", "blocked_by_policy", "blocked", "denied"}: return Classification.FORBIDDEN if outcome in {"fail_closed", "catastrophic", "blocked_catastrophic"}: return Classification.CATASTROPHIC return None def _ensure_ledger_schema(conn: sqlite3.Connection) -> None: conn.execute( """ CREATE TABLE IF NOT EXISTS migrations ( id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, applied_at TEXT NOT NULL DEFAULT (datetime('now')) ); """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS proof_artifacts ( id TEXT PRIMARY KEY, ts TEXT NOT NULL DEFAULT (datetime('now')), kind TEXT NOT NULL, path TEXT, sha256_hex TEXT, blake3_hex TEXT, size_bytes INTEGER, meta_json TEXT, trace_id TEXT ); """ ) def _log_artifact( *, kind: str, path: Path | None, meta: dict[str, Any], trace_id: str | None, db_path: Path, ) -> str: try: from ledger.db import log_proof_artifact # type: ignore return log_proof_artifact( kind=kind, path=path, meta=meta, trace_id=trace_id, db_path=db_path, ) except Exception: pass artifact_id = str(uuid.uuid4()) rel_path: str | None = None sha256_hex: str | None = None size_bytes: int | None = None if path is not None: try: rel_path = str(path.resolve().relative_to(REPO_ROOT)) except Exception: rel_path = str(path) if path.exists() and path.is_file(): data = path.read_bytes() sha256_hex = hashlib.sha256(data).hexdigest() size_bytes = len(data) db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(db_path), isolation_level=None) try: _ensure_ledger_schema(conn) conn.execute( """ INSERT INTO proof_artifacts ( id, ts, kind, path, sha256_hex, blake3_hex, size_bytes, meta_json, trace_id ) VALUES (?, ?, ?, ?, ?, NULL, ?, ?, ?); """, ( artifact_id, _utc_now_iso_z(), kind, rel_path, sha256_hex, size_bytes, json.dumps(meta, ensure_ascii=False, sort_keys=True), trace_id, ), ) finally: conn.close() return artifact_id def _load_patterns_file(path: Path) -> list[dict[str, Any]]: if not path.exists(): return [] data = json.loads(path.read_text(encoding="utf-8")) items = data.get("patterns") if isinstance(data, dict) else data return items if isinstance(items, list) else [] def _merge_patterns( active: list[dict[str, Any]], extra: list[dict[str, Any]] ) -> list[dict[str, Any]]: """ Candidate patterns win on identical (mode, tokens_all, classification). """ def key(p: dict[str, Any]) -> tuple[str, tuple[str, ...], str]: mode = str(p.get("mode") or "escalate") cls = str(p.get("classification") or "") tokens = p.get("tokens_all") or p.get("tokens") or [] if not isinstance(tokens, list): tokens = [] return (mode, tuple(str(t).lower() for t in tokens), cls) merged: dict[tuple[str, tuple[str, ...], str], dict[str, Any]] = {} for p in active: if isinstance(p, dict): merged[key(p)] = p for p in extra: if isinstance(p, dict): merged[key(p)] = p return list(merged.values()) @dataclass class ReplayMetrics: total: int baseline_false_pos: int baseline_false_neg: int candidate_false_pos: int candidate_false_neg: int catastrophic_boundary_unchanged: bool def _is_false_positive(pred: Classification, truth: Classification) -> bool: return truth == Classification.BLESSED and pred in { Classification.FORBIDDEN, Classification.CATASTROPHIC, } def _is_false_negative(pred: Classification, truth: Classification) -> bool: return truth in { Classification.FORBIDDEN, Classification.CATASTROPHIC, } and pred in { Classification.BLESSED, Classification.AMBIGUOUS, } def _compute_metrics( events: list[dict[str, Any]], baseline: ShadowClassifier, candidate: ShadowClassifier, ) -> ReplayMetrics: total = 0 b_fp = b_fn = 0 c_fp = c_fn = 0 catastrophic_ok = True for ev in events: truth = _ground_truth(ev) if truth is None: continue q = _telemetry_query(ev) total += 1 b = baseline.classify(q).classification c = candidate.classify(q).classification if _is_false_positive(b, truth): b_fp += 1 if _is_false_negative(b, truth): b_fn += 1 if _is_false_positive(c, truth): c_fp += 1 if _is_false_negative(c, truth): c_fn += 1 if b == Classification.CATASTROPHIC and c != Classification.CATASTROPHIC: catastrophic_ok = False return ReplayMetrics( total=total, baseline_false_pos=b_fp, baseline_false_neg=b_fn, candidate_false_pos=c_fp, candidate_false_neg=c_fn, catastrophic_boundary_unchanged=catastrophic_ok, ) def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( description="Layer0: replay candidate patterns against recent telemetry." ) parser.add_argument( "--candidate", required=True, help="Candidate snapshot JSON (from layer0.learn).", ) parser.add_argument( "--telemetry-jsonl", action="append", default=[], help="Path to telemetry JSONL (repeatable). Must include outcome=success|blocked_by_guardrails|... for scoring.", ) parser.add_argument("--limit", type=int, default=2000) parser.add_argument( "--active", type=str, default=None, help="Active patterns snapshot (defaults to .state).", ) parser.add_argument("--db", type=str, default=None) parser.add_argument("--report-out", type=str, default=None) parser.add_argument( "--promote", action="store_true", help="If replay passes, write active snapshot update.", ) parser.add_argument( "--allow-relaxations", action="store_true", help="Allow promotion of relaxation-mode patterns (requires replay pass).", ) parser.add_argument("--max-fp-increase", type=int, default=0) args = parser.parse_args(argv) telemetry_paths = [Path(p).expanduser() for p in args.telemetry_jsonl if p] if not telemetry_paths: default_preboot = REPO_ROOT / "anomalies" / "preboot_shield.jsonl" if default_preboot.exists(): telemetry_paths = [default_preboot] events = _read_jsonl(telemetry_paths, limit=int(args.limit)) active_path = ( Path(args.active).expanduser().resolve() if args.active else PatternStore().active_path ) active_patterns = _load_patterns_file(active_path) candidate_path = Path(args.candidate).expanduser().resolve() candidate_patterns_all = _load_patterns_file(candidate_path) candidate_patterns = [ p for p in candidate_patterns_all if isinstance(p, dict) and (args.allow_relaxations or str(p.get("mode") or "escalate") != "relax") ] baseline_classifier = ShadowClassifier( pattern_store=PatternStore(active_path=active_path) ) merged = _merge_patterns(active_patterns, candidate_patterns) merged_path = ( REPO_ROOT / ".state" / "layer0_patterns_merged_replay.json" ).resolve() write_pattern_snapshot(merged_path, merged) candidate_classifier = ShadowClassifier( pattern_store=PatternStore(active_path=merged_path) ) metrics = _compute_metrics(events, baseline_classifier, candidate_classifier) passes = ( metrics.catastrophic_boundary_unchanged and metrics.candidate_false_pos <= metrics.baseline_false_pos + int(args.max_fp_increase) and metrics.candidate_false_neg <= metrics.baseline_false_neg ) report = { "generated_at": _utc_now_iso_z(), "telemetry_inputs": [str(p) for p in telemetry_paths], "candidate_snapshot": str(candidate_path), "active_snapshot": str(active_path), "merged_snapshot": str(merged_path), "allow_relaxations": bool(args.allow_relaxations), "max_fp_increase": int(args.max_fp_increase), "metrics": { "total_scored": metrics.total, "baseline_false_positives": metrics.baseline_false_pos, "baseline_false_negatives": metrics.baseline_false_neg, "candidate_false_positives": metrics.candidate_false_pos, "candidate_false_negatives": metrics.candidate_false_neg, "catastrophic_boundary_unchanged": metrics.catastrophic_boundary_unchanged, }, "passes": passes, "promotion": { "requested": bool(args.promote), "performed": False, "active_written_to": str(active_path), "patterns_added": len(candidate_patterns), }, } report_out = ( Path(args.report_out).expanduser().resolve() if args.report_out else (REPO_ROOT / ".state" / "layer0_shadow_replay_report.json").resolve() ) report_out.parent.mkdir(parents=True, exist_ok=True) report_out.write_text( json.dumps(report, ensure_ascii=False, sort_keys=True, indent=2) + "\n", encoding="utf-8", ) db_path = Path(args.db).expanduser().resolve() if args.db else _default_db_path() report_artifact_id = _log_artifact( kind="shadow_replay_report", path=report_out, meta={ "passes": passes, "total_scored": metrics.total, "baseline_fp": metrics.baseline_false_pos, "baseline_fn": metrics.baseline_false_neg, "candidate_fp": metrics.candidate_false_pos, "candidate_fn": metrics.candidate_false_neg, }, trace_id=None, db_path=db_path, ) if args.promote and passes: # Promotion = merged active snapshot (existing + candidates), written atomically. tmp_path = active_path.with_suffix(active_path.suffix + ".tmp") write_pattern_snapshot(tmp_path, merged) tmp_path.replace(active_path) promo_artifact_id = _log_artifact( kind="shadow_pattern_promotion", path=active_path, meta={ "added": len(candidate_patterns), "source_candidate": str(candidate_path), "merged_snapshot": str(merged_path), }, trace_id=None, db_path=db_path, ) report["promotion"]["performed"] = True report["promotion"]["artifact_id"] = promo_artifact_id report_out.write_text( json.dumps(report, ensure_ascii=False, sort_keys=True, indent=2) + "\n", encoding="utf-8", ) print(f"Replay report: {report_out} (passes={passes})") print(f"Logged artifact {report_artifact_id} to {db_path}") if args.promote: print( f"Promotion {'performed' if (args.promote and passes) else 'skipped'}; active={active_path}" ) return 0 if passes else 2 if __name__ == "__main__": raise SystemExit(main())