Files
vm-cloudflare/generate_capability_registry.py
Vault Sovereign f0b8d962de
Some checks failed
WAF Intelligence Guardrail / waf-intel (push) Waiting to run
Cloudflare Registry Validation / validate-registry (push) Has been cancelled
chore: pre-migration snapshot
Layer0, MCP servers, Terraform consolidation
2025-12-27 01:52:27 +00:00

351 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Cloudflare Control Plane Capability Registry Generator
Generates a machine-readable registry of all MCP server capabilities,
Terraform resources, and operational tools for auditability and documentation.
"""
import json
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Any
# Registry structure
CAPABILITY_REGISTRY = {
"metadata": {
"generated_at": datetime.now(timezone.utc).isoformat(),
"version": "1.0.0",
"scope": "Cloudflare Control Plane",
},
"mcp_servers": {},
"terraform_resources": {},
"gitops_tools": {},
"security_framework": {},
"operational_tools": {},
}
# MCP Server capabilities (from analysis)
MCP_CAPABILITIES = {
"cloudflare_safe": {
"module": "cloudflare.mcp.cloudflare_safe",
"entrypoint": "cloudflare.mcp.cloudflare_safe",
"purpose": "Secure Cloudflare API operations",
"tools": [
"cf_snapshot (read/write token required)",
"cf_refresh (write token required)",
"cf_config_diff (read; requires snapshot_id)",
"cf_export_config (read)",
"cf_tunnel_status (read)",
"cf_tunnel_ingress_summary (read)",
"cf_access_policy_list (read)"
],
"auth_env": ["CLOUDFLARE_API_TOKEN", "CLOUDFLARE_ACCOUNT_ID"],
"side_effects": "read-only unless token present; cf_refresh/cf_snapshot are mutating",
"outputs": ["json", "terraform_hcl"],
"capabilities": [
"dns_record_management",
"waf_rule_configuration",
"tunnel_health_monitoring",
"zone_analytics_query",
"terraform_state_synchronization"
],
"security": {
"token_redaction": True,
"error_handling": True,
"rate_limiting": True
}
},
"waf_intelligence": {
"module": "cloudflare.mcp.waf_intelligence",
"entrypoint": "cloudflare.mcp.waf_intelligence.mcp_server",
"purpose": "WAF rule analysis and synthesis",
"tools": [
"waf_capabilities (read)",
"waf_analyze (read)",
"waf_assess (read)",
"waf_generate_gitops_proposals (propose)"
],
"auth_env": [],
"side_effects": "propose-only; generates GitOps proposals",
"outputs": ["json", "terraform_hcl", "gitops_mr"],
"capabilities": [
"waf_config_analysis",
"threat_intelligence_integration",
"compliance_mapping",
"rule_gap_identification",
"terraform_ready_rule_generation"
],
"intelligence": {
"ml_classification": True,
"threat_intel": True,
"compliance_frameworks": ["PCI-DSS 6.6", "OWASP-ASVS 13"]
}
},
"oracle_answer": {
"module": "cloudflare.mcp.oracle_answer",
"entrypoint": "cloudflare.mcp.oracle_answer",
"purpose": "Security decision support",
"tools": ["oracle_answer (read)"],
"auth_env": [],
"side_effects": "read-only; security classification only",
"outputs": ["json", "security_classification"],
"capabilities": [
"security_classification",
"routing_decision_support",
"threat_assessment",
"pre_execution_screening"
],
"integration": {
"layer0_framework": True,
"shadow_classifier": True,
"preboot_logging": True
}
}
}
},
},
"waf_intelligence": {
"module": "cloudflare.mcp.waf_intelligence",
"purpose": "WAF rule analysis and synthesis",
"capabilities": [
"waf_config_analysis",
"threat_intelligence_integration",
"compliance_mapping",
"rule_gap_identification",
"terraform_ready_rule_generation",
],
"intelligence": {
"ml_classification": True,
"threat_intel": True,
"compliance_frameworks": ["PCI-DSS 6.6", "OWASP-ASVS 13"],
},
},
"oracle_answer": {
"module": "cloudflare.mcp.oracle_answer",
"purpose": "Security decision support",
"capabilities": [
"security_classification",
"routing_decision_support",
"threat_assessment",
"pre_execution_screening",
],
"integration": {
"layer0_framework": True,
"shadow_classifier": True,
"preboot_logging": True,
},
},
}
# Terraform resources (from analysis)
TERRAFORM_RESOURCES = {
"dns_management": {
"files": ["dns.tf"],
"resources": ["cloudflare_record", "cloudflare_zone"],
"capabilities": [
"automated_dns_provisioning",
"spf_dmarc_mx_configuration",
"tunnel_based_routing",
"proxied_record_management",
],
},
"waf_security": {
"files": ["waf.tf"],
"resources": ["cloudflare_ruleset", "cloudflare_bot_management"],
"capabilities": [
"custom_waf_rules",
"managed_ruleset_integration",
"bot_management",
"rate_limiting",
"country_blocking",
],
},
"tunnel_infrastructure": {
"files": ["tunnels.tf"],
"resources": ["cloudflare_tunnel", "cloudflare_tunnel_config"],
"capabilities": [
"multi_service_tunnel_routing",
"ingress_rule_management",
"health_monitoring",
"credential_rotation",
],
},
}
# GitOps tools
GITOPS_TOOLS = {
"waf_rule_proposer": {
"file": "gitops/waf_rule_proposer.py",
"purpose": "Automated WAF rule generation",
"capabilities": [
"threat_intel_driven_rules",
"gitlab_ci_integration",
"automated_mr_creation",
"compliance_mapping",
],
},
"invariant_checker": {
"file": "scripts/invariant_checker_py.py",
"purpose": "Real-time state validation",
"capabilities": [
"dns_integrity_checks",
"waf_compliance_validation",
"tunnel_health_monitoring",
"drift_detection",
],
},
"drift_guardian": {
"file": "scripts/drift_guardian_py.py",
"purpose": "Automated remediation",
"capabilities": [
"state_reconciliation",
"auto_remediation",
"ops_notification",
],
},
}
# Security framework
SECURITY_FRAMEWORK = {
"layer0": {
"components": ["entrypoint.py", "shadow_classifier.py", "preboot_logger.py"],
"capabilities": [
"pre_execution_security_classification",
"threat_assessment",
"security_event_logging",
"routing_decision_support",
],
"classification_levels": ["catastrophic", "forbidden", "ambiguous", "blessed"],
}
}
# Operational tools
OPERATIONAL_TOOLS = {
"systemd_services": {
"services": ["autonomous-remediator", "drift-guardian", "tunnel-rotation"],
"capabilities": [
"continuous_monitoring",
"automated_remediation",
"scheduled_operations",
],
},
"test_suites": {
"suites": ["layer0_validation", "mcp_integration", "cloudflare_safe_ingress"],
"capabilities": [
"security_classification_testing",
"mcp_server_validation",
"api_integration_testing",
],
},
}
def generate_registry():
"""Generate the complete capability registry."""
CAPABILITY_REGISTRY["mcp_servers"] = MCP_CAPABILITIES
CAPABILITY_REGISTRY["terraform_resources"] = TERRAFORM_RESOURCES
CAPABILITY_REGISTRY["gitops_tools"] = GITOPS_TOOLS
CAPABILITY_REGISTRY["security_framework"] = SECURITY_FRAMEWORK
CAPABILITY_REGISTRY["operational_tools"] = OPERATIONAL_TOOLS
return CAPABILITY_REGISTRY
def save_registry_formats():
"""Save registry in multiple formats for different use cases."""
registry = generate_registry()
# JSON format (machine-readable)
with open("capability_registry.json", "w") as f:
json.dump(registry, f, indent=2)
# Markdown format (documentation)
markdown_content = generate_markdown_doc(registry)
with open("CAPABILITY_REGISTRY.md", "w") as f:
f.write(markdown_content)
print("✅ Capability registry generated:")
print(" - capability_registry.json (machine-readable)")
print(" - CAPABILITY_REGISTRY.md (documentation)")
def generate_markdown_doc(registry: Dict[str, Any]) -> str:
"""Generate Markdown documentation from registry."""
md = f"""# Cloudflare Control Plane Capability Registry
Generated: {registry["metadata"]["generated_at"]}
Version: {registry["metadata"]["version"]}
## MCP Servers
"""
for server_name, server_info in registry["mcp_servers"].items():
md += f"### {server_name}\n"
md += f"**Module**: `{server_info['module']}` \n"
md += f"**Purpose**: {server_info['purpose']} \n\n"
md += "**Capabilities**:\n"
for cap in server_info["capabilities"]:
md += f"- {cap}\n"
md += "\n"
md += "## Terraform Resources\n\n"
for resource_name, resource_info in registry["terraform_resources"].items():
md += f"### {resource_name}\n"
md += f"**Files**: {', '.join(resource_info['files'])} \n\n"
md += "**Capabilities**:\n"
for cap in resource_info["capabilities"]:
md += f"- {cap}\n"
md += "\n"
md += "## GitOps Tools\n\n"
for tool_name, tool_info in registry["gitops_tools"].items():
md += f"### {tool_name}\n"
md += f"**File**: {tool_info['file']} \n"
md += f"**Purpose**: {tool_info['purpose']} \n\n"
md += "**Capabilities**:\n"
for cap in tool_info["capabilities"]:
md += f"- {cap}\n"
md += "\n"
md += "## Security Framework\n\n"
for framework_name, framework_info in registry["security_framework"].items():
md += f"### {framework_name}\n"
md += f"**Components**: {', '.join(framework_info['components'])} \n\n"
md += "**Capabilities**:\n"
for cap in framework_info["capabilities"]:
md += f"- {cap}\n"
md += "\n"
md += "**Classification Levels**:\n"
for level in framework_info["classification_levels"]:
md += f"- {level}\n"
md += "\n"
md += "## Operational Tools\n\n"
for tool_category, tool_info in registry["operational_tools"].items():
md += f"### {tool_category}\n"
if "services" in tool_info:
md += f"**Services**: {', '.join(tool_info['services'])} \n\n"
elif "suites" in tool_info:
md += f"**Test Suites**: {', '.join(tool_info['suites'])} \n\n"
md += "**Capabilities**:\n"
for cap in tool_info["capabilities"]:
md += f"- {cap}\n"
md += "\n"
return md
if __name__ == "__main__":
save_registry_formats()