import datetime import hashlib import json import os import re import sqlite3 from typing import Optional from .pattern_store import normalize_query_for_matching from .shadow_classifier import Classification, ShadowEvalResult class PrebootLogger: LOG_PATH = "anomalies/preboot_shield.jsonl" @staticmethod def _ledger_db_path() -> str | None: return os.getenv("VAULTMESH_LEDGER_DB") or os.getenv("LEDGER_DB_PATH") @staticmethod def _normalize_for_shadow_receipt(query: str) -> str: """ Poison-resistant normalizer for ShadowReceipt emission. Goals: - Normalize casing/whitespace - Replace common secret/identifier carriers with placeholders - Keep output stable and compact """ s = (query or "").lower().strip() s = re.sub(r"\s+", " ", s) s = re.sub(r"\bhttps?://\S+\b", "", s) s = re.sub(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", "", s) s = re.sub( r"\b[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}\b", "", s, flags=re.IGNORECASE, ) s = re.sub(r"(?:(?:\.\.?/)|/|~\/)[A-Za-z0-9._~/-]{2,}", "", s) s = re.sub(r"\b[0-9a-f]{16,}\b", "", s, flags=re.IGNORECASE) s = re.sub(r"\b\d+\b", "", s) return s.strip() @staticmethod def _sha256_hex(text: str) -> str: return hashlib.sha256(text.encode("utf-8", errors="ignore")).hexdigest() @staticmethod def _try_emit_shadow_receipt( *, query: str, classification: str, reason: str | None, flags: list[str], trace_id: str | None, ) -> None: """ Best-effort ShadowReceipt emission into the local-first SQLite ledger. Hard constraints: - No dependency on vaultmesh-orgine-mobile code - Fail silently on any error (Layer 0 must never crash) """ db_path = PrebootLogger._ledger_db_path() if not db_path: return try: norm = PrebootLogger._normalize_for_shadow_receipt(query) cf_hash = PrebootLogger._sha256_hex(norm) placeholders: list[str] = [] for p in ("", "", "", "", ""): if p in norm: placeholders.append(p) meta = { "ts_utc": datetime.datetime.now(datetime.timezone.utc) .replace(microsecond=0) .isoformat() .replace("+00:00", "Z"), "classification": classification, "reason": reason, "flags": (flags or [])[:64], "normalized_query_features": { "placeholders": placeholders, "length": len(norm), }, } conn = sqlite3.connect(db_path, timeout=0.25) try: conn.execute("PRAGMA foreign_keys=ON;") conn.execute( """ INSERT INTO shadow_receipts ( id, horizon_id, counterfactual_hash, entropy_delta, reason_unrealized, observer_signature, trace_id, meta_json ) VALUES (?, ?, ?, NULL, ?, NULL, ?, ?); """, ( PrebootLogger._sha256_hex( meta["ts_utc"] + "|" + (trace_id or "") + "|" + cf_hash ), "layer0_block", cf_hash, "layer0_block", trace_id, json.dumps(meta, separators=(",", ":"), ensure_ascii=False), ), ) conn.commit() finally: conn.close() except Exception: return @staticmethod def log(event: ShadowEvalResult, query: str, reason_override: Optional[str] = None): if event.classification not in ( Classification.CATASTROPHIC, Classification.FORBIDDEN, ): return # Only violations get logged record = { "timestamp": datetime.datetime.utcnow().isoformat() + "Z", # Store a normalized, low-leakage representation (never raw strings). "query": normalize_query_for_matching(query), "classification": event.classification.value, "reason": reason_override or event.reason, "trace_id": event.trace_id, "metadata": { "risk_score": event.risk_score, "flags": event.flags, "source": "layer0", }, } os.makedirs(os.path.dirname(PrebootLogger.LOG_PATH), exist_ok=True) with open(PrebootLogger.LOG_PATH, "a", encoding="utf-8") as f: f.write(json.dumps(record) + "\n") PrebootLogger._try_emit_shadow_receipt( query=query, classification=event.classification.value, reason=reason_override or event.reason, flags=event.flags, trace_id=event.trace_id, )