commit 789397eb3365768e4ced8bd5c0ef328681d17dae Author: Sovereign Date: Thu Dec 18 00:29:15 2025 +0100 chore: initial import diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..814e710 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +/target +Cargo.lock +*.log +.env +/cc-ed25519.key diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..18c3aa9 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,260 @@ +# Changelog + +## V0.7.2 – Communication Layer + +### Unified Event API +- Added `EventEnvelope` as the canonical message format for all comms events. +- New event kinds: `note`, `incident`, `ack`, `tag`, `resolve`. +- Events support optional `node_id` for node-specific or global scope. +- Author field tracks origin: "operator", "system", "vm-copilot", etc. + +### New API Endpoints +- `POST /api/events` - Create a new event envelope. + - Server assigns `id` and `ts`. + - Author can be overridden via `X-VM-Author` header. +- `GET /api/events` - Query events with filtering: + - `?since=RFC3339` - Filter by timestamp. + - `?kind=note,incident` - Comma-separated kind filter. + - `?node_id=uuid` - Filter by node. + - `?limit=N` - Max results (default: 100). + +### SSE Integration +- Envelope events broadcast via SSE using their `kind` as the event name. +- E.g., `event: note` for note envelopes. + +### Persistence +- Events logged to `events.jsonl` in `$VAULTMESH_LOG_DIR`. +- Replayed on startup to restore in-memory state. +- Memory-bounded to 500 most recent envelopes. + +### Usage +```bash +# Post a note +curl -X POST http://127.0.0.1:8088/api/events \ + -H "Content-Type: application/json" \ + -d '{"kind":"note","node_id":"uuid","author":"operator","body":{"text":"Test","severity":"info"}}' + +# Query events +curl "http://127.0.0.1:8088/api/events?since=2025-01-01T00:00:00Z&kind=note" +``` + +> V0.7.2 transforms CC from a control plane into a coordination plane. + +--- + +## V0.7.1 – Mission Console + +### NASA-Style Dashboard +- Added `GET /console` endpoint with 3-panel Mission Console UI. +- Global Mission Bar with fleet KPIs: Total, Healthy, Attention, Critical, Last Scan. +- Left panel: Clickable node list with live status pills (OK/ATTN/CRIT). +- Center panel: Selected node telemetry with metrics cards and per-node event timeline. +- Right panel: Attention summary chips, global scan findings, and global event feed. + +### Live SSE Integration +- Real-time heartbeat indicator glow when nodes report in. +- Live status pill updates via `attention` events. +- Dynamic KPI recalculation without page refresh. +- Per-node and global event timelines populated from SSE stream. + +### Visual Design +- Dark NASA-inspired theme (#05070a background). +- Monospace typography (JetBrains Mono). +- CSS Grid 3-column layout with sticky headers. +- Animated pulse for critical nodes. +- Color-coded severity indicators (green/yellow/red). + +### Usage +```bash +# Open Mission Console +open http://127.0.0.1:8088/console + +# Original table dashboard still at / +open http://127.0.0.1:8088/ +``` + +> V0.7.1 transforms the basic table into an operator-grade control room. + +--- + +## V0.7 – SSE Event Bus + +### Real-Time Event Streaming +- Added `GET /events` endpoint for Server-Sent Events (SSE). +- Events are streamed in real-time as they occur (no page refresh needed). +- Broadcast channel (`tokio::sync::broadcast`) distributes events to all connected clients. + +### Event Types +Four named SSE events are published: +- `heartbeat` – when a node sends a heartbeat. +- `scan` – when a sovereign-scan completes. +- `command` – when any command result is reported. +- `attention` – when a node's attention status changes. + +### Wire Format +Each event is JSON-encoded with wire-efficient payloads: +``` +event: heartbeat +data: {"ts":"2024-01-15T10:30:00Z","node_id":"...","hostname":"vault-01","cloudflare_ok":true,...} + +event: attention +data: {"ts":"2024-01-15T10:30:00Z","node_id":"...","needs_attention":true,"reasons":["critical_findings"]} +``` + +### Dashboard Integration +- Minimal JS probe added to dashboard and node detail pages. +- Events are logged to browser console (`[SSE][HB]`, `[SSE][SCAN]`, etc.) for debugging. +- Foundation for V0.7.1 Mission Console with live row updates. + +### Keepalive +- SSE connection sends keepalive every 15 seconds to prevent timeouts. +- Clients that lag behind receive a warning and skip missed events. + +### Testing +```bash +# Connect to SSE stream +curl -N http://127.0.0.1:8088/events + +# Or open dashboard in browser, open DevTools Console +# Trigger heartbeat → see [SSE][HB] in console +``` + +> V0.7 provides the real-time infrastructure for live dashboards without page refresh. + +--- + +## V0.6.1 – Log Tools + +### CLI Log Commands +- Added `logs view` subcommand to query event logs with filters: + - `--kind heartbeats|scans|commands` – filter by event type + - `--node NODE_ID` – filter by node UUID + - `--since 1h|24h|7d|30m` – time-based filtering + - `--min-severity info|low|medium|high|critical` – severity filter (scans only) + - `-n|--limit N` – number of events to show (default: 20) +- Added `logs tail` subcommand for real-time log following +- Added `logs stats` subcommand for per-node and per-kind statistics +- Server mode unchanged: `vm-cc` or `vm-cc serve` starts HTTP server + +### Files Added +- `command-center/src/cli.rs` – CLI argument parsing with clap +- `command-center/src/logs.rs` – Log reading and query logic + +### Usage Examples +```bash +vm-cc logs view # last 20 events +vm-cc logs view --kind scans # last 20 scans +vm-cc logs view --since 1h --kind heartbeats +vm-cc logs view --kind scans --min-severity high +vm-cc logs stats # event counts by node +vm-cc logs tail # follow all logs +``` + +--- + +## V0.6 – Append-Only Persistence + +### Event Logging +- All CC state changes are now persisted to append-only JSONL log files. +- Log files are stored in `$VAULTMESH_LOG_DIR` (default: `/var/lib/vaultmesh/cc-logs`). +- Three event types: + - `heartbeats.jsonl` – HeartbeatEvent for each agent heartbeat. + - `scans.jsonl` – ScanEvent when sovereign-scan completes successfully. + - `commands.jsonl` – CommandEvent when agent reports any command result. + +### Replay on Startup +- CC replays all log files on startup to reconstruct in-memory state. +- Nodes, heartbeat history, scan results, and command history survive restarts. +- Log replay counts are printed at startup for visibility. + +### Ledger-Ready Format +- JSONL format is ledger-native; each line is a self-contained JSON object. +- Designed as foundation for V0.8 Ledger Bridge (Merkle tree over logs). +- No external database dependencies (SQLite not required). + +### Configuration + +| Variable | Default | Description | +|----------------------|------------------------------|----------------------------| +| `VAULTMESH_LOG_DIR` | `/var/lib/vaultmesh/cc-logs` | Directory for event logs | + +> With V0.6, CC state is durable across restarts. The append-only design provides an audit trail of all fleet events. + +--- + +## V0.5 – Fleet Orchestrator + +### Scan Orchestrator +- Background scheduler runs every `VAULTMESH_SCHEDULER_TICK_SECONDS` (default: `300s`). +- Automatically queues `sovereign-scan` commands for nodes whose last scan is older than + `VAULTMESH_SCAN_INTERVAL_HOURS` (default: `24h`). +- Commands are signed with the CC Ed25519 key and added to the per-node command queue for agent pickup. + +### Staleness / Drift Detection +- New **Attention** column on the main dashboard for at-a-glance fleet health. +- Attention reasons: + - `never_scanned` – node has no scan history. + - `scan_stale` – last scan older than `VAULTMESH_SCAN_STALE_HOURS`. + - `heartbeat_stale` – no heartbeat within `VAULTMESH_HEARTBEAT_STALE_MINUTES`. + - `critical_findings` – last scan reported critical vulnerabilities. + - `high_findings` – last scan reported high-severity vulnerabilities. + - `cloudflare_down` – Cloudflare service flag is `false`. + - `services_down` – services flag is `false`. +- Visual cues: + - `.attn-ok` – green background, label `OK` for healthy nodes. + - `.attn-bad` – red background, comma-separated reasons for nodes needing attention. + +### Command Policy Enforcement +- Introduced `CommandPolicy` with: + - `global_allowed` commands. + - Optional `per_profile` allowlists keyed by OS profile. +- Default allowed commands: + - `service-status` + - `tail-journal` + - `sovereign-scan` + - `restart-service` +- Web UI returns HTTP `403 Forbidden` for disallowed commands. +- Scheduler respects policy and will not auto-queue scans if they are disallowed for the node's profile. + +### Configuration (Environment Variables) + +| Variable | Default | Description | +|------------------------------------|---------|-------------------------------------| +| `VAULTMESH_SCAN_INTERVAL_HOURS` | `24` | How often each node should be scanned | +| `VAULTMESH_SCAN_STALE_HOURS` | `48` | Scan staleness threshold | +| `VAULTMESH_HEARTBEAT_STALE_MINUTES`| `10` | Heartbeat staleness threshold | +| `VAULTMESH_SCHEDULER_TICK_SECONDS` | `300` | Scheduler loop interval | + +> With V0.5, the fleet tends itself; operators only need to investigate nodes marked red in the Attention column. + +--- + +## V0.4.1 – Scan Status UI + +- Dashboard columns: Last Scan, Crit/High, Source (REAL/MOCK badges). +- Parse `sovereign-scan` stdout to track `LastScan` in state. +- Visual indicators for critical (red) and high (orange) findings. + +## V0.4 – Sovereign Scan Integration + +- Node agent executes `sovereign-scan` command. +- Scan results reported via command-result API. +- ProofChain receipts written to `/var/lib/vaultmesh/proofchain/`. + +## V0.3 – Signed Commands + +- Ed25519 command signing (CC signs, agent verifies). +- Command queue per node with nonce replay protection. +- Commands: `service-status`, `restart-service`, `tail-journal`. + +## V0.2 – Node Metrics + +- Added system metrics to heartbeat (load, memory, disk). +- Node detail page with metrics cards. +- Heartbeat history tracking. + +## V0.1 – Initial Release + +- Command Center with HTML dashboard. +- Node Agent with heartbeat loop. +- Basic health monitoring (`cloudflare_ok`, `services_ok`). diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..455291c --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,7 @@ +[workspace] +members = [ + "command-center", + "node-agent", +] + +resolver = "2" diff --git a/README.md b/README.md new file mode 100644 index 0000000..5cbb7fb --- /dev/null +++ b/README.md @@ -0,0 +1,155 @@ +# VaultMesh Command Center + +Minimal, no-bloat control plane for VaultMesh nodes. + +- Rust backend (Axum) with server-rendered HTML (HTMX-ready) +- Node agent (Rust daemon) that runs on each VaultMesh node +- Zero external infra deps (no Redis, Kafka, k8s) +- Cloudflare-native: fronted by Cloudflare Tunnel + Access + +## Repository Layout + +```text +vaultmesh-command-center/ +├── Cargo.toml # Workspace manifest +├── README.md +├── .gitignore +├── command-center/ # Backend + Web UI +│ ├── Cargo.toml +│ └── src/ +│ ├── main.rs # Entry point, server setup +│ ├── routes.rs # HTTP handlers +│ └── state.rs # AppState, in-memory node store +├── node-agent/ # Daemon for each VaultMesh node +│ ├── Cargo.toml +│ └── src/ +│ ├── main.rs # Heartbeat loop +│ └── config.rs # Env config loader +├── docs/ +│ ├── ARCHITECTURE.md # How it all fits together +│ └── NODE_AGENT_CONTRACT.md # Agent API spec +└── systemd/ + ├── vaultmesh-command-center.service + └── vaultmesh-node-agent.service +``` + +## Quick Start + +```bash +# Clone and build +git clone vaultmesh-command-center +cd vaultmesh-command-center + +# Run the command center locally +cd command-center +RUST_LOG=info cargo run +# listens on 127.0.0.1:8088 + +# In another terminal, run the agent (pointing at local CC) +cd ../node-agent +RUST_LOG=info VAULTMESH_OS_PROFILE=ArchVault cargo run +``` + +Then: +- Put the Command Center behind a Cloudflare Tunnel. +- Protect it with Cloudflare Access. +- Install the node agent as a systemd service on each VaultMesh node. + +## Deployment + +### Command Center + +1. Build release binary: + ```bash + cargo build --release -p vaultmesh-command-center + ``` + +2. Copy to `/usr/local/bin/`: + ```bash + sudo cp target/release/vaultmesh-command-center /usr/local/bin/ + ``` + +3. Install systemd unit: + ```bash + sudo cp systemd/vaultmesh-command-center.service /etc/systemd/system/ + sudo systemctl daemon-reload + sudo systemctl enable --now vaultmesh-command-center + ``` + +4. Configure Cloudflare Tunnel to point at `http://127.0.0.1:8088`. + +### Node Agent + +1. Build release binary: + ```bash + cargo build --release -p vaultmesh-node-agent + ``` + +2. Copy to each node: + ```bash + sudo cp target/release/vaultmesh-node-agent /usr/local/bin/ + ``` + +3. Create environment file `/etc/vaultmesh/agent.env`: + ```bash + VAULTMESH_CC_URL=https://cc.your-domain.example + VAULTMESH_OS_PROFILE=ArchVault + VAULTMESH_ROOT=/var/lib/vaultmesh + VAULTMESH_HEARTBEAT_SECS=30 + ``` + +4. Install systemd unit: + ```bash + sudo cp systemd/vaultmesh-node-agent.service /etc/systemd/system/ + sudo systemctl daemon-reload + sudo systemctl enable --now vaultmesh-node-agent + ``` + +## API Endpoints + +| Method | Path | Description | +|--------|------------------------------------|------------------------------------------| +| GET | `/` | HTML dashboard showing all nodes | +| GET | `/nodes` | JSON array of all node heartbeats | +| GET | `/nodes/:id` | Node detail page (HTML) | +| POST | `/nodes/:id/commands` | Queue command for node (web form or API) | +| POST | `/api/agent/heartbeat` | Agent heartbeat endpoint | +| GET | `/api/agent/commands?node_id=` | Agent polls for pending commands | +| POST | `/api/agent/command-result` | Agent reports command execution result | + +## Fleet Operation Model (V0.5) + +**Green fleet:** + +- Attention = `OK`. +- Heartbeats fresh (age < `VAULTMESH_HEARTBEAT_STALE_MINUTES`). +- Last scan age < `VAULTMESH_SCAN_STALE_HOURS`. +- No critical or high findings. + +**Yellow/Red fleet:** + +Check Attention reasons in order: + +1. `heartbeat_stale` → connectivity / host issue. +2. `cloudflare_down` / `services_down` → control plane or local service failure. +3. `never_scanned` / `scan_stale` → wait for scheduler or trigger manual scan (if policy allows). +4. `critical_findings` / `high_findings` → prioritize remediation on that node. + +**Policy guardrail:** + +- Disallowed commands are blocked with HTTP 403. +- Scheduler respects policy; `sovereign-scan` must be allowed for a profile to be auto-queued. + +## Configuration + +| Variable | Default | Description | +|------------------------------------|-------------------|------------------------------| +| `VAULTMESH_SCAN_INTERVAL_HOURS` | `24` | Auto-scan interval | +| `VAULTMESH_SCAN_STALE_HOURS` | `48` | Scan staleness threshold | +| `VAULTMESH_HEARTBEAT_STALE_MINUTES`| `10` | Heartbeat staleness | +| `VAULTMESH_SCHEDULER_TICK_SECONDS` | `300` | Scheduler tick interval | +| `VAULTMESH_CC_KEY_PATH` | `cc-ed25519.key` | Command signing key path | + +## License + +MIT diff --git a/command-center/Cargo.toml b/command-center/Cargo.toml new file mode 100644 index 0000000..bd6334f --- /dev/null +++ b/command-center/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "vaultmesh-command-center" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = { version = "0.7", features = ["macros", "json"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tower = "0.4" +tower-http = { version = "0.5", features = ["trace"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +uuid = { version = "1", features = ["v4", "serde"] } +time = { version = "0.3", features = ["serde", "macros", "formatting", "parsing"] } +ed25519-dalek = { version = "2", features = ["rand_core"] } +rand = "0.8" +base64 = "0.22" +anyhow = "1" +clap = { version = "4", features = ["derive"] } +async-stream = "0.3" +futures-core = "0.3" diff --git a/command-center/src/cli.rs b/command-center/src/cli.rs new file mode 100644 index 0000000..8a3c598 --- /dev/null +++ b/command-center/src/cli.rs @@ -0,0 +1,61 @@ +use clap::{Parser, Subcommand}; + +#[derive(Parser)] +#[command(name = "vm-cc")] +#[command(about = "VaultMesh Command Center")] +pub struct Cli { + /// Subcommand to run. If omitted, starts the HTTP server. + #[command(subcommand)] + pub command: Option, +} + +#[derive(Subcommand)] +pub enum Commands { + /// View and query event logs + Logs { + #[command(subcommand)] + action: LogsAction, + }, + /// Start the HTTP server (default if no subcommand is provided) + Serve, +} + +#[derive(Subcommand)] +pub enum LogsAction { + /// View recent log events + View { + /// Event kind: heartbeats, scans, commands (default: all) + #[arg(short, long)] + kind: Option, + + /// Filter by node ID (UUID) + #[arg(short, long)] + node: Option, + + /// Time filter: 1h, 24h, 7d, 30m, etc. + #[arg(short, long)] + since: Option, + + /// Minimum severity for scans: info, low, medium, high, critical + #[arg(long)] + min_severity: Option, + + /// Number of events to show (default: 20) + #[arg(short = 'n', long, default_value = "20")] + limit: usize, + }, + + /// Follow logs in real-time + Tail { + /// Event kind to follow + #[arg(short, long)] + kind: Option, + + /// Filter by node ID (UUID) + #[arg(short, long)] + node: Option, + }, + + /// Show log statistics + Stats, +} diff --git a/command-center/src/logs.rs b/command-center/src/logs.rs new file mode 100644 index 0000000..6211a93 --- /dev/null +++ b/command-center/src/logs.rs @@ -0,0 +1,396 @@ +// V0.6.1: CLI log reading and query commands + +use crate::state::{CommandEvent, HeartbeatEvent, ScanEvent}; +use std::collections::HashMap; +use std::io::{BufRead, BufReader}; +use std::path::PathBuf; +use time::{Duration, OffsetDateTime}; +use uuid::Uuid; + +/// Reader for JSONL log files. +pub struct LogReader { + dir: PathBuf, +} + +impl LogReader { + pub fn new() -> Self { + let dir = std::env::var("VAULTMESH_LOG_DIR") + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from("/var/lib/vaultmesh/cc-logs")); + LogReader { dir } + } + + pub fn read_heartbeats(&self) -> Vec { + self.read_jsonl("heartbeats.jsonl") + } + + pub fn read_scans(&self) -> Vec { + self.read_jsonl("scans.jsonl") + } + + pub fn read_commands(&self) -> Vec { + self.read_jsonl("commands.jsonl") + } + + fn read_jsonl(&self, filename: &str) -> Vec { + let path = self.dir.join(filename); + if !path.exists() { + return vec![]; + } + + let file = match std::fs::File::open(&path) { + Ok(f) => f, + Err(_) => return vec![], + }; + + BufReader::new(file) + .lines() + .filter_map(|line| line.ok()) + .filter(|line| !line.trim().is_empty()) + .filter_map(|line| serde_json::from_str(&line).ok()) + .collect() + } +} + +/// Parse a duration string like "1h", "24h", "7d", "30m". +pub fn parse_duration(s: &str) -> Option { + let s = s.trim().to_lowercase(); + if s.ends_with('h') { + s[..s.len() - 1].parse::().ok().map(Duration::hours) + } else if s.ends_with('d') { + s[..s.len() - 1].parse::().ok().map(Duration::days) + } else if s.ends_with('m') { + s[..s.len() - 1].parse::().ok().map(Duration::minutes) + } else { + None + } +} + +/// Convert severity string to a rank for filtering. +pub fn severity_to_rank(s: &str) -> u8 { + match s.to_lowercase().as_str() { + "critical" => 5, + "high" => 4, + "medium" => 3, + "low" => 2, + "info" => 1, + _ => 0, + } +} + +/// View recent log events with filtering. +pub fn cmd_logs_view( + kind: Option, + node: Option, + since: Option, + min_severity: Option, + limit: usize, +) { + let reader = LogReader::new(); + let now = OffsetDateTime::now_utc(); + + let since_ts = since.as_ref().and_then(|s| parse_duration(s)).map(|d| now - d); + + let node_filter: Option = node.as_ref().and_then(|s| s.parse().ok()); + + let min_sev_rank = min_severity.as_ref().map(|s| severity_to_rank(s)).unwrap_or(0); + + // Collect events based on kind + let mut events: Vec<(OffsetDateTime, String)> = vec![]; + + if kind.is_none() || kind.as_deref() == Some("heartbeats") { + for e in reader.read_heartbeats() { + if let Some(ts) = since_ts { + if e.ts < ts { + continue; + } + } + if let Some(nf) = node_filter { + if e.node_id != nf { + continue; + } + } + let line = format!( + "[HB] {} {} {} cf={} svc={}", + format_ts(e.ts), + e.hostname, + e.os_profile, + if e.heartbeat.cloudflare_ok { "OK" } else { "DOWN" }, + if e.heartbeat.services_ok { "OK" } else { "DOWN" } + ); + events.push((e.ts, line)); + } + } + + if kind.is_none() || kind.as_deref() == Some("scans") { + for e in reader.read_scans() { + if let Some(ts) = since_ts { + if e.ts < ts { + continue; + } + } + if let Some(nf) = node_filter { + if e.node_id != nf { + continue; + } + } + let max_sev = if e.summary.critical > 0 { + 5 + } else if e.summary.high > 0 { + 4 + } else if e.summary.medium > 0 { + 3 + } else if e.summary.low > 0 { + 2 + } else { + 1 + }; + if max_sev < min_sev_rank { + continue; + } + let source = if e.findings_real { "REAL" } else { "MOCK" }; + let line = format!( + "[SCAN] {} C:{} H:{} M:{} L:{} [{}]", + format_ts(e.ts), + e.summary.critical, + e.summary.high, + e.summary.medium, + e.summary.low, + source + ); + events.push((e.ts, line)); + } + } + + if kind.is_none() || kind.as_deref() == Some("commands") { + for e in reader.read_commands() { + if let Some(ts) = since_ts { + if e.ts < ts { + continue; + } + } + if let Some(nf) = node_filter { + if e.node_id != nf { + continue; + } + } + let line = format!( + "[CMD] {} {} status={} exit={:?}", + format_ts(e.ts), + e.cmd, + e.status, + e.exit_code + ); + events.push((e.ts, line)); + } + } + + if events.is_empty() { + println!("No events found."); + return; + } + + // Sort by timestamp descending, take limit + events.sort_by(|a, b| b.0.cmp(&a.0)); + events.truncate(limit); + + // Print in chronological order + for (_, line) in events.into_iter().rev() { + println!("{}", line); + } +} + +/// Show log statistics. +pub fn cmd_logs_stats() { + let reader = LogReader::new(); + + let heartbeats = reader.read_heartbeats(); + let scans = reader.read_scans(); + let commands = reader.read_commands(); + + println!("=== Log Statistics ===\n"); + println!("Heartbeats: {}", heartbeats.len()); + println!("Scans: {}", scans.len()); + println!("Commands: {}", commands.len()); + + // Count by node + let mut by_node: HashMap = HashMap::new(); + + for e in &heartbeats { + let entry = by_node.entry(e.node_id).or_insert((0, 0, 0, e.hostname.clone())); + entry.0 += 1; + } + for e in &scans { + let entry = by_node.entry(e.node_id).or_insert((0, 0, 0, String::new())); + entry.1 += 1; + } + for e in &commands { + let entry = by_node.entry(e.node_id).or_insert((0, 0, 0, String::new())); + entry.2 += 1; + } + + if !by_node.is_empty() { + println!("\n=== By Node ===\n"); + println!( + "{:<38} {:<16} {:>8} {:>8} {:>8}", + "Node ID", "Hostname", "HB", "Scans", "Cmds" + ); + println!("{:-<80}", ""); + for (node_id, (hb, sc, cmd, hostname)) in by_node { + println!( + "{:<38} {:<16} {:>8} {:>8} {:>8}", + node_id, hostname, hb, sc, cmd + ); + } + } +} + +/// Follow logs in real-time (tail -f style). +pub fn cmd_logs_tail(kind: Option, node: Option) { + use std::io::Seek; + use std::thread; + use std::time::Duration as StdDuration; + + let node_filter: Option = node.as_ref().and_then(|s| s.parse().ok()); + + // Track file positions + let mut hb_pos: u64 = 0; + let mut scan_pos: u64 = 0; + let mut cmd_pos: u64 = 0; + + // Seek to end of existing files + let log_dir = std::env::var("VAULTMESH_LOG_DIR") + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from("/var/lib/vaultmesh/cc-logs")); + + if let Ok(mut f) = std::fs::File::open(log_dir.join("heartbeats.jsonl")) { + hb_pos = f.seek(std::io::SeekFrom::End(0)).unwrap_or(0); + } + if let Ok(mut f) = std::fs::File::open(log_dir.join("scans.jsonl")) { + scan_pos = f.seek(std::io::SeekFrom::End(0)).unwrap_or(0); + } + if let Ok(mut f) = std::fs::File::open(log_dir.join("commands.jsonl")) { + cmd_pos = f.seek(std::io::SeekFrom::End(0)).unwrap_or(0); + } + + println!("Following logs (Ctrl+C to stop)...\n"); + + loop { + // Check heartbeats + if kind.is_none() || kind.as_deref() == Some("heartbeats") { + if let Some((events, new_pos)) = + read_new_lines::(&log_dir.join("heartbeats.jsonl"), hb_pos) + { + hb_pos = new_pos; + for e in events { + if let Some(nf) = node_filter { + if e.node_id != nf { + continue; + } + } + println!( + "[HB] {} {} {}", + format_ts(e.ts), + e.hostname, + e.os_profile + ); + } + } + } + + // Check scans + if kind.is_none() || kind.as_deref() == Some("scans") { + if let Some((events, new_pos)) = + read_new_lines::(&log_dir.join("scans.jsonl"), scan_pos) + { + scan_pos = new_pos; + for e in events { + if let Some(nf) = node_filter { + if e.node_id != nf { + continue; + } + } + let source = if e.findings_real { "REAL" } else { "MOCK" }; + println!( + "[SCAN] {} C:{} H:{} M:{} L:{} [{}]", + format_ts(e.ts), + e.summary.critical, + e.summary.high, + e.summary.medium, + e.summary.low, + source + ); + } + } + } + + // Check commands + if kind.is_none() || kind.as_deref() == Some("commands") { + if let Some((events, new_pos)) = + read_new_lines::(&log_dir.join("commands.jsonl"), cmd_pos) + { + cmd_pos = new_pos; + for e in events { + if let Some(nf) = node_filter { + if e.node_id != nf { + continue; + } + } + println!( + "[CMD] {} {} status={}", + format_ts(e.ts), + e.cmd, + e.status + ); + } + } + } + + thread::sleep(StdDuration::from_secs(1)); + } +} + +/// Read new lines from a file starting at a given position. +fn read_new_lines( + path: &PathBuf, + start_pos: u64, +) -> Option<(Vec, u64)> { + use std::io::Seek; + + let mut file = std::fs::File::open(path).ok()?; + let end_pos = file.seek(std::io::SeekFrom::End(0)).ok()?; + + if end_pos <= start_pos { + return None; + } + + file.seek(std::io::SeekFrom::Start(start_pos)).ok()?; + let reader = BufReader::new(file); + let mut events = Vec::new(); + + for line in reader.lines() { + if let Ok(line) = line { + if line.trim().is_empty() { + continue; + } + if let Ok(event) = serde_json::from_str(&line) { + events.push(event); + } + } + } + + Some((events, end_pos)) +} + +/// Format timestamp for display. +fn format_ts(ts: OffsetDateTime) -> String { + format!( + "{:04}-{:02}-{:02} {:02}:{:02}:{:02}", + ts.year(), + ts.month() as u8, + ts.day(), + ts.hour(), + ts.minute(), + ts.second() + ) +} diff --git a/command-center/src/main.rs b/command-center/src/main.rs new file mode 100644 index 0000000..b9370f5 --- /dev/null +++ b/command-center/src/main.rs @@ -0,0 +1,174 @@ +mod cli; +mod logs; +mod routes; +mod state; + +use crate::cli::{Cli, Commands, LogsAction}; +use crate::routes::app; +use crate::state::{AppState, CommandPayload, SignedCommand}; +use base64::Engine; +use clap::Parser; +use ed25519_dalek::{Signer, SigningKey}; +use std::net::SocketAddr; +use std::path::Path; +use std::time::Duration; +use time::OffsetDateTime; +use tokio::net::TcpListener; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use uuid::Uuid; + +/// Load an existing Ed25519 signing key or generate a new one. +fn load_or_init_signing_key() -> SigningKey { + let key_path = std::env::var("VAULTMESH_CC_KEY_PATH") + .unwrap_or_else(|_| "cc-ed25519.key".into()); + let path = Path::new(&key_path); + + if path.exists() { + let bytes = std::fs::read(path).expect("Failed to read signing key"); + let key_bytes: [u8; 32] = bytes + .try_into() + .expect("Signing key must be exactly 32 bytes"); + SigningKey::from_bytes(&key_bytes) + } else { + // Generate a new key + let mut csprng = rand::thread_rng(); + let signing_key = SigningKey::generate(&mut csprng); + std::fs::write(path, signing_key.to_bytes()).expect("Failed to write signing key"); + tracing::info!("Generated new signing key at {}", key_path); + signing_key + } +} + +/// Background scheduler loop that auto-triggers scans on nodes. +async fn scheduler_loop(state: AppState) { + let tick_secs = std::env::var("VAULTMESH_SCHEDULER_TICK_SECONDS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(300); + + tracing::info!("Scheduler started with tick interval: {}s", tick_secs); + + loop { + if let Err(e) = run_scheduler_tick(&state).await { + tracing::warn!("scheduler tick failed: {e}"); + } + tokio::time::sleep(Duration::from_secs(tick_secs)).await; + } +} + +/// Run a single scheduler tick: check each node and queue scans if needed. +async fn run_scheduler_tick(state: &AppState) -> anyhow::Result<()> { + let now = OffsetDateTime::now_utc(); + let latest = state.list_latest().await; + let last_scans = state.list_last_scans().await; + + for hb in latest.iter() { + // Check policy allows sovereign-scan for this profile + if !state.is_command_allowed(&hb.os_profile, &hb.node_id, "sovereign-scan") { + continue; + } + + // Check if scan needed (never scanned or interval elapsed) + let needs_scan = match last_scans.get(&hb.node_id) { + None => true, + Some(scan) => { + let elapsed = now - scan.ts; + elapsed >= state.scheduler_cfg.scan_interval + } + }; + + if needs_scan { + // Build and sign the command + let payload = CommandPayload { + node_id: hb.node_id, + ts: now, + nonce: Uuid::new_v4().to_string(), + cmd: "sovereign-scan".into(), + args: serde_json::json!({}), + }; + + let payload_json = serde_json::to_vec(&payload)?; + let signature = state.signing_key.sign(&payload_json); + let signature_b64 = + base64::engine::general_purpose::STANDARD.encode(signature.to_bytes()); + + let signed_cmd = SignedCommand { + payload, + signature: signature_b64, + }; + + state.queue_command(signed_cmd).await; + tracing::info!( + "scheduler: queued sovereign-scan for {} ({})", + hb.node_id, + hb.hostname + ); + } + } + + Ok(()) +} + +#[tokio::main] +async fn main() { + let cli = Cli::parse(); + + match cli.command { + Some(Commands::Logs { action }) => { + // CLI log commands - no tracing/server init needed + match action { + LogsAction::View { + kind, + node, + since, + min_severity, + limit, + } => { + logs::cmd_logs_view(kind, node, since, min_severity, limit); + } + LogsAction::Tail { kind, node } => { + logs::cmd_logs_tail(kind, node); + } + LogsAction::Stats => { + logs::cmd_logs_stats(); + } + } + } + Some(Commands::Serve) | None => { + // Run the HTTP server (default) + run_server().await; + } + } +} + +async fn run_server() { + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new( + std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()), + )) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let signing_key = load_or_init_signing_key(); + let state = AppState::new(signing_key); + + // V0.6: Replay logs to reconstruct state + state.replay_from_logs().await; + + // Log the public key for distribution to nodes + tracing::info!("CC public key (base64): {}", state.public_key_b64); + + // Spawn the scheduler background task + let scheduler_state = state.clone(); + tokio::spawn(async move { + scheduler_loop(scheduler_state).await; + }); + + let app = app(state); + + let addr: SocketAddr = "0.0.0.0:8088".parse().unwrap(); + tracing::info!("VaultMesh Command Center listening on http://{}", addr); + + let listener = TcpListener::bind(&addr).await.unwrap(); + axum::serve(listener, app).await.unwrap(); +} diff --git a/command-center/src/routes.rs b/command-center/src/routes.rs new file mode 100644 index 0000000..a48dd6f --- /dev/null +++ b/command-center/src/routes.rs @@ -0,0 +1,2069 @@ +use axum::{ + extract::{Path, Query, State}, + response::{sse::Event, Html, IntoResponse, Sse}, + routing::{get, post}, + Json, Router, +}; +use ed25519_dalek::Signer; +use futures_core::Stream; +use serde::Deserialize; +use std::convert::Infallible; +use std::fmt::Write as _; +use time::OffsetDateTime; +use tokio::sync::broadcast; +use uuid::Uuid; + +use serde::Serialize; +use crate::state::{ + compute_attention, AppState, CommandEventPayload, CommandPayload, CommandResult, + EventEnvelope, HeartbeatEventPayload, LastScan, NodeHeartbeat, NodeHistory, ScanEvent, + ScanEventPayload, ScanSummary, ServerEvent, SignedCommand, +}; + +/// Request to send a command to a node (from HTML form). +#[derive(Debug, Deserialize)] +pub struct SendCommandRequest { + pub cmd: String, + #[serde(default)] + pub args: String, // JSON string from form, parsed later +} + +/// Query params for agent command polling. +#[derive(Debug, Deserialize)] +pub struct AgentCommandsQuery { + pub node_id: Uuid, +} + +// ============================================================================ +// V0.7.2: Communication Layer API types +// ============================================================================ + +/// Request body for POST /api/events +#[derive(Debug, Deserialize)] +pub struct PostEventRequest { + pub kind: String, + #[serde(default)] + pub node_id: Option, + #[serde(default)] + pub author: Option, + #[serde(default)] + pub body: serde_json::Value, +} + +/// Response for POST /api/events +#[derive(Debug, Serialize)] +pub struct PostEventResponse { + pub id: Uuid, + #[serde(with = "time::serde::rfc3339")] + pub ts: OffsetDateTime, +} + +/// Query params for GET /api/events +#[derive(Debug, Deserialize)] +pub struct GetEventsQuery { + #[serde(default)] + pub since: Option, + #[serde(default)] + pub kind: Option, + #[serde(default)] + pub node_id: Option, + #[serde(default = "default_events_limit")] + pub limit: usize, +} + +fn default_events_limit() -> usize { + 100 +} + +pub fn app(state: AppState) -> Router { + Router::new() + .route("/", get(dashboard)) + .route("/console", get(mission_console)) + .route("/nodes", get(list_nodes)) + .route("/nodes/:id", get(node_detail)) + .route("/nodes/:id/commands", post(send_command)) + .route("/events", get(sse_events)) + // V0.7.2: Communication Layer API + .route("/api/events", get(get_events).post(post_event)) + .route("/api/agent/heartbeat", post(heartbeat)) + .route("/api/agent/commands", get(agent_poll_commands)) + .route("/api/agent/command-result", post(agent_report_result)) + .with_state(state) +} + +// Simple HTML dashboard (no JS framework, HTMX-ready later). +pub async fn dashboard(State(state): State) -> Html { + let now = OffsetDateTime::now_utc(); + let latest = state.list_latest().await; + let last_scans = state.list_last_scans().await; + let mut rows = String::new(); + + for hb in latest.iter() { + let cf_class = if hb.cloudflare_ok { "status-ok" } else { "status-fail" }; + let svc_class = if hb.services_ok { "status-ok" } else { "status-fail" }; + + // Get scan info for this node + let (scan_ts, crit_high, source, crit_class, source_class) = match last_scans.get(&hb.node_id) { + Some(scan) => { + let ts = scan.ts.format(&time::format_description::well_known::Rfc3339).unwrap_or_else(|_| "-".into()); + let ch = format!("{} / {}", scan.summary.critical, scan.summary.high); + let src = if scan.findings_real { "REAL" } else { "MOCK" }; + let cc = if scan.summary.critical > 0 { + "crit-critical" + } else if scan.summary.high > 0 { + "crit-high" + } else { + "crit-ok" + }; + let sc = if scan.findings_real { "source-real" } else { "source-mock" }; + (ts, ch, src.to_string(), cc, sc) + } + None => ("-".into(), "-".into(), "-".into(), "", "") + }; + + // Compute attention status + let attn = compute_attention(now, hb, last_scans.get(&hb.node_id), &state.scheduler_cfg); + let attn_label = if attn.needs_attention { + attn.reasons.join(", ") + } else { + "OK".to_string() + }; + let attn_class = if attn.needs_attention { "attn-bad" } else { "attn-ok" }; + + let _ = write!( + rows, + r#" + {hostname} + {profile} + {attn_label} + {cf} + {svc} + {scan_ts} + {crit_high} + {source} + {root} + {ts} + "#, + id = hb.node_id, + hostname = hb.hostname, + profile = hb.os_profile, + attn_class = attn_class, + attn_label = attn_label, + cf_class = cf_class, + cf = hb.cloudflare_ok, + svc_class = svc_class, + svc = hb.services_ok, + scan_ts = scan_ts, + crit_class = crit_class, + crit_high = crit_high, + source_class = source_class, + source = source, + root = hb.vaultmesh_root, + ts = hb.timestamp + ); + } + + let node_count = latest.len(); + + let html = format!( + r#" + + + + VaultMesh Command Center + + + +

VaultMesh Command Center

+

Nodes online: {count}

+ + + + + + + + + + + + + + + + + {rows} + +
HostnameProfileAttentionCloudflareServicesLast ScanCrit/HighSourceVAULTMESH_ROOTLast heartbeat
+ + +"#, + count = node_count, + rows = rows + ); + + Html(html) +} + +pub async fn list_nodes(State(state): State) -> Json> { + let latest = state.list_latest().await; + Json(latest) +} + +pub async fn node_detail( + State(state): State, + Path(id): Path, +) -> impl IntoResponse { + let nodes = state.nodes.read().await; + let NodeHistory { latest, history } = match nodes.get(&id) { + Some(h) => h.clone(), + None => { + return ( + axum::http::StatusCode::NOT_FOUND, + Html("

Node not found

Back to dashboard

".to_string()), + ).into_response(); + } + }; + drop(nodes); + + // Get command results for this node + let cmd_results = state.list_command_results(id).await; + + // Metrics convenience + let m = &latest.metrics; + + let mut history_rows = String::new(); + for hb in history.iter().take(5) { + let cf_class = if hb.cloudflare_ok { "status-ok" } else { "status-fail" }; + let svc_class = if hb.services_ok { "status-ok" } else { "status-fail" }; + let _ = write!( + history_rows, + r#" + {ts} + {cf} + {svc} + "#, + ts = hb.timestamp, + cf_class = cf_class, + cf = hb.cloudflare_ok, + svc_class = svc_class, + svc = hb.services_ok, + ); + } + + // Build command results rows + let mut cmd_result_rows = String::new(); + for r in cmd_results.iter().take(10) { + let status_class = if r.status == "success" { "status-ok" } else { "status-fail" }; + let exit_str = r.exit_code.map(|c| c.to_string()).unwrap_or_else(|| "-".into()); + + // Special formatting for sovereign-scan + let (cmd_display, stdout_display) = if r.cmd == "sovereign-scan" && r.status == "success" { + if let Ok(scan) = serde_json::from_str::(&r.stdout) { + let source_badge = if scan["findings_real"].as_bool().unwrap_or(false) { + "REAL" + } else { + "MOCK" + }; + + let summary_str = format!( + "C/H/M/L/I = {}/{}/{}/{}/{}", + scan["summary"]["critical"].as_u64().unwrap_or(0), + scan["summary"]["high"].as_u64().unwrap_or(0), + scan["summary"]["medium"].as_u64().unwrap_or(0), + scan["summary"]["low"].as_u64().unwrap_or(0), + scan["summary"]["info"].as_u64().unwrap_or(0), + ); + + let hash = scan["receipt_hash"].as_str().unwrap_or("-"); + + let display = format!( + "Sovereign Scan {}
{}
{}", + source_badge, summary_str, hash + ); + + ("sovereign-scan".to_string(), display) + } else { + (r.cmd.clone(), format!("
{}
", html_escape(&r.stdout))) + } + } else { + (r.cmd.clone(), format!("
{}
", html_escape(&r.stdout))) + }; + + let _ = write!( + cmd_result_rows, + r#" + {ts} + {cmd} + {status} + {exit} + {stdout} +
{stderr}
+ "#, + ts = r.ts, + cmd = cmd_display, + status_class = status_class, + status = r.status, + exit = exit_str, + stdout = stdout_display, + stderr = html_escape(&r.stderr), + ); + } + + let cf_status = if latest.cloudflare_ok { "OK" } else { "DOWN" }; + let svc_status = if latest.services_ok { "OK" } else { "DOWN" }; + let cf_class = if latest.cloudflare_ok { "status-ok" } else { "status-fail" }; + let svc_class = if latest.services_ok { "status-ok" } else { "status-fail" }; + + // Format uptime nicely + let uptime_str = match m.uptime_seconds { + Some(secs) => { + let days = secs / 86400; + let hours = (secs % 86400) / 3600; + let mins = (secs % 3600) / 60; + format!("{}d {}h {}m", days, hours, mins) + } + None => "unknown".to_string(), + }; + + // Memory percentage + let mem_pct = match (m.mem_used_mb, m.mem_total_mb) { + (Some(used), Some(total)) if total > 0 => { + format!("{:.1}%", (used as f64 / total as f64) * 100.0) + } + _ => "N/A".to_string(), + }; + + // Disk percentage + let disk_pct = match (m.disk_root_used_gb, m.disk_root_total_gb) { + (Some(used), Some(total)) if total > 0.0 => { + format!("{:.1}%", (used / total) * 100.0) + } + _ => "N/A".to_string(), + }; + + let html = format!( + r#" + + + + Node {hostname} — VaultMesh Command Center + + + +

← Back to dashboard

+

Node: {hostname}

+ +
+
Node ID
+
{node_id}
+
Profile
+
{profile}
+
VAULTMESH_ROOT
+
{root}
+
Last heartbeat
+
{ts}
+
Cloudflare
+
{cf_status}
+
Services
+
{svc_status}
+
+ +

Metrics

+
+
+
Uptime
+
{uptime}
+
+
+
Load (1/5/15)
+
{load1:.2}
+
{load5:.2} / {load15:.2}
+
+
+
Memory
+
{mem_pct}
+
{mem_used} / {mem_total} MiB
+
+
+
Disk (/)
+
{disk_pct}
+
{disk_used:.1} / {disk_total:.1} GiB
+
+
+ +

Send Command

+
+
+ + + +
+ +
+ +

Recent Command Results

+ + + + + + + + + + + + + {cmd_result_rows} + +
TimestampCommandStatusExitstdoutstderr
+ +

Heartbeat History (last 5)

+ + + + + + + + + + {history_rows} + +
TimestampCloudflareServices
+ + +"#, + hostname = latest.hostname, + node_id = latest.node_id, + profile = latest.os_profile, + root = latest.vaultmesh_root, + ts = latest.timestamp, + cf_class = cf_class, + cf_status = cf_status, + svc_class = svc_class, + svc_status = svc_status, + uptime = uptime_str, + load1 = m.load1.unwrap_or(0.0), + load5 = m.load5.unwrap_or(0.0), + load15 = m.load15.unwrap_or(0.0), + mem_pct = mem_pct, + mem_used = m.mem_used_mb.unwrap_or(0), + mem_total = m.mem_total_mb.unwrap_or(0), + disk_pct = disk_pct, + disk_used = m.disk_root_used_gb.unwrap_or(0.0), + disk_total = m.disk_root_total_gb.unwrap_or(0.0), + cmd_result_rows = cmd_result_rows, + history_rows = history_rows, + ); + + Html(html).into_response() +} + +/// Queue a signed command for a node (from web UI). +pub async fn send_command( + State(state): State, + Path(node_id): Path, + axum::Form(form): axum::Form, +) -> impl IntoResponse { + // Get the node's profile for policy check + let nodes = state.nodes.read().await; + let profile = nodes + .get(&node_id) + .map(|h| h.latest.os_profile.clone()) + .unwrap_or_default(); + drop(nodes); + + // Validate command via policy + if !state.is_command_allowed(&profile, &node_id, &form.cmd) { + return ( + axum::http::StatusCode::FORBIDDEN, + Html(format!( + "

Command not allowed by policy

Command '{}' is not allowed for profile '{}'

Back

", + form.cmd, profile, node_id + )), + ).into_response(); + } + + // Parse args from JSON string + let args: serde_json::Value = if form.args.is_empty() { + serde_json::json!({}) + } else { + serde_json::from_str(&form.args).unwrap_or_else(|_| serde_json::json!({})) + }; + + // Build the payload + let nonce = Uuid::new_v4().to_string(); + let payload = CommandPayload { + node_id, + ts: OffsetDateTime::now_utc(), + nonce, + cmd: form.cmd, + args, + }; + + // Serialize and sign + let payload_bytes = serde_json::to_vec(&payload).unwrap(); + let signature = state.signing_key.sign(&payload_bytes); + let signature_b64 = base64::Engine::encode( + &base64::engine::general_purpose::STANDARD, + signature.to_bytes(), + ); + + let signed_cmd = SignedCommand { + payload, + signature: signature_b64, + }; + + tracing::info!("Queuing command '{}' for node {}", signed_cmd.payload.cmd, node_id); + state.queue_command(signed_cmd).await; + + // Redirect back to node detail + ( + axum::http::StatusCode::SEE_OTHER, + [("Location", format!("/nodes/{}", node_id))], + Html(String::new()), + ).into_response() +} + +/// Agent polls for pending commands. +pub async fn agent_poll_commands( + State(state): State, + Query(q): Query, +) -> Json> { + let cmds = state.take_pending_commands(q.node_id).await; + if !cmds.is_empty() { + tracing::info!("Agent {} polling: {} command(s)", q.node_id, cmds.len()); + } + Json(cmds) +} + +/// Agent reports command result. +pub async fn agent_report_result( + State(state): State, + Json(result): Json, +) -> impl IntoResponse { + tracing::info!( + "Command result from {}: {} -> {} (exit={:?})", + result.node_id, + result.cmd, + result.status, + result.exit_code + ); + + // V0.7: Get node info for SSE payloads + let (hostname, os_profile) = { + let nodes = state.nodes.read().await; + nodes + .get(&result.node_id) + .map(|h| (h.latest.hostname.clone(), h.latest.os_profile.clone())) + .unwrap_or_else(|| ("unknown".to_string(), "unknown".to_string())) + }; + + // V0.7: Publish command event to SSE + let cmd_payload = CommandEventPayload { + ts: result.ts, + node_id: result.node_id, + hostname: hostname.clone(), + os_profile: os_profile.clone(), + cmd: result.cmd.clone(), + status: result.status.clone(), + exit_code: result.exit_code, + nonce: result.nonce.clone(), + }; + state.publish(ServerEvent::Command(cmd_payload)); + + // Parse sovereign-scan results to track last scan + if result.cmd == "sovereign-scan" && result.status == "success" { + if let Ok(scan_output) = serde_json::from_str::(&result.stdout) { + let summary = ScanSummary { + critical: scan_output["summary"]["critical"].as_u64().unwrap_or(0) as u32, + high: scan_output["summary"]["high"].as_u64().unwrap_or(0) as u32, + medium: scan_output["summary"]["medium"].as_u64().unwrap_or(0) as u32, + low: scan_output["summary"]["low"].as_u64().unwrap_or(0) as u32, + info: scan_output["summary"]["info"].as_u64().unwrap_or(0) as u32, + }; + + let findings_real = scan_output["findings_real"].as_bool().unwrap_or(false); + let receipt_hash = scan_output["receipt_hash"] + .as_str() + .unwrap_or("") + .to_string(); + + // V0.6: Log scan event + let scan_event = ScanEvent { + ts: result.ts, + node_id: result.node_id, + os_profile: os_profile.clone(), + summary: summary.clone(), + findings_real, + receipt_hash: receipt_hash.clone(), + receipt_path: scan_output + .get("receipt_path") + .and_then(|v| v.as_str()) + .map(String::from), + }; + if let Err(e) = state.logs.append_json_line("scans.jsonl", &scan_event) { + tracing::warn!("failed to append scan log: {e}"); + } + + // V0.7: Publish scan event to SSE + let scan_payload = ScanEventPayload { + ts: result.ts, + node_id: result.node_id, + hostname: hostname.clone(), + os_profile: os_profile.clone(), + summary: summary.clone(), + findings_real, + receipt_hash: receipt_hash.clone(), + }; + state.publish(ServerEvent::Scan(scan_payload)); + + let last_scan = LastScan { + ts: result.ts, + summary, + findings_real, + receipt_hash, + }; + + tracing::info!( + "Tracking scan for node {}: C={} H={} real={}", + result.node_id, + last_scan.summary.critical, + last_scan.summary.high, + last_scan.findings_real + ); + state.update_last_scan(result.node_id, last_scan).await; + + // V0.7: Recompute and publish attention after scan + state.recompute_and_publish_attention(result.node_id).await; + } + } + + state.record_command_result(result).await; + Json(serde_json::json!({ "status": "ok" })) +} + +pub async fn heartbeat( + State(state): State, + Json(hb): Json, +) -> impl IntoResponse { + tracing::info!("Heartbeat from {} ({})", hb.hostname, hb.node_id); + + // V0.7: Publish heartbeat event to SSE + let payload = HeartbeatEventPayload { + ts: hb.timestamp, + node_id: hb.node_id, + hostname: hb.hostname.clone(), + os_profile: hb.os_profile.clone(), + cloudflare_ok: hb.cloudflare_ok, + services_ok: hb.services_ok, + vaultmesh_root: hb.vaultmesh_root.clone(), + uptime_secs: hb.metrics.uptime_seconds, + load_1: hb.metrics.load1, + load_5: hb.metrics.load5, + load_15: hb.metrics.load15, + }; + state.publish(ServerEvent::Heartbeat(payload)); + + let node_id = hb.node_id; + state.upsert_heartbeat(hb).await; + + // Recompute and publish attention after heartbeat is stored + state.recompute_and_publish_attention(node_id).await; + + Json(serde_json::json!({ "status": "ok" })) +} + +// ============================================================================ +// V0.7: SSE Event Bus +// ============================================================================ + +/// SSE endpoint for real-time event streaming. +pub async fn sse_events( + State(state): State, +) -> Sse>> { + let mut rx = state.events_tx.subscribe(); + + let stream = async_stream::stream! { + loop { + match rx.recv().await { + Ok(ev) => { + let (event_name, payload_json) = match &ev { + ServerEvent::Heartbeat(p) => ("heartbeat".to_string(), serde_json::to_string(p).unwrap()), + ServerEvent::Scan(p) => ("scan".to_string(), serde_json::to_string(p).unwrap()), + ServerEvent::Command(p) => ("command".to_string(), serde_json::to_string(p).unwrap()), + ServerEvent::Attention(p) => ("attention".to_string(), serde_json::to_string(p).unwrap()), + // V0.7.2: Envelope events use their kind as the SSE event name + ServerEvent::Envelope(env) => (env.kind.clone(), serde_json::to_string(env).unwrap()), + }; + yield Ok(Event::default().event(&event_name).data(payload_json)); + } + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!("SSE client lagged, skipped {n} events"); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } + }; + + Sse::new(stream).keep_alive( + axum::response::sse::KeepAlive::new() + .interval(std::time::Duration::from_secs(15)) + .text("keepalive"), + ) +} + +/// Simple HTML escaping for output display. +fn html_escape(s: &str) -> String { + s.replace('&', "&") + .replace('<', "<") + .replace('>', ">") + .replace('"', """) +} + +// ============================================================================ +// V0.7.2: Communication Layer API +// ============================================================================ + +/// POST /api/events - Create a new event envelope +pub async fn post_event( + State(state): State, + headers: axum::http::HeaderMap, + Json(req): Json, +) -> impl IntoResponse { + let now = OffsetDateTime::now_utc(); + let id = Uuid::new_v4(); + + // Author can be overridden via X-VM-Author header + let author = headers + .get("X-VM-Author") + .and_then(|v| v.to_str().ok()) + .map(String::from) + .or(req.author) + .unwrap_or_else(|| "operator".to_string()); + + let envelope = EventEnvelope { + id, + kind: req.kind.clone(), + ts: now, + node_id: req.node_id, + author: author.clone(), + body: req.body, + }; + + tracing::info!( + "POST /api/events: kind={}, node_id={:?}, author={}", + req.kind, + req.node_id, + author + ); + + state.record_envelope(envelope).await; + + ( + axum::http::StatusCode::CREATED, + Json(PostEventResponse { id, ts: now }), + ) +} + +/// GET /api/events - Query event envelopes with filtering +pub async fn get_events( + State(state): State, + Query(query): Query, +) -> impl IntoResponse { + let events = state.events.read().await; + + // Parse since timestamp if provided + let since = query.since.as_ref().and_then(|s| { + time::OffsetDateTime::parse(s, &time::format_description::well_known::Rfc3339).ok() + }); + + // Parse kind filter (comma-separated list) + let kinds: Option> = query.kind.as_ref().map(|k| k.split(',').collect()); + + // Filter and clone events + let filtered: Vec = events + .iter() + .rev() // Most recent first + .filter(|ev| { + // Filter by since + if let Some(since_ts) = since { + if ev.ts < since_ts { + return false; + } + } + + // Filter by kind + if let Some(ref ks) = kinds { + if !ks.contains(&ev.kind.as_str()) { + return false; + } + } + + // Filter by node_id + if let Some(filter_node) = query.node_id { + match ev.node_id { + Some(ev_node) if ev_node == filter_node => {} + _ => return false, + } + } + + true + }) + .take(query.limit) + .cloned() + .collect(); + + Json(filtered) +} + +// ============================================================================ +// V0.7.1: Mission Console +// ============================================================================ + +/// NASA-style Mission Console with live SSE updates. +pub async fn mission_console(State(state): State) -> Html { + let now = OffsetDateTime::now_utc(); + let latest = state.list_latest().await; + let last_scans = state.list_last_scans().await; + + // Global KPIs + let total = latest.len(); + let mut ok_count = 0; + let mut attention_count = 0; + let mut critical_count = 0; + let mut last_scan_ts: Option = None; + + // Build node list items + let mut node_items = String::new(); + + for hb in latest.iter() { + let attn = compute_attention(now, hb, last_scans.get(&hb.node_id), &state.scheduler_cfg); + let scan = last_scans.get(&hb.node_id); + + // Track global KPIs + if attn.needs_attention { + attention_count += 1; + if attn.reasons.iter().any(|r| r.contains("critical")) { + critical_count += 1; + } + } else { + ok_count += 1; + } + if let Some(s) = scan { + if last_scan_ts.is_none() || s.ts > last_scan_ts.unwrap() { + last_scan_ts = Some(s.ts); + } + } + + // Status pill class + let (status_class, status_label) = if attn.reasons.iter().any(|r| r.contains("critical")) { + ("status-crit", "CRIT") + } else if attn.needs_attention { + ("status-attn", "ATTN") + } else { + ("status-ok", "OK") + }; + + // Build per-node scan info + let (crit, high, scan_ts_str) = match scan { + Some(s) => { + let ts = s.ts.format(&time::format_description::well_known::Rfc3339) + .unwrap_or_else(|_| "-".into()); + (s.summary.critical, s.summary.high, ts) + } + None => (0, 0, "-".to_string()), + }; + + let _ = write!( + node_items, + r#"
  • + {status_label} + {hostname} + {profile} + +
  • "#, + node_id = hb.node_id, + hostname = hb.hostname, + profile = hb.os_profile, + status_class = status_class, + status_label = status_label, + ); + } + + // Last scan timestamp for global KPI + let last_scan_str = last_scan_ts + .and_then(|ts| ts.format(&time::format_description::well_known::Rfc3339).ok()) + .unwrap_or_else(|| "Never".into()); + + let html = format!( + r##" + + + + VaultMesh Mission Console + + + + +
    +
    VAULTMESH COMMAND CENTER
    +
    +
    +
    {total}
    +
    Total Nodes
    +
    +
    +
    {ok_count}
    +
    Healthy
    +
    +
    +
    {attention_count}
    +
    Attention
    +
    +
    +
    {critical_count}
    +
    Critical
    +
    +
    +
    {last_scan_str}
    +
    Last Scan
    +
    +
    +
    + + +
    + + + + +
    +
    +
    +
    Select a node to view telemetry
    +
    +
    + + + +
    + + + +"##, + total = total, + ok_count = ok_count, + attention_count = attention_count, + critical_count = critical_count, + last_scan_str = last_scan_str, + node_items = node_items, + ); + + Html(html) +} diff --git a/command-center/src/state.rs b/command-center/src/state.rs new file mode 100644 index 0000000..4b7f61b --- /dev/null +++ b/command-center/src/state.rs @@ -0,0 +1,842 @@ +use ed25519_dalek::SigningKey; +use serde::{Deserialize, Serialize}; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + fs::OpenOptions, + io::{BufRead, BufReader, Write}, + path::PathBuf, + sync::Arc, +}; +use time::{Duration, OffsetDateTime}; +use tokio::sync::{broadcast, RwLock}; +use uuid::Uuid; + +/// How many heartbeats we keep per node (for history). +const MAX_HEARTBEATS_PER_NODE: usize = 50; + +/// How many command records we keep per node. +const MAX_COMMAND_RECORDS_PER_NODE: usize = 50; + +/// Metrics reported by the node agent. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct NodeMetrics { + pub uptime_seconds: Option, + pub load1: Option, + pub load5: Option, + pub load15: Option, + pub mem_total_mb: Option, + pub mem_used_mb: Option, + pub disk_root_total_gb: Option, + pub disk_root_used_gb: Option, +} + +/// Heartbeat data reported by each node. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeHeartbeat { + pub node_id: Uuid, + pub hostname: String, + pub os_profile: String, + pub cloudflare_ok: bool, + pub services_ok: bool, + pub vaultmesh_root: String, + #[serde(with = "time::serde::rfc3339")] + pub timestamp: OffsetDateTime, + #[serde(default)] + pub metrics: NodeMetrics, +} + +/// A single node's heartbeat history. +#[derive(Clone)] +pub struct NodeHistory { + pub latest: NodeHeartbeat, + pub history: VecDeque, +} + +/// Command payload to be signed and sent to agents. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommandPayload { + pub node_id: Uuid, + #[serde(with = "time::serde::rfc3339")] + pub ts: OffsetDateTime, + pub nonce: String, + pub cmd: String, + pub args: serde_json::Value, +} + +/// Signed command delivered to the agent. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SignedCommand { + pub payload: CommandPayload, + /// Base64-encoded Ed25519 signature of `payload` JSON bytes. + pub signature: String, +} + +/// Result of executing a command on the node. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommandResult { + pub node_id: Uuid, + #[serde(with = "time::serde::rfc3339")] + pub ts: OffsetDateTime, + pub nonce: String, + pub cmd: String, + pub status: String, + pub exit_code: Option, + pub stdout: String, + pub stderr: String, +} + +/// Summary counts from a sovereign scan. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScanSummary { + pub critical: u32, + pub high: u32, + pub medium: u32, + pub low: u32, + pub info: u32, +} + +/// Last scan result for a node. +#[derive(Debug, Clone)] +pub struct LastScan { + pub ts: OffsetDateTime, + pub summary: ScanSummary, + pub findings_real: bool, + pub receipt_hash: String, +} + +/// Scheduler configuration for automatic scans. +#[derive(Debug, Clone)] +pub struct SchedulerConfig { + pub scan_interval: Duration, + pub scan_stale: Duration, + pub heartbeat_stale: Duration, +} + +/// Node attention status for dashboard. +#[derive(Debug, Clone)] +pub struct NodeAttentionStatus { + pub needs_attention: bool, + pub reasons: Vec, +} + +// ============================================================================ +// V0.7: SSE Event Bus payloads +// ============================================================================ + +/// SSE payload for heartbeat events (wire-efficient subset). +#[derive(Debug, Clone, Serialize)] +pub struct HeartbeatEventPayload { + #[serde(with = "time::serde::rfc3339")] + pub ts: OffsetDateTime, + pub node_id: Uuid, + pub hostname: String, + pub os_profile: String, + pub cloudflare_ok: bool, + pub services_ok: bool, + pub vaultmesh_root: String, + pub uptime_secs: Option, + pub load_1: Option, + pub load_5: Option, + pub load_15: Option, +} + +/// SSE payload for scan events. +#[derive(Debug, Clone, Serialize)] +pub struct ScanEventPayload { + #[serde(with = "time::serde::rfc3339")] + pub ts: OffsetDateTime, + pub node_id: Uuid, + pub hostname: String, + pub os_profile: String, + pub summary: ScanSummary, + pub findings_real: bool, + pub receipt_hash: String, +} + +/// SSE payload for command events. +#[derive(Debug, Clone, Serialize)] +pub struct CommandEventPayload { + #[serde(with = "time::serde::rfc3339")] + pub ts: OffsetDateTime, + pub node_id: Uuid, + pub hostname: String, + pub os_profile: String, + pub cmd: String, + pub status: String, + pub exit_code: Option, + pub nonce: String, +} + +/// SSE payload for attention state changes. +#[derive(Debug, Clone, Serialize)] +pub struct AttentionEventPayload { + #[serde(with = "time::serde::rfc3339")] + pub ts: OffsetDateTime, + pub node_id: Uuid, + pub hostname: String, + pub os_profile: String, + pub needs_attention: bool, + pub reasons: Vec, +} + +/// Server event types for SSE broadcast. +#[derive(Debug, Clone)] +pub enum ServerEvent { + Heartbeat(HeartbeatEventPayload), + Scan(ScanEventPayload), + Command(CommandEventPayload), + Attention(AttentionEventPayload), + /// V0.7.2: Generic envelope for comms events (note, incident, ack, tag, resolve) + Envelope(EventEnvelope), +} + +// ============================================================================ +// V0.7.2: Communication Layer - EventEnvelope +// ============================================================================ + +/// Canonical message format for all comms events. +/// Used for notes, incidents, acknowledgements, tags, and resolutions. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EventEnvelope { + /// Unique event ID (server-assigned) + pub id: Uuid, + /// Event kind: "note", "incident", "ack", "tag", "resolve" + pub kind: String, + /// Timestamp (server-assigned) + #[serde(with = "time::serde::rfc3339")] + pub ts: OffsetDateTime, + /// Node this event relates to (optional for global events) + #[serde(skip_serializing_if = "Option::is_none")] + pub node_id: Option, + /// Author: "operator", "system", "vm-copilot", "scheduler", agent name + pub author: String, + /// Structured payload (kind-specific) + pub body: serde_json::Value, +} + +/// Log entry wrapper for EventEnvelope (versioned for future compatibility). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EventEnvelopeLogEntry { + pub version: u8, + pub event: EventEnvelope, +} + +/// Maximum number of envelopes to keep in memory. +const MAX_ENVELOPES_IN_MEMORY: usize = 500; + +// ============================================================================ +// V0.6: Append-only log events +// ============================================================================ + +/// Event logged when a heartbeat is received. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HeartbeatEvent { + #[serde(with = "time::serde::rfc3339")] + pub ts: OffsetDateTime, + pub node_id: Uuid, + pub hostname: String, + pub os_profile: String, + pub heartbeat: NodeHeartbeat, +} + +/// Event logged when a sovereign-scan completes successfully. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScanEvent { + #[serde(with = "time::serde::rfc3339")] + pub ts: OffsetDateTime, + pub node_id: Uuid, + pub os_profile: String, + pub summary: ScanSummary, + pub findings_real: bool, + pub receipt_hash: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub receipt_path: Option, +} + +/// Event logged when any command result is reported. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommandEvent { + #[serde(with = "time::serde::rfc3339")] + pub ts: OffsetDateTime, + pub node_id: Uuid, + pub cmd: String, + pub status: String, + pub exit_code: Option, + pub nonce: String, +} + +/// Append-only log sink for durable event storage. +#[derive(Clone)] +pub struct LogSink { + pub dir: PathBuf, +} + +impl LogSink { + /// Create a new LogSink, initializing the directory if needed. + pub fn new_from_env() -> std::io::Result { + let dir = std::env::var("VAULTMESH_LOG_DIR") + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from("/var/lib/vaultmesh/cc-logs")); + + std::fs::create_dir_all(&dir)?; + Ok(LogSink { dir }) + } + + /// Get the full path for a log file. + pub fn file_path(&self, name: &str) -> PathBuf { + self.dir.join(name) + } + + /// Append a JSON line to a log file. + pub fn append_json_line(&self, file: &str, value: &T) -> std::io::Result<()> { + let path = self.file_path(file); + let mut f = OpenOptions::new() + .create(true) + .append(true) + .open(path)?; + serde_json::to_writer(&mut f, value).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + f.write_all(b"\n")?; + f.flush()?; + Ok(()) + } +} + +/// Command policy for allowed commands per profile. +#[derive(Clone)] +pub struct CommandPolicy { + /// Per-profile allowed command names. + pub per_profile: HashMap>, + /// Global fallback set (applied if no per-profile entry). + pub global_allowed: HashSet, +} + +impl CommandPolicy { + pub fn default_policy() -> Self { + let mut global = HashSet::new(); + global.insert("service-status".to_string()); + global.insert("tail-journal".to_string()); + global.insert("sovereign-scan".to_string()); + global.insert("restart-service".to_string()); + + CommandPolicy { + per_profile: HashMap::new(), + global_allowed: global, + } + } + + pub fn is_allowed(&self, profile: &str, cmd: &str) -> bool { + if let Some(set) = self.per_profile.get(profile) { + set.contains(cmd) + } else { + self.global_allowed.contains(cmd) + } + } +} + +/// Shared application state. +#[derive(Clone)] +pub struct AppState { + pub nodes: Arc>>, + pub commands_pending: Arc>>>, + pub command_results: Arc>>>, + pub last_scans: Arc>>, + pub signing_key: Arc, + pub public_key_b64: String, + pub scheduler_cfg: SchedulerConfig, + pub policy: CommandPolicy, + /// V0.6: Append-only log sink for persistence + pub logs: LogSink, + /// V0.7: SSE event broadcast channel + pub events_tx: broadcast::Sender, + /// V0.7.2: In-memory event envelope store + pub events: Arc>>, +} + +impl AppState { + pub fn new(signing_key: SigningKey) -> Self { + use base64::Engine; + let pk_bytes = signing_key.verifying_key().to_bytes(); + let public_key_b64 = base64::engine::general_purpose::STANDARD.encode(pk_bytes); + + // Parse scheduler config from env (with sane defaults) + let scan_interval_hours = std::env::var("VAULTMESH_SCAN_INTERVAL_HOURS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(24); + + let scan_stale_hours = std::env::var("VAULTMESH_SCAN_STALE_HOURS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(48); + + let heartbeat_stale_minutes = std::env::var("VAULTMESH_HEARTBEAT_STALE_MINUTES") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(10); + + let scheduler_cfg = SchedulerConfig { + scan_interval: Duration::hours(scan_interval_hours), + scan_stale: Duration::hours(scan_stale_hours), + heartbeat_stale: Duration::minutes(heartbeat_stale_minutes), + }; + + tracing::info!( + "Scheduler config: scan_interval={}h, scan_stale={}h, heartbeat_stale={}m", + scan_interval_hours, + scan_stale_hours, + heartbeat_stale_minutes + ); + + // V0.6: Initialize log sink + let logs = LogSink::new_from_env() + .expect("failed to initialize log directory"); + tracing::info!("Log directory: {:?}", logs.dir); + + // V0.7: Initialize SSE broadcast channel + let (events_tx, _) = broadcast::channel(1024); + + AppState { + nodes: Arc::new(RwLock::new(HashMap::new())), + commands_pending: Arc::new(RwLock::new(HashMap::new())), + command_results: Arc::new(RwLock::new(HashMap::new())), + last_scans: Arc::new(RwLock::new(HashMap::new())), + signing_key: Arc::new(signing_key), + public_key_b64, + scheduler_cfg, + policy: CommandPolicy::default_policy(), + logs, + events_tx, + events: Arc::new(RwLock::new(Vec::new())), + } + } + + /// Check if a command is allowed by policy. + pub fn is_command_allowed(&self, profile: &str, node_id: &Uuid, cmd: &str) -> bool { + let allowed = self.policy.is_allowed(profile, cmd); + if !allowed { + tracing::warn!( + "policy: command {} denied for node {} (profile={})", + cmd, + node_id, + profile + ); + } + allowed + } + + // ======================================================================== + // V0.7: SSE Event Bus + // ======================================================================== + + /// Publish an event to all SSE subscribers. + pub fn publish(&self, event: ServerEvent) { + if let Err(e) = self.events_tx.send(event) { + tracing::debug!("no SSE listeners: {e}"); + } + } + + /// Recompute and publish attention status for a node. + pub async fn recompute_and_publish_attention(&self, node_id: Uuid) { + let now = OffsetDateTime::now_utc(); + let latest = self.list_latest().await; + let last_scans = self.list_last_scans().await; + + if let Some(hb) = latest.iter().find(|h| h.node_id == node_id) { + let attn = compute_attention(now, hb, last_scans.get(&node_id), &self.scheduler_cfg); + let payload = AttentionEventPayload { + ts: now, + node_id: hb.node_id, + hostname: hb.hostname.clone(), + os_profile: hb.os_profile.clone(), + needs_attention: attn.needs_attention, + reasons: attn.reasons, + }; + self.publish(ServerEvent::Attention(payload)); + } + } + + // ======================================================================== + // V0.7.2: Communication Layer + // ======================================================================== + + /// Record an EventEnvelope: log to JSONL, store in memory, broadcast via SSE. + pub async fn record_envelope(&self, ev: EventEnvelope) { + // 1) Log to JSONL + let entry = EventEnvelopeLogEntry { version: 1, event: ev.clone() }; + if let Err(e) = self.logs.append_json_line("events.jsonl", &entry) { + tracing::warn!("failed to append events log: {e}"); + } + + // 2) In-memory store with cap + { + let mut events = self.events.write().await; + events.push(ev.clone()); + if events.len() > MAX_ENVELOPES_IN_MEMORY { + let overflow = events.len() - MAX_ENVELOPES_IN_MEMORY; + events.drain(0..overflow); + } + } + + // 3) SSE broadcast + self.publish(ServerEvent::Envelope(ev)); + } + + /// Replay envelopes from events.jsonl on startup. + pub async fn replay_envelopes(&self) -> std::io::Result<()> { + let path = self.logs.file_path("events.jsonl"); + if !path.exists() { + return Ok(()); + } + + let file = std::fs::File::open(&path)?; + let reader = BufReader::new(file); + let mut count = 0; + + for line in reader.lines() { + let line = line?; + if line.trim().is_empty() { + continue; + } + + let entry: EventEnvelopeLogEntry = match serde_json::from_str(&line) { + Ok(v) => v, + Err(e) => { + tracing::warn!("invalid events line: {e}"); + continue; + } + }; + + // Store in memory (no broadcast during replay) + let mut events = self.events.write().await; + events.push(entry.event); + if events.len() > MAX_ENVELOPES_IN_MEMORY { + let overflow = events.len() - MAX_ENVELOPES_IN_MEMORY; + events.drain(0..overflow); + } + + count += 1; + } + + tracing::info!("replayed {} events from {:?}", count, path); + Ok(()) + } + + /// Insert a new heartbeat for a node, updating history. + /// Logs the event to heartbeats.jsonl. + pub async fn upsert_heartbeat(&self, hb: NodeHeartbeat) { + // V0.6: Log first + let event = HeartbeatEvent { + ts: hb.timestamp, + node_id: hb.node_id, + hostname: hb.hostname.clone(), + os_profile: hb.os_profile.clone(), + heartbeat: hb.clone(), + }; + if let Err(e) = self.logs.append_json_line("heartbeats.jsonl", &event) { + tracing::warn!("failed to append heartbeat log: {e}"); + } + + self.upsert_heartbeat_no_log(hb).await; + } + + /// Insert heartbeat without logging (used during replay). + async fn upsert_heartbeat_no_log(&self, hb: NodeHeartbeat) { + let mut nodes = self.nodes.write().await; + let entry = nodes.entry(hb.node_id).or_insert_with(|| NodeHistory { + latest: hb.clone(), + history: VecDeque::new(), + }); + + entry.history.push_front(entry.latest.clone()); + if entry.history.len() > MAX_HEARTBEATS_PER_NODE { + entry.history.pop_back(); + } + + entry.latest = hb; + } + + /// Get a snapshot of all latest heartbeats. + pub async fn list_latest(&self) -> Vec { + let nodes = self.nodes.read().await; + nodes.values().map(|h| h.latest.clone()).collect() + } + + /// Queue a signed command for a node. + pub async fn queue_command(&self, cmd: SignedCommand) { + let mut pending = self.commands_pending.write().await; + let entry = pending + .entry(cmd.payload.node_id) + .or_insert_with(VecDeque::new); + entry.push_back(cmd); + } + + /// Take (and clear) all pending commands for a node. + pub async fn take_pending_commands(&self, node_id: Uuid) -> Vec { + let mut pending = self.commands_pending.write().await; + if let Some(mut queue) = pending.remove(&node_id) { + queue.drain(..).collect() + } else { + Vec::new() + } + } + + /// Record a command result. + /// Logs the event to commands.jsonl. + pub async fn record_command_result(&self, result: CommandResult) { + // V0.6: Log command event + let event = CommandEvent { + ts: result.ts, + node_id: result.node_id, + cmd: result.cmd.clone(), + status: result.status.clone(), + exit_code: result.exit_code, + nonce: result.nonce.clone(), + }; + if let Err(e) = self.logs.append_json_line("commands.jsonl", &event) { + tracing::warn!("failed to append command log: {e}"); + } + + self.record_command_result_no_log(result).await; + } + + /// Record command result without logging (used during replay). + async fn record_command_result_no_log(&self, result: CommandResult) { + let mut results = self.command_results.write().await; + let entry = results + .entry(result.node_id) + .or_insert_with(VecDeque::new); + entry.push_front(result); + if entry.len() > MAX_COMMAND_RECORDS_PER_NODE { + entry.pop_back(); + } + } + + /// Get recent command results for a node. + pub async fn list_command_results(&self, node_id: Uuid) -> Vec { + let results = self.command_results.read().await; + results + .get(&node_id) + .map(|q| q.iter().cloned().collect()) + .unwrap_or_default() + } + + /// Update the last scan result for a node. + /// Note: ScanEvent logging is handled in routes.rs where we have os_profile context. + pub async fn update_last_scan(&self, node_id: Uuid, scan: LastScan) { + let mut scans = self.last_scans.write().await; + scans.insert(node_id, scan); + } + + /// Update last scan without logging (used during replay). + pub async fn update_last_scan_no_log(&self, node_id: Uuid, scan: LastScan) { + let mut scans = self.last_scans.write().await; + scans.insert(node_id, scan); + } + + /// Get the last scan result for a node. + pub async fn get_last_scan(&self, node_id: &Uuid) -> Option { + let scans = self.last_scans.read().await; + scans.get(node_id).cloned() + } + + /// Get all last scan results. + pub async fn list_last_scans(&self) -> HashMap { + let scans = self.last_scans.read().await; + scans.clone() + } + + // ======================================================================== + // V0.6: Log replay on startup + // ======================================================================== + + /// Replay all logs to reconstruct state on startup. + pub async fn replay_from_logs(&self) { + if let Err(e) = self.replay_heartbeats().await { + tracing::warn!("failed to replay heartbeats: {e}"); + } + if let Err(e) = self.replay_scans().await { + tracing::warn!("failed to replay scans: {e}"); + } + if let Err(e) = self.replay_commands().await { + tracing::warn!("failed to replay commands: {e}"); + } + // V0.7.2: Replay event envelopes + if let Err(e) = self.replay_envelopes().await { + tracing::warn!("failed to replay envelopes: {e}"); + } + } + + async fn replay_heartbeats(&self) -> std::io::Result<()> { + let path = self.logs.file_path("heartbeats.jsonl"); + if !path.exists() { + return Ok(()); + } + + let file = std::fs::File::open(&path)?; + let reader = BufReader::new(file); + let mut count = 0; + + for line in reader.lines() { + let line = line?; + if line.trim().is_empty() { + continue; + } + + let event: HeartbeatEvent = match serde_json::from_str(&line) { + Ok(v) => v, + Err(e) => { + tracing::warn!("invalid heartbeat line: {e}"); + continue; + } + }; + + // Reconstruct heartbeat from event + let hb = NodeHeartbeat { + node_id: event.node_id, + hostname: event.hostname, + os_profile: event.os_profile, + cloudflare_ok: event.heartbeat.cloudflare_ok, + services_ok: event.heartbeat.services_ok, + vaultmesh_root: event.heartbeat.vaultmesh_root, + timestamp: event.ts, + metrics: event.heartbeat.metrics, + }; + + self.upsert_heartbeat_no_log(hb).await; + count += 1; + } + + tracing::info!("replayed {} heartbeats from {:?}", count, path); + Ok(()) + } + + async fn replay_scans(&self) -> std::io::Result<()> { + let path = self.logs.file_path("scans.jsonl"); + if !path.exists() { + return Ok(()); + } + + let file = std::fs::File::open(&path)?; + let reader = BufReader::new(file); + let mut count = 0; + + for line in reader.lines() { + let line = line?; + if line.trim().is_empty() { + continue; + } + + let event: ScanEvent = match serde_json::from_str(&line) { + Ok(v) => v, + Err(e) => { + tracing::warn!("invalid scan line: {e}"); + continue; + } + }; + + let scan = LastScan { + ts: event.ts, + summary: event.summary, + findings_real: event.findings_real, + receipt_hash: event.receipt_hash, + }; + + self.update_last_scan_no_log(event.node_id, scan).await; + count += 1; + } + + tracing::info!("replayed {} scans from {:?}", count, path); + Ok(()) + } + + async fn replay_commands(&self) -> std::io::Result<()> { + let path = self.logs.file_path("commands.jsonl"); + if !path.exists() { + return Ok(()); + } + + let file = std::fs::File::open(&path)?; + let reader = BufReader::new(file); + let mut count = 0; + + for line in reader.lines() { + let line = line?; + if line.trim().is_empty() { + continue; + } + + let event: CommandEvent = match serde_json::from_str(&line) { + Ok(v) => v, + Err(e) => { + tracing::warn!("invalid command line: {e}"); + continue; + } + }; + + // Build minimal CommandResult for history (stdout/stderr not stored in log) + let result = CommandResult { + node_id: event.node_id, + ts: event.ts, + nonce: event.nonce, + cmd: event.cmd, + status: event.status, + exit_code: event.exit_code, + stdout: String::new(), + stderr: String::new(), + }; + + self.record_command_result_no_log(result).await; + count += 1; + } + + tracing::info!("replayed {} commands from {:?}", count, path); + Ok(()) + } +} + +// ============================================================================ +// V0.7: Attention computation (moved from routes.rs for SSE access) +// ============================================================================ + +/// Compute attention status for a node based on current state. +pub fn compute_attention( + now: OffsetDateTime, + hb: &NodeHeartbeat, + scan: Option<&LastScan>, + cfg: &SchedulerConfig, +) -> NodeAttentionStatus { + let mut reasons = Vec::new(); + + // Heartbeat staleness + if now - hb.timestamp > cfg.heartbeat_stale { + reasons.push("heartbeat_stale".to_string()); + } + + // Scan staleness / findings + match scan { + None => reasons.push("never_scanned".to_string()), + Some(s) => { + if now - s.ts > cfg.scan_stale { + reasons.push("scan_stale".to_string()); + } + if s.summary.critical > 0 { + reasons.push("critical_findings".to_string()); + } else if s.summary.high > 0 { + reasons.push("high_findings".to_string()); + } + } + } + + // Service flags + if !hb.cloudflare_ok { + reasons.push("cloudflare_down".to_string()); + } + if !hb.services_ok { + reasons.push("services_down".to_string()); + } + + NodeAttentionStatus { + needs_attention: !reasons.is_empty(), + reasons, + } +} diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md new file mode 100644 index 0000000..5615e8d --- /dev/null +++ b/docs/ARCHITECTURE.md @@ -0,0 +1,182 @@ +# VaultMesh Command Center Architecture + +## Overview + +The VaultMesh Command Center is a minimal control plane for monitoring and managing VaultMesh nodes. It consists of two components: + +1. **Command Center (CC)** - Central Rust/Axum web server +2. **Node Agent** - Lightweight daemon running on each VaultMesh node + +## Communication Model + +``` +┌─────────────────┐ HTTPS (Cloudflare Tunnel) ┌─────────────────┐ +│ Node Agent │ ─────────────────────────────────▶│ Command Center │ +│ (ArchVault) │ POST /api/agent/heartbeat │ (Axum) │ +└─────────────────┘ └─────────────────┘ + │ +┌─────────────────┐ HTTPS (Cloudflare Tunnel) │ +│ Node Agent │ ─────────────────────────────────────────┘ +│ (DebianVault) │ POST /api/agent/heartbeat +└─────────────────┘ + + ┌─────────────────────────────────┐ + │ Cloudflare Access │ + │ (Zero Trust Authentication) │ + └─────────────────────────────────┘ + │ + ▼ + ┌─────────────────────────────────┐ + │ Admin Browser │ + │ GET / (Dashboard) │ + └─────────────────────────────────┘ +``` + +## Security Model + +### Outbound-Only from Nodes + +Nodes only make **outbound** connections to the Command Center. No inbound ports are opened on VaultMesh nodes for CC communication. + +### Cloudflare Zero Trust + +- **Cloudflare Tunnel**: CC is exposed via `cloudflared` tunnel, not a public IP +- **Cloudflare Access**: Admin dashboard protected by Cloudflare Access policies +- **Agent Authentication**: Future: signed JWTs or shared secrets for agent auth + +### Network Flow + +1. Node agent wakes up every N seconds +2. Agent collects local health metrics +3. Agent POSTs heartbeat JSON to CC via Cloudflare Tunnel +4. CC stores heartbeat in memory (future: SQLite) +5. Admin views dashboard via Cloudflare Access-protected URL + +## Data Model + +### NodeHeartbeat + +```json +{ + "node_id": "550e8400-e29b-41d4-a716-446655440000", + "hostname": "vault-node-01", + "os_profile": "ArchVault", + "cloudflare_ok": true, + "services_ok": true, + "vaultmesh_root": "/var/lib/vaultmesh", + "timestamp": "2024-01-15T10:30:00Z" +} +``` + +### Storage (V1) + +In-memory `HashMap` wrapped in `Arc>`. + +Nodes are keyed by `node_id`. Each heartbeat overwrites the previous entry for that node. + +## Component Details + +### Command Center + +| Component | Technology | +|-----------|------------| +| Web Framework | Axum 0.7 | +| Async Runtime | Tokio | +| Serialization | Serde + serde_json | +| Logging | tracing + tracing-subscriber | +| HTML | Server-rendered (HTMX-ready) | + +### Node Agent + +| Component | Technology | +|-----------|------------| +| HTTP Client | reqwest (rustls-tls) | +| Async Runtime | Tokio | +| System Checks | systemctl calls | +| Config | Environment variables | + +## Deployment + +### Command Center Deployment + +1. Build: `cargo build --release -p vaultmesh-command-center` +2. Install binary to `/usr/local/bin/` +3. Install systemd unit +4. Configure Cloudflare Tunnel to `http://127.0.0.1:8088` +5. Configure Cloudflare Access policies + +### Node Agent Deployment + +1. Build: `cargo build --release -p vaultmesh-node-agent` +2. Install binary to `/usr/local/bin/` +3. Create `/etc/vaultmesh/agent.env` with: + - `VAULTMESH_CC_URL=https://cc.your-domain.example` + - `VAULTMESH_OS_PROFILE=ArchVault` +4. Install systemd unit + +## Version History + +### V0.7.2: Communication Layer (Current) +- Unified `EventEnvelope` as canonical message format for comms events. +- `POST /api/events` - Ingest endpoint for operators, agents, and bots. +- `GET /api/events` - Query endpoint with since/kind/node_id/limit filtering. +- New event kinds: `note`, `incident`, `ack`, `tag`, `resolve`. +- SSE broadcast of envelope events by their `kind` name. +- Durable persistence to `events.jsonl` with replay on startup. +- Memory-bounded in-memory store (500 most recent envelopes). + +### V0.7.1: Mission Console +- NASA-style 3-panel dashboard at `GET /console`. +- Global Mission Bar with fleet KPIs (Total/Healthy/Attention/Critical). +- Left panel: Node list with status pills (OK/ATTN/CRIT), live heartbeat glow. +- Center panel: Selected node telemetry + per-node event timeline. +- Right panel: Attention summary, scan findings, global event feed. +- Full SSE wiring for real-time DOM updates without page refresh. + +### V0.7: SSE Event Bus +- Real-time event streaming via `GET /events` (Server-Sent Events). +- Named events: `heartbeat`, `scan`, `command`, `attention`. +- Broadcast channel distributes events to all connected SSE clients. +- Keepalive every 15s to prevent connection timeouts. +- JS probe in dashboard for console.log debugging. + +### V0.6.1: Log Tools +- CLI subcommands for querying JSONL event logs. +- `logs view` with filters: --kind, --node, --since, --min-severity, --limit. +- `logs tail` for real-time log following. +- `logs stats` for per-node event statistics. + +### V0.6: Append-Only Persistence +- Event logging to JSONL files (heartbeats, scans, commands). +- State replay on startup from `$VAULTMESH_LOG_DIR`. +- Foundation for V0.8 Ledger Bridge (Merkle over logs). + +### V0.5: Fleet Orchestrator +- Background scheduler for autonomous scans. +- Attention model with staleness / drift detection. +- Command policy enforcement (per-profile allowlists). + +### V0.4: Commands & Sovereign Scan +- Push commands to nodes (Ed25519 signed). +- Command receipts with proof chain. +- Sovereign scan execution and reporting. + +### V0.3: Signed Commands +- Ed25519 key generation and command signing. +- Agent signature verification. +- Nonce-based replay protection. + +### V0.2: Node Metrics +- System metrics in heartbeat (load, memory, disk). +- Heartbeat history tracking. + +## Future Extensions + +### V0.7.1: Mission Console +- NASA-inspired dashboard layout with node tree sidebar. +- Live row updates via SSE (no page refresh). +- Attention summary panel with fleet health overview. + +### V0.8: Ledger Bridge +- Merkle tree over `sovereign-scans.jsonl`. +- `ROOT.txt` + `PROOFCHAIN.json` for VaultMesh proof layer. diff --git a/docs/EVENT_GENERATION.md b/docs/EVENT_GENERATION.md new file mode 100644 index 0000000..a85eb94 --- /dev/null +++ b/docs/EVENT_GENERATION.md @@ -0,0 +1,83 @@ +# VaultMesh Command Center: Event Generation Mechanism + +## Overview + +The VaultMesh Command Center generates events through a sophisticated, multi-layered mechanism designed for real-time monitoring and fleet management. + +## Event Types + +### 1. Heartbeat Events +- **Trigger**: Node heartbeat submission +- **Payload Includes**: + * Timestamp + * Node ID + * Hostname + * OS Profile + * Cloudflare Status + * Services Status + * VaultMesh Root Path + * System Metrics (uptime, load averages) + +### 2. Scan Events +- **Trigger**: Successful scan result submission +- **Payload Includes**: + * Timestamp + * Node ID + * Hostname + * OS Profile + * Scan Summary (critical/high/medium/low findings) + * Real/Mock Findings Flag + * Receipt Hash + +### 3. Command Events +- **Trigger**: Command execution result +- **Payload Includes**: + * Timestamp + * Node ID + * Hostname + * OS Profile + * Command Name + * Execution Status + * Exit Code + * Nonce (for replay protection) + +## Event Generation Flow + +1. **Data Collection** + - Node agents submit heartbeats and scan results + - Command results are reported back to the Command Center + +2. **Event Processing** + - Raw data is transformed into structured event payloads + - Events are published to a broadcast channel + - Server-Sent Events (SSE) distribute events to connected clients + +3. **State Management** + - Events trigger state updates (node history, last scan, etc.) + - Attention status is recomputed based on new events + +## Advanced Features + +- **Automatic Scan Scheduling** + - Periodic scans triggered based on node profile and last scan timestamp + - Configurable scan intervals + +- **Attention Computation** + - Dynamic assessment of node health + - Tracks critical findings, heartbeat staleness, service status + +## Security Considerations + +- Ed25519 key signing for commands +- Nonce-based replay protection +- Configurable command policies per node profile + +## Performance Characteristics + +- In-memory event storage (500 most recent events) +- Optional JSONL log persistence +- Low-overhead event broadcasting + +## Extensibility + +The event system supports easy addition of new event types and payloads through the `ServerEvent` enum and corresponding payload structures. \ No newline at end of file diff --git a/docs/EVENT_PROCESSING.md b/docs/EVENT_PROCESSING.md new file mode 100644 index 0000000..590f360 --- /dev/null +++ b/docs/EVENT_PROCESSING.md @@ -0,0 +1,170 @@ +# VaultMesh Command Center: Event Processing Architecture + +## Overview + +The Command Center implements a sophisticated, multi-layered event processing system designed for robust, real-time fleet management. + +## Key Components + +### 1. Event Types +- `NodeHeartbeat`: Node status updates +- `ScanEvent`: Scan findings and results +- `CommandEvent`: Command execution outcomes +- `EventEnvelope`: Generic communication events + +### 2. Processing Stages + +#### a. Ingestion +- Raw event data received via HTTP endpoints +- Validated and transformed into structured payloads +- Logged to append-only JSONL files for durability + +#### b. State Update +- In-memory state updated with latest information +- Maintain sliding window of recent events (max 50 per node) +- Compute derived states (attention status, last scan) + +#### c. Broadcasting +- Events published via Server-Sent Events (SSE) +- Broadcast to all connected clients +- Low-latency, real-time updates + +## Event Processing Workflow + +### Heartbeat Processing +1. Receive heartbeat data +2. Log to `heartbeats.jsonl` +3. Update node history +4. Publish heartbeat event +5. Recompute node attention status +6. Broadcast attention event + +```rust +pub async fn upsert_heartbeat(&self, hb: NodeHeartbeat) { + // Log event + let event = HeartbeatEvent { ... }; + self.logs.append_json_line("heartbeats.jsonl", &event); + + // Update in-memory state + self.upsert_heartbeat_no_log(hb).await; +} +``` + +### Scan Result Processing +1. Receive scan results +2. Log to `scans.jsonl` +3. Update last scan information +4. Publish scan event +5. Recompute node attention status +6. Broadcast attention event + +```rust +pub async fn update_last_scan(&self, node_id: Uuid, scan: LastScan) { + // Update scan history + let mut scans = self.last_scans.write().await; + scans.insert(node_id, scan); +} +``` + +### Command Result Processing +1. Receive command result +2. Log to `commands.jsonl` +3. Store command history +4. Publish command event +5. Optionally trigger additional actions + +```rust +pub async fn record_command_result(&self, result: CommandResult) { + // Log command event + let event = CommandEvent { ... }; + self.logs.append_json_line("commands.jsonl", &event); + + // Update command result history + self.record_command_result_no_log(result).await; +} +``` + +## Attention Computation + +The system dynamically computes a node's attention status based on multiple factors: + +- Heartbeat staleness +- Scan staleness +- Scan findings severity +- Service status +- Cloudflare tunnel status + +```rust +pub fn compute_attention( + now: OffsetDateTime, + hb: &NodeHeartbeat, + scan: Option<&LastScan>, + cfg: &SchedulerConfig, +) -> NodeAttentionStatus { + let mut reasons = Vec::new(); + + // Check heartbeat age + if now - hb.timestamp > cfg.heartbeat_stale { + reasons.push("heartbeat_stale"); + } + + // Check scan status + match scan { + None => reasons.push("never_scanned"), + Some(s) => { + if now - s.ts > cfg.scan_stale { + reasons.push("scan_stale"); + } + if s.summary.critical > 0 { + reasons.push("critical_findings"); + } + } + } + + // Check service flags + if !hb.cloudflare_ok { + reasons.push("cloudflare_down"); + } + + NodeAttentionStatus { + needs_attention: !reasons.is_empty(), + reasons, + } +} +``` + +## Persistence and State Replay + +### Log Replay Mechanism +- On startup, reconstruct in-memory state from JSONL logs +- Replay events in chronological order +- Recreate node history, scan results, and command results + +```rust +pub async fn replay_from_logs(&self) { + self.replay_heartbeats().await; + self.replay_scans().await; + self.replay_commands().await; + self.replay_envelopes().await; +} +``` + +## Performance Characteristics + +- In-memory event store: 500 most recent events +- Append-only logging for durability +- Non-blocking event processing +- Low-overhead broadcasting + +## Security Considerations + +- Ed25519 key signing for commands +- Configurable command policies +- Nonce-based replay protection +- No sensitive data stored in logs + +## Extensibility + +- Easy to add new event types +- Flexible attention computation +- Modular event processing pipeline \ No newline at end of file diff --git a/docs/NODE_AGENT_CONTRACT.md b/docs/NODE_AGENT_CONTRACT.md new file mode 100644 index 0000000..6c21f5a --- /dev/null +++ b/docs/NODE_AGENT_CONTRACT.md @@ -0,0 +1,163 @@ +# Node Agent Contract + +This document defines the API contract between VaultMesh Node Agents and the Command Center. + +## Heartbeat Endpoint + +### Request + +``` +POST /api/agent/heartbeat +Content-Type: application/json +``` + +### Heartbeat Schema + +```json +{ + "node_id": "UUID v4", + "hostname": "string", + "os_profile": "string", + "cloudflare_ok": "boolean", + "services_ok": "boolean", + "vaultmesh_root": "string", + "timestamp": "ISO 8601 / RFC 3339" +} +``` + +### Field Definitions + +| Field | Type | Description | +|-------|------|-------------| +| `node_id` | UUID v4 | Unique identifier for this node. Should persist across reboots. | +| `hostname` | String | System hostname (from `hostname::get()` or `/etc/hostname`) | +| `os_profile` | String | VaultMesh profile name: `ArchVault`, `DebianVault`, etc. | +| `cloudflare_ok` | Boolean | `true` if `cloudflared` service is active | +| `services_ok` | Boolean | `true` if VAULTMESH_ROOT exists and is healthy | +| `vaultmesh_root` | String | Path to VAULTMESH_ROOT (e.g., `/var/lib/vaultmesh`) | +| `timestamp` | RFC 3339 | UTC timestamp when heartbeat was generated | + +### Response + +**Success (200 OK)**: +```json +{ + "status": "ok" +} +``` + +**Error (4xx/5xx)**: +```json +{ + "error": "description" +} +``` + +## Environment Variables + +The node agent is configured via environment variables, typically set in `/etc/vaultmesh/agent.env`. + +| Variable | Required | Default | Description | +|----------|----------|---------|-------------| +| `VAULTMESH_NODE_ID` | No | Auto-generated UUID v4 | Persistent node identifier | +| `VAULTMESH_CC_URL` | No | `http://127.0.0.1:8088` | Command Center base URL | +| `VAULTMESH_OS_PROFILE` | No | `ArchVault` | OS profile name to report | +| `VAULTMESH_ROOT` | No | `/var/lib/vaultmesh` | Path to check for services_ok | +| `VAULTMESH_HEARTBEAT_SECS` | No | `30` | Seconds between heartbeats | +| `RUST_LOG` | No | `info` | Log level (trace, debug, info, warn, error) | + +### Example `/etc/vaultmesh/agent.env` + +```bash +VAULTMESH_NODE_ID=550e8400-e29b-41d4-a716-446655440000 +VAULTMESH_CC_URL=https://cc.vaultmesh.example +VAULTMESH_OS_PROFILE=ArchVault +VAULTMESH_ROOT=/var/lib/vaultmesh +VAULTMESH_HEARTBEAT_SECS=30 +RUST_LOG=info +``` + +## Node Registration + +Nodes self-register on first heartbeat. There is no explicit registration endpoint. + +When the Command Center receives a heartbeat with a new `node_id`, it creates a new entry. Subsequent heartbeats update the existing entry. + +### Node ID Persistence + +For consistent tracking, the `VAULTMESH_NODE_ID` should be persisted. Options: + +1. **Environment file**: Set in `/etc/vaultmesh/agent.env` +2. **Machine ID**: Could derive from `/etc/machine-id` +3. **Auto-generated**: If not set, agent generates a new UUID on each start (not recommended for production) + +**Recommended**: Generate a UUID once during node bootstrap and store in `agent.env`: + +```bash +# During node bootstrap +echo "VAULTMESH_NODE_ID=$(uuidgen)" >> /etc/vaultmesh/agent.env +``` + +## Health Checks + +### cloudflare_ok + +The agent runs: +```bash +systemctl is-active --quiet cloudflared +``` + +Returns `true` if exit code is 0 (service active). + +### services_ok + +The agent checks if `VAULTMESH_ROOT` exists and is a directory: +```rust +std::path::Path::new(vaultmesh_root).is_dir() +``` + +Future versions may add additional checks: +- Disk space +- Key services running +- ProofChain integrity + +## Error Handling + +### Network Errors + +If the agent cannot reach the Command Center: +- Log error at WARN/ERROR level +- Sleep for heartbeat interval +- Retry on next cycle + +No exponential backoff in V1. The agent will retry every `VAULTMESH_HEARTBEAT_SECS` seconds indefinitely. + +### Invalid Response + +If CC returns non-2xx status: +- Log warning with status code +- Continue normal operation +- Retry on next cycle + +## Security Considerations + +### Transport Security + +- Agent should connect to CC via HTTPS (Cloudflare Tunnel) +- `reqwest` configured with `rustls-tls` (no OpenSSL dependency) + +### Authentication (Future) + +V1 has no agent authentication. Future versions may add: +- Signed JWTs +- Shared secrets +- mTLS + +### Data Sensitivity + +Heartbeat data is low-sensitivity: +- No secrets or credentials +- No PII +- No file contents + +The `vaultmesh_root` path and hostname are the most identifying fields. diff --git a/node-agent/Cargo.toml b/node-agent/Cargo.toml new file mode 100644 index 0000000..f1f9c30 --- /dev/null +++ b/node-agent/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "vaultmesh-node-agent" +version = "0.1.0" +edition = "2021" + +[dependencies] +reqwest = { version = "0.12", features = ["json", "rustls-tls"], default-features = false } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "process"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +uuid = { version = "1", features = ["v4", "serde"] } +time = { version = "0.3", features = ["serde", "macros", "formatting", "parsing"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +hostname = "0.4" +anyhow = "1" +ed25519-dalek = "2" +base64 = "0.22" +blake3 = "1" diff --git a/node-agent/src/config.rs b/node-agent/src/config.rs new file mode 100644 index 0000000..1689eaa --- /dev/null +++ b/node-agent/src/config.rs @@ -0,0 +1,52 @@ +use std::env; +use uuid::Uuid; + +/// Agent configuration loaded from environment variables. +#[derive(Debug, Clone)] +pub struct Config { + pub node_id: Uuid, + pub cc_url: String, + pub os_profile: String, + pub vaultmesh_root: String, + pub heartbeat_secs: u64, +} + +impl Config { + /// Load configuration from environment variables. + /// + /// | Variable | Default | + /// |---------------------------|----------------------------| + /// | VAULTMESH_NODE_ID | auto-generated UUID | + /// | VAULTMESH_CC_URL | http://127.0.0.1:8088 | + /// | VAULTMESH_OS_PROFILE | ArchVault | + /// | VAULTMESH_ROOT | /var/lib/vaultmesh | + /// | VAULTMESH_HEARTBEAT_SECS | 30 | + pub fn from_env() -> Self { + let node_id = env::var("VAULTMESH_NODE_ID") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or_else(Uuid::new_v4); + + let cc_url = env::var("VAULTMESH_CC_URL") + .unwrap_or_else(|_| "http://127.0.0.1:8088".into()); + + let os_profile = env::var("VAULTMESH_OS_PROFILE") + .unwrap_or_else(|_| "ArchVault".into()); + + let vaultmesh_root = env::var("VAULTMESH_ROOT") + .unwrap_or_else(|_| "/var/lib/vaultmesh".into()); + + let heartbeat_secs = env::var("VAULTMESH_HEARTBEAT_SECS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(30); + + Self { + node_id, + cc_url, + os_profile, + vaultmesh_root, + heartbeat_secs, + } + } +} diff --git a/node-agent/src/main.rs b/node-agent/src/main.rs new file mode 100644 index 0000000..f2e3b75 --- /dev/null +++ b/node-agent/src/main.rs @@ -0,0 +1,818 @@ +mod config; + +use crate::config::Config; +use anyhow::{bail, Result}; +use ed25519_dalek::{Signature, Verifier, VerifyingKey}; +use serde::{Deserialize, Serialize}; +use std::{fs, path::Path, process::Command as StdCommand}; +use time::OffsetDateTime; +use tokio::time::{sleep, Duration}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use uuid::Uuid; + +/// Metrics collected from the node. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct NodeMetrics { + pub uptime_seconds: Option, + pub load1: Option, + pub load5: Option, + pub load15: Option, + pub mem_total_mb: Option, + pub mem_used_mb: Option, + pub disk_root_total_gb: Option, + pub disk_root_used_gb: Option, +} + +/// Heartbeat data sent to the Command Center. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeHeartbeat { + pub node_id: Uuid, + pub hostname: String, + pub os_profile: String, + pub cloudflare_ok: bool, + pub services_ok: bool, + pub vaultmesh_root: String, + #[serde(with = "time::serde::rfc3339")] + pub timestamp: OffsetDateTime, + pub metrics: NodeMetrics, +} + +/// Command payload from CC. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommandPayload { + pub node_id: Uuid, + #[serde(with = "time::serde::rfc3339")] + pub ts: OffsetDateTime, + pub nonce: String, + pub cmd: String, + pub args: serde_json::Value, +} + +/// Signed command from CC. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SignedCommand { + pub payload: CommandPayload, + pub signature: String, +} + +/// Command execution result sent back to CC. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommandResult { + pub node_id: Uuid, + #[serde(with = "time::serde::rfc3339")] + pub ts: OffsetDateTime, + pub nonce: String, + pub cmd: String, + pub status: String, + pub exit_code: Option, + pub stdout: String, + pub stderr: String, +} + +/// Summary of scan findings by severity. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct ScanSummary { + pub critical: u32, + pub high: u32, + pub medium: u32, + pub low: u32, + pub info: u32, +} + +impl ScanSummary { + fn merge(&mut self, other: &ScanSummary) { + self.critical += other.critical; + self.high += other.high; + self.medium += other.medium; + self.low += other.low; + self.info += other.info; + } +} + +/// Result from running a single scanner. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct ScannerResult { + pub available: bool, + pub ran: bool, + pub version: Option, + pub findings_real: bool, + pub findings: u32, + pub output_file: Option, + pub error: Option, + pub summary: ScanSummary, +} + +/// Check if cloudflared service is active via systemctl. +fn check_cloudflared() -> bool { + StdCommand::new("systemctl") + .args(["is-active", "--quiet", "cloudflared"]) + .status() + .map(|s| s.success()) + .unwrap_or(false) +} + +/// Check if essential VaultMesh services are healthy. +fn check_services(vaultmesh_root: &str) -> bool { + Path::new(vaultmesh_root).is_dir() +} + +/// Gather system metrics from /proc and df. +fn gather_metrics() -> NodeMetrics { + NodeMetrics { + uptime_seconds: read_uptime_seconds(), + load1: read_loadavg_index(0), + load5: read_loadavg_index(1), + load15: read_loadavg_index(2), + mem_total_mb: read_meminfo_total_mb(), + mem_used_mb: read_meminfo_used_mb(), + disk_root_total_gb: read_disk_root_total_gb(), + disk_root_used_gb: read_disk_root_used_gb(), + } +} + +fn read_uptime_seconds() -> Option { + let contents = fs::read_to_string("/proc/uptime").ok()?; + let first = contents.split_whitespace().next()?; + let secs_str = first.split('.').next().unwrap_or(first); + secs_str.parse().ok() +} + +fn read_loadavg_index(idx: usize) -> Option { + let contents = fs::read_to_string("/proc/loadavg").ok()?; + let mut parts = contents.split_whitespace(); + parts.nth(idx)?.parse().ok() +} + +fn read_meminfo_total_mb() -> Option { + read_meminfo_field_kb("MemTotal").map(|kb| kb / 1024) +} + +fn read_meminfo_used_mb() -> Option { + let total_kb = read_meminfo_field_kb("MemTotal")?; + let avail_kb = read_meminfo_field_kb("MemAvailable")?; + let used_kb = total_kb.saturating_sub(avail_kb); + Some(used_kb / 1024) +} + +fn read_meminfo_field_kb(field: &str) -> Option { + let contents = fs::read_to_string("/proc/meminfo").ok()?; + for line in contents.lines() { + if line.starts_with(field) { + let val = line.split_whitespace().nth(1)?; + return val.parse().ok(); + } + } + None +} + +fn read_disk_root_total_gb() -> Option { + read_df_root_column(1) +} + +fn read_disk_root_used_gb() -> Option { + read_df_root_column(2) +} + +fn read_df_root_column(col_index: usize) -> Option { + let output = StdCommand::new("df") + .arg("-B1") + .arg("/") + .output() + .ok()?; + if !output.status.success() { + return None; + } + let stdout = String::from_utf8_lossy(&output.stdout); + let mut lines = stdout.lines(); + lines.next()?; + let line = lines.next()?; + let cols: Vec<&str> = line.split_whitespace().collect(); + let val: f64 = cols.get(col_index)?.parse().ok()?; + Some((val / (1024.0 * 1024.0 * 1024.0)) as f32) +} + +/// Load CC public key from file. +fn load_cc_public_key(path: &str) -> Result { + use base64::Engine; + let p = Path::new(path); + if !p.exists() { + bail!("CC public key file not found: {}", path); + } + let contents = fs::read_to_string(p)?.trim().to_string(); + let bytes = base64::engine::general_purpose::STANDARD.decode(&contents)?; + let key_bytes: [u8; 32] = bytes + .try_into() + .map_err(|_| anyhow::anyhow!("Public key must be 32 bytes"))?; + Ok(VerifyingKey::from_bytes(&key_bytes)?) +} + +/// Verify a signed command. +fn verify_command(cmd: &SignedCommand, pubkey: &VerifyingKey) -> Result<()> { + use base64::Engine; + let payload_bytes = serde_json::to_vec(&cmd.payload)?; + let sig_bytes = base64::engine::general_purpose::STANDARD.decode(&cmd.signature)?; + let sig_arr: [u8; 64] = sig_bytes + .try_into() + .map_err(|_| anyhow::anyhow!("Signature must be 64 bytes"))?; + let signature = Signature::from_bytes(&sig_arr); + pubkey.verify(&payload_bytes, &signature)?; + Ok(()) +} + +/// Execute a whitelisted command. +fn execute_command(cmd: &SignedCommand, config: &Config) -> CommandResult { + let payload = &cmd.payload; + let allowed = ["restart-service", "service-status", "tail-journal", "sovereign-scan"]; + + if !allowed.contains(&payload.cmd.as_str()) { + return CommandResult { + node_id: payload.node_id, + ts: OffsetDateTime::now_utc(), + nonce: payload.nonce.clone(), + cmd: payload.cmd.clone(), + status: "rejected".into(), + exit_code: None, + stdout: String::new(), + stderr: format!("Command '{}' not in whitelist", payload.cmd), + }; + } + + match payload.cmd.as_str() { + "service-status" => { + let service = payload.args["service"].as_str().unwrap_or("cloudflared"); + let output = StdCommand::new("systemctl") + .args(["status", service]) + .output(); + make_result(payload, output) + } + "restart-service" => { + let service = payload.args["service"].as_str().unwrap_or("cloudflared"); + let output = StdCommand::new("systemctl") + .args(["restart", service]) + .output(); + make_result(payload, output) + } + "tail-journal" => { + let unit = payload.args["unit"].as_str().unwrap_or("cloudflared"); + let lines = payload.args["lines"].as_u64().unwrap_or(50); + let output = StdCommand::new("journalctl") + .args(["-u", unit, "-n", &lines.to_string(), "--no-pager"]) + .output(); + make_result(payload, output) + } + "sovereign-scan" => { + run_sovereign_scan(payload, config) + } + _ => CommandResult { + node_id: payload.node_id, + ts: OffsetDateTime::now_utc(), + nonce: payload.nonce.clone(), + cmd: payload.cmd.clone(), + status: "error".into(), + exit_code: None, + stdout: String::new(), + stderr: "Unknown command".into(), + }, + } +} + +/// Convert command output to CommandResult. +fn make_result(payload: &CommandPayload, output: std::io::Result) -> CommandResult { + match output { + Ok(out) => CommandResult { + node_id: payload.node_id, + ts: OffsetDateTime::now_utc(), + nonce: payload.nonce.clone(), + cmd: payload.cmd.clone(), + status: if out.status.success() { "success".into() } else { "failed".into() }, + exit_code: out.status.code(), + stdout: String::from_utf8_lossy(&out.stdout).into_owned(), + stderr: String::from_utf8_lossy(&out.stderr).into_owned(), + }, + Err(e) => CommandResult { + node_id: payload.node_id, + ts: OffsetDateTime::now_utc(), + nonce: payload.nonce.clone(), + cmd: payload.cmd.clone(), + status: "error".into(), + exit_code: None, + stdout: String::new(), + stderr: format!("Failed to execute: {}", e), + }, + } +} + +/// Check if a scanner binary is available on the system. +fn scanner_available(name: &str) -> bool { + StdCommand::new("which") + .arg(name) + .output() + .map(|o| o.status.success()) + .unwrap_or(false) +} + +/// Get scanner version string. +fn get_scanner_version(name: &str) -> Option { + let output = StdCommand::new(name) + .arg("--version") + .output() + .ok()?; + if output.status.success() { + let stdout = String::from_utf8_lossy(&output.stdout); + Some(stdout.lines().next().unwrap_or("unknown").to_string()) + } else { + None + } +} + +/// Run nuclei scanner or generate mock results. +fn run_nuclei(nonce: &str, profile: &str, receipts_dir: &str) -> ScannerResult { + if !scanner_available("nuclei") { + tracing::info!("nuclei not available, generating mock results"); + return generate_mock_nuclei_result(nonce, receipts_dir); + } + + let output_file = format!("{}/nuclei-{}.json", receipts_dir, nonce); + let templates = match profile { + "fast" => "cves/", + "deep" => ".", + _ => "cves/,vulnerabilities/", + }; + + let output = StdCommand::new("nuclei") + .args(["-t", templates, "-silent", "-json", "-o", &output_file]) + .args(["-target", "localhost"]) + .output(); + + match output { + Ok(out) if out.status.success() || out.status.code() == Some(0) => { + let summary = parse_nuclei_findings(&output_file); + ScannerResult { + available: true, + ran: true, + version: get_scanner_version("nuclei"), + findings_real: true, + findings: summary.critical + summary.high + summary.medium + summary.low + summary.info, + output_file: Some(format!("nuclei-{}.json", nonce)), + error: None, + summary, + } + } + Ok(out) => ScannerResult { + available: true, + ran: true, + version: get_scanner_version("nuclei"), + findings_real: false, + findings: 0, + output_file: None, + error: Some(format!("nuclei exited with code {:?}: {}", + out.status.code(), + String::from_utf8_lossy(&out.stderr))), + summary: ScanSummary::default(), + }, + Err(e) => ScannerResult { + available: true, + ran: false, + version: None, + findings_real: false, + findings: 0, + output_file: None, + error: Some(format!("Failed to run nuclei: {}", e)), + summary: ScanSummary::default(), + }, + } +} + +/// Generate mock nuclei results when scanner not available. +fn generate_mock_nuclei_result(nonce: &str, receipts_dir: &str) -> ScannerResult { + let output_file = format!("{}/nuclei-{}.json", receipts_dir, nonce); + let mock_findings = serde_json::json!([ + {"info": {"severity": "high", "name": "MOCK: Example CVE Detection"}, "host": "localhost", "matched-at": "localhost:80"}, + {"info": {"severity": "medium", "name": "MOCK: Outdated Software Version"}, "host": "localhost", "matched-at": "localhost:443"}, + {"info": {"severity": "low", "name": "MOCK: Information Disclosure"}, "host": "localhost", "matched-at": "localhost:22"} + ]); + + fs::write(&output_file, serde_json::to_string_pretty(&mock_findings).unwrap()).ok(); + + ScannerResult { + available: false, + ran: false, + version: None, + findings_real: false, + findings: 3, + output_file: Some(format!("nuclei-{}.json", nonce)), + error: Some("Scanner not installed - mock data generated".into()), + summary: ScanSummary { + critical: 0, + high: 1, + medium: 1, + low: 1, + info: 0, + }, + } +} + +/// Parse nuclei JSON output for findings summary. +fn parse_nuclei_findings(output_file: &str) -> ScanSummary { + let mut summary = ScanSummary::default(); + + if let Ok(contents) = fs::read_to_string(output_file) { + for line in contents.lines() { + if let Ok(finding) = serde_json::from_str::(line) { + if let Some(severity) = finding["info"]["severity"].as_str() { + match severity.to_lowercase().as_str() { + "critical" => summary.critical += 1, + "high" => summary.high += 1, + "medium" => summary.medium += 1, + "low" => summary.low += 1, + "info" => summary.info += 1, + _ => {} + } + } + } + } + } + + summary +} + +/// Run trivy scanner or generate mock results. +fn run_trivy(nonce: &str, profile: &str, receipts_dir: &str) -> ScannerResult { + if !scanner_available("trivy") { + tracing::info!("trivy not available, generating mock results"); + return generate_mock_trivy_result(nonce, receipts_dir); + } + + let output_file = format!("{}/trivy-{}.json", receipts_dir, nonce); + let severity = match profile { + "fast" => "CRITICAL,HIGH", + "deep" => "CRITICAL,HIGH,MEDIUM,LOW", + _ => "CRITICAL,HIGH,MEDIUM", + }; + + let output = StdCommand::new("trivy") + .args(["fs", "/", "--format", "json", "--output", &output_file]) + .args(["--severity", severity]) + .args(["--scanners", "vuln"]) + .output(); + + match output { + Ok(out) if out.status.success() || out.status.code() == Some(0) => { + let summary = parse_trivy_findings(&output_file); + ScannerResult { + available: true, + ran: true, + version: get_scanner_version("trivy"), + findings_real: true, + findings: summary.critical + summary.high + summary.medium + summary.low + summary.info, + output_file: Some(format!("trivy-{}.json", nonce)), + error: None, + summary, + } + } + Ok(out) => ScannerResult { + available: true, + ran: true, + version: get_scanner_version("trivy"), + findings_real: false, + findings: 0, + output_file: None, + error: Some(format!("trivy exited with code {:?}: {}", + out.status.code(), + String::from_utf8_lossy(&out.stderr))), + summary: ScanSummary::default(), + }, + Err(e) => ScannerResult { + available: true, + ran: false, + version: None, + findings_real: false, + findings: 0, + output_file: None, + error: Some(format!("Failed to run trivy: {}", e)), + summary: ScanSummary::default(), + }, + } +} + +/// Generate mock trivy results when scanner not available. +fn generate_mock_trivy_result(nonce: &str, receipts_dir: &str) -> ScannerResult { + let output_file = format!("{}/trivy-{}.json", receipts_dir, nonce); + let mock_findings = serde_json::json!({ + "Results": [{ + "Target": "/", + "Vulnerabilities": [ + {"VulnerabilityID": "MOCK-CVE-2024-0001", "Severity": "CRITICAL", "Title": "MOCK: Critical System Vulnerability"}, + {"VulnerabilityID": "MOCK-CVE-2024-0002", "Severity": "HIGH", "Title": "MOCK: High Risk Package"}, + {"VulnerabilityID": "MOCK-CVE-2024-0003", "Severity": "HIGH", "Title": "MOCK: Outdated Dependency"}, + {"VulnerabilityID": "MOCK-CVE-2024-0004", "Severity": "MEDIUM", "Title": "MOCK: Medium Risk Finding"}, + {"VulnerabilityID": "MOCK-CVE-2024-0005", "Severity": "MEDIUM", "Title": "MOCK: Configuration Issue"} + ] + }] + }); + + fs::write(&output_file, serde_json::to_string_pretty(&mock_findings).unwrap()).ok(); + + ScannerResult { + available: false, + ran: false, + version: None, + findings_real: false, + findings: 5, + output_file: Some(format!("trivy-{}.json", nonce)), + error: Some("Scanner not installed - mock data generated".into()), + summary: ScanSummary { + critical: 1, + high: 2, + medium: 2, + low: 0, + info: 0, + }, + } +} + +/// Parse trivy JSON output for findings summary. +fn parse_trivy_findings(output_file: &str) -> ScanSummary { + let mut summary = ScanSummary::default(); + + if let Ok(contents) = fs::read_to_string(output_file) { + if let Ok(report) = serde_json::from_str::(&contents) { + if let Some(results) = report["Results"].as_array() { + for result in results { + if let Some(vulns) = result["Vulnerabilities"].as_array() { + for vuln in vulns { + if let Some(severity) = vuln["Severity"].as_str() { + match severity.to_uppercase().as_str() { + "CRITICAL" => summary.critical += 1, + "HIGH" => summary.high += 1, + "MEDIUM" => summary.medium += 1, + "LOW" => summary.low += 1, + _ => summary.info += 1, + } + } + } + } + } + } + } + } + + summary +} + +/// Execute the sovereign-scan command. +fn run_sovereign_scan(payload: &CommandPayload, config: &Config) -> CommandResult { + use time::format_description::well_known::Rfc3339; + + let profile = payload.args["profile"].as_str().unwrap_or("default"); + let scanners: Vec<&str> = payload.args["scanners"] + .as_array() + .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect()) + .unwrap_or_else(|| vec!["nuclei", "trivy"]); + + tracing::info!("Running sovereign-scan: profile={}, scanners={:?}", profile, scanners); + + // Create receipts directory + let receipts_dir = format!("{}/receipts/sovereign", config.vaultmesh_root); + if let Err(e) = fs::create_dir_all(&receipts_dir) { + return CommandResult { + node_id: payload.node_id, + ts: OffsetDateTime::now_utc(), + nonce: payload.nonce.clone(), + cmd: "sovereign-scan".into(), + status: "error".into(), + exit_code: None, + stdout: String::new(), + stderr: format!("Failed to create receipts directory: {}", e), + }; + } + + let mut scanner_results = serde_json::Map::new(); + let mut summary = ScanSummary::default(); + + // Run each requested scanner + for scanner in &scanners { + match *scanner { + "nuclei" => { + let result = run_nuclei(&payload.nonce, profile, &receipts_dir); + summary.merge(&result.summary); + scanner_results.insert("nuclei".into(), serde_json::to_value(&result).unwrap()); + } + "trivy" => { + let result = run_trivy(&payload.nonce, profile, &receipts_dir); + summary.merge(&result.summary); + scanner_results.insert("trivy".into(), serde_json::to_value(&result).unwrap()); + } + _ => { + tracing::warn!("Unknown scanner requested: {}", scanner); + } + } + } + + // Build receipt (without hash first) + let mut receipt = serde_json::json!({ + "kind": "sovereign-scan", + "node_id": payload.node_id, + "nonce": payload.nonce, + "timestamp": OffsetDateTime::now_utc().format(&Rfc3339).unwrap(), + "os_profile": config.os_profile, + "vaultmesh_root": config.vaultmesh_root, + "profile": profile, + "scanners": scanner_results, + "summary": summary + }); + + // Hash the receipt content (before adding hash field) + let receipt_bytes = serde_json::to_vec(&receipt).unwrap(); + let hash = blake3::hash(&receipt_bytes); + let hash_hex = format!("blake3:{}", hash.to_hex()); + receipt["receipt_hash"] = serde_json::Value::String(hash_hex.clone()); + + // Write receipt file + let receipt_path = format!("{}/SCAN-{}.json", receipts_dir, payload.nonce); + if let Err(e) = fs::write(&receipt_path, serde_json::to_string_pretty(&receipt).unwrap()) { + tracing::error!("Failed to write receipt: {}", e); + } + + tracing::info!("Sovereign scan complete: receipt_hash={}", hash_hex); + + // Append to proofchain log + let proofchain_dir = format!("{}/proofchain", config.vaultmesh_root); + fs::create_dir_all(&proofchain_dir).ok(); + let proofchain_entry = serde_json::json!({ + "ts": OffsetDateTime::now_utc().format(&Rfc3339).unwrap(), + "node_id": payload.node_id, + "nonce": payload.nonce, + "receipt_hash": hash_hex + }); + let proofchain_path = format!("{}/sovereign-scans.jsonl", proofchain_dir); + if let Ok(mut file) = fs::OpenOptions::new() + .create(true) + .append(true) + .open(&proofchain_path) + { + use std::io::Write; + writeln!(file, "{}", serde_json::to_string(&proofchain_entry).unwrap()).ok(); + } + + // Determine if any scanner had real findings + let any_real = scanner_results.values().any(|v| { + v.get("findings_real").and_then(|f| f.as_bool()).unwrap_or(false) + }); + + // Return summary as stdout + let stdout = serde_json::to_string_pretty(&serde_json::json!({ + "status": "completed", + "receipt_path": receipt_path, + "receipt_hash": hash_hex, + "summary": summary, + "findings_real": any_real + })) + .unwrap(); + + CommandResult { + node_id: payload.node_id, + ts: OffsetDateTime::now_utc(), + nonce: payload.nonce.clone(), + cmd: "sovereign-scan".into(), + status: "success".into(), + exit_code: Some(0), + stdout, + stderr: String::new(), + } +} + +#[tokio::main] +async fn main() { + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new( + std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()), + )) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let config = Config::from_env(); + let hostname = hostname::get() + .map(|h| h.to_string_lossy().to_string()) + .unwrap_or_else(|_| "unknown".into()); + + tracing::info!( + "VaultMesh Node Agent starting (node_id={}, hostname={})", + config.node_id, + hostname + ); + tracing::info!("Command Center URL: {}", config.cc_url); + tracing::info!("Heartbeat interval: {}s", config.heartbeat_secs); + + // Load CC public key for signature verification + let cc_pubkey_path = std::env::var("VAULTMESH_CC_PUBKEY") + .unwrap_or_else(|_| "/etc/vaultmesh/cc-ed25519.pub".into()); + let cc_pubkey = match load_cc_public_key(&cc_pubkey_path) { + Ok(pk) => { + tracing::info!("Loaded CC public key from {}", cc_pubkey_path); + Some(pk) + } + Err(e) => { + tracing::warn!("Could not load CC public key: {} - commands disabled", e); + None + } + }; + + let client = reqwest::Client::new(); + let heartbeat_url = format!("{}/api/agent/heartbeat", config.cc_url); + let commands_url = format!("{}/api/agent/commands?node_id={}", config.cc_url, config.node_id); + let result_url = format!("{}/api/agent/command-result", config.cc_url); + + loop { + // Send heartbeat + let metrics = gather_metrics(); + let heartbeat = NodeHeartbeat { + node_id: config.node_id, + hostname: hostname.clone(), + os_profile: config.os_profile.clone(), + cloudflare_ok: check_cloudflared(), + services_ok: check_services(&config.vaultmesh_root), + vaultmesh_root: config.vaultmesh_root.clone(), + timestamp: OffsetDateTime::now_utc(), + metrics, + }; + + tracing::debug!("Sending heartbeat: {:?}", heartbeat); + + match client.post(&heartbeat_url).json(&heartbeat).send().await { + Ok(resp) if resp.status().is_success() => { + tracing::info!( + "Heartbeat sent (cf={}, svc={}, load={:.2}, mem={}MiB, disk={:.1}GiB)", + heartbeat.cloudflare_ok, + heartbeat.services_ok, + heartbeat.metrics.load1.unwrap_or(0.0), + heartbeat.metrics.mem_used_mb.unwrap_or(0), + heartbeat.metrics.disk_root_used_gb.unwrap_or(0.0), + ); + } + Ok(resp) => { + tracing::warn!("Heartbeat returned non-success status: {}", resp.status()); + } + Err(e) => { + tracing::error!("Failed to send heartbeat: {}", e); + } + } + + // Poll for commands (only if we have the public key) + if let Some(ref pubkey) = cc_pubkey { + match client.get(&commands_url).send().await { + Ok(resp) if resp.status().is_success() => { + match resp.json::>().await { + Ok(commands) => { + for cmd in commands { + tracing::info!("Received command: {} (nonce={})", cmd.payload.cmd, cmd.payload.nonce); + + // Verify signature + if let Err(e) = verify_command(&cmd, pubkey) { + tracing::error!("Signature verification failed: {}", e); + let result = CommandResult { + node_id: cmd.payload.node_id, + ts: OffsetDateTime::now_utc(), + nonce: cmd.payload.nonce.clone(), + cmd: cmd.payload.cmd.clone(), + status: "rejected".into(), + exit_code: None, + stdout: String::new(), + stderr: format!("Signature verification failed: {}", e), + }; + let _ = client.post(&result_url).json(&result).send().await; + continue; + } + + tracing::info!("Signature verified, executing command"); + let result = execute_command(&cmd, &config); + tracing::info!( + "Command '{}' completed: {} (exit={:?})", + result.cmd, + result.status, + result.exit_code + ); + + // Report result + if let Err(e) = client.post(&result_url).json(&result).send().await { + tracing::error!("Failed to report command result: {}", e); + } + } + } + Err(e) => { + tracing::warn!("Failed to parse commands response: {}", e); + } + } + } + Ok(resp) => { + tracing::warn!("Commands endpoint returned: {}", resp.status()); + } + Err(e) => { + tracing::debug!("Failed to poll commands: {}", e); + } + } + } + + sleep(Duration::from_secs(config.heartbeat_secs)).await; + } +} diff --git a/scripts/event_generation_demo.py b/scripts/event_generation_demo.py new file mode 100755 index 0000000..f790975 --- /dev/null +++ b/scripts/event_generation_demo.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +""" +VaultMesh Command Center: Event Generation Demonstration + +This script simulates the event generation mechanism of the VaultMesh Command Center. +""" + +import uuid +import json +import time +from datetime import datetime, timezone +import requests + +class EventGenerator: + def __init__(self, cc_url="http://localhost:8088"): + self.cc_url = cc_url + self.node_id = str(uuid.uuid4()) + self.hostname = "demo-node" + self.os_profile = "DemoVault" + + def generate_heartbeat(self): + """Generate a heartbeat event payload.""" + return { + "node_id": self.node_id, + "hostname": self.hostname, + "os_profile": self.os_profile, + "cloudflare_ok": True, + "services_ok": True, + "vaultmesh_root": "/var/lib/vaultmesh", + "timestamp": datetime.now(timezone.utc).isoformat(), + "metrics": { + "uptime_seconds": int(time.time()), + "load1": 0.5, + "load5": 0.3, + "load15": 0.2 + } + } + + def generate_scan_result(self): + """Generate a mock scan result event payload.""" + return { + "cmd": "sovereign-scan", + "node_id": self.node_id, + "ts": datetime.now(timezone.utc).isoformat(), + "status": "success", + "stdout": json.dumps({ + "summary": { + "critical": 1, + "high": 3, + "medium": 5, + "low": 10, + "info": 15 + }, + "findings_real": False, + "receipt_hash": str(uuid.uuid4()) + }), + "exit_code": 0 + } + + def generate_command_result(self): + """Generate a mock command result event payload.""" + return { + "node_id": self.node_id, + "cmd": "service-status", + "ts": datetime.now(timezone.utc).isoformat(), + "status": "success", + "stdout": "cloudflared: active\nnginx: active", + "exit_code": 0, + "nonce": str(uuid.uuid4()) + } + + def send_event(self, event_type, event_data): + """Send an event to the Command Center.""" + try: + response = requests.post( + f"{self.cc_url}/api/events", + json={ + "kind": event_type, + "node_id": self.node_id, + "body": event_data + }, + headers={"Content-Type": "application/json"} + ) + response.raise_for_status() + print(f"[SUCCESS] Sent {event_type} event for node {self.node_id}") + return response.json() + except requests.RequestException as e: + print(f"[ERROR] Failed to send {event_type} event: {e}") + return None + +def main(): + generator = EventGenerator() + + print("🚀 VaultMesh Command Center Event Generation Demo") + print("---------------------------------------------") + + # Simulate event generation + events = [ + ("heartbeat", generator.generate_heartbeat()), + ("scan", generator.generate_scan_result()), + ("command", generator.generate_command_result()) + ] + + for event_type, event_data in events: + print(f"\nGenerating {event_type.upper()} event:") + print(json.dumps(event_data, indent=2)) + result = generator.send_event(event_type, event_data) + if result: + print(f"Event ID: {result.get('id')}") + time.sleep(1) # Add a small delay between events + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/systemd/vaultmesh-command-center.service b/systemd/vaultmesh-command-center.service new file mode 100644 index 0000000..441a563 --- /dev/null +++ b/systemd/vaultmesh-command-center.service @@ -0,0 +1,23 @@ +[Unit] +Description=VaultMesh Command Center +After=network-online.target cloudflared.service +Wants=network-online.target + +[Service] +Type=exec +ExecStart=/usr/local/bin/vaultmesh-command-center +Restart=on-failure +RestartSec=5 +Environment=RUST_LOG=info + +# Hardening +NoNewPrivileges=yes +ProtectSystem=strict +ProtectHome=yes +PrivateTmp=yes +ProtectKernelTunables=yes +ProtectKernelModules=yes +ProtectControlGroups=yes + +[Install] +WantedBy=multi-user.target diff --git a/systemd/vaultmesh-node-agent.service b/systemd/vaultmesh-node-agent.service new file mode 100644 index 0000000..698f56a --- /dev/null +++ b/systemd/vaultmesh-node-agent.service @@ -0,0 +1,23 @@ +[Unit] +Description=VaultMesh Node Agent +After=network-online.target cloudflared.service +Wants=network-online.target + +[Service] +Type=exec +ExecStart=/usr/local/bin/vaultmesh-node-agent +Restart=on-failure +RestartSec=10 +EnvironmentFile=/etc/vaultmesh/agent.env + +# Hardening +NoNewPrivileges=yes +ProtectSystem=strict +ProtectHome=yes +PrivateTmp=yes +ProtectKernelTunables=yes +ProtectKernelModules=yes +ProtectControlGroups=yes + +[Install] +WantedBy=multi-user.target diff --git a/vm-copilot/requirements.txt b/vm-copilot/requirements.txt new file mode 100644 index 0000000..a8608b2 --- /dev/null +++ b/vm-copilot/requirements.txt @@ -0,0 +1 @@ +requests>=2.28.0 diff --git a/vm-copilot/vm_copilot_agent.py b/vm-copilot/vm_copilot_agent.py new file mode 100644 index 0000000..a5f02a3 --- /dev/null +++ b/vm-copilot/vm_copilot_agent.py @@ -0,0 +1,305 @@ +#!/usr/bin/env python3 +""" +VaultMesh vm-copilot Agent + +A bot that monitors Command Center nodes and posts incidents/notes +when attention conditions are detected. + +Usage: + python vm_copilot_agent.py + +Environment variables: + VAULTMESH_CC_URL - Command Center URL (default: http://127.0.0.1:8088) + VM_COPILOT_INTERVAL - Poll interval in seconds (default: 60) + VM_COPILOT_NODES - Comma-separated list of node IDs to watch +""" + +import os +import sys +import time +import logging +from datetime import datetime, timezone +from typing import Optional + +import requests + +# ----------------------------------------------------------------------------- +# Configuration +# ----------------------------------------------------------------------------- + +CC_URL = os.environ.get("VAULTMESH_CC_URL", "http://127.0.0.1:8088") +POLL_INTERVAL = int(os.environ.get("VM_COPILOT_INTERVAL", "60")) + +# Node IDs to watch (can be overridden via env var) +DEFAULT_NODES = [ + "b5d5a72e-71d9-46d7-a4d7-a1c89d53224b", # gamma (machine-id derived) +] +WATCH_NODES = os.environ.get("VM_COPILOT_NODES", ",".join(DEFAULT_NODES)).split(",") +WATCH_NODES = [n.strip() for n in WATCH_NODES if n.strip()] + +AUTHOR = "vm-copilot" + +# Logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger("vm-copilot") + +# ----------------------------------------------------------------------------- +# State Tracking +# ----------------------------------------------------------------------------- + +# Track conditions we've already alerted on: node_id -> set of condition keys +alerted_conditions: dict[str, set[str]] = {} + +# Track last event timestamp per node for incremental fetching +last_event_ts: dict[str, str] = {} + +# ----------------------------------------------------------------------------- +# HTTP Helpers +# ----------------------------------------------------------------------------- + +def get_node(node_id: str) -> Optional[dict]: + """Fetch a single node's state from CC by querying the nodes list.""" + try: + # /nodes returns JSON array of all nodes + resp = requests.get(f"{CC_URL}/nodes", timeout=10) + if resp.status_code == 200: + nodes = resp.json() + # Find the node by ID + for node in nodes: + if node.get("node_id") == node_id: + return node + log.warning(f"Node {node_id} not found in nodes list") + return None + else: + log.error(f"Failed to get nodes: {resp.status_code}") + return None + except requests.RequestException as e: + log.error(f"Request error getting nodes: {e}") + return None + + +def get_events( + node_id: str, + since: Optional[str] = None, + kinds: Optional[list[str]] = None, + limit: int = 50, +) -> list[dict]: + """Query events for a node from CC.""" + params = {"node_id": node_id, "limit": str(limit)} + if since: + params["since"] = since + if kinds: + params["kind"] = ",".join(kinds) + + try: + resp = requests.get(f"{CC_URL}/api/events", params=params, timeout=10) + if resp.status_code == 200: + return resp.json() + else: + log.error(f"Failed to get events: {resp.status_code}") + return [] + except requests.RequestException as e: + log.error(f"Request error getting events: {e}") + return [] + + +def post_event(kind: str, node_id: str, body: dict) -> Optional[dict]: + """Post an event to CC.""" + payload = { + "kind": kind, + "node_id": node_id, + "author": AUTHOR, + "body": body, + } + try: + resp = requests.post( + f"{CC_URL}/api/events", + json=payload, + headers={"Content-Type": "application/json"}, + timeout=10, + ) + if resp.status_code == 200: + result = resp.json() + log.info(f"Posted {kind} event: {result.get('id', 'unknown')}") + return result + else: + log.error(f"Failed to post event: {resp.status_code} - {resp.text}") + return None + except requests.RequestException as e: + log.error(f"Request error posting event: {e}") + return None + + +# ----------------------------------------------------------------------------- +# Rule Engine +# ----------------------------------------------------------------------------- + +def analyze_node(node: dict, recent_events: list[dict]) -> list[tuple[str, str, dict]]: + """ + Analyze a node's state and return events to emit. + + Returns: + List of (kind, condition_key, body) tuples + """ + to_emit = [] + attention = node.get("attention", []) + hostname = node.get("hostname", "unknown") + node_id = node.get("node_id", "unknown") + + # Rule 1: critical_findings - needs attention + if "critical_findings" in attention: + scan_summary = node.get("last_scan_summary") + crit_count = 0 + if scan_summary: + crit_count = scan_summary.get("critical", 0) + scan_summary.get("high", 0) + + to_emit.append(( + "incident", + "critical_findings", + { + "title": "Critical Findings Detected", + "description": f"Node {hostname} has {crit_count} critical/high scan findings requiring attention", + "severity": "high", + }, + )) + to_emit.append(( + "note", + "critical_findings_note", + { + "text": f"Auto-detected critical_findings on node {hostname}. Recommend manual review of scan results.", + "severity": "warn", + }, + )) + + # Rule 2: heartbeat_stale - node may be offline + if "heartbeat_stale" in attention: + last_seen = node.get("last_seen", "unknown") + to_emit.append(( + "incident", + "heartbeat_stale", + { + "title": "Node Heartbeat Stale", + "description": f"Node {hostname} has not sent a heartbeat recently. Last seen: {last_seen}", + "severity": "high", + }, + )) + + # Rule 3: cloudflare_down - informational note + if "cloudflare_down" in attention: + to_emit.append(( + "note", + "cloudflare_down", + { + "text": f"Node {hostname} reports Cloudflare tunnel is down.", + "severity": "warn", + }, + )) + + # Rule 4: never_scanned - needs initial scan + if "never_scanned" in attention: + to_emit.append(( + "note", + "never_scanned", + { + "text": f"Node {hostname} has never been scanned. Recommend triggering initial sovereign scan.", + "severity": "info", + }, + )) + + return to_emit + + +def has_recent_operator_ack(events: list[dict]) -> bool: + """Check if there's a recent ack from someone other than vm-copilot.""" + for ev in events: + if ev.get("kind") == "ack": + author = ev.get("author", "") + if author and author != AUTHOR: + return True + return False + + +# ----------------------------------------------------------------------------- +# Main Loop +# ----------------------------------------------------------------------------- + +def run_cycle(): + """Run a single monitoring cycle.""" + for node_id in WATCH_NODES: + log.debug(f"Checking node {node_id}") + + # Fetch node state + node = get_node(node_id) + if not node: + continue + + hostname = node.get("hostname", node_id[:8]) + attention = node.get("attention", []) + log.info(f"Node {hostname}: attention={attention}") + + # Fetch recent events + since = last_event_ts.get(node_id) + events = get_events( + node_id, + since=since, + kinds=["note", "incident", "ack", "resolve"], + ) + + # Update last seen timestamp + if events: + # Events are returned oldest-first by default, get the latest + latest_ts = max(ev.get("ts", "") for ev in events) + if latest_ts: + last_event_ts[node_id] = latest_ts + + # Check for operator acks + recent_ack = has_recent_operator_ack(events) + if recent_ack: + log.info(f"Node {hostname}: operator ack detected, resetting alerts") + alerted_conditions.pop(node_id, None) + + # Analyze and emit events + to_emit = analyze_node(node, events) + + for kind, condition_key, body in to_emit: + # Skip if we've already alerted on this condition + node_alerts = alerted_conditions.setdefault(node_id, set()) + if condition_key in node_alerts: + log.debug(f"Already alerted on {condition_key} for {hostname}") + continue + + # Post the event + result = post_event(kind, node_id, body) + if result: + node_alerts.add(condition_key) + + +def main(): + """Main entry point.""" + log.info("=" * 60) + log.info("VaultMesh vm-copilot starting") + log.info(f" CC URL: {CC_URL}") + log.info(f" Poll interval: {POLL_INTERVAL}s") + log.info(f" Watching nodes: {WATCH_NODES}") + log.info("=" * 60) + + while True: + try: + run_cycle() + except Exception as e: + log.exception(f"Error in monitoring cycle: {e}") + + log.info(f"Sleeping {POLL_INTERVAL}s...") + time.sleep(POLL_INTERVAL) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + log.info("Shutting down...") + sys.exit(0)