Files
vm-cloudflare/layer0/preboot_logger.py
Vault Sovereign f0b8d962de
Some checks failed
WAF Intelligence Guardrail / waf-intel (push) Waiting to run
Cloudflare Registry Validation / validate-registry (push) Has been cancelled
chore: pre-migration snapshot
Layer0, MCP servers, Terraform consolidation
2025-12-27 01:52:27 +00:00

154 lines
5.1 KiB
Python

import datetime
import hashlib
import json
import os
import re
import sqlite3
from typing import Optional
from .pattern_store import normalize_query_for_matching
from .shadow_classifier import Classification, ShadowEvalResult
class PrebootLogger:
LOG_PATH = "anomalies/preboot_shield.jsonl"
@staticmethod
def _ledger_db_path() -> str | None:
return os.getenv("VAULTMESH_LEDGER_DB") or os.getenv("LEDGER_DB_PATH")
@staticmethod
def _normalize_for_shadow_receipt(query: str) -> str:
"""
Poison-resistant normalizer for ShadowReceipt emission.
Goals:
- Normalize casing/whitespace
- Replace common secret/identifier carriers with placeholders
- Keep output stable and compact
"""
s = (query or "").lower().strip()
s = re.sub(r"\s+", " ", s)
s = re.sub(r"\bhttps?://\S+\b", "<URL>", s)
s = re.sub(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", "<IP>", s)
s = re.sub(
r"\b[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}\b",
"<HEX>",
s,
flags=re.IGNORECASE,
)
s = re.sub(r"(?:(?:\.\.?/)|/|~\/)[A-Za-z0-9._~/-]{2,}", "<PATH>", s)
s = re.sub(r"\b[0-9a-f]{16,}\b", "<HEX>", s, flags=re.IGNORECASE)
s = re.sub(r"\b\d+\b", "<N>", s)
return s.strip()
@staticmethod
def _sha256_hex(text: str) -> str:
return hashlib.sha256(text.encode("utf-8", errors="ignore")).hexdigest()
@staticmethod
def _try_emit_shadow_receipt(
*,
query: str,
classification: str,
reason: str | None,
flags: list[str],
trace_id: str | None,
) -> None:
"""
Best-effort ShadowReceipt emission into the local-first SQLite ledger.
Hard constraints:
- No dependency on vaultmesh-orgine-mobile code
- Fail silently on any error (Layer 0 must never crash)
"""
db_path = PrebootLogger._ledger_db_path()
if not db_path:
return
try:
norm = PrebootLogger._normalize_for_shadow_receipt(query)
cf_hash = PrebootLogger._sha256_hex(norm)
placeholders: list[str] = []
for p in ("<URL>", "<IP>", "<PATH>", "<HEX>", "<N>"):
if p in norm:
placeholders.append(p)
meta = {
"ts_utc": datetime.datetime.now(datetime.timezone.utc)
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z"),
"classification": classification,
"reason": reason,
"flags": (flags or [])[:64],
"normalized_query_features": {
"placeholders": placeholders,
"length": len(norm),
},
}
conn = sqlite3.connect(db_path, timeout=0.25)
try:
conn.execute("PRAGMA foreign_keys=ON;")
conn.execute(
"""
INSERT INTO shadow_receipts (
id, horizon_id, counterfactual_hash, entropy_delta,
reason_unrealized, observer_signature, trace_id, meta_json
)
VALUES (?, ?, ?, NULL, ?, NULL, ?, ?);
""",
(
PrebootLogger._sha256_hex(
meta["ts_utc"] + "|" + (trace_id or "") + "|" + cf_hash
),
"layer0_block",
cf_hash,
"layer0_block",
trace_id,
json.dumps(meta, separators=(",", ":"), ensure_ascii=False),
),
)
conn.commit()
finally:
conn.close()
except Exception:
return
@staticmethod
def log(event: ShadowEvalResult, query: str, reason_override: Optional[str] = None):
if event.classification not in (
Classification.CATASTROPHIC,
Classification.FORBIDDEN,
):
return # Only violations get logged
record = {
"timestamp": datetime.datetime.utcnow().isoformat() + "Z",
# Store a normalized, low-leakage representation (never raw strings).
"query": normalize_query_for_matching(query),
"classification": event.classification.value,
"reason": reason_override or event.reason,
"trace_id": event.trace_id,
"metadata": {
"risk_score": event.risk_score,
"flags": event.flags,
"source": "layer0",
},
}
os.makedirs(os.path.dirname(PrebootLogger.LOG_PATH), exist_ok=True)
with open(PrebootLogger.LOG_PATH, "a", encoding="utf-8") as f:
f.write(json.dumps(record) + "\n")
PrebootLogger._try_emit_shadow_receipt(
query=query,
classification=event.classification.value,
reason=reason_override or event.reason,
flags=event.flags,
trace_id=event.trace_id,
)