from __future__ import annotations import json import os import sqlite3 import time import uuid from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Iterator, Mapping, Sequence from ledger.migrate import migrate from ledger.redact import redact_json_for_storage THIS_FILE = Path(__file__).resolve() PKG_DIR = THIS_FILE.parent REPO_ROOT = PKG_DIR.parent def default_db_path() -> Path: configured = os.environ.get("LEDGER_DB_PATH") or os.environ.get( "VAULTMESH_LEDGER_DB" ) if configured: return Path(configured).expanduser().resolve() vaultmesh_root = os.environ.get("VAULTMESH_ROOT") if vaultmesh_root: return ( Path(vaultmesh_root).expanduser().resolve() / ".state" / "ledger.sqlite" ).resolve() return (REPO_ROOT / ".state" / "ledger.sqlite").resolve() def new_id() -> str: return str(uuid.uuid4()) def new_trace_id() -> str: return str(uuid.uuid4()) def _apply_pragmas(conn: sqlite3.Connection) -> None: conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA synchronous=NORMAL;") conn.execute("PRAGMA foreign_keys=ON;") conn.execute("PRAGMA busy_timeout=5000;") conn.execute("PRAGMA temp_store=MEMORY;") def connect(db_path: Path | str | None = None) -> sqlite3.Connection: path = Path(db_path) if db_path is not None else default_db_path() path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(path), isolation_level=None) conn.row_factory = sqlite3.Row _apply_pragmas(conn) return conn @contextmanager def open_db(db_path: Path | str | None = None) -> Iterator[sqlite3.Connection]: conn = connect(db_path) try: yield conn finally: conn.close() @contextmanager def txn(conn: sqlite3.Connection) -> Iterator[sqlite3.Connection]: conn.execute("BEGIN;") try: yield conn conn.execute("COMMIT;") except Exception: conn.execute("ROLLBACK;") raise def ensure_migrated(conn: sqlite3.Connection) -> None: migrate(conn) def _utc_now_iso_z() -> str: return ( datetime.now(timezone.utc) .replace(microsecond=0) .isoformat() .replace("+00:00", "Z") ) def _json_dumps(value: Any) -> str: return json.dumps(value, ensure_ascii=False, sort_keys=True, separators=(",", ":")) def _normalize_action(value: str | None) -> str | None: if value is None: return None value = value.strip() return value or None def _sha256_hex(data: bytes) -> str: import hashlib return hashlib.sha256(data).hexdigest() def _blake3_hex(data: bytes) -> str | None: try: import blake3 # type: ignore except Exception: return None return blake3.blake3(data).hexdigest() @dataclass(frozen=True) class LedgerEvent: id: str ts: str kind: str status: str label: str duration_ms: int | None trace_id: str | None error_text: str | None @dataclass(frozen=True) class ShadowReceiptRow: id: str ts: str horizon_id: str counterfactual_hash: str entropy_delta: float | None reason_unrealized: str observer_signature: str | None trace_id: str | None meta_json: str | None def log_tool_invocation( *, tool_name: str, action: str | None = None, status: str, duration_ms: int | None = None, input_payload: Any | None = None, output_payload: Any | None = None, error_text: str | None = None, trace_id: str | None = None, actor: str | None = None, db_path: Path | str | None = None, ) -> str: invocation_id = new_id() redacted_input, input_meta = redact_json_for_storage(input_payload) redacted_output, output_meta = redact_json_for_storage(output_payload) input_meta_json = _json_dumps(input_meta) if input_meta else None output_meta_json = _json_dumps(output_meta) if output_meta else None with open_db(db_path) as conn: ensure_migrated(conn) with txn(conn): conn.execute( """ INSERT INTO tool_invocations ( id, ts, tool_name, action, status, duration_ms, input_json, output_json, error_text, trace_id, actor, input_meta_json, output_meta_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); """, ( invocation_id, _utc_now_iso_z(), tool_name, _normalize_action(action), status, duration_ms, redacted_input, redacted_output, error_text, trace_id, actor, input_meta_json, output_meta_json, ), ) return invocation_id def log_mcp_call( *, server_name: str, method: str, tool_name: str | None = None, status: str, duration_ms: int | None = None, request_payload: Any | None = None, response_payload: Any | None = None, error_text: str | None = None, trace_id: str | None = None, client_id: str | None = None, db_path: Path | str | None = None, ) -> str: call_id = new_id() redacted_request, request_meta = redact_json_for_storage(request_payload) redacted_response, response_meta = redact_json_for_storage(response_payload) request_meta_json = _json_dumps(request_meta) if request_meta else None response_meta_json = _json_dumps(response_meta) if response_meta else None with open_db(db_path) as conn: ensure_migrated(conn) with txn(conn): conn.execute( """ INSERT INTO mcp_calls ( id, ts, server_name, method, tool_name, status, duration_ms, request_json, response_json, error_text, trace_id, client_id, request_meta_json, response_meta_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); """, ( call_id, _utc_now_iso_z(), server_name, method, _normalize_action(tool_name), status, duration_ms, redacted_request, redacted_response, error_text, trace_id, client_id, request_meta_json, response_meta_json, ), ) return call_id def log_proof_artifact( *, kind: str, path: str | Path | None = None, meta: Mapping[str, Any] | None = None, trace_id: str | None = None, db_path: Path | str | None = None, ) -> str: artifact_id = new_id() rel_path: str | None = None sha256_hex: str | None = None blake3_hex: str | None = None size_bytes: int | None = None if path is not None: p = Path(path) try: rel_path = str(p.resolve().relative_to(REPO_ROOT)) except Exception: rel_path = str(p) if p.exists() and p.is_file(): data = p.read_bytes() sha256_hex = _sha256_hex(data) blake3_hex = _blake3_hex(data) size_bytes = len(data) meta_json_redacted, _ = redact_json_for_storage(meta) meta_json = meta_json_redacted with open_db(db_path) as conn: ensure_migrated(conn) with txn(conn): conn.execute( """ INSERT INTO proof_artifacts ( id, ts, kind, path, sha256_hex, blake3_hex, size_bytes, meta_json, trace_id ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); """, ( artifact_id, _utc_now_iso_z(), kind, rel_path, sha256_hex, blake3_hex, size_bytes, meta_json, trace_id, ), ) return artifact_id @contextmanager def timed_operation() -> Iterator[dict[str, Any]]: start = time.perf_counter() info: dict[str, Any] = {} try: yield info finally: info["duration_ms"] = int((time.perf_counter() - start) * 1000) def insert_shadow_receipt( *, horizon_id: str, counterfactual_hash: str, reason_unrealized: str, entropy_delta: float | None = None, observer_signature: str | None = None, trace_id: str | None = None, meta: Mapping[str, Any] | None = None, db_path: Path | str | None = None, ) -> str: """ Insert a ShadowReceipt (proof of restraint / unrealized futures) into the local SQLite ledger. Notes: - `meta` is redacted via `redact_json_for_storage` before storage. - Callers should pass `trace_id` to correlate with tool_invocations/mcp_calls/proof_artifacts. """ receipt_id = new_id() meta_json_redacted, _ = redact_json_for_storage(meta) with open_db(db_path) as conn: ensure_migrated(conn) with txn(conn): conn.execute( """ INSERT INTO shadow_receipts ( id, ts, horizon_id, counterfactual_hash, entropy_delta, reason_unrealized, observer_signature, trace_id, meta_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); """, ( receipt_id, _utc_now_iso_z(), horizon_id, counterfactual_hash, entropy_delta, reason_unrealized, observer_signature, trace_id, meta_json_redacted, ), ) return receipt_id def get_shadow_receipts_by_trace( trace_id: str, *, db_path: Path | str | None = None ) -> list[ShadowReceiptRow]: with open_db(db_path) as conn: ensure_migrated(conn) rows = conn.execute( """ SELECT id, ts, horizon_id, counterfactual_hash, entropy_delta, reason_unrealized, observer_signature, trace_id, meta_json FROM shadow_receipts WHERE trace_id = ? ORDER BY datetime(ts) ASC, id ASC; """, (trace_id,), ).fetchall() out: list[ShadowReceiptRow] = [] for r in rows: out.append( ShadowReceiptRow( id=r["id"], ts=r["ts"], horizon_id=r["horizon_id"], counterfactual_hash=r["counterfactual_hash"], entropy_delta=r["entropy_delta"], reason_unrealized=r["reason_unrealized"], observer_signature=r["observer_signature"], trace_id=r["trace_id"], meta_json=r["meta_json"], ) ) return out def get_shadow_receipts_recent( n: int = 50, *, db_path: Path | str | None = None ) -> list[ShadowReceiptRow]: with open_db(db_path) as conn: ensure_migrated(conn) rows = conn.execute( """ SELECT id, ts, horizon_id, counterfactual_hash, entropy_delta, reason_unrealized, observer_signature, trace_id, meta_json FROM shadow_receipts ORDER BY datetime(ts) DESC, id DESC LIMIT ?; """, (int(n),), ).fetchall() out: list[ShadowReceiptRow] = [] for r in rows: out.append( ShadowReceiptRow( id=r["id"], ts=r["ts"], horizon_id=r["horizon_id"], counterfactual_hash=r["counterfactual_hash"], entropy_delta=r["entropy_delta"], reason_unrealized=r["reason_unrealized"], observer_signature=r["observer_signature"], trace_id=r["trace_id"], meta_json=r["meta_json"], ) ) return out