Files
vm-cloudflare/observatory/metrics-exporter.py
Vault Sovereign 37a867c485 Initial commit: Cloudflare infrastructure with WAF Intelligence
- Complete Cloudflare Terraform configuration (DNS, WAF, tunnels, access)
- WAF Intelligence MCP server with threat analysis and ML classification
- GitOps automation with PR workflows and drift detection
- Observatory monitoring stack with Prometheus/Grafana
- IDE operator rules for governed development
- Security playbooks and compliance frameworks
- Autonomous remediation and state reconciliation
2025-12-16 18:31:53 +00:00

356 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Cloudflare Metrics Exporter for Prometheus
Exports Cloudflare state and invariant status as Prometheus metrics.
Usage:
python3 metrics-exporter.py --port 9100
Environment Variables:
CLOUDFLARE_API_TOKEN - API token
CLOUDFLARE_ZONE_ID - Zone ID
CLOUDFLARE_ACCOUNT_ID - Account ID
SNAPSHOT_DIR - Directory containing state snapshots
ANOMALY_DIR - Directory containing invariant reports
"""
import argparse
import glob
import json
import os
import time
from datetime import datetime, timezone
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import Any, Dict, List, Optional
import requests
# Configuration
CF_API_BASE = "https://api.cloudflare.com/client/v4"
DEFAULT_PORT = 9100
SCRAPE_INTERVAL = 60 # seconds
class CloudflareMetricsCollector:
"""Collects Cloudflare metrics for Prometheus export."""
def __init__(self, api_token: str, zone_id: str, account_id: str,
snapshot_dir: str, anomaly_dir: str):
self.api_token = api_token
self.zone_id = zone_id
self.account_id = account_id
self.snapshot_dir = snapshot_dir
self.anomaly_dir = anomaly_dir
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json"
})
self.metrics: Dict[str, Any] = {}
self.last_scrape = 0
def _cf_request(self, endpoint: str) -> Dict[str, Any]:
"""Make Cloudflare API request."""
url = f"{CF_API_BASE}{endpoint}"
response = self.session.get(url)
response.raise_for_status()
return response.json()
def _get_latest_file(self, pattern: str) -> Optional[str]:
"""Get most recent file matching pattern."""
files = glob.glob(pattern)
if not files:
return None
return max(files, key=os.path.getmtime)
def collect_dns_metrics(self):
"""Collect DNS record metrics."""
try:
data = self._cf_request(f"/zones/{self.zone_id}/dns_records?per_page=500")
records = data.get("result", [])
# Count by type
type_counts = {}
proxied_count = 0
unproxied_count = 0
for r in records:
rtype = r.get("type", "UNKNOWN")
type_counts[rtype] = type_counts.get(rtype, 0) + 1
if r.get("proxied"):
proxied_count += 1
else:
unproxied_count += 1
self.metrics["dns_records_total"] = len(records)
self.metrics["dns_records_proxied"] = proxied_count
self.metrics["dns_records_unproxied"] = unproxied_count
for rtype, count in type_counts.items():
self.metrics[f"dns_records_by_type{{type=\"{rtype}\"}}"] = count
except Exception as e:
self.metrics["dns_scrape_errors_total"] = self.metrics.get("dns_scrape_errors_total", 0) + 1
def collect_dnssec_metrics(self):
"""Collect DNSSEC status."""
try:
data = self._cf_request(f"/zones/{self.zone_id}/dnssec")
result = data.get("result", {})
status = result.get("status", "unknown")
self.metrics["dnssec_enabled"] = 1 if status == "active" else 0
except Exception:
self.metrics["dnssec_enabled"] = -1
def collect_tunnel_metrics(self):
"""Collect tunnel metrics."""
try:
data = self._cf_request(f"/accounts/{self.account_id}/cfd_tunnel")
tunnels = data.get("result", [])
active = 0
healthy = 0
total_connections = 0
for t in tunnels:
if not t.get("deleted_at"):
active += 1
# Check connections
try:
conn_data = self._cf_request(
f"/accounts/{self.account_id}/cfd_tunnel/{t['id']}/connections"
)
conns = conn_data.get("result", [])
if conns:
healthy += 1
total_connections += len(conns)
except Exception:
pass
self.metrics["tunnels_total"] = active
self.metrics["tunnels_healthy"] = healthy
self.metrics["tunnels_unhealthy"] = active - healthy
self.metrics["tunnel_connections_total"] = total_connections
except Exception:
self.metrics["tunnel_scrape_errors_total"] = self.metrics.get("tunnel_scrape_errors_total", 0) + 1
def collect_access_metrics(self):
"""Collect Access app metrics."""
try:
data = self._cf_request(f"/accounts/{self.account_id}/access/apps")
apps = data.get("result", [])
self.metrics["access_apps_total"] = len(apps)
# Count by type
type_counts = {}
for app in apps:
app_type = app.get("type", "unknown")
type_counts[app_type] = type_counts.get(app_type, 0) + 1
for app_type, count in type_counts.items():
self.metrics[f"access_apps_by_type{{type=\"{app_type}\"}}"] = count
except Exception:
self.metrics["access_scrape_errors_total"] = self.metrics.get("access_scrape_errors_total", 0) + 1
def collect_zone_settings_metrics(self):
"""Collect zone security settings."""
try:
data = self._cf_request(f"/zones/{self.zone_id}/settings")
settings = {s["id"]: s["value"] for s in data.get("result", [])}
# TLS settings
ssl = settings.get("ssl", "unknown")
self.metrics["zone_ssl_strict"] = 1 if ssl in ("strict", "full_strict") else 0
min_tls = settings.get("min_tls_version", "unknown")
self.metrics["zone_tls_version_secure"] = 1 if min_tls in ("1.2", "1.3") else 0
# Security features
self.metrics["zone_always_https"] = 1 if settings.get("always_use_https") == "on" else 0
self.metrics["zone_browser_check"] = 1 if settings.get("browser_check") == "on" else 0
except Exception:
pass
def collect_snapshot_metrics(self):
"""Collect metrics from state snapshots."""
latest = self._get_latest_file(os.path.join(self.snapshot_dir, "cloudflare-*.json"))
if not latest:
self.metrics["snapshot_age_seconds"] = -1
return
try:
mtime = os.path.getmtime(latest)
age = time.time() - mtime
self.metrics["snapshot_age_seconds"] = int(age)
with open(latest) as f:
snapshot = json.load(f)
integrity = snapshot.get("integrity", {})
self.metrics["snapshot_merkle_root_set"] = 1 if integrity.get("merkle_root") else 0
except Exception:
self.metrics["snapshot_age_seconds"] = -1
def collect_invariant_metrics(self):
"""Collect metrics from invariant reports."""
latest = self._get_latest_file(os.path.join(self.anomaly_dir, "invariant-report-*.json"))
if not latest:
self.metrics["invariants_total"] = 0
self.metrics["invariants_passed"] = 0
self.metrics["invariants_failed"] = 0
return
try:
with open(latest) as f:
report = json.load(f)
summary = report.get("summary", {})
self.metrics["invariants_total"] = summary.get("total", 0)
self.metrics["invariants_passed"] = summary.get("passed", 0)
self.metrics["invariants_failed"] = summary.get("failed", 0)
self.metrics["invariants_pass_rate"] = summary.get("pass_rate", 0)
# Report age
mtime = os.path.getmtime(latest)
self.metrics["invariant_report_age_seconds"] = int(time.time() - mtime)
except Exception:
pass
def collect_anomaly_metrics(self):
"""Count anomaly receipts."""
anomaly_files = glob.glob(os.path.join(self.anomaly_dir, "anomaly-*.json"))
self.metrics["anomalies_total"] = len(anomaly_files)
# Recent anomalies (last 24h)
recent = 0
day_ago = time.time() - 86400
for f in anomaly_files:
if os.path.getmtime(f) > day_ago:
recent += 1
self.metrics["anomalies_last_24h"] = recent
def collect_all(self):
"""Collect all metrics."""
now = time.time()
if now - self.last_scrape < SCRAPE_INTERVAL:
return # Rate limit
self.last_scrape = now
self.metrics = {"scrape_timestamp": int(now)}
self.collect_dns_metrics()
self.collect_dnssec_metrics()
self.collect_tunnel_metrics()
self.collect_access_metrics()
self.collect_zone_settings_metrics()
self.collect_snapshot_metrics()
self.collect_invariant_metrics()
self.collect_anomaly_metrics()
def format_prometheus(self) -> str:
"""Format metrics as Prometheus exposition format."""
lines = [
"# HELP cloudflare_dns_records_total Total DNS records",
"# TYPE cloudflare_dns_records_total gauge",
"# HELP cloudflare_tunnels_total Total active tunnels",
"# TYPE cloudflare_tunnels_total gauge",
"# HELP cloudflare_tunnels_healthy Healthy tunnels with connections",
"# TYPE cloudflare_tunnels_healthy gauge",
"# HELP cloudflare_invariants_passed Invariants passing",
"# TYPE cloudflare_invariants_passed gauge",
"# HELP cloudflare_invariants_failed Invariants failing",
"# TYPE cloudflare_invariants_failed gauge",
"",
]
for key, value in self.metrics.items():
if isinstance(value, (int, float)):
# Handle labels in key
if "{" in key:
lines.append(f"cloudflare_{key} {value}")
else:
lines.append(f"cloudflare_{key} {value}")
return "\n".join(lines)
class MetricsHandler(BaseHTTPRequestHandler):
"""HTTP handler for Prometheus scrapes."""
collector: CloudflareMetricsCollector = None
def do_GET(self):
if self.path == "/metrics":
self.collector.collect_all()
output = self.collector.format_prometheus()
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.end_headers()
self.wfile.write(output.encode())
elif self.path == "/health":
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
pass # Suppress default logging
def main():
parser = argparse.ArgumentParser(description="Cloudflare Metrics Exporter")
parser.add_argument("--port", type=int, default=DEFAULT_PORT,
help=f"Port to listen on (default: {DEFAULT_PORT})")
parser.add_argument("--zone-id", default=os.environ.get("CLOUDFLARE_ZONE_ID"))
parser.add_argument("--account-id", default=os.environ.get("CLOUDFLARE_ACCOUNT_ID"))
parser.add_argument("--snapshot-dir",
default=os.environ.get("SNAPSHOT_DIR", "../snapshots"))
parser.add_argument("--anomaly-dir",
default=os.environ.get("ANOMALY_DIR", "../anomalies"))
args = parser.parse_args()
api_token = os.environ.get("CLOUDFLARE_API_TOKEN")
if not api_token:
print("Error: CLOUDFLARE_API_TOKEN required")
return 1
if not args.zone_id or not args.account_id:
print("Error: Zone ID and Account ID required")
return 1
# Initialize collector
collector = CloudflareMetricsCollector(
api_token, args.zone_id, args.account_id,
args.snapshot_dir, args.anomaly_dir
)
MetricsHandler.collector = collector
# Start server
server = HTTPServer(("0.0.0.0", args.port), MetricsHandler)
print(f"Cloudflare Metrics Exporter listening on :{args.port}")
print(f" /metrics - Prometheus metrics")
print(f" /health - Health check")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nShutting down...")
server.shutdown()
return 0
if __name__ == "__main__":
exit(main())