#!/usr/bin/env python3 """ Terraform State Backup and Recovery Manager Automated state management with versioning and rollback capabilities """ import os import json import shutil import hashlib from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional import argparse class TerraformStateManager: """Manage Terraform state backups and recovery""" def __init__( self, terraform_dir: str = "terraform", backup_dir: str = "terraform_backups" ): self.terraform_dir = Path(terraform_dir) self.backup_dir = Path(backup_dir) self.state_file = self.terraform_dir / "terraform.tfstate" self.backup_dir.mkdir(exist_ok=True) def create_backup(self, description: str = "", auto_backup: bool = True) -> str: """Create a backup of the current Terraform state""" if not self.state_file.exists(): return "No state file found to backup" # Generate backup filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_filename = f"state_backup_{timestamp}.tfstate" backup_path = self.backup_dir / backup_filename # Copy state file shutil.copy2(self.state_file, backup_path) # Create metadata file metadata = { "timestamp": timestamp, "description": description, "auto_backup": auto_backup, "file_size": os.path.getsize(backup_path), "file_hash": self._calculate_file_hash(backup_path), } metadata_path = backup_path.with_suffix(".json") with open(metadata_path, "w") as f: json.dump(metadata, f, indent=2) return f"Backup created: {backup_filename}" def list_backups(self) -> List[Dict]: """List all available backups""" backups = [] for file in self.backup_dir.glob("state_backup_*.tfstate"): metadata_file = file.with_suffix(".json") backup_info = { "filename": file.name, "path": str(file), "size": file.stat().st_size, "modified": datetime.fromtimestamp(file.stat().st_mtime), } if metadata_file.exists(): with open(metadata_file, "r") as f: backup_info.update(json.load(f)) backups.append(backup_info) # Sort by modification time (newest first) backups.sort(key=lambda x: x["modified"], reverse=True) return backups def restore_backup(self, backup_filename: str, dry_run: bool = False) -> str: """Restore a specific backup""" backup_path = self.backup_dir / backup_filename if not backup_path.exists(): return f"Backup file not found: {backup_filename}" # Create backup of current state before restore if self.state_file.exists() and not dry_run: self.create_backup("Pre-restore backup", auto_backup=True) if dry_run: return f"Dry run: Would restore {backup_filename}" # Perform restore shutil.copy2(backup_path, self.state_file) return f"State restored from: {backup_filename}" def cleanup_old_backups( self, keep_days: int = 30, keep_count: int = 10 ) -> List[str]: """Clean up old backups based on age and count""" backups = self.list_backups() if not backups: return ["No backups found to clean up"] cutoff_date = datetime.now() - timedelta(days=keep_days) backups_to_delete = [] # Delete backups older than keep_days for backup in backups: if backup["modified"] < cutoff_date: backups_to_delete.append(backup) # If we have more than keep_count backups, delete the oldest ones if len(backups) > keep_count: # Keep the newest keep_count backups backups_to_keep = backups[:keep_count] backups_to_delete.extend([b for b in backups if b not in backups_to_keep]) # Remove duplicates backups_to_delete = list({b["filename"]: b for b in backups_to_delete}.values()) deleted_files = [] for backup in backups_to_delete: try: # Delete state file state_file = Path(backup["path"]) if state_file.exists(): state_file.unlink() deleted_files.append(state_file.name) # Delete metadata file metadata_file = state_file.with_suffix(".json") if metadata_file.exists(): metadata_file.unlink() deleted_files.append(metadata_file.name) except Exception as e: print(f"Error deleting {backup['filename']}: {e}") return deleted_files def verify_backup_integrity(self, backup_filename: str) -> Dict[str, bool]: """Verify the integrity of a backup""" backup_path = self.backup_dir / backup_filename metadata_path = backup_path.with_suffix(".json") if not backup_path.exists(): return {"exists": False, "metadata_exists": False, "integrity": False} if not metadata_path.exists(): return {"exists": True, "metadata_exists": False, "integrity": False} # Check file size and hash with open(metadata_path, "r") as f: metadata = json.load(f) current_size = backup_path.stat().st_size current_hash = self._calculate_file_hash(backup_path) size_matches = current_size == metadata.get("file_size", 0) hash_matches = current_hash == metadata.get("file_hash", "") return { "exists": True, "metadata_exists": True, "size_matches": size_matches, "hash_matches": hash_matches, "integrity": size_matches and hash_matches, } def get_state_statistics(self) -> Dict: """Get statistics about current state and backups""" backups = self.list_backups() stats = { "current_state_exists": self.state_file.exists(), "current_state_size": self.state_file.stat().st_size if self.state_file.exists() else 0, "backup_count": len(backups), "oldest_backup": min([b["modified"] for b in backups]) if backups else None, "newest_backup": max([b["modified"] for b in backups]) if backups else None, "total_backup_size": sum(b["size"] for b in backups), "backups_with_issues": [], } # Check backup integrity for backup in backups: integrity = self.verify_backup_integrity(backup["filename"]) if not integrity["integrity"]: stats["backups_with_issues"].append( {"filename": backup["filename"], "integrity": integrity} ) return stats def _calculate_file_hash(self, file_path: Path) -> str: """Calculate SHA256 hash of a file""" hasher = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hasher.update(chunk) return hasher.hexdigest() def main(): """Command-line interface for Terraform state management""" parser = argparse.ArgumentParser( description="Terraform State Backup and Recovery Manager" ) parser.add_argument( "action", choices=["backup", "list", "restore", "cleanup", "stats", "verify"], help="Action to perform", ) parser.add_argument("--filename", help="Backup filename for restore/verify") parser.add_argument("--description", help="Description for backup") parser.add_argument("--dry-run", action="store_true", help="Dry run mode") parser.add_argument( "--keep-days", type=int, default=30, help="Days to keep backups" ) parser.add_argument( "--keep-count", type=int, default=10, help="Number of backups to keep" ) parser.add_argument( "--terraform-dir", default="terraform", help="Terraform directory" ) parser.add_argument( "--backup-dir", default="terraform_backups", help="Backup directory" ) args = parser.parse_args() manager = TerraformStateManager(args.terraform_dir, args.backup_dir) if args.action == "backup": result = manager.create_backup( args.description or "Manual backup", auto_backup=False ) print(f"āœ… {result}") elif args.action == "list": backups = manager.list_backups() print("šŸ“‹ Available Backups:") print("-" * 80) for backup in backups: print(f"šŸ“ {backup['filename']}") print(f" Size: {backup['size']:,} bytes") print(f" Modified: {backup['modified'].strftime('%Y-%m-%d %H:%M:%S')}") if "description" in backup: print(f" Description: {backup['description']}") print() elif args.action == "restore": if not args.filename: print("āŒ Error: --filename argument required for restore") return result = manager.restore_backup(args.filename, args.dry_run) print(f"šŸ” {result}") elif args.action == "cleanup": deleted = manager.cleanup_old_backups(args.keep_days, args.keep_count) if deleted: print("šŸ—‘ļø Cleaned up backups:") for filename in deleted: print(f" - {filename}") else: print("āœ… No backups needed cleanup") elif args.action == "stats": stats = manager.get_state_statistics() print("šŸ“Š Terraform State Statistics") print("-" * 40) print( f"Current state exists: {'āœ…' if stats['current_state_exists'] else 'āŒ'}" ) print(f"Current state size: {stats['current_state_size']:,} bytes") print(f"Backup count: {stats['backup_count']}") if stats["oldest_backup"]: print(f"Oldest backup: {stats['oldest_backup'].strftime('%Y-%m-%d')}") print(f"Newest backup: {stats['newest_backup'].strftime('%Y-%m-%d')}") print(f"Total backup size: {stats['total_backup_size']:,} bytes") if stats["backups_with_issues"]: print(f"\nāš ļø Backups with issues: {len(stats['backups_with_issues'])}") for issue in stats["backups_with_issues"]: print(f" - {issue['filename']}") elif args.action == "verify": if not args.filename: print("āŒ Error: --filename argument required for verify") return integrity = manager.verify_backup_integrity(args.filename) print(f"šŸ” Integrity check for {args.filename}") print(f" File exists: {'āœ…' if integrity['exists'] else 'āŒ'}") print(f" Metadata exists: {'āœ…' if integrity['metadata_exists'] else 'āŒ'}") if integrity["metadata_exists"]: print(f" Size matches: {'āœ…' if integrity['size_matches'] else 'āŒ'}") print(f" Hash matches: {'āœ…' if integrity['hash_matches'] else 'āŒ'}") print(f" Overall integrity: {'āœ…' if integrity['integrity'] else 'āŒ'}") if __name__ == "__main__": main()