chore: initial import

This commit is contained in:
Sovereign
2025-12-18 00:29:15 +01:00
commit 789397eb33
22 changed files with 5944 additions and 0 deletions

23
command-center/Cargo.toml Normal file
View File

@@ -0,0 +1,23 @@
[package]
name = "vaultmesh-command-center"
version = "0.1.0"
edition = "2021"
[dependencies]
axum = { version = "0.7", features = ["macros", "json"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tower = "0.4"
tower-http = { version = "0.5", features = ["trace"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1", features = ["v4", "serde"] }
time = { version = "0.3", features = ["serde", "macros", "formatting", "parsing"] }
ed25519-dalek = { version = "2", features = ["rand_core"] }
rand = "0.8"
base64 = "0.22"
anyhow = "1"
clap = { version = "4", features = ["derive"] }
async-stream = "0.3"
futures-core = "0.3"

61
command-center/src/cli.rs Normal file
View File

@@ -0,0 +1,61 @@
use clap::{Parser, Subcommand};
#[derive(Parser)]
#[command(name = "vm-cc")]
#[command(about = "VaultMesh Command Center")]
pub struct Cli {
/// Subcommand to run. If omitted, starts the HTTP server.
#[command(subcommand)]
pub command: Option<Commands>,
}
#[derive(Subcommand)]
pub enum Commands {
/// View and query event logs
Logs {
#[command(subcommand)]
action: LogsAction,
},
/// Start the HTTP server (default if no subcommand is provided)
Serve,
}
#[derive(Subcommand)]
pub enum LogsAction {
/// View recent log events
View {
/// Event kind: heartbeats, scans, commands (default: all)
#[arg(short, long)]
kind: Option<String>,
/// Filter by node ID (UUID)
#[arg(short, long)]
node: Option<String>,
/// Time filter: 1h, 24h, 7d, 30m, etc.
#[arg(short, long)]
since: Option<String>,
/// Minimum severity for scans: info, low, medium, high, critical
#[arg(long)]
min_severity: Option<String>,
/// Number of events to show (default: 20)
#[arg(short = 'n', long, default_value = "20")]
limit: usize,
},
/// Follow logs in real-time
Tail {
/// Event kind to follow
#[arg(short, long)]
kind: Option<String>,
/// Filter by node ID (UUID)
#[arg(short, long)]
node: Option<String>,
},
/// Show log statistics
Stats,
}

396
command-center/src/logs.rs Normal file
View File

@@ -0,0 +1,396 @@
// V0.6.1: CLI log reading and query commands
use crate::state::{CommandEvent, HeartbeatEvent, ScanEvent};
use std::collections::HashMap;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use time::{Duration, OffsetDateTime};
use uuid::Uuid;
/// Reader for JSONL log files.
pub struct LogReader {
dir: PathBuf,
}
impl LogReader {
pub fn new() -> Self {
let dir = std::env::var("VAULTMESH_LOG_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/var/lib/vaultmesh/cc-logs"));
LogReader { dir }
}
pub fn read_heartbeats(&self) -> Vec<HeartbeatEvent> {
self.read_jsonl("heartbeats.jsonl")
}
pub fn read_scans(&self) -> Vec<ScanEvent> {
self.read_jsonl("scans.jsonl")
}
pub fn read_commands(&self) -> Vec<CommandEvent> {
self.read_jsonl("commands.jsonl")
}
fn read_jsonl<T: serde::de::DeserializeOwned>(&self, filename: &str) -> Vec<T> {
let path = self.dir.join(filename);
if !path.exists() {
return vec![];
}
let file = match std::fs::File::open(&path) {
Ok(f) => f,
Err(_) => return vec![],
};
BufReader::new(file)
.lines()
.filter_map(|line| line.ok())
.filter(|line| !line.trim().is_empty())
.filter_map(|line| serde_json::from_str(&line).ok())
.collect()
}
}
/// Parse a duration string like "1h", "24h", "7d", "30m".
pub fn parse_duration(s: &str) -> Option<Duration> {
let s = s.trim().to_lowercase();
if s.ends_with('h') {
s[..s.len() - 1].parse::<i64>().ok().map(Duration::hours)
} else if s.ends_with('d') {
s[..s.len() - 1].parse::<i64>().ok().map(Duration::days)
} else if s.ends_with('m') {
s[..s.len() - 1].parse::<i64>().ok().map(Duration::minutes)
} else {
None
}
}
/// Convert severity string to a rank for filtering.
pub fn severity_to_rank(s: &str) -> u8 {
match s.to_lowercase().as_str() {
"critical" => 5,
"high" => 4,
"medium" => 3,
"low" => 2,
"info" => 1,
_ => 0,
}
}
/// View recent log events with filtering.
pub fn cmd_logs_view(
kind: Option<String>,
node: Option<String>,
since: Option<String>,
min_severity: Option<String>,
limit: usize,
) {
let reader = LogReader::new();
let now = OffsetDateTime::now_utc();
let since_ts = since.as_ref().and_then(|s| parse_duration(s)).map(|d| now - d);
let node_filter: Option<Uuid> = node.as_ref().and_then(|s| s.parse().ok());
let min_sev_rank = min_severity.as_ref().map(|s| severity_to_rank(s)).unwrap_or(0);
// Collect events based on kind
let mut events: Vec<(OffsetDateTime, String)> = vec![];
if kind.is_none() || kind.as_deref() == Some("heartbeats") {
for e in reader.read_heartbeats() {
if let Some(ts) = since_ts {
if e.ts < ts {
continue;
}
}
if let Some(nf) = node_filter {
if e.node_id != nf {
continue;
}
}
let line = format!(
"[HB] {} {} {} cf={} svc={}",
format_ts(e.ts),
e.hostname,
e.os_profile,
if e.heartbeat.cloudflare_ok { "OK" } else { "DOWN" },
if e.heartbeat.services_ok { "OK" } else { "DOWN" }
);
events.push((e.ts, line));
}
}
if kind.is_none() || kind.as_deref() == Some("scans") {
for e in reader.read_scans() {
if let Some(ts) = since_ts {
if e.ts < ts {
continue;
}
}
if let Some(nf) = node_filter {
if e.node_id != nf {
continue;
}
}
let max_sev = if e.summary.critical > 0 {
5
} else if e.summary.high > 0 {
4
} else if e.summary.medium > 0 {
3
} else if e.summary.low > 0 {
2
} else {
1
};
if max_sev < min_sev_rank {
continue;
}
let source = if e.findings_real { "REAL" } else { "MOCK" };
let line = format!(
"[SCAN] {} C:{} H:{} M:{} L:{} [{}]",
format_ts(e.ts),
e.summary.critical,
e.summary.high,
e.summary.medium,
e.summary.low,
source
);
events.push((e.ts, line));
}
}
if kind.is_none() || kind.as_deref() == Some("commands") {
for e in reader.read_commands() {
if let Some(ts) = since_ts {
if e.ts < ts {
continue;
}
}
if let Some(nf) = node_filter {
if e.node_id != nf {
continue;
}
}
let line = format!(
"[CMD] {} {} status={} exit={:?}",
format_ts(e.ts),
e.cmd,
e.status,
e.exit_code
);
events.push((e.ts, line));
}
}
if events.is_empty() {
println!("No events found.");
return;
}
// Sort by timestamp descending, take limit
events.sort_by(|a, b| b.0.cmp(&a.0));
events.truncate(limit);
// Print in chronological order
for (_, line) in events.into_iter().rev() {
println!("{}", line);
}
}
/// Show log statistics.
pub fn cmd_logs_stats() {
let reader = LogReader::new();
let heartbeats = reader.read_heartbeats();
let scans = reader.read_scans();
let commands = reader.read_commands();
println!("=== Log Statistics ===\n");
println!("Heartbeats: {}", heartbeats.len());
println!("Scans: {}", scans.len());
println!("Commands: {}", commands.len());
// Count by node
let mut by_node: HashMap<Uuid, (usize, usize, usize, String)> = HashMap::new();
for e in &heartbeats {
let entry = by_node.entry(e.node_id).or_insert((0, 0, 0, e.hostname.clone()));
entry.0 += 1;
}
for e in &scans {
let entry = by_node.entry(e.node_id).or_insert((0, 0, 0, String::new()));
entry.1 += 1;
}
for e in &commands {
let entry = by_node.entry(e.node_id).or_insert((0, 0, 0, String::new()));
entry.2 += 1;
}
if !by_node.is_empty() {
println!("\n=== By Node ===\n");
println!(
"{:<38} {:<16} {:>8} {:>8} {:>8}",
"Node ID", "Hostname", "HB", "Scans", "Cmds"
);
println!("{:-<80}", "");
for (node_id, (hb, sc, cmd, hostname)) in by_node {
println!(
"{:<38} {:<16} {:>8} {:>8} {:>8}",
node_id, hostname, hb, sc, cmd
);
}
}
}
/// Follow logs in real-time (tail -f style).
pub fn cmd_logs_tail(kind: Option<String>, node: Option<String>) {
use std::io::Seek;
use std::thread;
use std::time::Duration as StdDuration;
let node_filter: Option<Uuid> = node.as_ref().and_then(|s| s.parse().ok());
// Track file positions
let mut hb_pos: u64 = 0;
let mut scan_pos: u64 = 0;
let mut cmd_pos: u64 = 0;
// Seek to end of existing files
let log_dir = std::env::var("VAULTMESH_LOG_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/var/lib/vaultmesh/cc-logs"));
if let Ok(mut f) = std::fs::File::open(log_dir.join("heartbeats.jsonl")) {
hb_pos = f.seek(std::io::SeekFrom::End(0)).unwrap_or(0);
}
if let Ok(mut f) = std::fs::File::open(log_dir.join("scans.jsonl")) {
scan_pos = f.seek(std::io::SeekFrom::End(0)).unwrap_or(0);
}
if let Ok(mut f) = std::fs::File::open(log_dir.join("commands.jsonl")) {
cmd_pos = f.seek(std::io::SeekFrom::End(0)).unwrap_or(0);
}
println!("Following logs (Ctrl+C to stop)...\n");
loop {
// Check heartbeats
if kind.is_none() || kind.as_deref() == Some("heartbeats") {
if let Some((events, new_pos)) =
read_new_lines::<HeartbeatEvent>(&log_dir.join("heartbeats.jsonl"), hb_pos)
{
hb_pos = new_pos;
for e in events {
if let Some(nf) = node_filter {
if e.node_id != nf {
continue;
}
}
println!(
"[HB] {} {} {}",
format_ts(e.ts),
e.hostname,
e.os_profile
);
}
}
}
// Check scans
if kind.is_none() || kind.as_deref() == Some("scans") {
if let Some((events, new_pos)) =
read_new_lines::<ScanEvent>(&log_dir.join("scans.jsonl"), scan_pos)
{
scan_pos = new_pos;
for e in events {
if let Some(nf) = node_filter {
if e.node_id != nf {
continue;
}
}
let source = if e.findings_real { "REAL" } else { "MOCK" };
println!(
"[SCAN] {} C:{} H:{} M:{} L:{} [{}]",
format_ts(e.ts),
e.summary.critical,
e.summary.high,
e.summary.medium,
e.summary.low,
source
);
}
}
}
// Check commands
if kind.is_none() || kind.as_deref() == Some("commands") {
if let Some((events, new_pos)) =
read_new_lines::<CommandEvent>(&log_dir.join("commands.jsonl"), cmd_pos)
{
cmd_pos = new_pos;
for e in events {
if let Some(nf) = node_filter {
if e.node_id != nf {
continue;
}
}
println!(
"[CMD] {} {} status={}",
format_ts(e.ts),
e.cmd,
e.status
);
}
}
}
thread::sleep(StdDuration::from_secs(1));
}
}
/// Read new lines from a file starting at a given position.
fn read_new_lines<T: serde::de::DeserializeOwned>(
path: &PathBuf,
start_pos: u64,
) -> Option<(Vec<T>, u64)> {
use std::io::Seek;
let mut file = std::fs::File::open(path).ok()?;
let end_pos = file.seek(std::io::SeekFrom::End(0)).ok()?;
if end_pos <= start_pos {
return None;
}
file.seek(std::io::SeekFrom::Start(start_pos)).ok()?;
let reader = BufReader::new(file);
let mut events = Vec::new();
for line in reader.lines() {
if let Ok(line) = line {
if line.trim().is_empty() {
continue;
}
if let Ok(event) = serde_json::from_str(&line) {
events.push(event);
}
}
}
Some((events, end_pos))
}
/// Format timestamp for display.
fn format_ts(ts: OffsetDateTime) -> String {
format!(
"{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
ts.year(),
ts.month() as u8,
ts.day(),
ts.hour(),
ts.minute(),
ts.second()
)
}

174
command-center/src/main.rs Normal file
View File

@@ -0,0 +1,174 @@
mod cli;
mod logs;
mod routes;
mod state;
use crate::cli::{Cli, Commands, LogsAction};
use crate::routes::app;
use crate::state::{AppState, CommandPayload, SignedCommand};
use base64::Engine;
use clap::Parser;
use ed25519_dalek::{Signer, SigningKey};
use std::net::SocketAddr;
use std::path::Path;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::net::TcpListener;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use uuid::Uuid;
/// Load an existing Ed25519 signing key or generate a new one.
fn load_or_init_signing_key() -> SigningKey {
let key_path = std::env::var("VAULTMESH_CC_KEY_PATH")
.unwrap_or_else(|_| "cc-ed25519.key".into());
let path = Path::new(&key_path);
if path.exists() {
let bytes = std::fs::read(path).expect("Failed to read signing key");
let key_bytes: [u8; 32] = bytes
.try_into()
.expect("Signing key must be exactly 32 bytes");
SigningKey::from_bytes(&key_bytes)
} else {
// Generate a new key
let mut csprng = rand::thread_rng();
let signing_key = SigningKey::generate(&mut csprng);
std::fs::write(path, signing_key.to_bytes()).expect("Failed to write signing key");
tracing::info!("Generated new signing key at {}", key_path);
signing_key
}
}
/// Background scheduler loop that auto-triggers scans on nodes.
async fn scheduler_loop(state: AppState) {
let tick_secs = std::env::var("VAULTMESH_SCHEDULER_TICK_SECONDS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(300);
tracing::info!("Scheduler started with tick interval: {}s", tick_secs);
loop {
if let Err(e) = run_scheduler_tick(&state).await {
tracing::warn!("scheduler tick failed: {e}");
}
tokio::time::sleep(Duration::from_secs(tick_secs)).await;
}
}
/// Run a single scheduler tick: check each node and queue scans if needed.
async fn run_scheduler_tick(state: &AppState) -> anyhow::Result<()> {
let now = OffsetDateTime::now_utc();
let latest = state.list_latest().await;
let last_scans = state.list_last_scans().await;
for hb in latest.iter() {
// Check policy allows sovereign-scan for this profile
if !state.is_command_allowed(&hb.os_profile, &hb.node_id, "sovereign-scan") {
continue;
}
// Check if scan needed (never scanned or interval elapsed)
let needs_scan = match last_scans.get(&hb.node_id) {
None => true,
Some(scan) => {
let elapsed = now - scan.ts;
elapsed >= state.scheduler_cfg.scan_interval
}
};
if needs_scan {
// Build and sign the command
let payload = CommandPayload {
node_id: hb.node_id,
ts: now,
nonce: Uuid::new_v4().to_string(),
cmd: "sovereign-scan".into(),
args: serde_json::json!({}),
};
let payload_json = serde_json::to_vec(&payload)?;
let signature = state.signing_key.sign(&payload_json);
let signature_b64 =
base64::engine::general_purpose::STANDARD.encode(signature.to_bytes());
let signed_cmd = SignedCommand {
payload,
signature: signature_b64,
};
state.queue_command(signed_cmd).await;
tracing::info!(
"scheduler: queued sovereign-scan for {} ({})",
hb.node_id,
hb.hostname
);
}
}
Ok(())
}
#[tokio::main]
async fn main() {
let cli = Cli::parse();
match cli.command {
Some(Commands::Logs { action }) => {
// CLI log commands - no tracing/server init needed
match action {
LogsAction::View {
kind,
node,
since,
min_severity,
limit,
} => {
logs::cmd_logs_view(kind, node, since, min_severity, limit);
}
LogsAction::Tail { kind, node } => {
logs::cmd_logs_tail(kind, node);
}
LogsAction::Stats => {
logs::cmd_logs_stats();
}
}
}
Some(Commands::Serve) | None => {
// Run the HTTP server (default)
run_server().await;
}
}
}
async fn run_server() {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()),
))
.with(tracing_subscriber::fmt::layer())
.init();
let signing_key = load_or_init_signing_key();
let state = AppState::new(signing_key);
// V0.6: Replay logs to reconstruct state
state.replay_from_logs().await;
// Log the public key for distribution to nodes
tracing::info!("CC public key (base64): {}", state.public_key_b64);
// Spawn the scheduler background task
let scheduler_state = state.clone();
tokio::spawn(async move {
scheduler_loop(scheduler_state).await;
});
let app = app(state);
let addr: SocketAddr = "0.0.0.0:8088".parse().unwrap();
tracing::info!("VaultMesh Command Center listening on http://{}", addr);
let listener = TcpListener::bind(&addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
}

2069
command-center/src/routes.rs Normal file

File diff suppressed because it is too large Load Diff

842
command-center/src/state.rs Normal file
View File

@@ -0,0 +1,842 @@
use ed25519_dalek::SigningKey;
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet, VecDeque},
fs::OpenOptions,
io::{BufRead, BufReader, Write},
path::PathBuf,
sync::Arc,
};
use time::{Duration, OffsetDateTime};
use tokio::sync::{broadcast, RwLock};
use uuid::Uuid;
/// How many heartbeats we keep per node (for history).
const MAX_HEARTBEATS_PER_NODE: usize = 50;
/// How many command records we keep per node.
const MAX_COMMAND_RECORDS_PER_NODE: usize = 50;
/// Metrics reported by the node agent.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct NodeMetrics {
pub uptime_seconds: Option<u64>,
pub load1: Option<f32>,
pub load5: Option<f32>,
pub load15: Option<f32>,
pub mem_total_mb: Option<u64>,
pub mem_used_mb: Option<u64>,
pub disk_root_total_gb: Option<f32>,
pub disk_root_used_gb: Option<f32>,
}
/// Heartbeat data reported by each node.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeHeartbeat {
pub node_id: Uuid,
pub hostname: String,
pub os_profile: String,
pub cloudflare_ok: bool,
pub services_ok: bool,
pub vaultmesh_root: String,
#[serde(with = "time::serde::rfc3339")]
pub timestamp: OffsetDateTime,
#[serde(default)]
pub metrics: NodeMetrics,
}
/// A single node's heartbeat history.
#[derive(Clone)]
pub struct NodeHistory {
pub latest: NodeHeartbeat,
pub history: VecDeque<NodeHeartbeat>,
}
/// Command payload to be signed and sent to agents.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandPayload {
pub node_id: Uuid,
#[serde(with = "time::serde::rfc3339")]
pub ts: OffsetDateTime,
pub nonce: String,
pub cmd: String,
pub args: serde_json::Value,
}
/// Signed command delivered to the agent.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SignedCommand {
pub payload: CommandPayload,
/// Base64-encoded Ed25519 signature of `payload` JSON bytes.
pub signature: String,
}
/// Result of executing a command on the node.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandResult {
pub node_id: Uuid,
#[serde(with = "time::serde::rfc3339")]
pub ts: OffsetDateTime,
pub nonce: String,
pub cmd: String,
pub status: String,
pub exit_code: Option<i32>,
pub stdout: String,
pub stderr: String,
}
/// Summary counts from a sovereign scan.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScanSummary {
pub critical: u32,
pub high: u32,
pub medium: u32,
pub low: u32,
pub info: u32,
}
/// Last scan result for a node.
#[derive(Debug, Clone)]
pub struct LastScan {
pub ts: OffsetDateTime,
pub summary: ScanSummary,
pub findings_real: bool,
pub receipt_hash: String,
}
/// Scheduler configuration for automatic scans.
#[derive(Debug, Clone)]
pub struct SchedulerConfig {
pub scan_interval: Duration,
pub scan_stale: Duration,
pub heartbeat_stale: Duration,
}
/// Node attention status for dashboard.
#[derive(Debug, Clone)]
pub struct NodeAttentionStatus {
pub needs_attention: bool,
pub reasons: Vec<String>,
}
// ============================================================================
// V0.7: SSE Event Bus payloads
// ============================================================================
/// SSE payload for heartbeat events (wire-efficient subset).
#[derive(Debug, Clone, Serialize)]
pub struct HeartbeatEventPayload {
#[serde(with = "time::serde::rfc3339")]
pub ts: OffsetDateTime,
pub node_id: Uuid,
pub hostname: String,
pub os_profile: String,
pub cloudflare_ok: bool,
pub services_ok: bool,
pub vaultmesh_root: String,
pub uptime_secs: Option<u64>,
pub load_1: Option<f32>,
pub load_5: Option<f32>,
pub load_15: Option<f32>,
}
/// SSE payload for scan events.
#[derive(Debug, Clone, Serialize)]
pub struct ScanEventPayload {
#[serde(with = "time::serde::rfc3339")]
pub ts: OffsetDateTime,
pub node_id: Uuid,
pub hostname: String,
pub os_profile: String,
pub summary: ScanSummary,
pub findings_real: bool,
pub receipt_hash: String,
}
/// SSE payload for command events.
#[derive(Debug, Clone, Serialize)]
pub struct CommandEventPayload {
#[serde(with = "time::serde::rfc3339")]
pub ts: OffsetDateTime,
pub node_id: Uuid,
pub hostname: String,
pub os_profile: String,
pub cmd: String,
pub status: String,
pub exit_code: Option<i32>,
pub nonce: String,
}
/// SSE payload for attention state changes.
#[derive(Debug, Clone, Serialize)]
pub struct AttentionEventPayload {
#[serde(with = "time::serde::rfc3339")]
pub ts: OffsetDateTime,
pub node_id: Uuid,
pub hostname: String,
pub os_profile: String,
pub needs_attention: bool,
pub reasons: Vec<String>,
}
/// Server event types for SSE broadcast.
#[derive(Debug, Clone)]
pub enum ServerEvent {
Heartbeat(HeartbeatEventPayload),
Scan(ScanEventPayload),
Command(CommandEventPayload),
Attention(AttentionEventPayload),
/// V0.7.2: Generic envelope for comms events (note, incident, ack, tag, resolve)
Envelope(EventEnvelope),
}
// ============================================================================
// V0.7.2: Communication Layer - EventEnvelope
// ============================================================================
/// Canonical message format for all comms events.
/// Used for notes, incidents, acknowledgements, tags, and resolutions.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventEnvelope {
/// Unique event ID (server-assigned)
pub id: Uuid,
/// Event kind: "note", "incident", "ack", "tag", "resolve"
pub kind: String,
/// Timestamp (server-assigned)
#[serde(with = "time::serde::rfc3339")]
pub ts: OffsetDateTime,
/// Node this event relates to (optional for global events)
#[serde(skip_serializing_if = "Option::is_none")]
pub node_id: Option<Uuid>,
/// Author: "operator", "system", "vm-copilot", "scheduler", agent name
pub author: String,
/// Structured payload (kind-specific)
pub body: serde_json::Value,
}
/// Log entry wrapper for EventEnvelope (versioned for future compatibility).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventEnvelopeLogEntry {
pub version: u8,
pub event: EventEnvelope,
}
/// Maximum number of envelopes to keep in memory.
const MAX_ENVELOPES_IN_MEMORY: usize = 500;
// ============================================================================
// V0.6: Append-only log events
// ============================================================================
/// Event logged when a heartbeat is received.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HeartbeatEvent {
#[serde(with = "time::serde::rfc3339")]
pub ts: OffsetDateTime,
pub node_id: Uuid,
pub hostname: String,
pub os_profile: String,
pub heartbeat: NodeHeartbeat,
}
/// Event logged when a sovereign-scan completes successfully.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScanEvent {
#[serde(with = "time::serde::rfc3339")]
pub ts: OffsetDateTime,
pub node_id: Uuid,
pub os_profile: String,
pub summary: ScanSummary,
pub findings_real: bool,
pub receipt_hash: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub receipt_path: Option<String>,
}
/// Event logged when any command result is reported.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandEvent {
#[serde(with = "time::serde::rfc3339")]
pub ts: OffsetDateTime,
pub node_id: Uuid,
pub cmd: String,
pub status: String,
pub exit_code: Option<i32>,
pub nonce: String,
}
/// Append-only log sink for durable event storage.
#[derive(Clone)]
pub struct LogSink {
pub dir: PathBuf,
}
impl LogSink {
/// Create a new LogSink, initializing the directory if needed.
pub fn new_from_env() -> std::io::Result<Self> {
let dir = std::env::var("VAULTMESH_LOG_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/var/lib/vaultmesh/cc-logs"));
std::fs::create_dir_all(&dir)?;
Ok(LogSink { dir })
}
/// Get the full path for a log file.
pub fn file_path(&self, name: &str) -> PathBuf {
self.dir.join(name)
}
/// Append a JSON line to a log file.
pub fn append_json_line<T: Serialize>(&self, file: &str, value: &T) -> std::io::Result<()> {
let path = self.file_path(file);
let mut f = OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
serde_json::to_writer(&mut f, value).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
f.write_all(b"\n")?;
f.flush()?;
Ok(())
}
}
/// Command policy for allowed commands per profile.
#[derive(Clone)]
pub struct CommandPolicy {
/// Per-profile allowed command names.
pub per_profile: HashMap<String, HashSet<String>>,
/// Global fallback set (applied if no per-profile entry).
pub global_allowed: HashSet<String>,
}
impl CommandPolicy {
pub fn default_policy() -> Self {
let mut global = HashSet::new();
global.insert("service-status".to_string());
global.insert("tail-journal".to_string());
global.insert("sovereign-scan".to_string());
global.insert("restart-service".to_string());
CommandPolicy {
per_profile: HashMap::new(),
global_allowed: global,
}
}
pub fn is_allowed(&self, profile: &str, cmd: &str) -> bool {
if let Some(set) = self.per_profile.get(profile) {
set.contains(cmd)
} else {
self.global_allowed.contains(cmd)
}
}
}
/// Shared application state.
#[derive(Clone)]
pub struct AppState {
pub nodes: Arc<RwLock<HashMap<Uuid, NodeHistory>>>,
pub commands_pending: Arc<RwLock<HashMap<Uuid, VecDeque<SignedCommand>>>>,
pub command_results: Arc<RwLock<HashMap<Uuid, VecDeque<CommandResult>>>>,
pub last_scans: Arc<RwLock<HashMap<Uuid, LastScan>>>,
pub signing_key: Arc<SigningKey>,
pub public_key_b64: String,
pub scheduler_cfg: SchedulerConfig,
pub policy: CommandPolicy,
/// V0.6: Append-only log sink for persistence
pub logs: LogSink,
/// V0.7: SSE event broadcast channel
pub events_tx: broadcast::Sender<ServerEvent>,
/// V0.7.2: In-memory event envelope store
pub events: Arc<RwLock<Vec<EventEnvelope>>>,
}
impl AppState {
pub fn new(signing_key: SigningKey) -> Self {
use base64::Engine;
let pk_bytes = signing_key.verifying_key().to_bytes();
let public_key_b64 = base64::engine::general_purpose::STANDARD.encode(pk_bytes);
// Parse scheduler config from env (with sane defaults)
let scan_interval_hours = std::env::var("VAULTMESH_SCAN_INTERVAL_HOURS")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(24);
let scan_stale_hours = std::env::var("VAULTMESH_SCAN_STALE_HOURS")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(48);
let heartbeat_stale_minutes = std::env::var("VAULTMESH_HEARTBEAT_STALE_MINUTES")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(10);
let scheduler_cfg = SchedulerConfig {
scan_interval: Duration::hours(scan_interval_hours),
scan_stale: Duration::hours(scan_stale_hours),
heartbeat_stale: Duration::minutes(heartbeat_stale_minutes),
};
tracing::info!(
"Scheduler config: scan_interval={}h, scan_stale={}h, heartbeat_stale={}m",
scan_interval_hours,
scan_stale_hours,
heartbeat_stale_minutes
);
// V0.6: Initialize log sink
let logs = LogSink::new_from_env()
.expect("failed to initialize log directory");
tracing::info!("Log directory: {:?}", logs.dir);
// V0.7: Initialize SSE broadcast channel
let (events_tx, _) = broadcast::channel(1024);
AppState {
nodes: Arc::new(RwLock::new(HashMap::new())),
commands_pending: Arc::new(RwLock::new(HashMap::new())),
command_results: Arc::new(RwLock::new(HashMap::new())),
last_scans: Arc::new(RwLock::new(HashMap::new())),
signing_key: Arc::new(signing_key),
public_key_b64,
scheduler_cfg,
policy: CommandPolicy::default_policy(),
logs,
events_tx,
events: Arc::new(RwLock::new(Vec::new())),
}
}
/// Check if a command is allowed by policy.
pub fn is_command_allowed(&self, profile: &str, node_id: &Uuid, cmd: &str) -> bool {
let allowed = self.policy.is_allowed(profile, cmd);
if !allowed {
tracing::warn!(
"policy: command {} denied for node {} (profile={})",
cmd,
node_id,
profile
);
}
allowed
}
// ========================================================================
// V0.7: SSE Event Bus
// ========================================================================
/// Publish an event to all SSE subscribers.
pub fn publish(&self, event: ServerEvent) {
if let Err(e) = self.events_tx.send(event) {
tracing::debug!("no SSE listeners: {e}");
}
}
/// Recompute and publish attention status for a node.
pub async fn recompute_and_publish_attention(&self, node_id: Uuid) {
let now = OffsetDateTime::now_utc();
let latest = self.list_latest().await;
let last_scans = self.list_last_scans().await;
if let Some(hb) = latest.iter().find(|h| h.node_id == node_id) {
let attn = compute_attention(now, hb, last_scans.get(&node_id), &self.scheduler_cfg);
let payload = AttentionEventPayload {
ts: now,
node_id: hb.node_id,
hostname: hb.hostname.clone(),
os_profile: hb.os_profile.clone(),
needs_attention: attn.needs_attention,
reasons: attn.reasons,
};
self.publish(ServerEvent::Attention(payload));
}
}
// ========================================================================
// V0.7.2: Communication Layer
// ========================================================================
/// Record an EventEnvelope: log to JSONL, store in memory, broadcast via SSE.
pub async fn record_envelope(&self, ev: EventEnvelope) {
// 1) Log to JSONL
let entry = EventEnvelopeLogEntry { version: 1, event: ev.clone() };
if let Err(e) = self.logs.append_json_line("events.jsonl", &entry) {
tracing::warn!("failed to append events log: {e}");
}
// 2) In-memory store with cap
{
let mut events = self.events.write().await;
events.push(ev.clone());
if events.len() > MAX_ENVELOPES_IN_MEMORY {
let overflow = events.len() - MAX_ENVELOPES_IN_MEMORY;
events.drain(0..overflow);
}
}
// 3) SSE broadcast
self.publish(ServerEvent::Envelope(ev));
}
/// Replay envelopes from events.jsonl on startup.
pub async fn replay_envelopes(&self) -> std::io::Result<()> {
let path = self.logs.file_path("events.jsonl");
if !path.exists() {
return Ok(());
}
let file = std::fs::File::open(&path)?;
let reader = BufReader::new(file);
let mut count = 0;
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let entry: EventEnvelopeLogEntry = match serde_json::from_str(&line) {
Ok(v) => v,
Err(e) => {
tracing::warn!("invalid events line: {e}");
continue;
}
};
// Store in memory (no broadcast during replay)
let mut events = self.events.write().await;
events.push(entry.event);
if events.len() > MAX_ENVELOPES_IN_MEMORY {
let overflow = events.len() - MAX_ENVELOPES_IN_MEMORY;
events.drain(0..overflow);
}
count += 1;
}
tracing::info!("replayed {} events from {:?}", count, path);
Ok(())
}
/// Insert a new heartbeat for a node, updating history.
/// Logs the event to heartbeats.jsonl.
pub async fn upsert_heartbeat(&self, hb: NodeHeartbeat) {
// V0.6: Log first
let event = HeartbeatEvent {
ts: hb.timestamp,
node_id: hb.node_id,
hostname: hb.hostname.clone(),
os_profile: hb.os_profile.clone(),
heartbeat: hb.clone(),
};
if let Err(e) = self.logs.append_json_line("heartbeats.jsonl", &event) {
tracing::warn!("failed to append heartbeat log: {e}");
}
self.upsert_heartbeat_no_log(hb).await;
}
/// Insert heartbeat without logging (used during replay).
async fn upsert_heartbeat_no_log(&self, hb: NodeHeartbeat) {
let mut nodes = self.nodes.write().await;
let entry = nodes.entry(hb.node_id).or_insert_with(|| NodeHistory {
latest: hb.clone(),
history: VecDeque::new(),
});
entry.history.push_front(entry.latest.clone());
if entry.history.len() > MAX_HEARTBEATS_PER_NODE {
entry.history.pop_back();
}
entry.latest = hb;
}
/// Get a snapshot of all latest heartbeats.
pub async fn list_latest(&self) -> Vec<NodeHeartbeat> {
let nodes = self.nodes.read().await;
nodes.values().map(|h| h.latest.clone()).collect()
}
/// Queue a signed command for a node.
pub async fn queue_command(&self, cmd: SignedCommand) {
let mut pending = self.commands_pending.write().await;
let entry = pending
.entry(cmd.payload.node_id)
.or_insert_with(VecDeque::new);
entry.push_back(cmd);
}
/// Take (and clear) all pending commands for a node.
pub async fn take_pending_commands(&self, node_id: Uuid) -> Vec<SignedCommand> {
let mut pending = self.commands_pending.write().await;
if let Some(mut queue) = pending.remove(&node_id) {
queue.drain(..).collect()
} else {
Vec::new()
}
}
/// Record a command result.
/// Logs the event to commands.jsonl.
pub async fn record_command_result(&self, result: CommandResult) {
// V0.6: Log command event
let event = CommandEvent {
ts: result.ts,
node_id: result.node_id,
cmd: result.cmd.clone(),
status: result.status.clone(),
exit_code: result.exit_code,
nonce: result.nonce.clone(),
};
if let Err(e) = self.logs.append_json_line("commands.jsonl", &event) {
tracing::warn!("failed to append command log: {e}");
}
self.record_command_result_no_log(result).await;
}
/// Record command result without logging (used during replay).
async fn record_command_result_no_log(&self, result: CommandResult) {
let mut results = self.command_results.write().await;
let entry = results
.entry(result.node_id)
.or_insert_with(VecDeque::new);
entry.push_front(result);
if entry.len() > MAX_COMMAND_RECORDS_PER_NODE {
entry.pop_back();
}
}
/// Get recent command results for a node.
pub async fn list_command_results(&self, node_id: Uuid) -> Vec<CommandResult> {
let results = self.command_results.read().await;
results
.get(&node_id)
.map(|q| q.iter().cloned().collect())
.unwrap_or_default()
}
/// Update the last scan result for a node.
/// Note: ScanEvent logging is handled in routes.rs where we have os_profile context.
pub async fn update_last_scan(&self, node_id: Uuid, scan: LastScan) {
let mut scans = self.last_scans.write().await;
scans.insert(node_id, scan);
}
/// Update last scan without logging (used during replay).
pub async fn update_last_scan_no_log(&self, node_id: Uuid, scan: LastScan) {
let mut scans = self.last_scans.write().await;
scans.insert(node_id, scan);
}
/// Get the last scan result for a node.
pub async fn get_last_scan(&self, node_id: &Uuid) -> Option<LastScan> {
let scans = self.last_scans.read().await;
scans.get(node_id).cloned()
}
/// Get all last scan results.
pub async fn list_last_scans(&self) -> HashMap<Uuid, LastScan> {
let scans = self.last_scans.read().await;
scans.clone()
}
// ========================================================================
// V0.6: Log replay on startup
// ========================================================================
/// Replay all logs to reconstruct state on startup.
pub async fn replay_from_logs(&self) {
if let Err(e) = self.replay_heartbeats().await {
tracing::warn!("failed to replay heartbeats: {e}");
}
if let Err(e) = self.replay_scans().await {
tracing::warn!("failed to replay scans: {e}");
}
if let Err(e) = self.replay_commands().await {
tracing::warn!("failed to replay commands: {e}");
}
// V0.7.2: Replay event envelopes
if let Err(e) = self.replay_envelopes().await {
tracing::warn!("failed to replay envelopes: {e}");
}
}
async fn replay_heartbeats(&self) -> std::io::Result<()> {
let path = self.logs.file_path("heartbeats.jsonl");
if !path.exists() {
return Ok(());
}
let file = std::fs::File::open(&path)?;
let reader = BufReader::new(file);
let mut count = 0;
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let event: HeartbeatEvent = match serde_json::from_str(&line) {
Ok(v) => v,
Err(e) => {
tracing::warn!("invalid heartbeat line: {e}");
continue;
}
};
// Reconstruct heartbeat from event
let hb = NodeHeartbeat {
node_id: event.node_id,
hostname: event.hostname,
os_profile: event.os_profile,
cloudflare_ok: event.heartbeat.cloudflare_ok,
services_ok: event.heartbeat.services_ok,
vaultmesh_root: event.heartbeat.vaultmesh_root,
timestamp: event.ts,
metrics: event.heartbeat.metrics,
};
self.upsert_heartbeat_no_log(hb).await;
count += 1;
}
tracing::info!("replayed {} heartbeats from {:?}", count, path);
Ok(())
}
async fn replay_scans(&self) -> std::io::Result<()> {
let path = self.logs.file_path("scans.jsonl");
if !path.exists() {
return Ok(());
}
let file = std::fs::File::open(&path)?;
let reader = BufReader::new(file);
let mut count = 0;
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let event: ScanEvent = match serde_json::from_str(&line) {
Ok(v) => v,
Err(e) => {
tracing::warn!("invalid scan line: {e}");
continue;
}
};
let scan = LastScan {
ts: event.ts,
summary: event.summary,
findings_real: event.findings_real,
receipt_hash: event.receipt_hash,
};
self.update_last_scan_no_log(event.node_id, scan).await;
count += 1;
}
tracing::info!("replayed {} scans from {:?}", count, path);
Ok(())
}
async fn replay_commands(&self) -> std::io::Result<()> {
let path = self.logs.file_path("commands.jsonl");
if !path.exists() {
return Ok(());
}
let file = std::fs::File::open(&path)?;
let reader = BufReader::new(file);
let mut count = 0;
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let event: CommandEvent = match serde_json::from_str(&line) {
Ok(v) => v,
Err(e) => {
tracing::warn!("invalid command line: {e}");
continue;
}
};
// Build minimal CommandResult for history (stdout/stderr not stored in log)
let result = CommandResult {
node_id: event.node_id,
ts: event.ts,
nonce: event.nonce,
cmd: event.cmd,
status: event.status,
exit_code: event.exit_code,
stdout: String::new(),
stderr: String::new(),
};
self.record_command_result_no_log(result).await;
count += 1;
}
tracing::info!("replayed {} commands from {:?}", count, path);
Ok(())
}
}
// ============================================================================
// V0.7: Attention computation (moved from routes.rs for SSE access)
// ============================================================================
/// Compute attention status for a node based on current state.
pub fn compute_attention(
now: OffsetDateTime,
hb: &NodeHeartbeat,
scan: Option<&LastScan>,
cfg: &SchedulerConfig,
) -> NodeAttentionStatus {
let mut reasons = Vec::new();
// Heartbeat staleness
if now - hb.timestamp > cfg.heartbeat_stale {
reasons.push("heartbeat_stale".to_string());
}
// Scan staleness / findings
match scan {
None => reasons.push("never_scanned".to_string()),
Some(s) => {
if now - s.ts > cfg.scan_stale {
reasons.push("scan_stale".to_string());
}
if s.summary.critical > 0 {
reasons.push("critical_findings".to_string());
} else if s.summary.high > 0 {
reasons.push("high_findings".to_string());
}
}
}
// Service flags
if !hb.cloudflare_ok {
reasons.push("cloudflare_down".to_string());
}
if !hb.services_ok {
reasons.push("services_down".to_string());
}
NodeAttentionStatus {
needs_attention: !reasons.is_empty(),
reasons,
}
}