diff --git a/CHANGELOG.md b/CHANGELOG.md index 18c3aa9..3c8c3b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog +## V0.7.3 – Envelope Canonicalization + +- Added `format` and `schema` to `EventEnvelope` for stable codec/semantics pinning. +- Renamed `body` → `payload` in the comms event API (server still accepts `body` as an alias). +- Canonicalization enforced before persistence/broadcast: + - Timestamps truncated to RFC3339 UTC `Z` with seconds precision + - `payload` object keys recursively sorted (arrays preserve order) +- `events.jsonl` now stores one canonical `EventEnvelope` per line. + ## V0.7.2 – Communication Layer ### Unified Event API @@ -32,7 +41,7 @@ # Post a note curl -X POST http://127.0.0.1:8088/api/events \ -H "Content-Type: application/json" \ - -d '{"kind":"note","node_id":"uuid","author":"operator","body":{"text":"Test","severity":"info"}}' + -d '{"kind":"note","node_id":"uuid","author":"operator","payload":{"text":"Test","severity":"info"}}' # Query events curl "http://127.0.0.1:8088/api/events?since=2025-01-01T00:00:00Z&kind=note" diff --git a/COOLING_CHECKLIST.md b/COOLING_CHECKLIST.md new file mode 100644 index 0000000..e12332e --- /dev/null +++ b/COOLING_CHECKLIST.md @@ -0,0 +1,4 @@ +# Cooling Checklist (30 Days) + +For the next 30 days, treat these as frozen invariants: do not change `EventEnvelope` `format`/`schema` semantics, do not reorder top-level envelope fields, do not change timestamp precision (UTC `Z`, seconds-only), do not change payload key-sorting rules (objects sorted recursively; arrays preserve order), do not change the JSONL newline byte contract (one LF per line), and do not silently change hash algorithms when/if leaf hashing is introduced—only evolve via an explicit schema/version bump. + diff --git a/command-center/src/main.rs b/command-center/src/main.rs index b9370f5..97b5dbb 100644 --- a/command-center/src/main.rs +++ b/command-center/src/main.rs @@ -5,14 +5,13 @@ mod state; use crate::cli::{Cli, Commands, LogsAction}; use crate::routes::app; -use crate::state::{AppState, CommandPayload, SignedCommand}; +use crate::state::{now_utc_seconds, AppState, CommandPayload, SignedCommand}; use base64::Engine; use clap::Parser; use ed25519_dalek::{Signer, SigningKey}; use std::net::SocketAddr; use std::path::Path; use std::time::Duration; -use time::OffsetDateTime; use tokio::net::TcpListener; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use uuid::Uuid; @@ -58,7 +57,7 @@ async fn scheduler_loop(state: AppState) { /// Run a single scheduler tick: check each node and queue scans if needed. async fn run_scheduler_tick(state: &AppState) -> anyhow::Result<()> { - let now = OffsetDateTime::now_utc(); + let now = now_utc_seconds(); let latest = state.list_latest().await; let last_scans = state.list_last_scans().await; diff --git a/command-center/src/routes.rs b/command-center/src/routes.rs index a48dd6f..0644b50 100644 --- a/command-center/src/routes.rs +++ b/command-center/src/routes.rs @@ -16,7 +16,7 @@ use uuid::Uuid; use serde::Serialize; use crate::state::{ compute_attention, AppState, CommandEventPayload, CommandPayload, CommandResult, - EventEnvelope, HeartbeatEventPayload, LastScan, NodeHeartbeat, NodeHistory, ScanEvent, + now_utc_seconds, EventEnvelope, HeartbeatEventPayload, LastScan, NodeHeartbeat, NodeHistory, ScanEvent, ScanEventPayload, ScanSummary, ServerEvent, SignedCommand, }; @@ -46,8 +46,12 @@ pub struct PostEventRequest { pub node_id: Option, #[serde(default)] pub author: Option, - #[serde(default)] - pub body: serde_json::Value, + #[serde(default = "default_json_object", alias = "body")] + pub payload: serde_json::Value, +} + +fn default_json_object() -> serde_json::Value { + serde_json::Value::Object(serde_json::Map::new()) } /// Response for POST /api/events @@ -93,7 +97,7 @@ pub fn app(state: AppState) -> Router { // Simple HTML dashboard (no JS framework, HTMX-ready later). pub async fn dashboard(State(state): State) -> Html { - let now = OffsetDateTime::now_utc(); + let now = now_utc_seconds(); let latest = state.list_latest().await; let last_scans = state.list_last_scans().await; let mut rows = String::new(); @@ -576,7 +580,7 @@ pub async fn send_command( let nonce = Uuid::new_v4().to_string(); let payload = CommandPayload { node_id, - ts: OffsetDateTime::now_utc(), + ts: now_utc_seconds(), nonce, cmd: form.cmd, args, @@ -812,7 +816,7 @@ pub async fn post_event( headers: axum::http::HeaderMap, Json(req): Json, ) -> impl IntoResponse { - let now = OffsetDateTime::now_utc(); + let now = now_utc_seconds(); let id = Uuid::new_v4(); // Author can be overridden via X-VM-Author header @@ -823,14 +827,14 @@ pub async fn post_event( .or(req.author) .unwrap_or_else(|| "operator".to_string()); - let envelope = EventEnvelope { + let envelope = EventEnvelope::new( id, - kind: req.kind.clone(), - ts: now, - node_id: req.node_id, - author: author.clone(), - body: req.body, - }; + now, + req.kind.clone(), + req.node_id, + author.clone(), + req.payload, + ); tracing::info!( "POST /api/events: kind={}, node_id={:?}, author={}", @@ -904,7 +908,7 @@ pub async fn get_events( /// NASA-style Mission Console with live SSE updates. pub async fn mission_console(State(state): State) -> Html { - let now = OffsetDateTime::now_utc(); + let now = now_utc_seconds(); let latest = state.list_latest().await; let last_scans = state.list_last_scans().await; @@ -1632,8 +1636,8 @@ body {{ appendCommsEvent(env); // Also add to node timeline and global feed - const body = env.body || {{}}; - const text = body.text || body.description || body.title || kind; + const payload = env.payload || env.body || {{}}; + const text = payload.text || payload.description || payload.title || kind; addNodeTimelineEvent(env.node_id, kind, text.substring(0, 50)); // Get hostname for global feed @@ -1944,7 +1948,7 @@ body {{ body: JSON.stringify({{ kind: "note", node_id: selectedNodeId, - body: {{ text, severity }} + payload: {{ text, severity }} }}) }}); @@ -1999,9 +2003,9 @@ body {{ const ts = new Date(ev.ts).toLocaleString(); const kind = ev.kind || "note"; const author = ev.author || "unknown"; - const body = ev.body || {{}}; - const severity = body.severity || "info"; - const text = body.text || body.description || body.title || JSON.stringify(body); + const payload = ev.payload || ev.body || {{}}; + const severity = payload.severity || "info"; + const text = payload.text || payload.description || payload.title || JSON.stringify(payload); return `
diff --git a/command-center/src/state.rs b/command-center/src/state.rs index 4b7f61b..29f4cf9 100644 --- a/command-center/src/state.rs +++ b/command-center/src/state.rs @@ -11,6 +11,10 @@ use time::{Duration, OffsetDateTime}; use tokio::sync::{broadcast, RwLock}; use uuid::Uuid; +pub fn now_utc_seconds() -> OffsetDateTime { + OffsetDateTime::now_utc().replace_nanosecond(0).unwrap() +} + /// How many heartbeats we keep per node (for history). const MAX_HEARTBEATS_PER_NODE: usize = 50; @@ -194,24 +198,111 @@ pub enum ServerEvent { // V0.7.2: Communication Layer - EventEnvelope // ============================================================================ +const EVENT_ENVELOPE_FORMAT_V0: &str = "vm-event-envelope-v0"; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct EventEnvelopeSchema { + pub envelope: u32, + pub payload: u32, +} + +impl Default for EventEnvelopeSchema { + fn default() -> Self { + Self { + envelope: 0, + payload: 0, + } + } +} + +fn default_event_envelope_format() -> String { + EVENT_ENVELOPE_FORMAT_V0.to_string() +} + +fn default_empty_object() -> serde_json::Value { + serde_json::Value::Object(serde_json::Map::new()) +} + +fn truncate_to_seconds_utc(ts: OffsetDateTime) -> OffsetDateTime { + ts.to_offset(time::UtcOffset::UTC) + .replace_nanosecond(0) + .unwrap() +} + +fn normalize_json_value(v: serde_json::Value) -> serde_json::Value { + use serde_json::{Map, Value}; + use std::collections::BTreeMap; + + match v { + Value::Object(map) => { + let mut sorted: BTreeMap = BTreeMap::new(); + for (k, v2) in map { + sorted.insert(k, normalize_json_value(v2)); + } + let mut out = Map::new(); + for (k, v2) in sorted { + out.insert(k, v2); + } + Value::Object(out) + } + Value::Array(arr) => Value::Array(arr.into_iter().map(normalize_json_value).collect()), + other => other, + } +} + /// Canonical message format for all comms events. /// Used for notes, incidents, acknowledgements, tags, and resolutions. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EventEnvelope { + /// Codec discriminator (stable forever) + #[serde(default = "default_event_envelope_format")] + pub format: String, + /// Semantics discriminator (stable forever) + #[serde(default)] + pub schema: EventEnvelopeSchema, /// Unique event ID (server-assigned) pub id: Uuid, - /// Event kind: "note", "incident", "ack", "tag", "resolve" - pub kind: String, /// Timestamp (server-assigned) #[serde(with = "time::serde::rfc3339")] pub ts: OffsetDateTime, + /// Event kind: "note", "incident", "ack", "tag", "resolve" + pub kind: String, /// Node this event relates to (optional for global events) #[serde(skip_serializing_if = "Option::is_none")] pub node_id: Option, /// Author: "operator", "system", "vm-copilot", "scheduler", agent name pub author: String, /// Structured payload (kind-specific) - pub body: serde_json::Value, + #[serde(default = "default_empty_object", alias = "body")] + pub payload: serde_json::Value, +} + +impl EventEnvelope { + pub fn new( + id: Uuid, + ts: OffsetDateTime, + kind: String, + node_id: Option, + author: String, + payload: serde_json::Value, + ) -> Self { + Self { + format: EVENT_ENVELOPE_FORMAT_V0.to_string(), + schema: EventEnvelopeSchema::default(), + id, + ts, + kind, + node_id, + author, + payload, + } + } + + pub fn canonicalize_in_place(&mut self) { + self.format = EVENT_ENVELOPE_FORMAT_V0.to_string(); + self.ts = truncate_to_seconds_utc(self.ts); + self.payload = normalize_json_value(std::mem::take(&mut self.payload)); + } } /// Log entry wrapper for EventEnvelope (versioned for future compatibility). @@ -437,7 +528,7 @@ impl AppState { /// Recompute and publish attention status for a node. pub async fn recompute_and_publish_attention(&self, node_id: Uuid) { - let now = OffsetDateTime::now_utc(); + let now = now_utc_seconds(); let latest = self.list_latest().await; let last_scans = self.list_last_scans().await; @@ -461,9 +552,11 @@ impl AppState { /// Record an EventEnvelope: log to JSONL, store in memory, broadcast via SSE. pub async fn record_envelope(&self, ev: EventEnvelope) { + let mut ev = ev; + ev.canonicalize_in_place(); + // 1) Log to JSONL - let entry = EventEnvelopeLogEntry { version: 1, event: ev.clone() }; - if let Err(e) = self.logs.append_json_line("events.jsonl", &entry) { + if let Err(e) = self.logs.append_json_line("events.jsonl", &ev) { tracing::warn!("failed to append events log: {e}"); } @@ -498,17 +591,22 @@ impl AppState { continue; } - let entry: EventEnvelopeLogEntry = match serde_json::from_str(&line) { + let event: EventEnvelope = match serde_json::from_str(&line) { Ok(v) => v, - Err(e) => { - tracing::warn!("invalid events line: {e}"); - continue; - } + Err(_) => match serde_json::from_str::(&line) { + Ok(v) => v.event, + Err(e) => { + tracing::warn!("invalid events line: {e}"); + continue; + } + }, }; + let mut event = event; + event.canonicalize_in_place(); // Store in memory (no broadcast during replay) let mut events = self.events.write().await; - events.push(entry.event); + events.push(event); if events.len() > MAX_ENVELOPES_IN_MEMORY { let overflow = events.len() - MAX_ENVELOPES_IN_MEMORY; events.drain(0..overflow); @@ -840,3 +938,74 @@ pub fn compute_attention( reasons, } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_event_envelope_canonicalization_bytes() { + let id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); + let ts = OffsetDateTime::parse( + "2025-12-17T23:07:10.123Z", + &time::format_description::well_known::Rfc3339, + ) + .unwrap(); + + let payload = serde_json::json!({ + "z": 1, + "a": { "d": 1, "b": 2 }, + "m": [{ "y": 1, "x": 2 }] + }); + + let mut ev = EventEnvelope::new( + id, + ts, + "note".to_string(), + None, + "operator".to_string(), + payload, + ); + ev.canonicalize_in_place(); + + assert_eq!(ev.format, "vm-event-envelope-v0"); + assert_eq!(ev.schema.envelope, 0); + assert_eq!(ev.schema.payload, 0); + assert_eq!( + ev.ts.format(&time::format_description::well_known::Rfc3339) + .unwrap(), + "2025-12-17T23:07:10Z" + ); + + let json = serde_json::to_string(&ev).unwrap(); + let expected = concat!( + "{\"format\":\"vm-event-envelope-v0\",", + "\"schema\":{\"envelope\":0,\"payload\":0},", + "\"id\":\"00000000-0000-0000-0000-000000000001\",", + "\"ts\":\"2025-12-17T23:07:10Z\",", + "\"kind\":\"note\",", + "\"author\":\"operator\",", + "\"payload\":{\"a\":{\"b\":2,\"d\":1},\"m\":[{\"x\":2,\"y\":1}],\"z\":1}}" + ); + assert_eq!(json, expected); + + let mut line = Vec::new(); + serde_json::to_writer(&mut line, &ev).unwrap(); + line.push(b'\n'); + assert!(line.ends_with(b"\n")); + assert!(line.len() >= 2); + assert_ne!(line[line.len() - 2], b'\n'); + assert_eq!(&line[..line.len() - 1], expected.as_bytes()); + + let legacy_json = concat!( + "{\"id\":\"00000000-0000-0000-0000-000000000001\",", + "\"ts\":\"2025-12-17T23:07:10.123Z\",", + "\"kind\":\"note\",", + "\"author\":\"operator\",", + "\"body\":{\"z\":1,\"a\":{\"d\":1,\"b\":2},\"m\":[{\"y\":1,\"x\":2}]}}" + ); + let mut legacy_ev: EventEnvelope = serde_json::from_str(legacy_json).unwrap(); + legacy_ev.canonicalize_in_place(); + assert_eq!(serde_json::to_string(&legacy_ev).unwrap(), expected); + } +} diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 5615e8d..6ff9444 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -124,6 +124,7 @@ Nodes are keyed by `node_id`. Each heartbeat overwrites the previous entry for t - SSE broadcast of envelope events by their `kind` name. - Durable persistence to `events.jsonl` with replay on startup. - Memory-bounded in-memory store (500 most recent envelopes). +- Canonicalization rules for audit-grade stability (see `docs/EVENT_ENVELOPE.md`). ### V0.7.1: Mission Console - NASA-style 3-panel dashboard at `GET /console`. diff --git a/docs/EVENT_ENVELOPE.md b/docs/EVENT_ENVELOPE.md new file mode 100644 index 0000000..4a06de7 --- /dev/null +++ b/docs/EVENT_ENVELOPE.md @@ -0,0 +1,56 @@ +# VaultMesh EventEnvelope (v0) – Canonical Spec + +This document defines the stable, audit-friendly contract for `EventEnvelope` as used by the Command Center: + +- HTTP API: `POST /api/events`, `GET /api/events` +- SSE stream: `GET /events` (event name = `kind`) +- Durable log: `$VAULTMESH_LOG_DIR/events.jsonl` (one envelope per line) + +## Envelope Shape + +`EventEnvelope` is a single JSON object with the following fields: + +Required: +- `format`: string, must be `"vm-event-envelope-v0"` +- `schema`: object, must be `{ "envelope": 0, "payload": 0 }` +- `id`: UUID string (server-assigned) +- `ts`: RFC3339 UTC timestamp with **seconds precision** (server-assigned), e.g. `"2025-12-17T23:07:10Z"` +- `kind`: string (e.g. `"note"`, `"incident"`, `"ack"`, `"tag"`, `"resolve"`) +- `author`: string (e.g. `"operator"`, `"system"`, `"vm-copilot"`) +- `payload`: JSON value (kind-specific; usually an object) + +Optional: +- `node_id`: UUID string (omit if global event) + +Compatibility: +- Incoming requests/log lines may use `body` instead of `payload`; Command Center treats `body` as an alias for `payload`. + +## Timestamp Rules + +- Canonical timestamps are **UTC `Z`**, **seconds precision only**. +- If a timestamp contains fractional seconds, Command Center truncates to seconds during canonicalization. + +## Canonical JSON Ordering + +To keep bytes stable forever (for hashing, Merkle roots, and diffability), Command Center canonicalizes envelopes before persistence and broadcast: + +- Top-level field order is fixed by the envelope struct definition. +- `payload` is recursively normalized by sorting **object keys** lexicographically. +- Arrays preserve order (arrays are never sorted). +- Optional fields are omitted when absent (no `field: null` unless semantically meaningful). + +## Canonical Bytes + Newline + +The canonical byte representation of an event is: + +- UTF-8 bytes of the canonical JSON serialization of the envelope +- followed by a single LF newline byte (`0x0A`) + +`events.jsonl` is the concatenation of these canonical envelope line bytes in file order. + +## Hashing (v0) + +When hashing a canonical event line (leaf hashing), use: + +- `SHA-256(canonical_event_line_bytes)` + diff --git a/scripts/event_generation_demo.py b/scripts/event_generation_demo.py index f790975..81cbdd9 100755 --- a/scripts/event_generation_demo.py +++ b/scripts/event_generation_demo.py @@ -77,7 +77,7 @@ class EventGenerator: json={ "kind": event_type, "node_id": self.node_id, - "body": event_data + "payload": event_data }, headers={"Content-Type": "application/json"} ) @@ -110,4 +110,4 @@ def main(): time.sleep(1) # Add a small delay between events if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/vm-copilot/vm_copilot_agent.py b/vm-copilot/vm_copilot_agent.py index a5f02a3..d747f6f 100644 --- a/vm-copilot/vm_copilot_agent.py +++ b/vm-copilot/vm_copilot_agent.py @@ -113,7 +113,7 @@ def post_event(kind: str, node_id: str, body: dict) -> Optional[dict]: "kind": kind, "node_id": node_id, "author": AUTHOR, - "body": body, + "payload": body, } try: resp = requests.post( @@ -122,7 +122,7 @@ def post_event(kind: str, node_id: str, body: dict) -> Optional[dict]: headers={"Content-Type": "application/json"}, timeout=10, ) - if resp.status_code == 200: + if resp.status_code in (200, 201): result = resp.json() log.info(f"Posted {kind} event: {result.get('id', 'unknown')}") return result