114 lines
3.6 KiB
Python
Executable File
114 lines
3.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
VaultMesh Command Center: Event Generation Demonstration
|
|
|
|
This script simulates the event generation mechanism of the VaultMesh Command Center.
|
|
"""
|
|
|
|
import uuid
|
|
import json
|
|
import time
|
|
from datetime import datetime, timezone
|
|
import requests
|
|
|
|
class EventGenerator:
|
|
def __init__(self, cc_url="http://localhost:8088"):
|
|
self.cc_url = cc_url
|
|
self.node_id = str(uuid.uuid4())
|
|
self.hostname = "demo-node"
|
|
self.os_profile = "DemoVault"
|
|
|
|
def generate_heartbeat(self):
|
|
"""Generate a heartbeat event payload."""
|
|
return {
|
|
"node_id": self.node_id,
|
|
"hostname": self.hostname,
|
|
"os_profile": self.os_profile,
|
|
"cloudflare_ok": True,
|
|
"services_ok": True,
|
|
"vaultmesh_root": "/var/lib/vaultmesh",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"metrics": {
|
|
"uptime_seconds": int(time.time()),
|
|
"load1": 0.5,
|
|
"load5": 0.3,
|
|
"load15": 0.2
|
|
}
|
|
}
|
|
|
|
def generate_scan_result(self):
|
|
"""Generate a mock scan result event payload."""
|
|
return {
|
|
"cmd": "sovereign-scan",
|
|
"node_id": self.node_id,
|
|
"ts": datetime.now(timezone.utc).isoformat(),
|
|
"status": "success",
|
|
"stdout": json.dumps({
|
|
"summary": {
|
|
"critical": 1,
|
|
"high": 3,
|
|
"medium": 5,
|
|
"low": 10,
|
|
"info": 15
|
|
},
|
|
"findings_real": False,
|
|
"receipt_hash": str(uuid.uuid4())
|
|
}),
|
|
"exit_code": 0
|
|
}
|
|
|
|
def generate_command_result(self):
|
|
"""Generate a mock command result event payload."""
|
|
return {
|
|
"node_id": self.node_id,
|
|
"cmd": "service-status",
|
|
"ts": datetime.now(timezone.utc).isoformat(),
|
|
"status": "success",
|
|
"stdout": "cloudflared: active\nnginx: active",
|
|
"exit_code": 0,
|
|
"nonce": str(uuid.uuid4())
|
|
}
|
|
|
|
def send_event(self, event_type, event_data):
|
|
"""Send an event to the Command Center."""
|
|
try:
|
|
response = requests.post(
|
|
f"{self.cc_url}/api/events",
|
|
json={
|
|
"kind": event_type,
|
|
"node_id": self.node_id,
|
|
"payload": event_data
|
|
},
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
response.raise_for_status()
|
|
print(f"[SUCCESS] Sent {event_type} event for node {self.node_id}")
|
|
return response.json()
|
|
except requests.RequestException as e:
|
|
print(f"[ERROR] Failed to send {event_type} event: {e}")
|
|
return None
|
|
|
|
def main():
|
|
generator = EventGenerator()
|
|
|
|
print("🚀 VaultMesh Command Center Event Generation Demo")
|
|
print("---------------------------------------------")
|
|
|
|
# Simulate event generation
|
|
events = [
|
|
("heartbeat", generator.generate_heartbeat()),
|
|
("scan", generator.generate_scan_result()),
|
|
("command", generator.generate_command_result())
|
|
]
|
|
|
|
for event_type, event_data in events:
|
|
print(f"\nGenerating {event_type.upper()} event:")
|
|
print(json.dumps(event_data, indent=2))
|
|
result = generator.send_event(event_type, event_data)
|
|
if result:
|
|
print(f"Event ID: {result.get('id')}")
|
|
time.sleep(1) # Add a small delay between events
|
|
|
|
if __name__ == "__main__":
|
|
main()
|