Files
vm-cloudflare/scripts/state_reconciler_py.py
Vault Sovereign 37a867c485 Initial commit: Cloudflare infrastructure with WAF Intelligence
- Complete Cloudflare Terraform configuration (DNS, WAF, tunnels, access)
- WAF Intelligence MCP server with threat analysis and ML classification
- GitOps automation with PR workflows and drift detection
- Observatory monitoring stack with Prometheus/Grafana
- IDE operator rules for governed development
- Security playbooks and compliance frameworks
- Autonomous remediation and state reconciliation
2025-12-16 18:31:53 +00:00

156 lines
3.5 KiB
Python

#!/usr/bin/env python3
"""
Cloudflare State Reconciler (Pure Technical)
Generates a canonical JSON snapshot + Merkle root representing:
- DNS records
- DNSSEC + registrar lock status
- WAF rules
- Firewall rules
- Zero-Trust Access apps + policies
- Tunnels + status metadata
- API token metadata (non-secret)
Outputs:
snapshots/cloudflare-<ts>.json
receipts/cloudflare-state-<ts>-<merkle>.json
"""
import os
import json
import hashlib
import requests
from datetime import datetime, timezone
CF_API = "https://api.cloudflare.com/client/v4"
CF_TOKEN = os.getenv("CF_API_TOKEN")
CF_ACCOUNT = os.getenv("CF_ACCOUNT_ID")
OUT_ROOT = os.getenv("VM_STATE_ROOT", "./cloudflare_state")
HEADERS = {
"Authorization": f"Bearer {CF_TOKEN}",
"Content-Type": "application/json",
}
os.makedirs(f"{OUT_ROOT}/snapshots", exist_ok=True)
os.makedirs(f"{OUT_ROOT}/receipts", exist_ok=True)
def merkle_root(obj):
data = json.dumps(obj, sort_keys=True).encode()
return hashlib.sha256(data).hexdigest()
def cf(endpoint):
r = requests.get(f"{CF_API}{endpoint}", headers=HEADERS)
r.raise_for_status()
return r.json().get("result", {})
# -------------------------------
# Fetch Cloudflare State Sections
# -------------------------------
def fetch_dns(zones):
items = {}
for z in zones:
zid = z["id"]
rec = cf(f"/zones/{zid}/dns_records")
items[z["name"]] = rec
return items
def fetch_zones():
return cf(f"/zones")
def fetch_waf(zones):
rules = {}
for z in zones:
zid = z["id"]
waf = cf(f"/zones/{zid}/firewall/waf/packages")
rules[z["name"]] = waf
return rules
def fetch_firewall_rules(zones):
fr = {}
for z in zones:
zid = z["id"]
rules = cf(f"/zones/{zid}/firewall/rules")
fr[z["name"]] = rules
return fr
def fetch_tunnels():
return cf(f"/accounts/{CF_ACCOUNT}/cfd_tunnel")
def fetch_access_apps():
return cf(f"/accounts/{CF_ACCOUNT}/access/apps")
def fetch_access_policies():
return cf(f"/accounts/{CF_ACCOUNT}/access/policies")
def fetch_api_tokens():
# Metadata only, not secrets
r = requests.get(f"{CF_API}/user/tokens", headers=HEADERS)
if r.status_code != 200:
return []
return r.json().get("result", [])
# -------------------------------
# Snapshot Assembly
# -------------------------------
def build_snapshot():
zones = fetch_zones()
snapshot = {
"ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"zones": zones,
"dns": fetch_dns(zones),
"waf": fetch_waf(zones),
"firewall_rules": fetch_firewall_rules(zones),
"access_apps": fetch_access_apps(),
"access_policies": fetch_access_policies(),
"tunnels": fetch_tunnels(),
"api_tokens": fetch_api_tokens(),
}
return snapshot
# -------------------------------
# Main
# -------------------------------
def main():
snap = build_snapshot()
root = merkle_root(snap)
ts = snap["ts"].replace(":", "-")
snap_path = f"{OUT_ROOT}/snapshots/cloudflare-{ts}.json"
with open(snap_path, "w") as f:
json.dump(snap, f, indent=2)
receipt = {
"ts": snap["ts"],
"merkle_root": root,
"snapshot_file": os.path.basename(snap_path)
}
receipt_path = f"{OUT_ROOT}/receipts/cloudflare-state-{ts}-{root[:8]}.json"
with open(receipt_path, "w") as f:
json.dump(receipt, f, indent=2)
print("Snapshot:", snap_path)
print("Receipt:", receipt_path)
print("Merkle root:", root)
if __name__ == "__main__":
main()