#!/usr/bin/env python3 """ Drift Guardian — Real-Time Cloudflare Drift Detection Pure technical (D1) Purpose: • Poll Cloudflare state at short intervals • Compare live state → latest snapshot → invariants • Detect unauthorized modifications • Trigger remediation (optional hook) • Emit VaultMesh anomaly receipts The Guardian = fast, reactive layer. The Remediator = corrective, authoritative layer. The Reconciler = canonical truth layer. """ import os import json import time import hashlib import requests from datetime import datetime, timezone CF_API = "https://api.cloudflare.com/client/v4" CF_TOKEN = os.getenv("CF_API_TOKEN") CF_ACCOUNT = os.getenv("CF_ACCOUNT_ID") STATE_ROOT = os.getenv("VM_STATE_ROOT", "./cloudflare_state") SNAP_DIR = f"{STATE_ROOT}/snapshots" RECEIPT_DIR = f"{STATE_ROOT}/receipts" ANOM_DIR = f"{STATE_ROOT}/anomalies" HEADERS = { "Authorization": f"Bearer {CF_TOKEN}", "Content-Type": "application/json", } os.makedirs(RECEIPT_DIR, exist_ok=True) os.makedirs(ANOM_DIR, exist_ok=True) # ----------------------------- # Helpers # ----------------------------- def cf(endpoint): r = requests.get(f"{CF_API}{endpoint}", headers=HEADERS) r.raise_for_status() return r.json().get("result", {}) def load_latest_snapshot(): snaps = sorted(os.listdir(SNAP_DIR)) if not snaps: return None latest = snaps[-1] with open(f"{SNAP_DIR}/{latest}") as f: return json.load(f) def emit_anomaly(event_type, details): ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") anomaly = {"ts": ts, "event_type": event_type, "details": details} h = hashlib.sha256(json.dumps(anomaly, sort_keys=True).encode()).hexdigest() file_path = f"{ANOM_DIR}/drift-{ts}-{h[:8]}.json" with open(file_path, "w") as f: json.dump(anomaly, f, indent=2) print(f"[GUARDIAN] Drift detected → {file_path}") return file_path # ----------------------------- # Drift Detection Logic # ----------------------------- def detect_dns_drift(snapshot): anomalies = [] zones_live = cf("/zones") # index snapshot zones by name snap_zones = {z["name"]: z for z in snapshot.get("zones", [])} for z in zones_live: name = z["name"] zid = z["id"] if name not in snap_zones: anomalies.append({"type": "zone_added", "zone": name}) continue # DNS record diff live_recs = cf(f"/zones/{zid}/dns_records") snap_recs = snapshot.get("dns", {}).get(name, []) live_set = {(r["type"], r["name"], r.get("content")) for r in live_recs} snap_set = {(r["type"], r["name"], r.get("content")) for r in snap_recs} added = live_set - snap_set removed = snap_set - live_set if added: anomalies.append({"type": "dns_added", "zone": name, "records": list(added)}) if removed: anomalies.append({"type": "dns_removed", "zone": name, "records": list(removed)}) return anomalies def detect_waf_drift(snapshot): anomalies = [] zones_live = cf("/zones") snap_waf = snapshot.get("waf", {}) for z in zones_live: zname = z["name"] zid = z["id"] live_pkgs = cf(f"/zones/{zid}/firewall/waf/packages") snap_pkgs = snap_waf.get(zname, []) live_names = {p.get("name") for p in live_pkgs} snap_names = {p.get("name") for p in snap_pkgs} if live_names != snap_names: anomalies.append({ "type": "waf_ruleset_drift", "zone": zname, "expected": list(snap_names), "found": list(live_names) }) return anomalies def detect_access_drift(snapshot): anomalies = [] live_apps = cf(f"/accounts/{CF_ACCOUNT}/access/apps") snap_apps = snapshot.get("access_apps", []) live_set = {(a.get("name"), a.get("type")) for a in live_apps} snap_set = {(a.get("name"), a.get("type")) for a in snap_apps} if live_set != snap_set: anomalies.append({ "type": "access_app_drift", "expected": list(snap_set), "found": list(live_set) }) return anomalies def detect_tunnel_drift(snapshot): anomalies = [] live = cf(f"/accounts/{CF_ACCOUNT}/cfd_tunnel") snap = snapshot.get("tunnels", []) live_ids = {t.get("id") for t in live} snap_ids = {t.get("id") for t in snap} if live_ids != snap_ids: anomalies.append({ "type": "tunnel_id_drift", "expected": list(snap_ids), "found": list(live_ids) }) # health drift for t in live: if t.get("status") not in ("active", "healthy"): anomalies.append({"type": "tunnel_unhealthy", "tunnel": t}) return anomalies # ----------------------------- # Main Guardian Loop # ----------------------------- def main(): print("[GUARDIAN] Drift Guardian active…") while True: snapshot = load_latest_snapshot() if not snapshot: print("[GUARDIAN] No snapshot found — run state-reconciler first.") time.sleep(60) continue anomalies = [] anomalies += detect_dns_drift(snapshot) anomalies += detect_waf_drift(snapshot) anomalies += detect_access_drift(snapshot) anomalies += detect_tunnel_drift(snapshot) if anomalies: for a in anomalies: emit_anomaly(a.get("type"), a) else: print("[GUARDIAN] No drift detected.") time.sleep(120) # check every 2 minutes if __name__ == "__main__": main()