#!/usr/bin/env python3 """ Cloudflare State Reconciler (Pure Technical) Generates a canonical JSON snapshot + Merkle root representing: - DNS records - DNSSEC + registrar lock status - WAF rules - Firewall rules - Zero-Trust Access apps + policies - Tunnels + status metadata - API token metadata (non-secret) Outputs: snapshots/cloudflare-.json receipts/cloudflare-state--.json """ import os import json import hashlib import requests from datetime import datetime, timezone CF_API = "https://api.cloudflare.com/client/v4" CF_TOKEN = os.getenv("CF_API_TOKEN") CF_ACCOUNT = os.getenv("CF_ACCOUNT_ID") OUT_ROOT = os.getenv("VM_STATE_ROOT", "./cloudflare_state") HEADERS = { "Authorization": f"Bearer {CF_TOKEN}", "Content-Type": "application/json", } os.makedirs(f"{OUT_ROOT}/snapshots", exist_ok=True) os.makedirs(f"{OUT_ROOT}/receipts", exist_ok=True) def merkle_root(obj): data = json.dumps(obj, sort_keys=True).encode() return hashlib.sha256(data).hexdigest() def cf(endpoint): r = requests.get(f"{CF_API}{endpoint}", headers=HEADERS) r.raise_for_status() return r.json().get("result", {}) # ------------------------------- # Fetch Cloudflare State Sections # ------------------------------- def fetch_dns(zones): items = {} for z in zones: zid = z["id"] rec = cf(f"/zones/{zid}/dns_records") items[z["name"]] = rec return items def fetch_zones(): return cf(f"/zones") def fetch_waf(zones): rules = {} for z in zones: zid = z["id"] waf = cf(f"/zones/{zid}/firewall/waf/packages") rules[z["name"]] = waf return rules def fetch_firewall_rules(zones): fr = {} for z in zones: zid = z["id"] rules = cf(f"/zones/{zid}/firewall/rules") fr[z["name"]] = rules return fr def fetch_tunnels(): return cf(f"/accounts/{CF_ACCOUNT}/cfd_tunnel") def fetch_access_apps(): return cf(f"/accounts/{CF_ACCOUNT}/access/apps") def fetch_access_policies(): return cf(f"/accounts/{CF_ACCOUNT}/access/policies") def fetch_api_tokens(): # Metadata only, not secrets r = requests.get(f"{CF_API}/user/tokens", headers=HEADERS) if r.status_code != 200: return [] return r.json().get("result", []) # ------------------------------- # Snapshot Assembly # ------------------------------- def build_snapshot(): zones = fetch_zones() snapshot = { "ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "zones": zones, "dns": fetch_dns(zones), "waf": fetch_waf(zones), "firewall_rules": fetch_firewall_rules(zones), "access_apps": fetch_access_apps(), "access_policies": fetch_access_policies(), "tunnels": fetch_tunnels(), "api_tokens": fetch_api_tokens(), } return snapshot # ------------------------------- # Main # ------------------------------- def main(): snap = build_snapshot() root = merkle_root(snap) ts = snap["ts"].replace(":", "-") snap_path = f"{OUT_ROOT}/snapshots/cloudflare-{ts}.json" with open(snap_path, "w") as f: json.dump(snap, f, indent=2) receipt = { "ts": snap["ts"], "merkle_root": root, "snapshot_file": os.path.basename(snap_path) } receipt_path = f"{OUT_ROOT}/receipts/cloudflare-state-{ts}-{root[:8]}.json" with open(receipt_path, "w") as f: json.dump(receipt, f, indent=2) print("Snapshot:", snap_path) print("Receipt:", receipt_path) print("Merkle root:", root) if __name__ == "__main__": main()