# VaultMesh Command Center: Event Processing Architecture ## Overview The Command Center implements a sophisticated, multi-layered event processing system designed for robust, real-time fleet management. ## Key Components ### 1. Event Types - `NodeHeartbeat`: Node status updates - `ScanEvent`: Scan findings and results - `CommandEvent`: Command execution outcomes - `EventEnvelope`: Generic communication events ### 2. Processing Stages #### a. Ingestion - Raw event data received via HTTP endpoints - Validated and transformed into structured payloads - Logged to append-only JSONL files for durability #### b. State Update - In-memory state updated with latest information - Maintain sliding window of recent events (max 50 per node) - Compute derived states (attention status, last scan) #### c. Broadcasting - Events published via Server-Sent Events (SSE) - Broadcast to all connected clients - Low-latency, real-time updates ## Event Processing Workflow ### Heartbeat Processing 1. Receive heartbeat data 2. Log to `heartbeats.jsonl` 3. Update node history 4. Publish heartbeat event 5. Recompute node attention status 6. Broadcast attention event ```rust pub async fn upsert_heartbeat(&self, hb: NodeHeartbeat) { // Log event let event = HeartbeatEvent { ... }; self.logs.append_json_line("heartbeats.jsonl", &event); // Update in-memory state self.upsert_heartbeat_no_log(hb).await; } ``` ### Scan Result Processing 1. Receive scan results 2. Log to `scans.jsonl` 3. Update last scan information 4. Publish scan event 5. Recompute node attention status 6. Broadcast attention event ```rust pub async fn update_last_scan(&self, node_id: Uuid, scan: LastScan) { // Update scan history let mut scans = self.last_scans.write().await; scans.insert(node_id, scan); } ``` ### Command Result Processing 1. Receive command result 2. Log to `commands.jsonl` 3. Store command history 4. Publish command event 5. Optionally trigger additional actions ```rust pub async fn record_command_result(&self, result: CommandResult) { // Log command event let event = CommandEvent { ... }; self.logs.append_json_line("commands.jsonl", &event); // Update command result history self.record_command_result_no_log(result).await; } ``` ## Attention Computation The system dynamically computes a node's attention status based on multiple factors: - Heartbeat staleness - Scan staleness - Scan findings severity - Service status - Cloudflare tunnel status ```rust pub fn compute_attention( now: OffsetDateTime, hb: &NodeHeartbeat, scan: Option<&LastScan>, cfg: &SchedulerConfig, ) -> NodeAttentionStatus { let mut reasons = Vec::new(); // Check heartbeat age if now - hb.timestamp > cfg.heartbeat_stale { reasons.push("heartbeat_stale"); } // Check scan status match scan { None => reasons.push("never_scanned"), Some(s) => { if now - s.ts > cfg.scan_stale { reasons.push("scan_stale"); } if s.summary.critical > 0 { reasons.push("critical_findings"); } } } // Check service flags if !hb.cloudflare_ok { reasons.push("cloudflare_down"); } NodeAttentionStatus { needs_attention: !reasons.is_empty(), reasons, } } ``` ## Persistence and State Replay ### Log Replay Mechanism - On startup, reconstruct in-memory state from JSONL logs - Replay events in chronological order - Recreate node history, scan results, and command results ```rust pub async fn replay_from_logs(&self) { self.replay_heartbeats().await; self.replay_scans().await; self.replay_commands().await; self.replay_envelopes().await; } ``` ## Performance Characteristics - In-memory event store: 500 most recent events - Append-only logging for durability - Non-blocking event processing - Low-overhead broadcasting ## Security Considerations - Ed25519 key signing for commands - Configurable command policies - Nonce-based replay protection - No sensitive data stored in logs ## Extensibility - Easy to add new event types - Flexible attention computation - Modular event processing pipeline