4.1 KiB
4.1 KiB
VaultMesh Command Center: Event Processing Architecture
Overview
The Command Center implements a sophisticated, multi-layered event processing system designed for robust, real-time fleet management.
Key Components
1. Event Types
NodeHeartbeat: Node status updatesScanEvent: Scan findings and resultsCommandEvent: Command execution outcomesEventEnvelope: Generic communication events
2. Processing Stages
a. Ingestion
- Raw event data received via HTTP endpoints
- Validated and transformed into structured payloads
- Logged to append-only JSONL files for durability
b. State Update
- In-memory state updated with latest information
- Maintain sliding window of recent events (max 50 per node)
- Compute derived states (attention status, last scan)
c. Broadcasting
- Events published via Server-Sent Events (SSE)
- Broadcast to all connected clients
- Low-latency, real-time updates
Event Processing Workflow
Heartbeat Processing
- Receive heartbeat data
- Log to
heartbeats.jsonl - Update node history
- Publish heartbeat event
- Recompute node attention status
- Broadcast attention event
pub async fn upsert_heartbeat(&self, hb: NodeHeartbeat) {
// Log event
let event = HeartbeatEvent { ... };
self.logs.append_json_line("heartbeats.jsonl", &event);
// Update in-memory state
self.upsert_heartbeat_no_log(hb).await;
}
Scan Result Processing
- Receive scan results
- Log to
scans.jsonl - Update last scan information
- Publish scan event
- Recompute node attention status
- Broadcast attention event
pub async fn update_last_scan(&self, node_id: Uuid, scan: LastScan) {
// Update scan history
let mut scans = self.last_scans.write().await;
scans.insert(node_id, scan);
}
Command Result Processing
- Receive command result
- Log to
commands.jsonl - Store command history
- Publish command event
- Optionally trigger additional actions
pub async fn record_command_result(&self, result: CommandResult) {
// Log command event
let event = CommandEvent { ... };
self.logs.append_json_line("commands.jsonl", &event);
// Update command result history
self.record_command_result_no_log(result).await;
}
Attention Computation
The system dynamically computes a node's attention status based on multiple factors:
- Heartbeat staleness
- Scan staleness
- Scan findings severity
- Service status
- Cloudflare tunnel status
pub fn compute_attention(
now: OffsetDateTime,
hb: &NodeHeartbeat,
scan: Option<&LastScan>,
cfg: &SchedulerConfig,
) -> NodeAttentionStatus {
let mut reasons = Vec::new();
// Check heartbeat age
if now - hb.timestamp > cfg.heartbeat_stale {
reasons.push("heartbeat_stale");
}
// Check scan status
match scan {
None => reasons.push("never_scanned"),
Some(s) => {
if now - s.ts > cfg.scan_stale {
reasons.push("scan_stale");
}
if s.summary.critical > 0 {
reasons.push("critical_findings");
}
}
}
// Check service flags
if !hb.cloudflare_ok {
reasons.push("cloudflare_down");
}
NodeAttentionStatus {
needs_attention: !reasons.is_empty(),
reasons,
}
}
Persistence and State Replay
Log Replay Mechanism
- On startup, reconstruct in-memory state from JSONL logs
- Replay events in chronological order
- Recreate node history, scan results, and command results
pub async fn replay_from_logs(&self) {
self.replay_heartbeats().await;
self.replay_scans().await;
self.replay_commands().await;
self.replay_envelopes().await;
}
Performance Characteristics
- In-memory event store: 500 most recent events
- Append-only logging for durability
- Non-blocking event processing
- Low-overhead broadcasting
Security Considerations
- Ed25519 key signing for commands
- Configurable command policies
- Nonce-based replay protection
- No sensitive data stored in logs
Extensibility
- Easy to add new event types
- Flexible attention computation
- Modular event processing pipeline