mod config; use crate::config::Config; use anyhow::{bail, Result}; use ed25519_dalek::{Signature, Verifier, VerifyingKey}; use serde::{Deserialize, Serialize}; use std::{fs, path::Path, process::Command as StdCommand}; use time::OffsetDateTime; use tokio::time::{sleep, Duration}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use uuid::Uuid; /// Metrics collected from the node. #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct NodeMetrics { pub uptime_seconds: Option, pub load1: Option, pub load5: Option, pub load15: Option, pub mem_total_mb: Option, pub mem_used_mb: Option, pub disk_root_total_gb: Option, pub disk_root_used_gb: Option, } /// Heartbeat data sent to the Command Center. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeHeartbeat { pub node_id: Uuid, pub hostname: String, pub os_profile: String, pub cloudflare_ok: bool, pub services_ok: bool, pub vaultmesh_root: String, #[serde(with = "time::serde::rfc3339")] pub timestamp: OffsetDateTime, pub metrics: NodeMetrics, } /// Command payload from CC. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommandPayload { pub node_id: Uuid, #[serde(with = "time::serde::rfc3339")] pub ts: OffsetDateTime, pub nonce: String, pub cmd: String, pub args: serde_json::Value, } /// Signed command from CC. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SignedCommand { pub payload: CommandPayload, pub signature: String, } /// Command execution result sent back to CC. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommandResult { pub node_id: Uuid, #[serde(with = "time::serde::rfc3339")] pub ts: OffsetDateTime, pub nonce: String, pub cmd: String, pub status: String, pub exit_code: Option, pub stdout: String, pub stderr: String, } /// Summary of scan findings by severity. #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct ScanSummary { pub critical: u32, pub high: u32, pub medium: u32, pub low: u32, pub info: u32, } impl ScanSummary { fn merge(&mut self, other: &ScanSummary) { self.critical += other.critical; self.high += other.high; self.medium += other.medium; self.low += other.low; self.info += other.info; } } /// Result from running a single scanner. #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct ScannerResult { pub available: bool, pub ran: bool, pub version: Option, pub findings_real: bool, pub findings: u32, pub output_file: Option, pub error: Option, pub summary: ScanSummary, } /// Check if cloudflared service is active via systemctl. fn check_cloudflared() -> bool { StdCommand::new("systemctl") .args(["is-active", "--quiet", "cloudflared"]) .status() .map(|s| s.success()) .unwrap_or(false) } /// Check if essential VaultMesh services are healthy. fn check_services(vaultmesh_root: &str) -> bool { Path::new(vaultmesh_root).is_dir() } /// Gather system metrics from /proc and df. fn gather_metrics() -> NodeMetrics { NodeMetrics { uptime_seconds: read_uptime_seconds(), load1: read_loadavg_index(0), load5: read_loadavg_index(1), load15: read_loadavg_index(2), mem_total_mb: read_meminfo_total_mb(), mem_used_mb: read_meminfo_used_mb(), disk_root_total_gb: read_disk_root_total_gb(), disk_root_used_gb: read_disk_root_used_gb(), } } fn read_uptime_seconds() -> Option { let contents = fs::read_to_string("/proc/uptime").ok()?; let first = contents.split_whitespace().next()?; let secs_str = first.split('.').next().unwrap_or(first); secs_str.parse().ok() } fn read_loadavg_index(idx: usize) -> Option { let contents = fs::read_to_string("/proc/loadavg").ok()?; let mut parts = contents.split_whitespace(); parts.nth(idx)?.parse().ok() } fn read_meminfo_total_mb() -> Option { read_meminfo_field_kb("MemTotal").map(|kb| kb / 1024) } fn read_meminfo_used_mb() -> Option { let total_kb = read_meminfo_field_kb("MemTotal")?; let avail_kb = read_meminfo_field_kb("MemAvailable")?; let used_kb = total_kb.saturating_sub(avail_kb); Some(used_kb / 1024) } fn read_meminfo_field_kb(field: &str) -> Option { let contents = fs::read_to_string("/proc/meminfo").ok()?; for line in contents.lines() { if line.starts_with(field) { let val = line.split_whitespace().nth(1)?; return val.parse().ok(); } } None } fn read_disk_root_total_gb() -> Option { read_df_root_column(1) } fn read_disk_root_used_gb() -> Option { read_df_root_column(2) } fn read_df_root_column(col_index: usize) -> Option { let output = StdCommand::new("df") .arg("-B1") .arg("/") .output() .ok()?; if !output.status.success() { return None; } let stdout = String::from_utf8_lossy(&output.stdout); let mut lines = stdout.lines(); lines.next()?; let line = lines.next()?; let cols: Vec<&str> = line.split_whitespace().collect(); let val: f64 = cols.get(col_index)?.parse().ok()?; Some((val / (1024.0 * 1024.0 * 1024.0)) as f32) } /// Load CC public key from file. fn load_cc_public_key(path: &str) -> Result { use base64::Engine; let p = Path::new(path); if !p.exists() { bail!("CC public key file not found: {}", path); } let contents = fs::read_to_string(p)?.trim().to_string(); let bytes = base64::engine::general_purpose::STANDARD.decode(&contents)?; let key_bytes: [u8; 32] = bytes .try_into() .map_err(|_| anyhow::anyhow!("Public key must be 32 bytes"))?; Ok(VerifyingKey::from_bytes(&key_bytes)?) } /// Verify a signed command. fn verify_command(cmd: &SignedCommand, pubkey: &VerifyingKey) -> Result<()> { use base64::Engine; let payload_bytes = serde_json::to_vec(&cmd.payload)?; let sig_bytes = base64::engine::general_purpose::STANDARD.decode(&cmd.signature)?; let sig_arr: [u8; 64] = sig_bytes .try_into() .map_err(|_| anyhow::anyhow!("Signature must be 64 bytes"))?; let signature = Signature::from_bytes(&sig_arr); pubkey.verify(&payload_bytes, &signature)?; Ok(()) } /// Execute a whitelisted command. fn execute_command(cmd: &SignedCommand, config: &Config) -> CommandResult { let payload = &cmd.payload; let allowed = ["restart-service", "service-status", "tail-journal", "sovereign-scan"]; if !allowed.contains(&payload.cmd.as_str()) { return CommandResult { node_id: payload.node_id, ts: OffsetDateTime::now_utc(), nonce: payload.nonce.clone(), cmd: payload.cmd.clone(), status: "rejected".into(), exit_code: None, stdout: String::new(), stderr: format!("Command '{}' not in whitelist", payload.cmd), }; } match payload.cmd.as_str() { "service-status" => { let service = payload.args["service"].as_str().unwrap_or("cloudflared"); let output = StdCommand::new("systemctl") .args(["status", service]) .output(); make_result(payload, output) } "restart-service" => { let service = payload.args["service"].as_str().unwrap_or("cloudflared"); let output = StdCommand::new("systemctl") .args(["restart", service]) .output(); make_result(payload, output) } "tail-journal" => { let unit = payload.args["unit"].as_str().unwrap_or("cloudflared"); let lines = payload.args["lines"].as_u64().unwrap_or(50); let output = StdCommand::new("journalctl") .args(["-u", unit, "-n", &lines.to_string(), "--no-pager"]) .output(); make_result(payload, output) } "sovereign-scan" => { run_sovereign_scan(payload, config) } _ => CommandResult { node_id: payload.node_id, ts: OffsetDateTime::now_utc(), nonce: payload.nonce.clone(), cmd: payload.cmd.clone(), status: "error".into(), exit_code: None, stdout: String::new(), stderr: "Unknown command".into(), }, } } /// Convert command output to CommandResult. fn make_result(payload: &CommandPayload, output: std::io::Result) -> CommandResult { match output { Ok(out) => CommandResult { node_id: payload.node_id, ts: OffsetDateTime::now_utc(), nonce: payload.nonce.clone(), cmd: payload.cmd.clone(), status: if out.status.success() { "success".into() } else { "failed".into() }, exit_code: out.status.code(), stdout: String::from_utf8_lossy(&out.stdout).into_owned(), stderr: String::from_utf8_lossy(&out.stderr).into_owned(), }, Err(e) => CommandResult { node_id: payload.node_id, ts: OffsetDateTime::now_utc(), nonce: payload.nonce.clone(), cmd: payload.cmd.clone(), status: "error".into(), exit_code: None, stdout: String::new(), stderr: format!("Failed to execute: {}", e), }, } } /// Check if a scanner binary is available on the system. fn scanner_available(name: &str) -> bool { StdCommand::new("which") .arg(name) .output() .map(|o| o.status.success()) .unwrap_or(false) } /// Get scanner version string. fn get_scanner_version(name: &str) -> Option { let output = StdCommand::new(name) .arg("--version") .output() .ok()?; if output.status.success() { let stdout = String::from_utf8_lossy(&output.stdout); Some(stdout.lines().next().unwrap_or("unknown").to_string()) } else { None } } /// Run nuclei scanner or generate mock results. fn run_nuclei(nonce: &str, profile: &str, receipts_dir: &str) -> ScannerResult { if !scanner_available("nuclei") { tracing::info!("nuclei not available, generating mock results"); return generate_mock_nuclei_result(nonce, receipts_dir); } let output_file = format!("{}/nuclei-{}.json", receipts_dir, nonce); let templates = match profile { "fast" => "cves/", "deep" => ".", _ => "cves/,vulnerabilities/", }; let output = StdCommand::new("nuclei") .args(["-t", templates, "-silent", "-json", "-o", &output_file]) .args(["-target", "localhost"]) .output(); match output { Ok(out) if out.status.success() || out.status.code() == Some(0) => { let summary = parse_nuclei_findings(&output_file); ScannerResult { available: true, ran: true, version: get_scanner_version("nuclei"), findings_real: true, findings: summary.critical + summary.high + summary.medium + summary.low + summary.info, output_file: Some(format!("nuclei-{}.json", nonce)), error: None, summary, } } Ok(out) => ScannerResult { available: true, ran: true, version: get_scanner_version("nuclei"), findings_real: false, findings: 0, output_file: None, error: Some(format!("nuclei exited with code {:?}: {}", out.status.code(), String::from_utf8_lossy(&out.stderr))), summary: ScanSummary::default(), }, Err(e) => ScannerResult { available: true, ran: false, version: None, findings_real: false, findings: 0, output_file: None, error: Some(format!("Failed to run nuclei: {}", e)), summary: ScanSummary::default(), }, } } /// Generate mock nuclei results when scanner not available. fn generate_mock_nuclei_result(nonce: &str, receipts_dir: &str) -> ScannerResult { let output_file = format!("{}/nuclei-{}.json", receipts_dir, nonce); let mock_findings = serde_json::json!([ {"info": {"severity": "high", "name": "MOCK: Example CVE Detection"}, "host": "localhost", "matched-at": "localhost:80"}, {"info": {"severity": "medium", "name": "MOCK: Outdated Software Version"}, "host": "localhost", "matched-at": "localhost:443"}, {"info": {"severity": "low", "name": "MOCK: Information Disclosure"}, "host": "localhost", "matched-at": "localhost:22"} ]); fs::write(&output_file, serde_json::to_string_pretty(&mock_findings).unwrap()).ok(); ScannerResult { available: false, ran: false, version: None, findings_real: false, findings: 3, output_file: Some(format!("nuclei-{}.json", nonce)), error: Some("Scanner not installed - mock data generated".into()), summary: ScanSummary { critical: 0, high: 1, medium: 1, low: 1, info: 0, }, } } /// Parse nuclei JSON output for findings summary. fn parse_nuclei_findings(output_file: &str) -> ScanSummary { let mut summary = ScanSummary::default(); if let Ok(contents) = fs::read_to_string(output_file) { for line in contents.lines() { if let Ok(finding) = serde_json::from_str::(line) { if let Some(severity) = finding["info"]["severity"].as_str() { match severity.to_lowercase().as_str() { "critical" => summary.critical += 1, "high" => summary.high += 1, "medium" => summary.medium += 1, "low" => summary.low += 1, "info" => summary.info += 1, _ => {} } } } } } summary } /// Run trivy scanner or generate mock results. fn run_trivy(nonce: &str, profile: &str, receipts_dir: &str) -> ScannerResult { if !scanner_available("trivy") { tracing::info!("trivy not available, generating mock results"); return generate_mock_trivy_result(nonce, receipts_dir); } let output_file = format!("{}/trivy-{}.json", receipts_dir, nonce); let severity = match profile { "fast" => "CRITICAL,HIGH", "deep" => "CRITICAL,HIGH,MEDIUM,LOW", _ => "CRITICAL,HIGH,MEDIUM", }; let output = StdCommand::new("trivy") .args(["fs", "/", "--format", "json", "--output", &output_file]) .args(["--severity", severity]) .args(["--scanners", "vuln"]) .output(); match output { Ok(out) if out.status.success() || out.status.code() == Some(0) => { let summary = parse_trivy_findings(&output_file); ScannerResult { available: true, ran: true, version: get_scanner_version("trivy"), findings_real: true, findings: summary.critical + summary.high + summary.medium + summary.low + summary.info, output_file: Some(format!("trivy-{}.json", nonce)), error: None, summary, } } Ok(out) => ScannerResult { available: true, ran: true, version: get_scanner_version("trivy"), findings_real: false, findings: 0, output_file: None, error: Some(format!("trivy exited with code {:?}: {}", out.status.code(), String::from_utf8_lossy(&out.stderr))), summary: ScanSummary::default(), }, Err(e) => ScannerResult { available: true, ran: false, version: None, findings_real: false, findings: 0, output_file: None, error: Some(format!("Failed to run trivy: {}", e)), summary: ScanSummary::default(), }, } } /// Generate mock trivy results when scanner not available. fn generate_mock_trivy_result(nonce: &str, receipts_dir: &str) -> ScannerResult { let output_file = format!("{}/trivy-{}.json", receipts_dir, nonce); let mock_findings = serde_json::json!({ "Results": [{ "Target": "/", "Vulnerabilities": [ {"VulnerabilityID": "MOCK-CVE-2024-0001", "Severity": "CRITICAL", "Title": "MOCK: Critical System Vulnerability"}, {"VulnerabilityID": "MOCK-CVE-2024-0002", "Severity": "HIGH", "Title": "MOCK: High Risk Package"}, {"VulnerabilityID": "MOCK-CVE-2024-0003", "Severity": "HIGH", "Title": "MOCK: Outdated Dependency"}, {"VulnerabilityID": "MOCK-CVE-2024-0004", "Severity": "MEDIUM", "Title": "MOCK: Medium Risk Finding"}, {"VulnerabilityID": "MOCK-CVE-2024-0005", "Severity": "MEDIUM", "Title": "MOCK: Configuration Issue"} ] }] }); fs::write(&output_file, serde_json::to_string_pretty(&mock_findings).unwrap()).ok(); ScannerResult { available: false, ran: false, version: None, findings_real: false, findings: 5, output_file: Some(format!("trivy-{}.json", nonce)), error: Some("Scanner not installed - mock data generated".into()), summary: ScanSummary { critical: 1, high: 2, medium: 2, low: 0, info: 0, }, } } /// Parse trivy JSON output for findings summary. fn parse_trivy_findings(output_file: &str) -> ScanSummary { let mut summary = ScanSummary::default(); if let Ok(contents) = fs::read_to_string(output_file) { if let Ok(report) = serde_json::from_str::(&contents) { if let Some(results) = report["Results"].as_array() { for result in results { if let Some(vulns) = result["Vulnerabilities"].as_array() { for vuln in vulns { if let Some(severity) = vuln["Severity"].as_str() { match severity.to_uppercase().as_str() { "CRITICAL" => summary.critical += 1, "HIGH" => summary.high += 1, "MEDIUM" => summary.medium += 1, "LOW" => summary.low += 1, _ => summary.info += 1, } } } } } } } } summary } /// Execute the sovereign-scan command. fn run_sovereign_scan(payload: &CommandPayload, config: &Config) -> CommandResult { use time::format_description::well_known::Rfc3339; let profile = payload.args["profile"].as_str().unwrap_or("default"); let scanners: Vec<&str> = payload.args["scanners"] .as_array() .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect()) .unwrap_or_else(|| vec!["nuclei", "trivy"]); tracing::info!("Running sovereign-scan: profile={}, scanners={:?}", profile, scanners); // Create receipts directory let receipts_dir = format!("{}/receipts/sovereign", config.vaultmesh_root); if let Err(e) = fs::create_dir_all(&receipts_dir) { return CommandResult { node_id: payload.node_id, ts: OffsetDateTime::now_utc(), nonce: payload.nonce.clone(), cmd: "sovereign-scan".into(), status: "error".into(), exit_code: None, stdout: String::new(), stderr: format!("Failed to create receipts directory: {}", e), }; } let mut scanner_results = serde_json::Map::new(); let mut summary = ScanSummary::default(); // Run each requested scanner for scanner in &scanners { match *scanner { "nuclei" => { let result = run_nuclei(&payload.nonce, profile, &receipts_dir); summary.merge(&result.summary); scanner_results.insert("nuclei".into(), serde_json::to_value(&result).unwrap()); } "trivy" => { let result = run_trivy(&payload.nonce, profile, &receipts_dir); summary.merge(&result.summary); scanner_results.insert("trivy".into(), serde_json::to_value(&result).unwrap()); } _ => { tracing::warn!("Unknown scanner requested: {}", scanner); } } } // Build receipt (without hash first) let mut receipt = serde_json::json!({ "kind": "sovereign-scan", "node_id": payload.node_id, "nonce": payload.nonce, "timestamp": OffsetDateTime::now_utc().format(&Rfc3339).unwrap(), "os_profile": config.os_profile, "vaultmesh_root": config.vaultmesh_root, "profile": profile, "scanners": scanner_results, "summary": summary }); // Hash the receipt content (before adding hash field) let receipt_bytes = serde_json::to_vec(&receipt).unwrap(); let hash = blake3::hash(&receipt_bytes); let hash_hex = format!("blake3:{}", hash.to_hex()); receipt["receipt_hash"] = serde_json::Value::String(hash_hex.clone()); // Write receipt file let receipt_path = format!("{}/SCAN-{}.json", receipts_dir, payload.nonce); if let Err(e) = fs::write(&receipt_path, serde_json::to_string_pretty(&receipt).unwrap()) { tracing::error!("Failed to write receipt: {}", e); } tracing::info!("Sovereign scan complete: receipt_hash={}", hash_hex); // Append to proofchain log let proofchain_dir = format!("{}/proofchain", config.vaultmesh_root); fs::create_dir_all(&proofchain_dir).ok(); let proofchain_entry = serde_json::json!({ "ts": OffsetDateTime::now_utc().format(&Rfc3339).unwrap(), "node_id": payload.node_id, "nonce": payload.nonce, "receipt_hash": hash_hex }); let proofchain_path = format!("{}/sovereign-scans.jsonl", proofchain_dir); if let Ok(mut file) = fs::OpenOptions::new() .create(true) .append(true) .open(&proofchain_path) { use std::io::Write; writeln!(file, "{}", serde_json::to_string(&proofchain_entry).unwrap()).ok(); } // Determine if any scanner had real findings let any_real = scanner_results.values().any(|v| { v.get("findings_real").and_then(|f| f.as_bool()).unwrap_or(false) }); // Return summary as stdout let stdout = serde_json::to_string_pretty(&serde_json::json!({ "status": "completed", "receipt_path": receipt_path, "receipt_hash": hash_hex, "summary": summary, "findings_real": any_real })) .unwrap(); CommandResult { node_id: payload.node_id, ts: OffsetDateTime::now_utc(), nonce: payload.nonce.clone(), cmd: "sovereign-scan".into(), status: "success".into(), exit_code: Some(0), stdout, stderr: String::new(), } } #[tokio::main] async fn main() { tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()), )) .with(tracing_subscriber::fmt::layer()) .init(); let config = Config::from_env(); let hostname = hostname::get() .map(|h| h.to_string_lossy().to_string()) .unwrap_or_else(|_| "unknown".into()); tracing::info!( "VaultMesh Node Agent starting (node_id={}, hostname={})", config.node_id, hostname ); tracing::info!("Command Center URL: {}", config.cc_url); tracing::info!("Heartbeat interval: {}s", config.heartbeat_secs); // Load CC public key for signature verification let cc_pubkey_path = std::env::var("VAULTMESH_CC_PUBKEY") .unwrap_or_else(|_| "/etc/vaultmesh/cc-ed25519.pub".into()); let cc_pubkey = match load_cc_public_key(&cc_pubkey_path) { Ok(pk) => { tracing::info!("Loaded CC public key from {}", cc_pubkey_path); Some(pk) } Err(e) => { tracing::warn!("Could not load CC public key: {} - commands disabled", e); None } }; let client = reqwest::Client::new(); let heartbeat_url = format!("{}/api/agent/heartbeat", config.cc_url); let commands_url = format!("{}/api/agent/commands?node_id={}", config.cc_url, config.node_id); let result_url = format!("{}/api/agent/command-result", config.cc_url); loop { // Send heartbeat let metrics = gather_metrics(); let heartbeat = NodeHeartbeat { node_id: config.node_id, hostname: hostname.clone(), os_profile: config.os_profile.clone(), cloudflare_ok: check_cloudflared(), services_ok: check_services(&config.vaultmesh_root), vaultmesh_root: config.vaultmesh_root.clone(), timestamp: OffsetDateTime::now_utc(), metrics, }; tracing::debug!("Sending heartbeat: {:?}", heartbeat); match client.post(&heartbeat_url).json(&heartbeat).send().await { Ok(resp) if resp.status().is_success() => { tracing::info!( "Heartbeat sent (cf={}, svc={}, load={:.2}, mem={}MiB, disk={:.1}GiB)", heartbeat.cloudflare_ok, heartbeat.services_ok, heartbeat.metrics.load1.unwrap_or(0.0), heartbeat.metrics.mem_used_mb.unwrap_or(0), heartbeat.metrics.disk_root_used_gb.unwrap_or(0.0), ); } Ok(resp) => { tracing::warn!("Heartbeat returned non-success status: {}", resp.status()); } Err(e) => { tracing::error!("Failed to send heartbeat: {}", e); } } // Poll for commands (only if we have the public key) if let Some(ref pubkey) = cc_pubkey { match client.get(&commands_url).send().await { Ok(resp) if resp.status().is_success() => { match resp.json::>().await { Ok(commands) => { for cmd in commands { tracing::info!("Received command: {} (nonce={})", cmd.payload.cmd, cmd.payload.nonce); // Verify signature if let Err(e) = verify_command(&cmd, pubkey) { tracing::error!("Signature verification failed: {}", e); let result = CommandResult { node_id: cmd.payload.node_id, ts: OffsetDateTime::now_utc(), nonce: cmd.payload.nonce.clone(), cmd: cmd.payload.cmd.clone(), status: "rejected".into(), exit_code: None, stdout: String::new(), stderr: format!("Signature verification failed: {}", e), }; let _ = client.post(&result_url).json(&result).send().await; continue; } tracing::info!("Signature verified, executing command"); let result = execute_command(&cmd, &config); tracing::info!( "Command '{}' completed: {} (exit={:?})", result.cmd, result.status, result.exit_code ); // Report result if let Err(e) = client.post(&result_url).json(&result).send().await { tracing::error!("Failed to report command result: {}", e); } } } Err(e) => { tracing::warn!("Failed to parse commands response: {}", e); } } } Ok(resp) => { tracing::warn!("Commands endpoint returned: {}", resp.status()); } Err(e) => { tracing::debug!("Failed to poll commands: {}", e); } } } sleep(Duration::from_secs(config.heartbeat_secs)).await; } }