170 lines
4.1 KiB
Markdown
170 lines
4.1 KiB
Markdown
# VaultMesh Command Center: Event Processing Architecture
|
|
|
|
## Overview
|
|
|
|
The Command Center implements a sophisticated, multi-layered event processing system designed for robust, real-time fleet management.
|
|
|
|
## Key Components
|
|
|
|
### 1. Event Types
|
|
- `NodeHeartbeat`: Node status updates
|
|
- `ScanEvent`: Scan findings and results
|
|
- `CommandEvent`: Command execution outcomes
|
|
- `EventEnvelope`: Generic communication events
|
|
|
|
### 2. Processing Stages
|
|
|
|
#### a. Ingestion
|
|
- Raw event data received via HTTP endpoints
|
|
- Validated and transformed into structured payloads
|
|
- Logged to append-only JSONL files for durability
|
|
|
|
#### b. State Update
|
|
- In-memory state updated with latest information
|
|
- Maintain sliding window of recent events (max 50 per node)
|
|
- Compute derived states (attention status, last scan)
|
|
|
|
#### c. Broadcasting
|
|
- Events published via Server-Sent Events (SSE)
|
|
- Broadcast to all connected clients
|
|
- Low-latency, real-time updates
|
|
|
|
## Event Processing Workflow
|
|
|
|
### Heartbeat Processing
|
|
1. Receive heartbeat data
|
|
2. Log to `heartbeats.jsonl`
|
|
3. Update node history
|
|
4. Publish heartbeat event
|
|
5. Recompute node attention status
|
|
6. Broadcast attention event
|
|
|
|
```rust
|
|
pub async fn upsert_heartbeat(&self, hb: NodeHeartbeat) {
|
|
// Log event
|
|
let event = HeartbeatEvent { ... };
|
|
self.logs.append_json_line("heartbeats.jsonl", &event);
|
|
|
|
// Update in-memory state
|
|
self.upsert_heartbeat_no_log(hb).await;
|
|
}
|
|
```
|
|
|
|
### Scan Result Processing
|
|
1. Receive scan results
|
|
2. Log to `scans.jsonl`
|
|
3. Update last scan information
|
|
4. Publish scan event
|
|
5. Recompute node attention status
|
|
6. Broadcast attention event
|
|
|
|
```rust
|
|
pub async fn update_last_scan(&self, node_id: Uuid, scan: LastScan) {
|
|
// Update scan history
|
|
let mut scans = self.last_scans.write().await;
|
|
scans.insert(node_id, scan);
|
|
}
|
|
```
|
|
|
|
### Command Result Processing
|
|
1. Receive command result
|
|
2. Log to `commands.jsonl`
|
|
3. Store command history
|
|
4. Publish command event
|
|
5. Optionally trigger additional actions
|
|
|
|
```rust
|
|
pub async fn record_command_result(&self, result: CommandResult) {
|
|
// Log command event
|
|
let event = CommandEvent { ... };
|
|
self.logs.append_json_line("commands.jsonl", &event);
|
|
|
|
// Update command result history
|
|
self.record_command_result_no_log(result).await;
|
|
}
|
|
```
|
|
|
|
## Attention Computation
|
|
|
|
The system dynamically computes a node's attention status based on multiple factors:
|
|
|
|
- Heartbeat staleness
|
|
- Scan staleness
|
|
- Scan findings severity
|
|
- Service status
|
|
- Cloudflare tunnel status
|
|
|
|
```rust
|
|
pub fn compute_attention(
|
|
now: OffsetDateTime,
|
|
hb: &NodeHeartbeat,
|
|
scan: Option<&LastScan>,
|
|
cfg: &SchedulerConfig,
|
|
) -> NodeAttentionStatus {
|
|
let mut reasons = Vec::new();
|
|
|
|
// Check heartbeat age
|
|
if now - hb.timestamp > cfg.heartbeat_stale {
|
|
reasons.push("heartbeat_stale");
|
|
}
|
|
|
|
// Check scan status
|
|
match scan {
|
|
None => reasons.push("never_scanned"),
|
|
Some(s) => {
|
|
if now - s.ts > cfg.scan_stale {
|
|
reasons.push("scan_stale");
|
|
}
|
|
if s.summary.critical > 0 {
|
|
reasons.push("critical_findings");
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check service flags
|
|
if !hb.cloudflare_ok {
|
|
reasons.push("cloudflare_down");
|
|
}
|
|
|
|
NodeAttentionStatus {
|
|
needs_attention: !reasons.is_empty(),
|
|
reasons,
|
|
}
|
|
}
|
|
```
|
|
|
|
## Persistence and State Replay
|
|
|
|
### Log Replay Mechanism
|
|
- On startup, reconstruct in-memory state from JSONL logs
|
|
- Replay events in chronological order
|
|
- Recreate node history, scan results, and command results
|
|
|
|
```rust
|
|
pub async fn replay_from_logs(&self) {
|
|
self.replay_heartbeats().await;
|
|
self.replay_scans().await;
|
|
self.replay_commands().await;
|
|
self.replay_envelopes().await;
|
|
}
|
|
```
|
|
|
|
## Performance Characteristics
|
|
|
|
- In-memory event store: 500 most recent events
|
|
- Append-only logging for durability
|
|
- Non-blocking event processing
|
|
- Low-overhead broadcasting
|
|
|
|
## Security Considerations
|
|
|
|
- Ed25519 key signing for commands
|
|
- Configurable command policies
|
|
- Nonce-based replay protection
|
|
- No sensitive data stored in logs
|
|
|
|
## Extensibility
|
|
|
|
- Easy to add new event types
|
|
- Flexible attention computation
|
|
- Modular event processing pipeline |