Files
vm-control/node-agent/src/main.rs
2025-12-18 00:29:15 +01:00

819 lines
29 KiB
Rust

mod config;
use crate::config::Config;
use anyhow::{bail, Result};
use ed25519_dalek::{Signature, Verifier, VerifyingKey};
use serde::{Deserialize, Serialize};
use std::{fs, path::Path, process::Command as StdCommand};
use time::OffsetDateTime;
use tokio::time::{sleep, Duration};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use uuid::Uuid;
/// Metrics collected from the node.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct NodeMetrics {
pub uptime_seconds: Option<u64>,
pub load1: Option<f32>,
pub load5: Option<f32>,
pub load15: Option<f32>,
pub mem_total_mb: Option<u64>,
pub mem_used_mb: Option<u64>,
pub disk_root_total_gb: Option<f32>,
pub disk_root_used_gb: Option<f32>,
}
/// Heartbeat data sent to the Command Center.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeHeartbeat {
pub node_id: Uuid,
pub hostname: String,
pub os_profile: String,
pub cloudflare_ok: bool,
pub services_ok: bool,
pub vaultmesh_root: String,
#[serde(with = "time::serde::rfc3339")]
pub timestamp: OffsetDateTime,
pub metrics: NodeMetrics,
}
/// Command payload from CC.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandPayload {
pub node_id: Uuid,
#[serde(with = "time::serde::rfc3339")]
pub ts: OffsetDateTime,
pub nonce: String,
pub cmd: String,
pub args: serde_json::Value,
}
/// Signed command from CC.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SignedCommand {
pub payload: CommandPayload,
pub signature: String,
}
/// Command execution result sent back to CC.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandResult {
pub node_id: Uuid,
#[serde(with = "time::serde::rfc3339")]
pub ts: OffsetDateTime,
pub nonce: String,
pub cmd: String,
pub status: String,
pub exit_code: Option<i32>,
pub stdout: String,
pub stderr: String,
}
/// Summary of scan findings by severity.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ScanSummary {
pub critical: u32,
pub high: u32,
pub medium: u32,
pub low: u32,
pub info: u32,
}
impl ScanSummary {
fn merge(&mut self, other: &ScanSummary) {
self.critical += other.critical;
self.high += other.high;
self.medium += other.medium;
self.low += other.low;
self.info += other.info;
}
}
/// Result from running a single scanner.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ScannerResult {
pub available: bool,
pub ran: bool,
pub version: Option<String>,
pub findings_real: bool,
pub findings: u32,
pub output_file: Option<String>,
pub error: Option<String>,
pub summary: ScanSummary,
}
/// Check if cloudflared service is active via systemctl.
fn check_cloudflared() -> bool {
StdCommand::new("systemctl")
.args(["is-active", "--quiet", "cloudflared"])
.status()
.map(|s| s.success())
.unwrap_or(false)
}
/// Check if essential VaultMesh services are healthy.
fn check_services(vaultmesh_root: &str) -> bool {
Path::new(vaultmesh_root).is_dir()
}
/// Gather system metrics from /proc and df.
fn gather_metrics() -> NodeMetrics {
NodeMetrics {
uptime_seconds: read_uptime_seconds(),
load1: read_loadavg_index(0),
load5: read_loadavg_index(1),
load15: read_loadavg_index(2),
mem_total_mb: read_meminfo_total_mb(),
mem_used_mb: read_meminfo_used_mb(),
disk_root_total_gb: read_disk_root_total_gb(),
disk_root_used_gb: read_disk_root_used_gb(),
}
}
fn read_uptime_seconds() -> Option<u64> {
let contents = fs::read_to_string("/proc/uptime").ok()?;
let first = contents.split_whitespace().next()?;
let secs_str = first.split('.').next().unwrap_or(first);
secs_str.parse().ok()
}
fn read_loadavg_index(idx: usize) -> Option<f32> {
let contents = fs::read_to_string("/proc/loadavg").ok()?;
let mut parts = contents.split_whitespace();
parts.nth(idx)?.parse().ok()
}
fn read_meminfo_total_mb() -> Option<u64> {
read_meminfo_field_kb("MemTotal").map(|kb| kb / 1024)
}
fn read_meminfo_used_mb() -> Option<u64> {
let total_kb = read_meminfo_field_kb("MemTotal")?;
let avail_kb = read_meminfo_field_kb("MemAvailable")?;
let used_kb = total_kb.saturating_sub(avail_kb);
Some(used_kb / 1024)
}
fn read_meminfo_field_kb(field: &str) -> Option<u64> {
let contents = fs::read_to_string("/proc/meminfo").ok()?;
for line in contents.lines() {
if line.starts_with(field) {
let val = line.split_whitespace().nth(1)?;
return val.parse().ok();
}
}
None
}
fn read_disk_root_total_gb() -> Option<f32> {
read_df_root_column(1)
}
fn read_disk_root_used_gb() -> Option<f32> {
read_df_root_column(2)
}
fn read_df_root_column(col_index: usize) -> Option<f32> {
let output = StdCommand::new("df")
.arg("-B1")
.arg("/")
.output()
.ok()?;
if !output.status.success() {
return None;
}
let stdout = String::from_utf8_lossy(&output.stdout);
let mut lines = stdout.lines();
lines.next()?;
let line = lines.next()?;
let cols: Vec<&str> = line.split_whitespace().collect();
let val: f64 = cols.get(col_index)?.parse().ok()?;
Some((val / (1024.0 * 1024.0 * 1024.0)) as f32)
}
/// Load CC public key from file.
fn load_cc_public_key(path: &str) -> Result<VerifyingKey> {
use base64::Engine;
let p = Path::new(path);
if !p.exists() {
bail!("CC public key file not found: {}", path);
}
let contents = fs::read_to_string(p)?.trim().to_string();
let bytes = base64::engine::general_purpose::STANDARD.decode(&contents)?;
let key_bytes: [u8; 32] = bytes
.try_into()
.map_err(|_| anyhow::anyhow!("Public key must be 32 bytes"))?;
Ok(VerifyingKey::from_bytes(&key_bytes)?)
}
/// Verify a signed command.
fn verify_command(cmd: &SignedCommand, pubkey: &VerifyingKey) -> Result<()> {
use base64::Engine;
let payload_bytes = serde_json::to_vec(&cmd.payload)?;
let sig_bytes = base64::engine::general_purpose::STANDARD.decode(&cmd.signature)?;
let sig_arr: [u8; 64] = sig_bytes
.try_into()
.map_err(|_| anyhow::anyhow!("Signature must be 64 bytes"))?;
let signature = Signature::from_bytes(&sig_arr);
pubkey.verify(&payload_bytes, &signature)?;
Ok(())
}
/// Execute a whitelisted command.
fn execute_command(cmd: &SignedCommand, config: &Config) -> CommandResult {
let payload = &cmd.payload;
let allowed = ["restart-service", "service-status", "tail-journal", "sovereign-scan"];
if !allowed.contains(&payload.cmd.as_str()) {
return CommandResult {
node_id: payload.node_id,
ts: OffsetDateTime::now_utc(),
nonce: payload.nonce.clone(),
cmd: payload.cmd.clone(),
status: "rejected".into(),
exit_code: None,
stdout: String::new(),
stderr: format!("Command '{}' not in whitelist", payload.cmd),
};
}
match payload.cmd.as_str() {
"service-status" => {
let service = payload.args["service"].as_str().unwrap_or("cloudflared");
let output = StdCommand::new("systemctl")
.args(["status", service])
.output();
make_result(payload, output)
}
"restart-service" => {
let service = payload.args["service"].as_str().unwrap_or("cloudflared");
let output = StdCommand::new("systemctl")
.args(["restart", service])
.output();
make_result(payload, output)
}
"tail-journal" => {
let unit = payload.args["unit"].as_str().unwrap_or("cloudflared");
let lines = payload.args["lines"].as_u64().unwrap_or(50);
let output = StdCommand::new("journalctl")
.args(["-u", unit, "-n", &lines.to_string(), "--no-pager"])
.output();
make_result(payload, output)
}
"sovereign-scan" => {
run_sovereign_scan(payload, config)
}
_ => CommandResult {
node_id: payload.node_id,
ts: OffsetDateTime::now_utc(),
nonce: payload.nonce.clone(),
cmd: payload.cmd.clone(),
status: "error".into(),
exit_code: None,
stdout: String::new(),
stderr: "Unknown command".into(),
},
}
}
/// Convert command output to CommandResult.
fn make_result(payload: &CommandPayload, output: std::io::Result<std::process::Output>) -> CommandResult {
match output {
Ok(out) => CommandResult {
node_id: payload.node_id,
ts: OffsetDateTime::now_utc(),
nonce: payload.nonce.clone(),
cmd: payload.cmd.clone(),
status: if out.status.success() { "success".into() } else { "failed".into() },
exit_code: out.status.code(),
stdout: String::from_utf8_lossy(&out.stdout).into_owned(),
stderr: String::from_utf8_lossy(&out.stderr).into_owned(),
},
Err(e) => CommandResult {
node_id: payload.node_id,
ts: OffsetDateTime::now_utc(),
nonce: payload.nonce.clone(),
cmd: payload.cmd.clone(),
status: "error".into(),
exit_code: None,
stdout: String::new(),
stderr: format!("Failed to execute: {}", e),
},
}
}
/// Check if a scanner binary is available on the system.
fn scanner_available(name: &str) -> bool {
StdCommand::new("which")
.arg(name)
.output()
.map(|o| o.status.success())
.unwrap_or(false)
}
/// Get scanner version string.
fn get_scanner_version(name: &str) -> Option<String> {
let output = StdCommand::new(name)
.arg("--version")
.output()
.ok()?;
if output.status.success() {
let stdout = String::from_utf8_lossy(&output.stdout);
Some(stdout.lines().next().unwrap_or("unknown").to_string())
} else {
None
}
}
/// Run nuclei scanner or generate mock results.
fn run_nuclei(nonce: &str, profile: &str, receipts_dir: &str) -> ScannerResult {
if !scanner_available("nuclei") {
tracing::info!("nuclei not available, generating mock results");
return generate_mock_nuclei_result(nonce, receipts_dir);
}
let output_file = format!("{}/nuclei-{}.json", receipts_dir, nonce);
let templates = match profile {
"fast" => "cves/",
"deep" => ".",
_ => "cves/,vulnerabilities/",
};
let output = StdCommand::new("nuclei")
.args(["-t", templates, "-silent", "-json", "-o", &output_file])
.args(["-target", "localhost"])
.output();
match output {
Ok(out) if out.status.success() || out.status.code() == Some(0) => {
let summary = parse_nuclei_findings(&output_file);
ScannerResult {
available: true,
ran: true,
version: get_scanner_version("nuclei"),
findings_real: true,
findings: summary.critical + summary.high + summary.medium + summary.low + summary.info,
output_file: Some(format!("nuclei-{}.json", nonce)),
error: None,
summary,
}
}
Ok(out) => ScannerResult {
available: true,
ran: true,
version: get_scanner_version("nuclei"),
findings_real: false,
findings: 0,
output_file: None,
error: Some(format!("nuclei exited with code {:?}: {}",
out.status.code(),
String::from_utf8_lossy(&out.stderr))),
summary: ScanSummary::default(),
},
Err(e) => ScannerResult {
available: true,
ran: false,
version: None,
findings_real: false,
findings: 0,
output_file: None,
error: Some(format!("Failed to run nuclei: {}", e)),
summary: ScanSummary::default(),
},
}
}
/// Generate mock nuclei results when scanner not available.
fn generate_mock_nuclei_result(nonce: &str, receipts_dir: &str) -> ScannerResult {
let output_file = format!("{}/nuclei-{}.json", receipts_dir, nonce);
let mock_findings = serde_json::json!([
{"info": {"severity": "high", "name": "MOCK: Example CVE Detection"}, "host": "localhost", "matched-at": "localhost:80"},
{"info": {"severity": "medium", "name": "MOCK: Outdated Software Version"}, "host": "localhost", "matched-at": "localhost:443"},
{"info": {"severity": "low", "name": "MOCK: Information Disclosure"}, "host": "localhost", "matched-at": "localhost:22"}
]);
fs::write(&output_file, serde_json::to_string_pretty(&mock_findings).unwrap()).ok();
ScannerResult {
available: false,
ran: false,
version: None,
findings_real: false,
findings: 3,
output_file: Some(format!("nuclei-{}.json", nonce)),
error: Some("Scanner not installed - mock data generated".into()),
summary: ScanSummary {
critical: 0,
high: 1,
medium: 1,
low: 1,
info: 0,
},
}
}
/// Parse nuclei JSON output for findings summary.
fn parse_nuclei_findings(output_file: &str) -> ScanSummary {
let mut summary = ScanSummary::default();
if let Ok(contents) = fs::read_to_string(output_file) {
for line in contents.lines() {
if let Ok(finding) = serde_json::from_str::<serde_json::Value>(line) {
if let Some(severity) = finding["info"]["severity"].as_str() {
match severity.to_lowercase().as_str() {
"critical" => summary.critical += 1,
"high" => summary.high += 1,
"medium" => summary.medium += 1,
"low" => summary.low += 1,
"info" => summary.info += 1,
_ => {}
}
}
}
}
}
summary
}
/// Run trivy scanner or generate mock results.
fn run_trivy(nonce: &str, profile: &str, receipts_dir: &str) -> ScannerResult {
if !scanner_available("trivy") {
tracing::info!("trivy not available, generating mock results");
return generate_mock_trivy_result(nonce, receipts_dir);
}
let output_file = format!("{}/trivy-{}.json", receipts_dir, nonce);
let severity = match profile {
"fast" => "CRITICAL,HIGH",
"deep" => "CRITICAL,HIGH,MEDIUM,LOW",
_ => "CRITICAL,HIGH,MEDIUM",
};
let output = StdCommand::new("trivy")
.args(["fs", "/", "--format", "json", "--output", &output_file])
.args(["--severity", severity])
.args(["--scanners", "vuln"])
.output();
match output {
Ok(out) if out.status.success() || out.status.code() == Some(0) => {
let summary = parse_trivy_findings(&output_file);
ScannerResult {
available: true,
ran: true,
version: get_scanner_version("trivy"),
findings_real: true,
findings: summary.critical + summary.high + summary.medium + summary.low + summary.info,
output_file: Some(format!("trivy-{}.json", nonce)),
error: None,
summary,
}
}
Ok(out) => ScannerResult {
available: true,
ran: true,
version: get_scanner_version("trivy"),
findings_real: false,
findings: 0,
output_file: None,
error: Some(format!("trivy exited with code {:?}: {}",
out.status.code(),
String::from_utf8_lossy(&out.stderr))),
summary: ScanSummary::default(),
},
Err(e) => ScannerResult {
available: true,
ran: false,
version: None,
findings_real: false,
findings: 0,
output_file: None,
error: Some(format!("Failed to run trivy: {}", e)),
summary: ScanSummary::default(),
},
}
}
/// Generate mock trivy results when scanner not available.
fn generate_mock_trivy_result(nonce: &str, receipts_dir: &str) -> ScannerResult {
let output_file = format!("{}/trivy-{}.json", receipts_dir, nonce);
let mock_findings = serde_json::json!({
"Results": [{
"Target": "/",
"Vulnerabilities": [
{"VulnerabilityID": "MOCK-CVE-2024-0001", "Severity": "CRITICAL", "Title": "MOCK: Critical System Vulnerability"},
{"VulnerabilityID": "MOCK-CVE-2024-0002", "Severity": "HIGH", "Title": "MOCK: High Risk Package"},
{"VulnerabilityID": "MOCK-CVE-2024-0003", "Severity": "HIGH", "Title": "MOCK: Outdated Dependency"},
{"VulnerabilityID": "MOCK-CVE-2024-0004", "Severity": "MEDIUM", "Title": "MOCK: Medium Risk Finding"},
{"VulnerabilityID": "MOCK-CVE-2024-0005", "Severity": "MEDIUM", "Title": "MOCK: Configuration Issue"}
]
}]
});
fs::write(&output_file, serde_json::to_string_pretty(&mock_findings).unwrap()).ok();
ScannerResult {
available: false,
ran: false,
version: None,
findings_real: false,
findings: 5,
output_file: Some(format!("trivy-{}.json", nonce)),
error: Some("Scanner not installed - mock data generated".into()),
summary: ScanSummary {
critical: 1,
high: 2,
medium: 2,
low: 0,
info: 0,
},
}
}
/// Parse trivy JSON output for findings summary.
fn parse_trivy_findings(output_file: &str) -> ScanSummary {
let mut summary = ScanSummary::default();
if let Ok(contents) = fs::read_to_string(output_file) {
if let Ok(report) = serde_json::from_str::<serde_json::Value>(&contents) {
if let Some(results) = report["Results"].as_array() {
for result in results {
if let Some(vulns) = result["Vulnerabilities"].as_array() {
for vuln in vulns {
if let Some(severity) = vuln["Severity"].as_str() {
match severity.to_uppercase().as_str() {
"CRITICAL" => summary.critical += 1,
"HIGH" => summary.high += 1,
"MEDIUM" => summary.medium += 1,
"LOW" => summary.low += 1,
_ => summary.info += 1,
}
}
}
}
}
}
}
}
summary
}
/// Execute the sovereign-scan command.
fn run_sovereign_scan(payload: &CommandPayload, config: &Config) -> CommandResult {
use time::format_description::well_known::Rfc3339;
let profile = payload.args["profile"].as_str().unwrap_or("default");
let scanners: Vec<&str> = payload.args["scanners"]
.as_array()
.map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
.unwrap_or_else(|| vec!["nuclei", "trivy"]);
tracing::info!("Running sovereign-scan: profile={}, scanners={:?}", profile, scanners);
// Create receipts directory
let receipts_dir = format!("{}/receipts/sovereign", config.vaultmesh_root);
if let Err(e) = fs::create_dir_all(&receipts_dir) {
return CommandResult {
node_id: payload.node_id,
ts: OffsetDateTime::now_utc(),
nonce: payload.nonce.clone(),
cmd: "sovereign-scan".into(),
status: "error".into(),
exit_code: None,
stdout: String::new(),
stderr: format!("Failed to create receipts directory: {}", e),
};
}
let mut scanner_results = serde_json::Map::new();
let mut summary = ScanSummary::default();
// Run each requested scanner
for scanner in &scanners {
match *scanner {
"nuclei" => {
let result = run_nuclei(&payload.nonce, profile, &receipts_dir);
summary.merge(&result.summary);
scanner_results.insert("nuclei".into(), serde_json::to_value(&result).unwrap());
}
"trivy" => {
let result = run_trivy(&payload.nonce, profile, &receipts_dir);
summary.merge(&result.summary);
scanner_results.insert("trivy".into(), serde_json::to_value(&result).unwrap());
}
_ => {
tracing::warn!("Unknown scanner requested: {}", scanner);
}
}
}
// Build receipt (without hash first)
let mut receipt = serde_json::json!({
"kind": "sovereign-scan",
"node_id": payload.node_id,
"nonce": payload.nonce,
"timestamp": OffsetDateTime::now_utc().format(&Rfc3339).unwrap(),
"os_profile": config.os_profile,
"vaultmesh_root": config.vaultmesh_root,
"profile": profile,
"scanners": scanner_results,
"summary": summary
});
// Hash the receipt content (before adding hash field)
let receipt_bytes = serde_json::to_vec(&receipt).unwrap();
let hash = blake3::hash(&receipt_bytes);
let hash_hex = format!("blake3:{}", hash.to_hex());
receipt["receipt_hash"] = serde_json::Value::String(hash_hex.clone());
// Write receipt file
let receipt_path = format!("{}/SCAN-{}.json", receipts_dir, payload.nonce);
if let Err(e) = fs::write(&receipt_path, serde_json::to_string_pretty(&receipt).unwrap()) {
tracing::error!("Failed to write receipt: {}", e);
}
tracing::info!("Sovereign scan complete: receipt_hash={}", hash_hex);
// Append to proofchain log
let proofchain_dir = format!("{}/proofchain", config.vaultmesh_root);
fs::create_dir_all(&proofchain_dir).ok();
let proofchain_entry = serde_json::json!({
"ts": OffsetDateTime::now_utc().format(&Rfc3339).unwrap(),
"node_id": payload.node_id,
"nonce": payload.nonce,
"receipt_hash": hash_hex
});
let proofchain_path = format!("{}/sovereign-scans.jsonl", proofchain_dir);
if let Ok(mut file) = fs::OpenOptions::new()
.create(true)
.append(true)
.open(&proofchain_path)
{
use std::io::Write;
writeln!(file, "{}", serde_json::to_string(&proofchain_entry).unwrap()).ok();
}
// Determine if any scanner had real findings
let any_real = scanner_results.values().any(|v| {
v.get("findings_real").and_then(|f| f.as_bool()).unwrap_or(false)
});
// Return summary as stdout
let stdout = serde_json::to_string_pretty(&serde_json::json!({
"status": "completed",
"receipt_path": receipt_path,
"receipt_hash": hash_hex,
"summary": summary,
"findings_real": any_real
}))
.unwrap();
CommandResult {
node_id: payload.node_id,
ts: OffsetDateTime::now_utc(),
nonce: payload.nonce.clone(),
cmd: "sovereign-scan".into(),
status: "success".into(),
exit_code: Some(0),
stdout,
stderr: String::new(),
}
}
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()),
))
.with(tracing_subscriber::fmt::layer())
.init();
let config = Config::from_env();
let hostname = hostname::get()
.map(|h| h.to_string_lossy().to_string())
.unwrap_or_else(|_| "unknown".into());
tracing::info!(
"VaultMesh Node Agent starting (node_id={}, hostname={})",
config.node_id,
hostname
);
tracing::info!("Command Center URL: {}", config.cc_url);
tracing::info!("Heartbeat interval: {}s", config.heartbeat_secs);
// Load CC public key for signature verification
let cc_pubkey_path = std::env::var("VAULTMESH_CC_PUBKEY")
.unwrap_or_else(|_| "/etc/vaultmesh/cc-ed25519.pub".into());
let cc_pubkey = match load_cc_public_key(&cc_pubkey_path) {
Ok(pk) => {
tracing::info!("Loaded CC public key from {}", cc_pubkey_path);
Some(pk)
}
Err(e) => {
tracing::warn!("Could not load CC public key: {} - commands disabled", e);
None
}
};
let client = reqwest::Client::new();
let heartbeat_url = format!("{}/api/agent/heartbeat", config.cc_url);
let commands_url = format!("{}/api/agent/commands?node_id={}", config.cc_url, config.node_id);
let result_url = format!("{}/api/agent/command-result", config.cc_url);
loop {
// Send heartbeat
let metrics = gather_metrics();
let heartbeat = NodeHeartbeat {
node_id: config.node_id,
hostname: hostname.clone(),
os_profile: config.os_profile.clone(),
cloudflare_ok: check_cloudflared(),
services_ok: check_services(&config.vaultmesh_root),
vaultmesh_root: config.vaultmesh_root.clone(),
timestamp: OffsetDateTime::now_utc(),
metrics,
};
tracing::debug!("Sending heartbeat: {:?}", heartbeat);
match client.post(&heartbeat_url).json(&heartbeat).send().await {
Ok(resp) if resp.status().is_success() => {
tracing::info!(
"Heartbeat sent (cf={}, svc={}, load={:.2}, mem={}MiB, disk={:.1}GiB)",
heartbeat.cloudflare_ok,
heartbeat.services_ok,
heartbeat.metrics.load1.unwrap_or(0.0),
heartbeat.metrics.mem_used_mb.unwrap_or(0),
heartbeat.metrics.disk_root_used_gb.unwrap_or(0.0),
);
}
Ok(resp) => {
tracing::warn!("Heartbeat returned non-success status: {}", resp.status());
}
Err(e) => {
tracing::error!("Failed to send heartbeat: {}", e);
}
}
// Poll for commands (only if we have the public key)
if let Some(ref pubkey) = cc_pubkey {
match client.get(&commands_url).send().await {
Ok(resp) if resp.status().is_success() => {
match resp.json::<Vec<SignedCommand>>().await {
Ok(commands) => {
for cmd in commands {
tracing::info!("Received command: {} (nonce={})", cmd.payload.cmd, cmd.payload.nonce);
// Verify signature
if let Err(e) = verify_command(&cmd, pubkey) {
tracing::error!("Signature verification failed: {}", e);
let result = CommandResult {
node_id: cmd.payload.node_id,
ts: OffsetDateTime::now_utc(),
nonce: cmd.payload.nonce.clone(),
cmd: cmd.payload.cmd.clone(),
status: "rejected".into(),
exit_code: None,
stdout: String::new(),
stderr: format!("Signature verification failed: {}", e),
};
let _ = client.post(&result_url).json(&result).send().await;
continue;
}
tracing::info!("Signature verified, executing command");
let result = execute_command(&cmd, &config);
tracing::info!(
"Command '{}' completed: {} (exit={:?})",
result.cmd,
result.status,
result.exit_code
);
// Report result
if let Err(e) = client.post(&result_url).json(&result).send().await {
tracing::error!("Failed to report command result: {}", e);
}
}
}
Err(e) => {
tracing::warn!("Failed to parse commands response: {}", e);
}
}
}
Ok(resp) => {
tracing::warn!("Commands endpoint returned: {}", resp.status());
}
Err(e) => {
tracing::debug!("Failed to poll commands: {}", e);
}
}
}
sleep(Duration::from_secs(config.heartbeat_secs)).await;
}
}