Files
vm-control/scripts/event_generation_demo.py
2025-12-18 00:29:15 +01:00

113 lines
3.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
VaultMesh Command Center: Event Generation Demonstration
This script simulates the event generation mechanism of the VaultMesh Command Center.
"""
import uuid
import json
import time
from datetime import datetime, timezone
import requests
class EventGenerator:
def __init__(self, cc_url="http://localhost:8088"):
self.cc_url = cc_url
self.node_id = str(uuid.uuid4())
self.hostname = "demo-node"
self.os_profile = "DemoVault"
def generate_heartbeat(self):
"""Generate a heartbeat event payload."""
return {
"node_id": self.node_id,
"hostname": self.hostname,
"os_profile": self.os_profile,
"cloudflare_ok": True,
"services_ok": True,
"vaultmesh_root": "/var/lib/vaultmesh",
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": {
"uptime_seconds": int(time.time()),
"load1": 0.5,
"load5": 0.3,
"load15": 0.2
}
}
def generate_scan_result(self):
"""Generate a mock scan result event payload."""
return {
"cmd": "sovereign-scan",
"node_id": self.node_id,
"ts": datetime.now(timezone.utc).isoformat(),
"status": "success",
"stdout": json.dumps({
"summary": {
"critical": 1,
"high": 3,
"medium": 5,
"low": 10,
"info": 15
},
"findings_real": False,
"receipt_hash": str(uuid.uuid4())
}),
"exit_code": 0
}
def generate_command_result(self):
"""Generate a mock command result event payload."""
return {
"node_id": self.node_id,
"cmd": "service-status",
"ts": datetime.now(timezone.utc).isoformat(),
"status": "success",
"stdout": "cloudflared: active\nnginx: active",
"exit_code": 0,
"nonce": str(uuid.uuid4())
}
def send_event(self, event_type, event_data):
"""Send an event to the Command Center."""
try:
response = requests.post(
f"{self.cc_url}/api/events",
json={
"kind": event_type,
"node_id": self.node_id,
"body": event_data
},
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
print(f"[SUCCESS] Sent {event_type} event for node {self.node_id}")
return response.json()
except requests.RequestException as e:
print(f"[ERROR] Failed to send {event_type} event: {e}")
return None
def main():
generator = EventGenerator()
print("🚀 VaultMesh Command Center Event Generation Demo")
print("---------------------------------------------")
# Simulate event generation
events = [
("heartbeat", generator.generate_heartbeat()),
("scan", generator.generate_scan_result()),
("command", generator.generate_command_result())
]
for event_type, event_data in events:
print(f"\nGenerating {event_type.upper()} event:")
print(json.dumps(event_data, indent=2))
result = generator.send_event(event_type, event_data)
if result:
print(f"Event ID: {result.get('id')}")
time.sleep(1) # Add a small delay between events
if __name__ == "__main__":
main()