#!/usr/bin/env python3 """ Drift Remediation PR Bot for Cloudflare GitOps Phase 6 - PR Workflows Creates Merge Requests when Terraform drift is detected. Can be triggered by: - Alertmanager webhooks - Scheduled CI jobs - Manual invocation """ import json import os import subprocess import sys import textwrap from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional try: import requests import yaml except ImportError: print("ERROR: pip install requests pyyaml", file=sys.stderr) sys.exit(1) HERE = Path(__file__).resolve().parent CONFIG_PATH = HERE / "config.yml" def load_config() -> Dict[str, Any]: """Load gitops configuration with env expansion""" with open(CONFIG_PATH) as f: config = yaml.safe_load(f) def expand_env(obj): if isinstance(obj, str): if obj.startswith("${") and "}" in obj: # Handle ${VAR:-default} syntax inner = obj[2:obj.index("}")] default = None var = inner if ":-" in inner: var, default = inner.split(":-", 1) return os.environ.get(var, default) return obj elif isinstance(obj, dict): return {k: expand_env(v) for k, v in obj.items()} elif isinstance(obj, list): return [expand_env(i) for i in obj] return obj return expand_env(config) def run_cmd(cmd: List[str], cwd: Optional[Path] = None, check: bool = True, capture: bool = False) -> subprocess.CompletedProcess: """Run a shell command""" print(f"+ {' '.join(cmd)}") return subprocess.run( cmd, cwd=cwd, check=check, text=True, capture_output=capture, ) class GitLabClient: """GitLab API client""" def __init__(self, base_url: str, project_id: str, token: str): self.base_url = base_url.rstrip("/") self.project_id = project_id self.token = token self.headers = {"PRIVATE-TOKEN": token} def create_branch(self, branch: str, ref: str) -> Dict: """Create a new branch""" url = f"{self.base_url}/api/v4/projects/{self.project_id}/repository/branches" resp = requests.post( url, headers=self.headers, data={"branch": branch, "ref": ref}, ) resp.raise_for_status() return resp.json() def create_merge_request( self, source_branch: str, target_branch: str, title: str, description: str, labels: Optional[List[str]] = None, reviewers: Optional[List[str]] = None, remove_source_branch: bool = True, ) -> Dict: """Create a merge request""" url = f"{self.base_url}/api/v4/projects/{self.project_id}/merge_requests" data = { "source_branch": source_branch, "target_branch": target_branch, "title": title, "description": description, "remove_source_branch": remove_source_branch, } if labels: data["labels"] = ",".join(labels) if reviewers: # Note: reviewers need to be user IDs, not usernames data["reviewer_ids"] = reviewers resp = requests.post(url, headers=self.headers, data=data) resp.raise_for_status() return resp.json() def trigger_pipeline(self, ref: str, token: str, variables: Optional[Dict] = None) -> Dict: """Trigger a pipeline""" url = f"{self.base_url}/api/v4/projects/{self.project_id}/trigger/pipeline" data = {"ref": ref, "token": token} if variables: for k, v in variables.items(): data[f"variables[{k}]"] = v resp = requests.post(url, data=data) resp.raise_for_status() return resp.json() class GitHubClient: """GitHub API client (alternative to GitLab)""" def __init__(self, owner: str, repo: str, token: str): self.base_url = "https://api.github.com" self.owner = owner self.repo = repo self.headers = { "Authorization": f"token {token}", "Accept": "application/vnd.github.v3+json", } def create_pull_request( self, head: str, base: str, title: str, body: str, labels: Optional[List[str]] = None, ) -> Dict: """Create a pull request""" url = f"{self.base_url}/repos/{self.owner}/{self.repo}/pulls" data = { "head": head, "base": base, "title": title, "body": body, } resp = requests.post(url, headers=self.headers, json=data) resp.raise_for_status() pr = resp.json() # Add labels if specified if labels: labels_url = f"{self.base_url}/repos/{self.owner}/{self.repo}/issues/{pr['number']}/labels" requests.post(labels_url, headers=self.headers, json={"labels": labels}) return pr def run_terraform_plan(tf_dir: Path, plan_file: str) -> tuple[bool, str]: """ Run terraform plan and return (has_changes, plan_output) Uses -detailed-exitcode: 0=no changes, 1=error, 2=changes """ # Initialize run_cmd(["terraform", "init", "-input=false"], cwd=tf_dir) # Plan with detailed exit code result = run_cmd( [ "terraform", "plan", "-input=false", "-no-color", "-out", plan_file, "-detailed-exitcode", ], cwd=tf_dir, check=False, capture=True, ) if result.returncode == 0: return False, result.stdout elif result.returncode == 2: return True, result.stdout else: print(f"Terraform plan failed:\n{result.stderr}", file=sys.stderr) sys.exit(1) def get_plan_summary(cfg: Dict[str, Any]) -> tuple[str, Dict]: """Run plan_summarizer and get markdown + json""" result = run_cmd( ["python3", "plan_summarizer.py", "--format", "markdown"], cwd=HERE, capture=True, ) markdown = result.stdout result = run_cmd( ["python3", "plan_summarizer.py", "--format", "json"], cwd=HERE, capture=True, ) summary_json = json.loads(result.stdout) return markdown, summary_json def get_reviewers(cfg: Dict[str, Any], summary: Dict) -> List[str]: """Determine reviewers based on affected categories""" drift_cfg = cfg.get("drift_pr", {}) reviewer_mapping = drift_cfg.get("reviewer_mapping", {}) reviewers = set() by_category = summary.get("by_category", {}) for category in by_category.keys(): if category in reviewer_mapping: reviewers.update(reviewer_mapping[category]) # Add default reviewers if not reviewers and "default" in reviewer_mapping: reviewers.update(reviewer_mapping["default"]) return list(reviewers) def notify_slack(cfg: Dict[str, Any], title: str, url: str, risk: str, changes: int): """Send Slack notification about created PR""" slack_cfg = cfg.get("slack", {}) webhook_url = slack_cfg.get("webhook_url") if not webhook_url or not slack_cfg.get("notify_on", {}).get("pr_created"): return template = slack_cfg.get("templates", {}).get("pr_created", "PR Created: {title}") message = template.format( title=title, url=url, risk_level=risk, change_count=changes, ) # Send to Slack payload = { "channel": slack_cfg.get("channel", "#cloudflare-gitops"), "text": message, "attachments": [ { "color": {"LOW": "good", "MEDIUM": "warning", "HIGH": "danger", "CRITICAL": "danger"}.get(risk, "#808080"), "fields": [ {"title": "Risk Level", "value": risk, "short": True}, {"title": "Changes", "value": str(changes), "short": True}, ], "actions": [ { "type": "button", "text": "View MR", "url": url, } ], } ], } try: requests.post(webhook_url, json=payload, timeout=10) except Exception as e: print(f"Slack notification failed: {e}", file=sys.stderr) def create_mr_description( cfg: Dict[str, Any], summary_md: str, summary_json: Dict, trigger_source: str = "scheduled", ) -> str: """Generate MR description""" drift_cfg = cfg.get("drift_pr", {}) title_prefix = drift_cfg.get("title_prefix", "Drift Remediation") compliance = summary_json.get("compliance_violations", []) compliance_warning = "" if compliance: frameworks = ", ".join(compliance) compliance_warning = f""" > **Compliance Notice:** This change affects the following frameworks: {frameworks} > Please ensure appropriate review and approval processes are followed. """ return textwrap.dedent(f""" ## {title_prefix} Detected by Phase 6 GitOps automation. **Trigger:** {trigger_source} **Timestamp:** {datetime.utcnow().isoformat()}Z {compliance_warning} --- {summary_md} --- ## Review Checklist - [ ] Verified changes match expected drift - [ ] No conflicting manual changes in Cloudflare dashboard - [ ] Compliance requirements satisfied - [ ] Tested in staging (if applicable) ## Notes - This MR was auto-generated by the GitOps drift remediation bot - Please review especially **HIGH** and **CRITICAL** risk resources - Apply only after confirming no conflicting manual changes --- *Generated by Cloudflare Mesh Observatory - Phase 6 GitOps* """).strip() def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser( description="Create drift remediation MR" ) parser.add_argument( "--dry-run", action="store_true", default=os.environ.get("GITOPS_DRY_RUN", "false").lower() == "true", help="Don't actually create MR", ) parser.add_argument( "--trigger-source", default=os.environ.get("GITOPS_TRIGGER_SOURCE", "scheduled"), help="What triggered this run (alert, scheduled, manual)", ) parser.add_argument( "--alert-name", help="Name of alert that triggered this (for alert triggers)", ) args = parser.parse_args() # Load config cfg = load_config() tf_cfg = cfg.get("terraform", {}) gitlab_cfg = cfg.get("gitlab", {}) drift_cfg = cfg.get("drift_pr", {}) # Paths tf_dir = HERE.parent / tf_cfg.get("working_dir", "terraform") plan_file = tf_cfg.get("plan_file", "plan.tfplan") # Check for changes print("Running terraform plan...") has_changes, plan_output = run_terraform_plan(tf_dir, plan_file) if not has_changes: print("No changes detected. Nothing to do.") return print("Changes detected. Generating summary...") summary_md, summary_json = get_plan_summary(cfg) # Generate branch name and title now = datetime.utcnow().strftime("%Y-%m-%dT%H%M%SZ") branch_prefix = drift_cfg.get("branch_prefix", "drift/remediation-") branch = f"{branch_prefix}{now}" title_prefix = drift_cfg.get("title_prefix", "Drift Remediation") title = f"{title_prefix}: {now}" # Get trigger info trigger_source = args.trigger_source if args.alert_name: trigger_source = f"Alert: {args.alert_name}" # Generate description description = create_mr_description(cfg, summary_md, summary_json, trigger_source) # Get reviewers reviewers = get_reviewers(cfg, summary_json) labels = drift_cfg.get("labels", ["drift", "terraform"]) if args.dry_run: print("\n" + "=" * 60) print("[DRY RUN] Would create MR:") print(f" Branch: {branch}") print(f" Title: {title}") print(f" Labels: {labels}") print(f" Reviewers: {reviewers}") print(f" Risk: {summary_json.get('overall_risk')}") print(f" Changes: {summary_json.get('total_changes')}") print("=" * 60) print("\nDescription:") print(description) return # Create MR via GitLab API base_url = gitlab_cfg.get("base_url", os.environ.get("GITLAB_BASE_URL", "https://gitlab.com")) project_id = gitlab_cfg.get("project_id", os.environ.get("GITLAB_PROJECT_ID")) token = os.environ.get("GITLAB_TOKEN") default_branch = gitlab_cfg.get("default_branch", "main") if not project_id or not token: print("ERROR: GITLAB_PROJECT_ID and GITLAB_TOKEN required", file=sys.stderr) sys.exit(1) client = GitLabClient(base_url, project_id, token) print(f"Creating branch {branch}...") try: client.create_branch(branch, default_branch) except requests.HTTPError as e: if e.response.status_code == 400: # Branch exists print(f"Branch {branch} already exists, using it") else: raise print(f"Creating MR: {title}") mr = client.create_merge_request( source_branch=branch, target_branch=default_branch, title=title, description=description, labels=labels, remove_source_branch=True, ) mr_url = mr.get("web_url", "") print(f"\nCreated MR: {mr_url}") # Notify Slack notify_slack( cfg, title=title, url=mr_url, risk=summary_json.get("overall_risk", "UNKNOWN"), changes=summary_json.get("total_changes", 0), ) print("\nDone!") if __name__ == "__main__": main()