from __future__ import annotations import argparse import hashlib import json import os import sqlite3 import uuid from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Iterable from .pattern_store import ( normalize_query_for_matching, pattern_dict, write_pattern_snapshot, ) THIS_FILE = Path(__file__).resolve() LAYER0_DIR = THIS_FILE.parent REPO_ROOT = LAYER0_DIR.parent.parent def _utc_now_iso_z() -> str: return ( datetime.now(timezone.utc) .replace(microsecond=0) .isoformat() .replace("+00:00", "Z") ) def _default_db_path() -> Path: for key in ("LEDGER_DB_PATH", "VAULTMESH_LEDGER_DB"): v = (os.environ.get(key) or "").strip() if v: return Path(v).expanduser().resolve() return (REPO_ROOT / ".state" / "ledger.sqlite").resolve() def _default_candidate_path() -> Path: return (REPO_ROOT / ".state" / "layer0_patterns_candidate.json").resolve() def _read_jsonl(paths: Iterable[Path]) -> list[dict[str, Any]]: events: list[dict[str, Any]] = [] for path in paths: if not path.exists(): continue for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue try: obj = json.loads(line) except Exception: continue if isinstance(obj, dict): events.append(obj) return events def _telemetry_actor(event: dict[str, Any]) -> str | None: v = event.get("actor") or event.get("user") or event.get("account") if isinstance(v, str) and v.strip(): return v.strip() meta = event.get("metadata") if isinstance(meta, dict): v2 = meta.get("actor") or meta.get("account") if isinstance(v2, str) and v2.strip(): return v2.strip() return None def _telemetry_trace_id(event: dict[str, Any]) -> str | None: for k in ("trace_id", "layer0_trace_id", "trace", "id"): v = event.get(k) if isinstance(v, str) and v.strip(): return v.strip() return None def _telemetry_ts(event: dict[str, Any]) -> str | None: for k in ("timestamp", "ts", "time"): v = event.get(k) if isinstance(v, str) and v.strip(): return v.strip() return None def _telemetry_query(event: dict[str, Any]) -> str: v = event.get("query") or event.get("prompt") or event.get("input") if isinstance(v, str): return v meta = event.get("metadata") if isinstance(meta, dict) and isinstance(meta.get("query"), str): return str(meta.get("query")) return "" def _outcome(event: dict[str, Any]) -> str | None: v = event.get("outcome") or event.get("result") or event.get("status") if isinstance(v, str) and v.strip(): return v.strip() return None def _layer0_classification(event: dict[str, Any]) -> str | None: v = event.get("layer0_classification") or event.get("classification") if isinstance(v, str) and v.strip(): return v.strip() return None def _infer_target_from_event( event: dict[str, Any], *, include_relaxations: bool ) -> tuple[str, str] | None: """ Returns (mode, classification) or None. mode: - "escalate": adds/strengthens detection immediately - "relax": can reduce severity only after replay + explicit approval """ outcome = (_outcome(event) or "").lower() l0 = (_layer0_classification(event) or "").lower() # Ground-truth blocked downstream: L0 should tighten. if outcome in { "blocked_by_guardrails", "blocked_by_policy", "blocked", "denied", } and l0 in {"blessed", "ambiguous"}: return ("escalate", "forbidden") if ( outcome in {"fail_closed", "catastrophic", "blocked_catastrophic"} and l0 != "catastrophic" ): return ("escalate", "catastrophic") # Preboot logs (already blocked) can still be used to learn more specific signatures. if not outcome and l0 in {"forbidden", "catastrophic"}: return ("escalate", l0) # False positives: relax only after replay + approval. if include_relaxations and outcome in {"success", "ok"} and l0 in {"forbidden"}: return ("relax", "blessed") return None def _default_risk_score(classification: str) -> int: if classification == "catastrophic": return 5 if classification == "forbidden": return 3 if classification == "ambiguous": return 1 return 0 @dataclass class _Bucket: traces: set[str] actors: set[str] last_seen: str | None def _ensure_ledger_schema(conn: sqlite3.Connection) -> None: conn.execute( """ CREATE TABLE IF NOT EXISTS migrations ( id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, applied_at TEXT NOT NULL DEFAULT (datetime('now')) ); """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS proof_artifacts ( id TEXT PRIMARY KEY, ts TEXT NOT NULL DEFAULT (datetime('now')), kind TEXT NOT NULL, path TEXT, sha256_hex TEXT, blake3_hex TEXT, size_bytes INTEGER, meta_json TEXT, trace_id TEXT ); """ ) def _log_artifact( *, kind: str, path: Path | None, meta: dict[str, Any], trace_id: str | None, db_path: Path, ) -> str: try: from ledger.db import log_proof_artifact # type: ignore return log_proof_artifact( kind=kind, path=path, meta=meta, trace_id=trace_id, db_path=db_path, ) except Exception: pass artifact_id = str(uuid.uuid4()) rel_path: str | None = None sha256_hex: str | None = None size_bytes: int | None = None if path is not None: try: rel_path = str(path.resolve().relative_to(REPO_ROOT)) except Exception: rel_path = str(path) if path.exists() and path.is_file(): data = path.read_bytes() sha256_hex = hashlib.sha256(data).hexdigest() size_bytes = len(data) db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(db_path), isolation_level=None) try: _ensure_ledger_schema(conn) conn.execute( """ INSERT INTO proof_artifacts ( id, ts, kind, path, sha256_hex, blake3_hex, size_bytes, meta_json, trace_id ) VALUES (?, ?, ?, ?, ?, NULL, ?, ?, ?); """, ( artifact_id, _utc_now_iso_z(), kind, rel_path, sha256_hex, size_bytes, json.dumps(meta, ensure_ascii=False, sort_keys=True), trace_id, ), ) finally: conn.close() return artifact_id def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( description="Layer0: build candidate patterns from telemetry." ) parser.add_argument( "--telemetry-jsonl", action="append", default=[], help="Path to telemetry JSONL (repeatable). Defaults include anomalies/preboot_shield.jsonl if present.", ) parser.add_argument("--min-support", type=int, default=3) parser.add_argument("--min-actors", type=int, default=2) parser.add_argument("--max-tokens", type=int, default=8) parser.add_argument( "--include-relaxations", action="store_true", help="Generate relaxation candidates (still requires replay + explicit promotion).", ) parser.add_argument("--out", type=str, default=str(_default_candidate_path())) parser.add_argument("--db", type=str, default=None) args = parser.parse_args(argv) paths: list[Path] = [] for p in args.telemetry_jsonl: if p: paths.append(Path(p).expanduser()) default_preboot = REPO_ROOT / "anomalies" / "preboot_shield.jsonl" if default_preboot.exists() and default_preboot not in paths: paths.append(default_preboot) events = _read_jsonl(paths) buckets: dict[tuple[str, str, tuple[str, ...]], _Bucket] = {} for ev in events: inferred = _infer_target_from_event( ev, include_relaxations=bool(args.include_relaxations) ) if not inferred: continue mode, target = inferred norm = normalize_query_for_matching(_telemetry_query(ev)) tokens = norm.split() if len(tokens) < 2: continue if args.max_tokens and len(tokens) > args.max_tokens: tokens = tokens[: int(args.max_tokens)] key = (mode, target, tuple(tokens)) b = buckets.get(key) if b is None: b = _Bucket(traces=set(), actors=set(), last_seen=None) buckets[key] = b trace = _telemetry_trace_id(ev) if trace: b.traces.add(trace) actor = _telemetry_actor(ev) if actor: b.actors.add(actor) ts = _telemetry_ts(ev) if ts and (b.last_seen is None or ts > b.last_seen): b.last_seen = ts patterns: list[dict[str, Any]] = [] for (mode, target, tokens), bucket in buckets.items(): support = len(bucket.traces) if bucket.traces else 0 actors = len(bucket.actors) if support < int(args.min_support): continue if actors and actors < int(args.min_actors): continue patterns.append( pattern_dict( tokens_all=tokens, classification=target, reason="telemetry_learned", risk_score=_default_risk_score(target), flags=["telemetry_learned"], min_support=support, last_seen=bucket.last_seen, source={"support_traces": support, "support_actors": actors}, mode=mode, pattern_id=str(uuid.uuid4()), ) ) # Deterministic ordering: most severe, then most specific/support. severity_rank = { "blessed": 0, "ambiguous": 1, "forbidden": 2, "catastrophic": 3, } patterns.sort( key=lambda p: ( severity_rank.get(p["classification"], 0), int(p.get("specificity_score") or 0), int(p.get("min_support") or 0), str(p.get("last_seen") or ""), ), reverse=True, ) out_path = Path(args.out).expanduser().resolve() write_pattern_snapshot(out_path, patterns) db_path = Path(args.db).expanduser().resolve() if args.db else _default_db_path() artifact_id = _log_artifact( kind="shadow_pattern_candidate", path=out_path, meta={ "patterns": len(patterns), "min_support": int(args.min_support), "min_actors": int(args.min_actors), "inputs": [str(p) for p in paths], }, trace_id=None, db_path=db_path, ) print(f"Wrote {len(patterns)} candidate patterns to {out_path}") print(f"Logged artifact {artifact_id} to {db_path}") return 0 if __name__ == "__main__": raise SystemExit(main())