#!/usr/bin/env python3 """ VaultMesh Unified CLI (minimal) vm skill validate - Validate the VaultMesh Claude skill and emit receipts. """ import json import os import subprocess import sys from datetime import datetime, timezone from pathlib import Path from typing import NoReturn, Optional def _missing_dep(dep: str, *, pip_name: str | None = None) -> NoReturn: pkg = pip_name or dep raise SystemExit( f"Missing dependency: {dep}. Install with: python3 -m pip install {pkg}" ) try: import base58 # type: ignore except ModuleNotFoundError: # pragma: no cover _missing_dep("base58") try: import blake3 # type: ignore except ModuleNotFoundError: # pragma: no cover _missing_dep("blake3") try: import click # type: ignore except ModuleNotFoundError: # pragma: no cover _missing_dep("click") try: from nacl import signing # type: ignore except ModuleNotFoundError: # pragma: no cover _missing_dep("pynacl", pip_name="pynacl") # ============================================================================ # Self-rooting: CLI auto-detects repo root # ============================================================================ THIS_FILE = Path(__file__).resolve() CLI_DIR = THIS_FILE.parent # /root/work/vaultmesh/cli REPO_ROOT = THIS_FILE.parents[1] # /root/work/vaultmesh # Allow override via env var, but default to auto-detected repo root VAULTMESH_ROOT = Path(os.environ.get("VAULTMESH_ROOT", REPO_ROOT)).resolve() RECEIPTS_ROOT = VAULTMESH_ROOT / "receipts" # Add VAULTMESH_ROOT to sys.path for engine imports if str(VAULTMESH_ROOT) not in sys.path: sys.path.insert(0, str(VAULTMESH_ROOT)) # ============================================================================ # Guardian / Scroll config and Merkle helpers # ============================================================================ SCROLLS = { "drills": { "jsonl": RECEIPTS_ROOT / "drills" / "drill_runs.jsonl", "root_file": VAULTMESH_ROOT / "ROOT.drills.txt", }, "compliance": { "jsonl": RECEIPTS_ROOT / "compliance" / "oracle_answers.jsonl", "root_file": VAULTMESH_ROOT / "ROOT.compliance.txt", }, "guardian": { "jsonl": RECEIPTS_ROOT / "guardian" / "anchor_events.jsonl", "root_file": VAULTMESH_ROOT / "ROOT.guardian.txt", }, "treasury": { "jsonl": RECEIPTS_ROOT / "treasury" / "treasury_events.jsonl", "root_file": VAULTMESH_ROOT / "ROOT.treasury.txt", }, "mesh": { "jsonl": RECEIPTS_ROOT / "mesh" / "mesh_events.jsonl", "root_file": VAULTMESH_ROOT / "ROOT.mesh.txt", }, "offsec": { "jsonl": RECEIPTS_ROOT / "offsec" / "offsec_events.jsonl", "root_file": VAULTMESH_ROOT / "ROOT.offsec.txt", }, "identity": { "jsonl": RECEIPTS_ROOT / "identity" / "identity_events.jsonl", "root_file": VAULTMESH_ROOT / "ROOT.identity.txt", }, "observability": { "jsonl": RECEIPTS_ROOT / "observability" / "observability_events.jsonl", "root_file": VAULTMESH_ROOT / "ROOT.observability.txt", }, "automation": { "jsonl": RECEIPTS_ROOT / "automation" / "automation_events.jsonl", "root_file": VAULTMESH_ROOT / "ROOT.automation.txt", }, "psi": { "jsonl": RECEIPTS_ROOT / "psi" / "psi_events.jsonl", "root_file": VAULTMESH_ROOT / "ROOT.psi.txt", }, "console": { "jsonl": RECEIPTS_ROOT / "console" / "console_events.jsonl", "root_file": RECEIPTS_ROOT / "console" / "ROOT.console.txt", }, } def _vmhash_blake3(data: bytes) -> str: """VaultMesh hash: blake3:.""" return f"blake3:{blake3.blake3(data).hexdigest()}" def _now_iso_z() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def _read_root_value(root_file: Path) -> Optional[str]: if not root_file.exists(): return None text = root_file.read_text(encoding="utf-8").strip() if not text: return None # Structured root file (e.g., Console engine root) for line in text.splitlines(): line = line.strip() if not line or line.startswith("#"): continue if line.startswith("merkle_root="): value = line.split("=", 1)[1].strip() return value or None # Plain root file: first non-empty, non-comment line for line in text.splitlines(): line = line.strip() if not line or line.startswith("#"): continue return line return None def _write_root_value( root_file: Path, root_value: str, *, leaves: int | None = None ) -> None: root_file.parent.mkdir(parents=True, exist_ok=True) # Console root file is a structured key=value document used by the HTTP bridge. if root_file.name == "ROOT.console.txt": events = int(leaves or 0) root_file.write_text( "\n".join( [ "# VaultMesh Console Root", "engine_id=engine:console", f"merkle_root={root_value}", f"events={events}", f"updated_at={_now_iso_z()}", "", ] ), encoding="utf-8", ) return root_file.write_text(root_value + "\n", encoding="utf-8") def get_guardian_did() -> str: """ Resolve the DID that represents the Guardian anchor actor. Priority: 1) VAULTMESH_GUARDIAN_DID env var 2) keys/identity/guardian-local.json 3) default 'did:vm:guardian:local' """ env_did = os.environ.get("VAULTMESH_GUARDIAN_DID") if env_did: return env_did.strip() # Try key file from the identity bootstrap key_path = VAULTMESH_ROOT / "keys" / "identity" / "guardian-local.json" if key_path.exists(): try: data = json.loads(key_path.read_text(encoding="utf-8")) did = data.get("did") if did: return did except Exception: pass # Fallback return "did:vm:guardian:local" def _merkle_root_from_hashes(hashes: list[str]) -> str: """ Compute Merkle root using the same pattern as the Rust VmHash::merkle_root: - If empty: blake3("empty") - If 1 element: that hash - Else: * group in pairs, duplicate last if odd * each parent = blake3( left_hex + right_hex ) """ if not hashes: return _vmhash_blake3(b"empty") if len(hashes) == 1: return hashes[0] current = hashes[:] while len(current) > 1: next_level: list[str] = [] for i in range(0, len(current), 2): left = current[i] right = current[i + 1] if i + 1 < len(current) else current[i] combined = (left.split(":", 1)[-1] + right.split(":", 1)[-1]).encode( "utf-8" ) next_level.append(_vmhash_blake3(combined)) current = next_level return current[0] def _compute_scroll_root(scroll_name: str) -> dict: """ Compute Merkle root for a given scroll's JSONL file. Returns dict: { "scroll": str, "path": Path, "root": str, "leaves": int, "exists": bool, } """ cfg = SCROLLS[scroll_name] path = cfg["jsonl"] if not path.exists(): return { "scroll": scroll_name, "path": path, "root": _vmhash_blake3(b"empty"), "leaves": 0, "exists": False, } hashes: list[str] = [] leaves = 0 with path.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue leaves += 1 hashes.append(_vmhash_blake3(line.encode("utf-8"))) root = _merkle_root_from_hashes(hashes) return { "scroll": scroll_name, "path": path, "root": root, "leaves": leaves, "exists": True, } def _compute_all_roots(selected: Optional[list[str]] = None) -> list[dict]: names = selected or list(SCROLLS.keys()) results = [] for name in names: if name not in SCROLLS: continue results.append(_compute_scroll_root(name)) return results # ============================================================================ # Main CLI Group # ============================================================================ @click.group() @click.version_option(version="0.1.0") def cli() -> None: """VaultMesh Civilization Ledger CLI (minimal edition).""" pass # ============================================================================ # Skill Commands # ============================================================================ @cli.group() def skill() -> None: """Skill utilities.""" pass @skill.command("validate") @click.option( "--skill-dir", type=click.Path(), help="Override skill directory (default: ~/.claude/skills/vaultmesh)", ) @click.option( "--validator-path", type=click.Path(), help=( "Path to vm_validate_vaultmesh_skill.py " "(default: VAULTMESH_ROOT or same directory as this CLI)" ), ) def skill_validate(skill_dir: Optional[str], validator_path: Optional[str]) -> None: """ Validate the VaultMesh Claude skill and emit an Automation receipt. This wraps vm_validate_vaultmesh_skill.py, which: - Checks ~/.claude/skills/vaultmesh/ (or provided skill-dir) - Ensures SKILL.md frontmatter + links - Maintains VAULTMESH_SKILL_ROOT.txt - Writes Automation receipts to receipts/automation/automation_events.jsonl """ # ------------------------------------------------------------------ # Locate validator script (skill_validator.py in cli/) # ------------------------------------------------------------------ if validator_path: validator = Path(validator_path).expanduser() else: # Primary: cli/skill_validator.py (renamed copy) candidate = CLI_DIR / "skill_validator.py" if candidate.exists(): validator = candidate else: # Fallback: legacy name in same directory fallback = CLI_DIR / "vm_validate_vaultmesh_skill.py" if fallback.exists(): validator = fallback else: click.echo( "Could not find skill_validator.py\n" " Tried:\n" f" {candidate}\n" f" {fallback}", err=True, ) sys.exit(2) # ------------------------------------------------------------------ # Build command # ------------------------------------------------------------------ cmd = ["python3", str(validator)] if skill_dir: cmd.append(str(Path(skill_dir).expanduser())) # ------------------------------------------------------------------ # Run validator # ------------------------------------------------------------------ result = subprocess.run( cmd, capture_output=True, text=True, ) if not result.stdout.strip(): click.echo("Validator produced no output", err=True) if result.stderr: click.echo(result.stderr, err=True) sys.exit(result.returncode or 2) # ------------------------------------------------------------------ # Parse JSON report # ------------------------------------------------------------------ try: report = json.loads(result.stdout) except json.JSONDecodeError: click.echo("Failed to parse validator JSON output", err=True) click.echo("Raw stdout:", err=True) click.echo(result.stdout, err=True) if result.stderr: click.echo("\nStderr:", err=True) click.echo(result.stderr, err=True) sys.exit(result.returncode or 2) status = report.get("overall_status", "unknown") algo = report.get("hash_algorithm", "unknown") root = report.get("root_hash", "unknown") skill_dir_used = report.get("skill_dir", "(unknown)") # ------------------------------------------------------------------ # Human-friendly summary # ------------------------------------------------------------------ click.echo("VaultMesh Skill Validation") click.echo("----------------------------------------") click.echo(f" Skill dir: {skill_dir_used}") click.echo(f" Status: {status}") click.echo(f" Hash algo: {algo}") click.echo(f" Root hash: {root}") click.echo("----------------------------------------") checks = report.get("checks", []) ok = sum(1 for c in checks if c.get("status") == "ok") warn = sum(1 for c in checks if c.get("status") == "warn") fail = sum(1 for c in checks if c.get("status") == "fail") click.echo(f" Checks: {ok} ok, {warn} warn, {fail} fail") # Show Automation scroll location receipts_path = RECEIPTS_ROOT / "automation" / "automation_events.jsonl" click.echo(f" Receipt scroll: {receipts_path}") if result.returncode == 0: click.echo("[ok] Automation receipt emitted (status: success)") elif result.returncode == 1: click.echo("[warn] Automation receipt emitted (status: warning)") else: click.echo("[fail] Validation failed (see receipt + logs)") # Propagate exit code so CI / callers can react sys.exit(result.returncode) # ============================================================================ # Guardian Commands # ============================================================================ @cli.group() def guardian() -> None: """Guardian Engine - Audit and anchor.""" pass @guardian.command("automation-log") @click.option( "--limit", default=10, show_default=True, type=int, help="Number of recent Automation receipts to show", ) def guardian_automation_log(limit: int) -> None: """ Show recent Automation scroll events (e.g. skill validation runs). Reads the Automation scroll: receipts/automation/automation_events.jsonl and prints the last N events with sequence, status, and hash summary. """ receipts_path = RECEIPTS_ROOT / "automation" / "automation_events.jsonl" if not receipts_path.exists(): click.echo(f"No Automation scroll found at: {receipts_path}") sys.exit(0) lines = receipts_path.read_text(encoding="utf-8").splitlines() lines = [ln for ln in lines if ln.strip()] if not lines: click.echo("Automation scroll is empty.") sys.exit(0) # Take last N recent = lines[-limit:] records = [] for raw in recent: try: r = json.loads(raw) except Exception: continue records.append(r) if not records: click.echo("No valid Automation receipts parsed.") sys.exit(0) click.echo("Guardian Audit View - Automation Scroll") click.echo("========================================") click.echo(f"File: {receipts_path}") click.echo(f"Showing last {len(records)} event(s):") click.echo() # Print a compact table header = f"{'SEQ':>4} {'TIME':19} {'TYPE':36} {'STATUS':7} {'HASH':18}" click.echo(header) click.echo("-" * len(header)) for r in records: meta = r.get("meta", {}) hdr = r.get("header", {}) body = r.get("body", {}) seq = meta.get("sequence", 0) ts = r.get("timestamp", "")[:19] rtype = str(r.get("type", ""))[:36] status_tag = "?" # Status is encoded in tags like "status:ok" for t in hdr.get("tags", []): if t.startswith("status:"): status_tag = t.split(":", 1)[1] break # Fallback to body.overall_status if present if status_tag == "?" and "overall_status" in body: status_tag = body.get("overall_status") rhash = hdr.get("root_hash", "")[:18] click.echo(f"{seq:>4} {ts:19} {rtype:36} {status_tag:7} {rhash:18}") click.echo() click.echo("Hint: use --limit N to adjust how many events are shown.") @guardian.command("compute-roots") @click.option( "--scroll", "scrolls", multiple=True, help="Limit to one or more scrolls (e.g. --scroll automation --scroll treasury)", ) def guardian_compute_roots(scrolls: tuple[str, ...]) -> None: """Compute Merkle root for each scroll's JSONL but do NOT write ROOT.* files.""" selected = list(scrolls) if scrolls else None results = _compute_all_roots(selected) click.echo("Guardian - Computed Scroll Roots") click.echo("================================") for r in results: status = "OK" if r["exists"] else "MISSING" click.echo( f"- {r['scroll']:14} | leaves: {r['leaves']:4d} | " f"status: {status:7} | root: {r['root']}" ) if not results: click.echo("No matching scrolls (check --scroll names).") @guardian.command("update-roots") @click.option( "--scroll", "scrolls", multiple=True, help="Limit to one or more scrolls (e.g. --scroll automation --scroll psi)", ) def guardian_update_roots(scrolls: tuple[str, ...]) -> None: """ Compute Merkle roots for scrolls and write them into ROOT.*.txt files. This is the canonical 'scroll -> Merkle root -> ROOT.*.txt' step. """ selected = list(scrolls) if scrolls else None results = _compute_all_roots(selected) click.echo("Guardian - Updating ROOT.*.txt Files") click.echo("====================================") for r in results: cfg = SCROLLS[r["scroll"]] root_file = cfg["root_file"] _write_root_value(root_file, r["root"], leaves=r["leaves"]) click.echo( f"- {r['scroll']:14} -> {root_file.name:22} | " f"leaves: {r['leaves']:4d} | root: {r['root']}" ) if not results: click.echo("No matching scrolls (check --scroll names).") @guardian.command("anchor") @click.option( "--backend", default="local", show_default=True, help="Anchor backend identifier (e.g. local, ots, eth, btc).", ) def guardian_anchor(backend: str) -> None: """ Run a full anchor cycle: - Compute Merkle root for each scroll - Update ROOT.*.txt files - Emit a Guardian anchor receipt in receipts/guardian/anchor_events.jsonl """ results = _compute_all_roots(None) # Update ROOT files for r in results: cfg = SCROLLS[r["scroll"]] root_file = cfg["root_file"] _write_root_value(root_file, r["root"], leaves=r["leaves"]) # Build anchor payload now = datetime.now(timezone.utc) ts = now.isoformat(timespec="seconds").replace("+00:00", "Z") anchor_id = f"anchor-{now.strftime('%Y%m%d%H%M%S')}" anchor_epoch = int(now.timestamp()) # Get the Guardian DID for attribution guardian_did = get_guardian_did() roots_map = {r["scroll"]: r["root"] for r in results} anchor_body = { "schema_version": "2.0.0", "type": "guardian_anchor", "timestamp": ts, "anchor_id": anchor_id, "backend": backend, "anchor_by": guardian_did, "anchor_epoch": anchor_epoch, "roots": roots_map, "scrolls": list(roots_map.keys()), } anchor_json = json.dumps(anchor_body, sort_keys=True).encode("utf-8") anchor_hash = _vmhash_blake3(anchor_json) record = { **anchor_body, "anchor_hash": anchor_hash, } guardian_path = SCROLLS["guardian"]["jsonl"] guardian_path.parent.mkdir(parents=True, exist_ok=True) with guardian_path.open("a", encoding="utf-8") as f: f.write(json.dumps(record) + "\n") click.echo("Guardian Anchor") click.echo("===============") click.echo(f" Anchor ID: {anchor_id}") click.echo(f" Backend: {backend}") click.echo(f" Anchor by: {guardian_did}") click.echo(f" Anchor hash: {anchor_hash}") click.echo() click.echo(" Scroll roots:") for name, root in roots_map.items(): click.echo(f" - {name:14} {root}") click.echo() click.echo(f" Receipt: {guardian_path}") @guardian.command("status") def guardian_status() -> None: """ Show current ROOT.*.txt hashes and whether they match the latest JSONL Merkle root. """ click.echo("Guardian Status - ROOT Files vs Computed Roots") click.echo("==============================================") computed = {r["scroll"]: r for r in _compute_all_roots(None)} for name, cfg in SCROLLS.items(): root_file = cfg["root_file"] on_disk = _read_root_value(root_file) comp = computed.get(name) comp_root = comp["root"] if comp else None if on_disk is None or on_disk == "": state = "MISSING_ROOT" elif comp_root is None: state = "NO_COMPUTED" elif on_disk == comp_root: state = "OK" else: state = "STALE" # Truncate hashes for display on_disk_short = ( (on_disk[:30] + "...") if on_disk and len(on_disk) > 30 else (on_disk or "-") ) comp_short = ( (comp_root[:30] + "...") if comp_root and len(comp_root) > 30 else (comp_root or "-") ) click.echo( f"- {name:14} | {state:12} | disk: {on_disk_short} | computed: {comp_short}" ) # ============================================================================ # Identity Commands # ============================================================================ def _store_identity_keypair( did: str, keypair: dict, explicit_path: Optional[Path] = None ) -> Path: """ Store the DID keypair JSON to disk. Default: VAULTMESH_ROOT/keys/identity/{did_type}-{name}.json """ if explicit_path is not None: out_path = explicit_path else: parts = did.split(":") did_type = parts[2] if len(parts) >= 4 else "unknown" name = parts[3] if len(parts) >= 4 else "unknown" key_dir = VAULTMESH_ROOT / "keys" / "identity" key_dir.mkdir(parents=True, exist_ok=True) out_path = key_dir / f"{did_type}-{name}.json" tmp = { "did": did, "created_at": datetime.now(timezone.utc) .isoformat(timespec="seconds") .replace("+00:00", "Z"), "public_key_multibase": keypair["public_key_multibase"], "public_key_hex": keypair["public_key_hex"], "secret_key_hex": keypair["secret_key_hex"], } out_path.write_text(json.dumps(tmp, indent=2), encoding="utf-8") try: os.chmod(out_path, 0o600) except PermissionError: pass # Best-effort return out_path @cli.group() def identity() -> None: """Identity Engine - DIDs and credentials.""" pass @identity.command("did-create") @click.option( "--type", "did_type", required=True, type=click.Choice( ["node", "human", "agent", "service", "mesh", "portal", "guardian", "skill"], case_sensitive=False, ), help="DID type (node/human/agent/service/mesh/portal/guardian/skill)", ) @click.option( "--name", "name", required=True, help="DID name (e.g. 'shield', 'local', 'karol')", ) @click.option( "--display-name", "display_name", help="Human-friendly display name for this DID", ) @click.option( "--role", "role", help="Logical role (e.g. 'portal', 'guardian', 'skill', 'auditor')", ) @click.option( "--controller", "controller_did", help="Controller DID (optional, default: creator DID)", ) @click.option( "--creator", "creator_did", required=True, help="DID of the actor creating this DID (e.g. did:vm:human:karol)", ) @click.option( "--key-out", "key_out", type=click.Path(dir_okay=False, writable=True, path_type=Path), help="Explicit path to write keypair JSON (default: keys/identity/{type}-{name}.json)", ) def identity_did_create( did_type: str, name: str, display_name: Optional[str], role: Optional[str], controller_did: Optional[str], creator_did: str, key_out: Optional[Path], ) -> None: """ Create a VaultMesh DID + Ed25519 keypair and emit identity_did_create receipt. Example: vm identity did-create \\ --type portal \\ --name shield \\ --display-name "VaultMesh Auditor Portal (shield)" \\ --role portal \\ --creator did:vm:human:karol """ # Assemble DID: did:vm:{type}:{name} did_str = f"did:vm:{did_type.lower()}:{name}" controller = controller_did or creator_did # 1) Generate Ed25519 keypair signing_key = signing.SigningKey.generate() verify_key = signing_key.verify_key public_bytes = bytes(verify_key) secret_bytes = bytes(signing_key) # Multibase base58-btc with 'z' prefix (Ed25519VerificationKey2020 style) public_key_b58 = base58.b58encode(public_bytes).decode("ascii") public_key_multibase = f"z{public_key_b58}" keypair_info = { "public_key_multibase": public_key_multibase, "public_key_hex": public_bytes.hex(), "secret_key_hex": secret_bytes.hex(), } # 2) Build receipt body aligned with Rust DidCreateReceipt doc_like = { "id": did_str, "controller": controller, "public_key_multibase": public_key_multibase, "display_name": display_name, "role": role, } doc_json = json.dumps(doc_like, sort_keys=True).encode("utf-8") doc_hash = _vmhash_blake3(doc_json) receipt_body = { "did": did_str, "did_type": did_type.lower(), "controller": controller, "created_by": creator_did, "display_name": display_name, "role": role, "public_key_type": "Ed25519VerificationKey2020", "public_key_multibase": public_key_multibase, "initial_keys": [f"{did_str}#key-1"], "did_document_hash": doc_hash, } # 3) Emit receipt into identity scroll now = datetime.now(timezone.utc) ts = now.isoformat(timespec="seconds").replace("+00:00", "Z") body_json = json.dumps(receipt_body, sort_keys=True).encode("utf-8") root_hash = _vmhash_blake3(body_json) receipt = { "schema_version": "2.0.0", "type": "identity_did_create", "timestamp": ts, "scroll": "identity", "tags": ["identity", "did", "create", did_type.lower()], "root_hash": root_hash, "body": receipt_body, } # Write to identity scroll identity_scroll = SCROLLS["identity"]["jsonl"] identity_scroll.parent.mkdir(parents=True, exist_ok=True) with identity_scroll.open("a", encoding="utf-8") as f: f.write(json.dumps(receipt) + "\n") # Update ROOT.identity.txt (scroll Merkle root) root_file = SCROLLS["identity"]["root_file"] root_info = _compute_scroll_root("identity") _write_root_value(root_file, root_info["root"], leaves=root_info["leaves"]) # 4) Store key material safely on disk key_path = _store_identity_keypair(did_str, keypair_info, explicit_path=key_out) click.echo("Identity DID Created") click.echo("====================") click.echo(f" DID: {did_str}") click.echo(f" Type: {did_type.lower()}") if display_name: click.echo(f" Display: {display_name}") if role: click.echo(f" Role: {role}") click.echo(f" Controller: {controller}") click.echo(f" Creator: {creator_did}") click.echo() click.echo(f" Public key: {public_key_multibase}") click.echo(f" Key file: {key_path}") click.echo(f" Receipt: {root_hash[:40]}...") click.echo(f" Scroll: {identity_scroll}") @identity.command("list") def identity_list() -> None: """List all DIDs in the identity scroll.""" identity_scroll = SCROLLS["identity"]["jsonl"] if not identity_scroll.exists(): click.echo(f"No identity scroll found at: {identity_scroll}") return lines = identity_scroll.read_text(encoding="utf-8").splitlines() lines = [ln for ln in lines if ln.strip()] if not lines: click.echo("Identity scroll is empty.") return dids = [] for raw in lines: try: r = json.loads(raw) if r.get("type") == "identity_did_create": body = r.get("body", {}) dids.append( { "did": body.get("did", "?"), "type": body.get("did_type", "?"), "display_name": body.get("display_name", ""), "role": body.get("role", ""), "created_by": body.get("created_by", "?"), "timestamp": r.get("timestamp", "?"), } ) except Exception: continue if not dids: click.echo("No DIDs found in identity scroll.") return click.echo("Identity Registry") click.echo("=================") click.echo(f"{'DID':40} {'TYPE':10} {'ROLE':12} {'DISPLAY NAME'}") click.echo("-" * 80) for d in dids: did_short = d["did"][:40] if len(d["did"]) > 40 else d["did"] click.echo( f"{did_short:40} {d['type']:10} {d['role'] or '-':12} {d['display_name'] or '-'}" ) # ============================================================================ # OffSec Commands # ============================================================================ def _resolve_actor_did(explicit: Optional[str] = None) -> str: """ Resolve the actor DID for OffSec operations. Priority: 1) explicit --actor-did option 2) VAULTMESH_ACTOR_DID env var 3) raise error """ if explicit: return explicit env = os.environ.get("VAULTMESH_ACTOR_DID") if not env: raise click.UsageError( "Actor DID required; pass --actor-did or set VAULTMESH_ACTOR_DID" ) return env def _emit_offsec_receipt( vm_root: Path, receipt_type: str, body: dict, tags: list[str], ) -> dict: """ Append an OffSec receipt to receipts/offsec/offsec_events.jsonl with hash chaining (previous_hash -> root_hash). """ scroll_path = vm_root / "receipts" / "offsec" / "offsec_events.jsonl" scroll_path.parent.mkdir(parents=True, exist_ok=True) previous_hash = None if scroll_path.exists() and scroll_path.stat().st_size > 0: text = scroll_path.read_text(encoding="utf-8").strip() if text: last_line = text.splitlines()[-1] try: last_obj = json.loads(last_line) previous_hash = last_obj.get("header", {}).get("root_hash") except json.JSONDecodeError: previous_hash = None timestamp = ( datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") ) envelope = { "schema_version": "1.0.0", "type": receipt_type, "timestamp": timestamp, "scroll": "offsec", "header": { "previous_hash": previous_hash, "tags": tags, "root_hash": None, }, "body": body, } # Compute root_hash over canonical JSON (with root_hash=None) serialized = json.dumps( envelope, sort_keys=True, separators=(",", ":"), ).encode("utf-8") root_hash = "blake3:" + blake3.blake3(serialized).hexdigest() envelope["header"]["root_hash"] = root_hash with scroll_path.open("a", encoding="utf-8") as f: f.write(json.dumps(envelope, sort_keys=True) + "\n") return envelope # ============================================================================ # OffSec Phase 2 Helpers # ============================================================================ def _generate_offsec_id(prefix: str) -> str: """ Generate unique ID like VULN-2025-12-001, RT-2025-12-001, INTEL-2025-12-001. Counts existing IDs with same prefix+date to get sequence number. """ now = datetime.now(timezone.utc) date_part = now.strftime("%Y-%m") scroll_path = VAULTMESH_ROOT / "receipts" / "offsec" / "offsec_events.jsonl" seq = 1 if scroll_path.exists(): for line in scroll_path.read_text(encoding="utf-8").splitlines(): if line.strip(): try: obj = json.loads(line) body = obj.get("body", {}) existing_id = ( body.get("vuln_id") or body.get("exercise_id") or body.get("intel_id") ) if existing_id and existing_id.startswith(f"{prefix}-{date_part}"): seq += 1 except Exception: pass return f"{prefix}-{date_part}-{seq:03d}" def _parse_severity_breakdown(json_str: str) -> dict: """Parse severity breakdown JSON string.""" try: breakdown = json.loads(json_str) required = {"critical", "high", "medium", "low", "info"} missing = required - set(breakdown.keys()) if missing: # Fill missing with 0 for key in missing: breakdown[key] = 0 return breakdown except json.JSONDecodeError as e: raise click.UsageError(f"Invalid severity breakdown JSON: {e}") def _parse_affected_products(json_str: str) -> list: """Parse affected products JSON array.""" try: products = json.loads(json_str) if not isinstance(products, list): raise ValueError("affected-products must be a JSON array") return products except json.JSONDecodeError as e: raise click.UsageError(f"Invalid affected-products JSON: {e}") def _parse_comma_list(value: Optional[str]) -> list: """Parse comma-separated string into list.""" if not value: return [] return [v.strip() for v in value.split(",") if v.strip()] def _validate_cvss_vector(vector: str) -> bool: """Validate CVSS 3.1 vector format (basic check).""" if not vector: return True # Optional field return vector.startswith("CVSS:3.1/") or vector.startswith("CVSS:3.0/") def _resolve_asset_identifier(asset: str) -> tuple: """ Resolve asset to (DID, hostname) tuple. Input can be: - DID: did:vm:node:brick-02 -> (did:vm:node:brick-02, None) - Hostname: brick-02.mesh.local -> (attempt DID lookup, brick-02.mesh.local) - Short: brick-02 -> (did:vm:node:brick-02, brick-02) """ if asset.startswith("did:vm:"): return (asset, None) # Check if it's a known node name in identity scroll identity_scroll = VAULTMESH_ROOT / "receipts" / "identity" / "identity_events.jsonl" if identity_scroll.exists(): for line in identity_scroll.read_text(encoding="utf-8").splitlines(): if line.strip(): try: obj = json.loads(line) if obj.get("type") == "identity_did_create": did = obj.get("body", {}).get("did", "") short_name = asset.split(".")[0] if did.endswith(f":{short_name}"): return (did, asset if "." in asset else None) except Exception: pass # Fallback: construct DID from short name short_name = asset.split(".")[0] return (f"did:vm:node:{short_name}", asset if "." in asset else None) @cli.group() def offsec(): """OffSec Engine - incidents, forensics, remediation.""" pass @offsec.command("agents") def offsec_agents() -> None: """List agents on the Shield Node (requires OFFSEC_NODE_URL).""" import asyncio # Lazy import to avoid breaking CLI if aiohttp not installed try: from scripts.offsec_node_client import OffsecNodeClient, OffsecNodeError except ImportError: click.echo( "[offsec] Error: aiohttp not installed. Run: pip install aiohttp", err=True ) raise SystemExit(1) async def _run() -> None: client = OffsecNodeClient.from_env() click.echo(f"[offsec] Connecting to {client.base_url}...") resp = await client.command("agents list") if resp.get("status") == "ok": for line in resp.get("lines", []): click.echo(line) else: click.echo(json.dumps(resp, indent=2)) try: asyncio.run(_run()) except OffsecNodeError as e: click.echo(f"[offsec] Shield node error: {e}", err=True) raise SystemExit(1) except Exception as e: click.echo(f"[offsec] Connection failed: {e}", err=True) click.echo( "[offsec] Set OFFSEC_NODE_URL to Shield Node address (e.g. http://shield-vm:8081)" ) raise SystemExit(1) @offsec.command("shield-status") def offsec_shield_status() -> None: """Get Shield Node status (requires OFFSEC_NODE_URL).""" import asyncio try: from scripts.offsec_node_client import OffsecNodeClient, OffsecNodeError except ImportError: click.echo( "[offsec] Error: aiohttp not installed. Run: pip install aiohttp", err=True ) raise SystemExit(1) async def _run() -> None: client = OffsecNodeClient.from_env() click.echo(f"[offsec] Connecting to {client.base_url}...") # Get health health = await client.health() click.echo(f"\n SHIELD NODE HEALTH") click.echo(f" Status: {health.get('status', 'unknown')}") click.echo(f" Nodes: {health.get('nodes', 0)}") click.echo(f" Proofs: {health.get('proofs', 0)}") click.echo(f" Uptime: {health.get('uptime', 'unknown')}") # Get shield status resp = await client.command("shield status") if resp.get("status") == "ok": click.echo("") for line in resp.get("lines", []): click.echo(line) try: asyncio.run(_run()) except OffsecNodeError as e: click.echo(f"[offsec] Shield node error: {e}", err=True) raise SystemExit(1) except Exception as e: click.echo(f"[offsec] Connection failed: {e}", err=True) raise SystemExit(1) @offsec.command("incident-open") @click.option( "--id", "incident_id", required=True, help="Incident ID (e.g. INC-2025-12-001)" ) @click.option("--title", required=True, help="Short incident title") @click.option( "--severity", type=click.Choice(["low", "medium", "high", "critical"]), required=True, ) @click.option("--summary", required=True, help="Initial summary of the incident") @click.option("--device", default="unknown", help="Device or node label (e.g. shield)") @click.option( "--actor-did", required=False, help="Override actor DID (else VAULTMESH_ACTOR_DID)" ) def offsec_incident_open( incident_id: str, title: str, severity: str, summary: str, device: str, actor_did: Optional[str], ) -> None: """Open a new security incident.""" actor = _resolve_actor_did(actor_did) body = { "incident_id": incident_id, "title": title, "severity": severity, "status": "reported", "opened_by": actor, "device": device, "affected_nodes": [], "summary": summary, } tags = ["offsec", "incident", "open", incident_id, actor] receipt = _emit_offsec_receipt(VAULTMESH_ROOT, "offsec_incident_open", body, tags) click.echo(f"[offsec] incident opened: {incident_id}") click.echo(f" severity: {severity}") click.echo(f" root_hash: {receipt['header']['root_hash']}") @offsec.command("incident-update") @click.option("--id", "incident_id", required=True, help="Incident ID") @click.option("--field", required=True, help="Field to update (e.g. status)") @click.option("--value", required=True, help="New value for the field") @click.option("--old-value", required=False, help="Previous value (optional)") @click.option("--actor-did", required=False, help="Override actor DID") def offsec_incident_update( incident_id: str, field: str, value: str, old_value: Optional[str], actor_did: Optional[str], ) -> None: """Record an incident field change (e.g. status).""" actor = _resolve_actor_did(actor_did) body = { "incident_id": incident_id, "field": field, "old_value": old_value, "new_value": value, "updated_by": actor, } tags = ["offsec", "incident", "update", incident_id, actor, f"field:{field}"] receipt = _emit_offsec_receipt(VAULTMESH_ROOT, "offsec_incident_update", body, tags) click.echo(f"[offsec] incident updated: {incident_id} {field} -> {value}") click.echo(f" root_hash: {receipt['header']['root_hash']}") @offsec.command("incident-close") @click.option("--id", "incident_id", required=True, help="Incident ID") @click.option( "--resolution", required=True, help="Resolution code (e.g. credentials_revoked)" ) @click.option("--lessons", required=True, help="Lessons learned summary") @click.option("--phases-completed", type=int, default=4, show_default=True) @click.option("--timeline-hash", required=False, help="Optional hash of timeline.json") @click.option("--actor-did", required=False, help="Override actor DID") def offsec_incident_close( incident_id: str, resolution: str, lessons: str, phases_completed: int, timeline_hash: Optional[str], actor_did: Optional[str], ) -> None: """Close an incident with resolution and lessons learned.""" actor = _resolve_actor_did(actor_did) body = { "incident_id": incident_id, "closed_by": actor, "resolution": resolution, "lessons_learned": lessons, "phases_completed": phases_completed, "timeline_hash": timeline_hash, } tags = ["offsec", "incident", "close", incident_id, actor, resolution] receipt = _emit_offsec_receipt(VAULTMESH_ROOT, "offsec_incident_close", body, tags) click.echo(f"[offsec] incident closed: {incident_id} ({resolution})") click.echo(f" root_hash: {receipt['header']['root_hash']}") @offsec.command("snapshot") @click.option("--incident", "incident_id", required=True, help="Incident ID") @click.option("--snapshot-id", required=True, help="Snapshot ID (e.g. SNAP-001)") @click.option("--summary", required=True, help="What this snapshot captures") @click.option("--path", "evidence_path", required=True, help="Path to evidence file") @click.option("--actor-did", required=False, help="Override actor DID") def offsec_snapshot( incident_id: str, snapshot_id: str, summary: str, evidence_path: str, actor_did: Optional[str], ) -> None: """Record a forensic snapshot for an incident.""" actor = _resolve_actor_did(actor_did) ev_path = Path(evidence_path) if not ev_path.exists(): raise click.UsageError(f"Evidence path does not exist: {evidence_path}") # Hash the evidence file hasher = blake3.blake3() with ev_path.open("rb") as f: for chunk in iter(lambda: f.read(65536), b""): hasher.update(chunk) file_hash = "blake3:" + hasher.hexdigest() body = { "incident_id": incident_id, "snapshot_id": snapshot_id, "summary": summary, "collected_by": actor, "path": str(ev_path), "file_hash": file_hash, } tags = ["offsec", "forensic", "snapshot", incident_id, actor, snapshot_id] receipt = _emit_offsec_receipt( VAULTMESH_ROOT, "offsec_forensic_snapshot", body, tags ) click.echo(f"[offsec] snapshot recorded: {incident_id} / {snapshot_id}") click.echo(f" evidence_hash: {file_hash}") click.echo(f" root_hash: {receipt['header']['root_hash']}") @offsec.command("recover") @click.option("--incident", "incident_id", required=True, help="Incident ID") @click.option("--action-id", required=True, help="Action ID (e.g. ACT-001)") @click.option("--description", required=True, help="Description of recovery action") @click.option("--actor-did", required=False, help="Override actor DID") def offsec_recover( incident_id: str, action_id: str, description: str, actor_did: Optional[str] ) -> None: """Record a remediation / recovery action for an incident.""" actor = _resolve_actor_did(actor_did) body = { "incident_id": incident_id, "action_id": action_id, "description": description, "executed_by": actor, } tags = ["offsec", "remediation", incident_id, actor, action_id] receipt = _emit_offsec_receipt(VAULTMESH_ROOT, "offsec_remediation", body, tags) click.echo(f"[offsec] remediation recorded: {incident_id} / {action_id}") click.echo(f" root_hash: {receipt['header']['root_hash']}") @offsec.command("status") @click.option("--incident", "incident_id", required=False, help="Filter by incident ID") @click.option( "--limit", type=int, default=20, show_default=True, help="Max receipts to show" ) def offsec_status(incident_id: Optional[str], limit: int) -> None: """Show recent OffSec receipts (optionally filtered by incident).""" scroll_path = VAULTMESH_ROOT / "receipts" / "offsec" / "offsec_events.jsonl" if not scroll_path.exists() or scroll_path.stat().st_size == 0: click.echo("[offsec] no OffSec receipts yet") return lines = scroll_path.read_text(encoding="utf-8").strip().splitlines() lines = [ln for ln in lines if ln.strip()] lines = lines[-limit:] click.echo(f"[offsec] last {len(lines)} receipts") click.echo() click.echo(f"{'TIMESTAMP':20} {'TYPE':28} {'INCIDENT':20} {'DETAIL'}") click.echo("-" * 90) for ln in lines: try: obj = json.loads(ln) except json.JSONDecodeError: continue body = obj.get("body", {}) inc = body.get("incident_id") or "" if incident_id and inc != incident_id: continue ts = obj.get("timestamp", "")[:19] rtype = obj.get("type", "") # Extract a useful detail detail = "" if "status" in body: detail = f"status={body['status']}" elif "new_value" in body: detail = f"{body.get('field', '?')}={body['new_value']}" elif "resolution" in body: detail = f"resolution={body['resolution']}" elif "action_id" in body: detail = f"action={body['action_id']}" elif "snapshot_id" in body: detail = f"snap={body['snapshot_id']}" click.echo(f"{ts:20} {rtype:28} {inc:20} {detail}") # ============================================================================ # OffSec Phase 2: Vulnerability Commands # ============================================================================ @offsec.group("vuln") def offsec_vuln(): """Vulnerability discovery and tracking.""" pass @offsec_vuln.command("report") @click.option( "--id", "vuln_id", required=False, help="Vulnerability ID (auto-generated if not provided)", ) @click.option("--title", required=True, help="Vulnerability title") @click.option( "--severity", type=click.Choice(["critical", "high", "medium", "low", "info"]), required=True, ) @click.option("--description", required=False, help="Detailed description") @click.option("--cvss-score", type=float, required=False, help="CVSS score (0.0-10.0)") @click.option("--cvss-vector", required=False, help="CVSS 3.1 vector string") @click.option("--asset", required=True, help="Affected asset (DID or hostname)") @click.option("--component", required=False, help="Affected software component") @click.option("--version", "affected_version", required=False, help="Affected version") @click.option("--cve", "cve_id", required=False, help="CVE ID if known") @click.option("--cwe", "cwe_id", required=False, help="CWE ID if known") @click.option( "--method", type=click.Choice(["pentest", "scan", "manual", "bug_bounty", "threat_intel"]), default="manual", ) @click.option( "--engagement", "engagement_id", required=False, help="Link to red team exercise ID" ) @click.option( "--incident", "incident_id", required=False, help="Link to incident ID if exploited" ) @click.option("--actor-did", required=False, help="Override actor DID") def vuln_report( vuln_id: Optional[str], title: str, severity: str, description: Optional[str], cvss_score: Optional[float], cvss_vector: Optional[str], asset: str, component: Optional[str], affected_version: Optional[str], cve_id: Optional[str], cwe_id: Optional[str], method: str, engagement_id: Optional[str], incident_id: Optional[str], actor_did: Optional[str], ) -> None: """Report a new vulnerability discovery.""" actor = _resolve_actor_did(actor_did) if not vuln_id: vuln_id = _generate_offsec_id("VULN") if cvss_vector and not _validate_cvss_vector(cvss_vector): raise click.UsageError("Invalid CVSS vector format (expected CVSS:3.1/...)") asset_did, asset_hostname = _resolve_asset_identifier(asset) body = { "vuln_id": vuln_id, "title": title, "description": description, "severity": severity, "cvss_score": cvss_score, "cvss_vector": cvss_vector, "affected_asset": asset_did, "affected_hostname": asset_hostname, "affected_component": component, "affected_version": affected_version, "cve_id": cve_id, "cwe_id": cwe_id, "discovery_method": method, "discovered_by": actor, "engagement_id": engagement_id, "incident_id": incident_id, "status": "reported", "remediation_status": "pending", "proof_of_concept": False, "exploited_in_wild": False, } tags = ["offsec", "vulnerability", vuln_id, severity, actor] if cve_id: tags.append(cve_id) receipt = _emit_offsec_receipt(VAULTMESH_ROOT, "offsec_vuln_discovery", body, tags) click.echo(f"[offsec] vulnerability reported: {vuln_id}") click.echo(f" severity: {severity}") click.echo(f" asset: {asset_did}") if cve_id: click.echo(f" cve: {cve_id}") click.echo(f" root_hash: {receipt['header']['root_hash']}") @offsec_vuln.command("update") @click.option("--id", "vuln_id", required=True, help="Vulnerability ID") @click.option( "--status", type=click.Choice( ["reported", "confirmed", "remediated", "accepted", "false_positive"] ), required=False, ) @click.option( "--remediation-status", type=click.Choice(["pending", "in_progress", "completed", "wont_fix"]), required=False, ) @click.option("--poc/--no-poc", "has_poc", default=None, help="Proof of concept exists") @click.option( "--exploited/--not-exploited", "exploited", default=None, help="Exploited in the wild", ) @click.option("--actor-did", required=False, help="Override actor DID") def vuln_update( vuln_id: str, status: Optional[str], remediation_status: Optional[str], has_poc: Optional[bool], exploited: Optional[bool], actor_did: Optional[str], ) -> None: """Update vulnerability status.""" actor = _resolve_actor_did(actor_did) if not any( [status, remediation_status, has_poc is not None, exploited is not None] ): raise click.UsageError("At least one field must be updated") body = { "vuln_id": vuln_id, "updated_by": actor, "status": status, "remediation_status": remediation_status, "proof_of_concept": has_poc, "exploited_in_wild": exploited, } # Remove None values body = {k: v for k, v in body.items() if v is not None} tags = ["offsec", "vulnerability", "update", vuln_id, actor] receipt = _emit_offsec_receipt(VAULTMESH_ROOT, "offsec_vuln_update", body, tags) click.echo(f"[offsec] vulnerability updated: {vuln_id}") if status: click.echo(f" status: {status}") if remediation_status: click.echo(f" remediation: {remediation_status}") click.echo(f" root_hash: {receipt['header']['root_hash']}") @offsec_vuln.command("link-incident") @click.option("--id", "vuln_id", required=True, help="Vulnerability ID") @click.option("--incident", "incident_id", required=True, help="Incident ID to link") @click.option("--actor-did", required=False, help="Override actor DID") def vuln_link_incident( vuln_id: str, incident_id: str, actor_did: Optional[str] ) -> None: """Link a vulnerability to an incident.""" actor = _resolve_actor_did(actor_did) body = { "vuln_id": vuln_id, "incident_id": incident_id, "linked_by": actor, } tags = ["offsec", "vulnerability", "link", vuln_id, incident_id, actor] receipt = _emit_offsec_receipt(VAULTMESH_ROOT, "offsec_vuln_link", body, tags) click.echo(f"[offsec] vulnerability linked: {vuln_id} <-> {incident_id}") click.echo(f" root_hash: {receipt['header']['root_hash']}") @offsec_vuln.command("list") @click.option("--severity", required=False, help="Filter by severity (comma-separated)") @click.option( "--remediation-status", required=False, help="Filter by remediation status" ) @click.option("--limit", type=int, default=20, show_default=True) def vuln_list( severity: Optional[str], remediation_status: Optional[str], limit: int ) -> None: """List vulnerabilities.""" scroll_path = VAULTMESH_ROOT / "receipts" / "offsec" / "offsec_events.jsonl" if not scroll_path.exists(): click.echo("[offsec] no vulnerabilities yet") return severity_filter = set(_parse_comma_list(severity)) if severity else None vulns = {} for line in scroll_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: obj = json.loads(line) if obj.get("type") == "offsec_vuln_discovery": body = obj.get("body", {}) vid = body.get("vuln_id") if vid: vulns[vid] = { "vuln_id": vid, "title": body.get("title", ""), "severity": body.get("severity", ""), "status": body.get("status", ""), "remediation_status": body.get("remediation_status", ""), "cve_id": body.get("cve_id", ""), "asset": body.get("affected_asset", ""), "timestamp": obj.get("timestamp", ""), } elif obj.get("type") == "offsec_vuln_update": body = obj.get("body", {}) vid = body.get("vuln_id") if vid and vid in vulns: if body.get("status"): vulns[vid]["status"] = body["status"] if body.get("remediation_status"): vulns[vid]["remediation_status"] = body["remediation_status"] except Exception: continue # Apply filters results = list(vulns.values()) if severity_filter: results = [v for v in results if v["severity"] in severity_filter] if remediation_status: results = [v for v in results if v["remediation_status"] == remediation_status] results = results[-limit:] click.echo(f"[offsec] {len(results)} vulnerabilities") click.echo() click.echo( f"{'VULN_ID':20} {'SEVERITY':10} {'STATUS':15} {'REMEDIATION':12} {'CVE':18} {'TITLE'}" ) click.echo("-" * 100) for v in results: click.echo( f"{v['vuln_id']:20} {v['severity']:10} {v['status']:15} {v['remediation_status']:12} {v['cve_id'] or '-':18} {v['title'][:30]}" ) # ============================================================================ # OffSec Phase 2: Red Team Commands # ============================================================================ @offsec.group("redteam") def offsec_redteam(): """Red team exercise management.""" pass @offsec_redteam.command("start") @click.option( "--id", "exercise_id", required=False, help="Exercise ID (auto-generated if not provided)", ) @click.option("--title", required=True, help="Exercise title") @click.option( "--type", "engagement_type", type=click.Choice( ["external_pentest", "internal_pentest", "red_team", "purple_team", "tabletop"] ), required=True, ) @click.option("--scope-in", required=False, help="In-scope targets (comma-separated)") @click.option( "--scope-out", required=False, help="Out-of-scope targets (comma-separated)" ) @click.option("--roe", required=False, help="Rules of engagement") @click.option("--team", required=False, help="Team member DIDs (comma-separated)") @click.option("--authorized-by", required=False, help="DID of authorizing party") @click.option("--actor-did", required=False, help="Override actor DID") def redteam_start( exercise_id: Optional[str], title: str, engagement_type: str, scope_in: Optional[str], scope_out: Optional[str], roe: Optional[str], team: Optional[str], authorized_by: Optional[str], actor_did: Optional[str], ) -> None: """Start a new red team exercise.""" actor = _resolve_actor_did(actor_did) if not exercise_id: exercise_id = _generate_offsec_id("RT") now = ( datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") ) body = { "exercise_id": exercise_id, "title": title, "engagement_type": engagement_type, "scope": { "in_scope": _parse_comma_list(scope_in), "out_of_scope": _parse_comma_list(scope_out), "rules_of_engagement": roe, }, "team_dids": _parse_comma_list(team), "authorized_by": authorized_by, "executed_by": actor, "started_at": now, "ended_at": None, "status": "in_progress", "tools_used": [], "findings_count": 0, "severity_breakdown": { "critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0, }, "vulns_created": [], "incidents_triggered": [], } tags = ["offsec", "redteam", "start", exercise_id, engagement_type, actor] receipt = _emit_offsec_receipt(VAULTMESH_ROOT, "offsec_redteam_start", body, tags) click.echo(f"[offsec] red team exercise started: {exercise_id}") click.echo(f" type: {engagement_type}") click.echo(f" title: {title}") click.echo(f" root_hash: {receipt['header']['root_hash']}") @offsec_redteam.command("update") @click.option("--id", "exercise_id", required=True, help="Exercise ID") @click.option("--tools-used", required=False, help="Tools used (comma-separated)") @click.option("--findings", type=int, required=False, help="Current findings count") @click.option("--severity-breakdown", required=False, help="Severity breakdown JSON") @click.option("--actor-did", required=False, help="Override actor DID") def redteam_update( exercise_id: str, tools_used: Optional[str], findings: Optional[int], severity_breakdown: Optional[str], actor_did: Optional[str], ) -> None: """Update red team exercise progress.""" actor = _resolve_actor_did(actor_did) body = { "exercise_id": exercise_id, "updated_by": actor, "status": "in_progress", } if tools_used: body["tools_used"] = _parse_comma_list(tools_used) if findings is not None: body["findings_count"] = findings if severity_breakdown: body["severity_breakdown"] = _parse_severity_breakdown(severity_breakdown) tags = ["offsec", "redteam", "update", exercise_id, actor] receipt = _emit_offsec_receipt(VAULTMESH_ROOT, "offsec_redteam_update", body, tags) click.echo(f"[offsec] red team exercise updated: {exercise_id}") if findings is not None: click.echo(f" findings: {findings}") click.echo(f" root_hash: {receipt['header']['root_hash']}") @offsec_redteam.command("complete") @click.option("--id", "exercise_id", required=True, help="Exercise ID") @click.option("--tools-used", required=False, help="Tools used (comma-separated)") @click.option("--findings", type=int, required=True, help="Total findings count") @click.option("--severity-breakdown", required=True, help="Severity breakdown JSON") @click.option( "--vulns-created", required=False, help="Created vulnerability IDs (comma-separated)", ) @click.option( "--incidents-triggered", required=False, help="Triggered incident IDs (comma-separated)", ) @click.option("--report-path", required=False, help="Path to final report") @click.option("--actor-did", required=False, help="Override actor DID") def redteam_complete( exercise_id: str, tools_used: Optional[str], findings: int, severity_breakdown: str, vulns_created: Optional[str], incidents_triggered: Optional[str], report_path: Optional[str], actor_did: Optional[str], ) -> None: """Complete a red team exercise.""" actor = _resolve_actor_did(actor_did) now = ( datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") ) body = { "exercise_id": exercise_id, "executed_by": actor, "ended_at": now, "status": "completed", "tools_used": _parse_comma_list(tools_used), "findings_count": findings, "severity_breakdown": _parse_severity_breakdown(severity_breakdown), "vulns_created": _parse_comma_list(vulns_created), "incidents_triggered": _parse_comma_list(incidents_triggered), } if report_path: rp = Path(report_path) if rp.exists(): hasher = blake3.blake3() with rp.open("rb") as f: for chunk in iter(lambda: f.read(65536), b""): hasher.update(chunk) body["report_hash"] = "blake3:" + hasher.hexdigest() body["report_path"] = str(rp) tags = ["offsec", "redteam", "complete", exercise_id, actor] receipt = _emit_offsec_receipt( VAULTMESH_ROOT, "offsec_redteam_complete", body, tags ) click.echo(f"[offsec] red team exercise completed: {exercise_id}") click.echo(f" findings: {findings}") click.echo(f" vulns: {len(_parse_comma_list(vulns_created))}") click.echo(f" root_hash: {receipt['header']['root_hash']}") @offsec_redteam.command("list") @click.option( "--status", type=click.Choice(["in_progress", "completed", "cancelled"]), required=False, ) @click.option("--limit", type=int, default=10, show_default=True) def redteam_list(status: Optional[str], limit: int) -> None: """List red team exercises.""" scroll_path = VAULTMESH_ROOT / "receipts" / "offsec" / "offsec_events.jsonl" if not scroll_path.exists(): click.echo("[offsec] no red team exercises yet") return exercises = {} for line in scroll_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: obj = json.loads(line) rtype = obj.get("type", "") if rtype.startswith("offsec_redteam"): body = obj.get("body", {}) eid = body.get("exercise_id") if eid: if eid not in exercises: exercises[eid] = { "exercise_id": eid, "title": body.get("title", ""), "type": body.get("engagement_type", ""), "status": "in_progress", "findings": 0, "timestamp": obj.get("timestamp", ""), } if body.get("status"): exercises[eid]["status"] = body["status"] if body.get("findings_count"): exercises[eid]["findings"] = body["findings_count"] if body.get("title"): exercises[eid]["title"] = body["title"] if body.get("engagement_type"): exercises[eid]["type"] = body["engagement_type"] except Exception: continue results = list(exercises.values()) if status: results = [e for e in results if e["status"] == status] results = results[-limit:] click.echo(f"[offsec] {len(results)} red team exercises") click.echo() click.echo( f"{'EXERCISE_ID':20} {'TYPE':18} {'STATUS':12} {'FINDINGS':10} {'TITLE'}" ) click.echo("-" * 90) for e in results: click.echo( f"{e['exercise_id']:20} {e['type']:18} {e['status']:12} {e['findings']:<10} {e['title'][:30]}" ) # ============================================================================ # OffSec Phase 2: Threat Intel Commands # ============================================================================ @offsec.group("intel") def offsec_intel(): """Threat intelligence management.""" pass @offsec_intel.command("ingest") @click.option( "--id", "intel_id", required=False, help="Intel ID (auto-generated if not provided)" ) @click.option("--title", required=True, help="Intelligence title") @click.option( "--type", "intel_type", type=click.Choice(["cve", "advisory", "ioc", "ttp", "campaign"]), required=True, ) @click.option( "--source", required=True, help="Source (NVD, CISA, vendor, MISP, OTX, custom)" ) @click.option("--source-url", required=False, help="Source URL") @click.option("--cve", "cve_id", required=False, help="CVE ID") @click.option("--advisory-id", required=False, help="Advisory ID") @click.option("--cvss-score", type=float, required=False, help="CVSS score") @click.option("--cvss-vector", required=False, help="CVSS vector") @click.option( "--affected-products", required=False, help="Affected products JSON array" ) @click.option( "--priority", type=click.Choice(["critical", "high", "medium", "low", "info"]), default="medium", ) @click.option( "--action-required/--no-action", default=False, help="Action required flag" ) @click.option( "--mitre", required=False, help="MITRE ATT&CK techniques (comma-separated)" ) @click.option("--actor-did", required=False, help="Override actor DID") def intel_ingest( intel_id: Optional[str], title: str, intel_type: str, source: str, source_url: Optional[str], cve_id: Optional[str], advisory_id: Optional[str], cvss_score: Optional[float], cvss_vector: Optional[str], affected_products: Optional[str], priority: str, action_required: bool, mitre: Optional[str], actor_did: Optional[str], ) -> None: """Ingest external threat intelligence.""" actor = _resolve_actor_did(actor_did) if not intel_id: intel_id = _generate_offsec_id("INTEL") now = ( datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") ) body = { "intel_id": intel_id, "title": title, "intel_type": intel_type, "source": source, "source_url": source_url, "cve_id": cve_id, "advisory_id": advisory_id, "cvss_score": cvss_score, "cvss_vector": cvss_vector, "affected_products": _parse_affected_products(affected_products) if affected_products else [], "ingested_by": actor, "ingested_at": now, "action_required": action_required, "priority": priority, "matched_assets": [], "vulns_created": [], "indicators": [], "mitre_techniques": _parse_comma_list(mitre), } tags = ["offsec", "threat_intel", intel_id, source, intel_type, actor] if cve_id: tags.append(cve_id) if action_required: tags.append("action_required") receipt = _emit_offsec_receipt(VAULTMESH_ROOT, "offsec_threat_intel", body, tags) click.echo(f"[offsec] threat intel ingested: {intel_id}") click.echo(f" type: {intel_type}") click.echo(f" source: {source}") click.echo(f" priority: {priority}") if cve_id: click.echo(f" cve: {cve_id}") click.echo(f" root_hash: {receipt['header']['root_hash']}") @offsec_intel.command("add-ioc") @click.option("--id", "intel_id", required=True, help="Intel ID to add IOC to") @click.option( "--ioc-type", required=True, help="IOC type (ip, domain, hash, url, email)" ) @click.option("--ioc-value", required=True, help="IOC value") @click.option("--context", required=False, help="Context/description for the IOC") @click.option("--actor-did", required=False, help="Override actor DID") def intel_add_ioc( intel_id: str, ioc_type: str, ioc_value: str, context: Optional[str], actor_did: Optional[str], ) -> None: """Add an IOC to a threat intel record.""" actor = _resolve_actor_did(actor_did) body = { "intel_id": intel_id, "added_by": actor, "indicator": { "type": ioc_type, "value": ioc_value, "context": context, }, } tags = ["offsec", "threat_intel", "ioc", intel_id, ioc_type, actor] receipt = _emit_offsec_receipt(VAULTMESH_ROOT, "offsec_intel_ioc_add", body, tags) click.echo(f"[offsec] IOC added to {intel_id}") click.echo(f" type: {ioc_type}") click.echo(f" value: {ioc_value}") click.echo(f" root_hash: {receipt['header']['root_hash']}") @offsec_intel.command("match") @click.option( "--id", "intel_id", required=True, help="Intel ID to match against assets" ) @click.option( "--auto-create-vulns/--no-vulns", default=False, help="Auto-create vulnerabilities for matches", ) @click.option("--actor-did", required=False, help="Override actor DID") def intel_match( intel_id: str, auto_create_vulns: bool, actor_did: Optional[str] ) -> None: """Match threat intel against mesh assets.""" actor = _resolve_actor_did(actor_did) # This is a placeholder - real implementation would query mesh inventory matched_assets: list = [] body = { "intel_id": intel_id, "matched_by": actor, "matched_assets": matched_assets, "vulns_created": [], "auto_create_vulns": auto_create_vulns, } tags = ["offsec", "threat_intel", "match", intel_id, actor] receipt = _emit_offsec_receipt(VAULTMESH_ROOT, "offsec_intel_match", body, tags) click.echo(f"[offsec] intel matched: {intel_id}") click.echo(f" matched_assets: {len(matched_assets)}") click.echo(f" root_hash: {receipt['header']['root_hash']}") click.echo() click.echo("Note: Asset matching requires Mesh engine integration (Phase 3)") @offsec_intel.command("search") @click.option("--cve", "cve_id", required=False, help="Search by CVE ID") @click.option("--source", required=False, help="Filter by source (comma-separated)") @click.option("--limit", type=int, default=10, show_default=True) def intel_search(cve_id: Optional[str], source: Optional[str], limit: int) -> None: """Search threat intel records.""" scroll_path = VAULTMESH_ROOT / "receipts" / "offsec" / "offsec_events.jsonl" if not scroll_path.exists(): click.echo("[offsec] no threat intel yet") return source_filter = set(_parse_comma_list(source)) if source else None results = [] for line in scroll_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: obj = json.loads(line) if obj.get("type") == "offsec_threat_intel": body = obj.get("body", {}) if cve_id and body.get("cve_id") != cve_id: continue if source_filter and body.get("source") not in source_filter: continue results.append( { "intel_id": body.get("intel_id", ""), "title": body.get("title", ""), "type": body.get("intel_type", ""), "source": body.get("source", ""), "cve_id": body.get("cve_id", ""), "priority": body.get("priority", ""), "action_required": body.get("action_required", False), } ) except Exception: continue results = results[-limit:] click.echo(f"[offsec] {len(results)} intel records found") click.echo() click.echo( f"{'INTEL_ID':20} {'TYPE':10} {'SOURCE':10} {'CVE':18} {'PRIORITY':10} {'ACTION'}" ) click.echo("-" * 90) for r in results: action = "YES" if r["action_required"] else "-" click.echo( f"{r['intel_id']:20} {r['type']:10} {r['source']:10} {r['cve_id'] or '-':18} {r['priority']:10} {action}" ) @offsec_intel.command("list") @click.option("--priority", required=False, help="Filter by priority (comma-separated)") @click.option( "--action-required/--all", default=False, help="Only show action required" ) @click.option("--limit", type=int, default=20, show_default=True) def intel_list(priority: Optional[str], action_required: bool, limit: int) -> None: """List threat intel records.""" scroll_path = VAULTMESH_ROOT / "receipts" / "offsec" / "offsec_events.jsonl" if not scroll_path.exists(): click.echo("[offsec] no threat intel yet") return priority_filter = set(_parse_comma_list(priority)) if priority else None results = [] for line in scroll_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: obj = json.loads(line) if obj.get("type") == "offsec_threat_intel": body = obj.get("body", {}) if priority_filter and body.get("priority") not in priority_filter: continue if action_required and not body.get("action_required"): continue results.append( { "intel_id": body.get("intel_id", ""), "title": body.get("title", ""), "type": body.get("intel_type", ""), "source": body.get("source", ""), "priority": body.get("priority", ""), "action_required": body.get("action_required", False), "timestamp": obj.get("timestamp", ""), } ) except Exception: continue results = results[-limit:] click.echo(f"[offsec] {len(results)} intel records") click.echo() click.echo( f"{'INTEL_ID':20} {'TYPE':10} {'SOURCE':10} {'PRIORITY':10} {'ACTION':8} {'TITLE'}" ) click.echo("-" * 100) for r in results: action = "YES" if r["action_required"] else "-" click.echo( f"{r['intel_id']:20} {r['type']:10} {r['source']:10} {r['priority']:10} {action:8} {r['title'][:30]}" ) # ============================================================================ # OffSec Phase 2: Enhanced Status & Summary # ============================================================================ @offsec.command("summary") def offsec_summary() -> None: """Show OffSec security posture summary.""" scroll_path = VAULTMESH_ROOT / "receipts" / "offsec" / "offsec_events.jsonl" if not scroll_path.exists(): click.echo("[offsec] no receipts yet") return # Track latest state per ID incident_states: dict = {} vuln_states: dict = {} exercise_states: dict = {} intel_stats = {"total": 0, "action_required": 0, "by_source": {}} for line in scroll_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: obj = json.loads(line) body = obj.get("body", {}) rtype = obj.get("type", "") if rtype.startswith("offsec_incident"): inc_id = body.get("incident_id") if inc_id: if inc_id not in incident_states: incident_states[inc_id] = {"severity": None, "closed": False} if body.get("severity"): incident_states[inc_id]["severity"] = body["severity"] if rtype == "offsec_incident_close": incident_states[inc_id]["closed"] = True elif rtype == "offsec_vuln_discovery": vuln_id = body.get("vuln_id") if vuln_id: vuln_states[vuln_id] = { "severity": body.get("severity"), "status": body.get("remediation_status", "pending"), } elif rtype == "offsec_vuln_update": vuln_id = body.get("vuln_id") if vuln_id and vuln_id in vuln_states: if body.get("remediation_status"): vuln_states[vuln_id]["status"] = body["remediation_status"] elif rtype.startswith("offsec_redteam"): ex_id = body.get("exercise_id") if ex_id: if ex_id not in exercise_states: exercise_states[ex_id] = { "status": "in_progress", "findings": 0, } if body.get("status"): exercise_states[ex_id]["status"] = body["status"] if body.get("findings_count"): exercise_states[ex_id]["findings"] = body["findings_count"] elif rtype == "offsec_threat_intel": intel_stats["total"] += 1 if body.get("action_required"): intel_stats["action_required"] += 1 src = body.get("source", "unknown") intel_stats["by_source"][src] = intel_stats["by_source"].get(src, 0) + 1 except Exception: pass # Compute aggregates incidents_open = sum(1 for i in incident_states.values() if not i["closed"]) incidents_closed = sum(1 for i in incident_states.values() if i["closed"]) incidents_by_sev: dict = {} for i in incident_states.values(): sev = i.get("severity") or "unknown" incidents_by_sev[sev] = incidents_by_sev.get(sev, 0) + 1 vulns_open = sum( 1 for v in vuln_states.values() if v["status"] not in ("completed", "remediated") ) vulns_remediated = sum( 1 for v in vuln_states.values() if v["status"] in ("completed", "remediated") ) vulns_by_sev: dict = {} for v in vuln_states.values(): sev = v.get("severity") or "unknown" vulns_by_sev[sev] = vulns_by_sev.get(sev, 0) + 1 redteam_active = sum( 1 for e in exercise_states.values() if e["status"] != "completed" ) redteam_completed = sum( 1 for e in exercise_states.values() if e["status"] == "completed" ) total_findings = sum(e["findings"] for e in exercise_states.values()) # Display click.echo("OffSec Security Posture Summary") click.echo("=" * 40) click.echo() click.echo(f"INCIDENTS: {incidents_open} open, {incidents_closed} closed") for sev in ["critical", "high", "medium", "low"]: if sev in incidents_by_sev: click.echo(f" {sev}: {incidents_by_sev[sev]}") click.echo() click.echo(f"VULNERABILITIES: {vulns_open} open, {vulns_remediated} remediated") for sev in ["critical", "high", "medium", "low", "info"]: if sev in vulns_by_sev: click.echo(f" {sev}: {vulns_by_sev[sev]}") click.echo() click.echo(f"RED TEAM: {redteam_active} active, {redteam_completed} completed") click.echo(f" total findings: {total_findings}") click.echo() click.echo( f"THREAT INTEL: {intel_stats['total']} total, {intel_stats['action_required']} action required" ) for src, cnt in sorted(intel_stats["by_source"].items()): click.echo(f" {src}: {cnt}") # ============================================================================ # Console Commands # ============================================================================ @cli.group() def console() -> None: """Console Engine - AI agent session management.""" pass @console.command("receipts") @click.option("--limit", default=20, show_default=True, type=int) @click.option("--type", "receipt_type", default=None, help="Filter by receipt type") def console_receipts(limit: int, receipt_type: Optional[str]) -> None: """Show recent Console receipts.""" from engines.console.receipts import get_emitter emitter = get_emitter(str(VAULTMESH_ROOT)) events_path = Path(emitter.events_path) if not events_path.exists(): click.echo("No Console scroll found.") return lines = events_path.read_text(encoding="utf-8").splitlines() lines = [ln for ln in lines if ln.strip()] records = [] for raw in reversed(lines): try: r = json.loads(raw) if receipt_type and r.get("type") != receipt_type: continue records.append(r) if len(records) >= limit: break except Exception: continue for r in reversed(records): click.echo(json.dumps(r, separators=(",", ":"))) @console.command("sessions") def console_sessions() -> None: """List known Console sessions.""" from engines.console.receipts import get_emitter emitter = get_emitter(str(VAULTMESH_ROOT)) events_path = Path(emitter.events_path) if not events_path.exists(): click.echo("No Console scroll found.") return sessions: dict = {} for line in events_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: r = json.loads(line) sid = r.get("session_id") if not sid: continue t = r.get("type") if sid not in sessions: sessions[sid] = { "session_id": sid, "started": None, "ended": None, "events": 0, } sessions[sid]["events"] += 1 if t == "console_session_start": sessions[sid]["started"] = r["ts"] elif t == "console_session_end": sessions[sid]["ended"] = r["ts"] except Exception: continue click.echo("Console Sessions") click.echo("================") for s in sessions.values(): status = "ended" if s["ended"] else "active" click.echo(f" {s['session_id']}: {status} ({s['events']} events)") @console.command("history") @click.option("--session", required=True, help="Session ID to show history for") def console_history(session: str) -> None: """Show history for a specific session.""" from engines.console.receipts import get_emitter emitter = get_emitter(str(VAULTMESH_ROOT)) events_path = Path(emitter.events_path) if not events_path.exists(): click.echo("No Console scroll found.") return click.echo(f"History for session: {session}") click.echo("=" * 60) for line in events_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: r = json.loads(line) if r.get("session_id") == session: click.echo(json.dumps(r, separators=(",", ":"))) except Exception: continue @console.command("root") def console_root() -> None: """Show Console scroll Merkle root info.""" from engines.console.receipts import get_emitter emitter = get_emitter(str(VAULTMESH_ROOT)) info = emitter.get_root_info() click.echo("Console Scroll Root") click.echo("===================") click.echo(f" engine_id: {info.get('engine_id', 'N/A')}") merkle_root = info.get("merkle_root", "0") click.echo( f" merkle_root: {merkle_root[:32]}..." if len(merkle_root) > 32 else f" merkle_root: {merkle_root}" ) click.echo(f" events: {info.get('events', 0)}") click.echo(f" updated_at: {info.get('updated_at', 'N/A')}") @console.command("approvals") @click.option("--session", default=None, help="Filter by session ID") def console_approvals(session: Optional[str]) -> None: """List pending approval requests.""" from engines.console.approvals import get_approval_manager manager = get_approval_manager(str(VAULTMESH_ROOT)) pending = manager.list_pending(session) if not pending: click.echo("No pending approvals.") return click.echo("Pending Approvals") click.echo("=================") for r in pending: click.echo(f" {r.approval_id}: {r.action_type}") click.echo(f" session: {r.session_id}") click.echo(f" approvers: {', '.join(r.approvers)}") click.echo(f" expires: {r.expires_at}") click.echo("") @console.command("approve") @click.argument("approval_id") @click.option("--reason", default="", help="Reason for approval") @click.option("--actor-did", required=False, help="Override actor DID") def console_approve(approval_id: str, reason: str, actor_did: Optional[str]) -> None: """Approve a pending action.""" from engines.console.approvals import get_approval_manager actor = actor_did or os.environ.get("VAULTMESH_ACTOR_DID", "did:vm:human:unknown") manager = get_approval_manager(str(VAULTMESH_ROOT)) try: success = manager.decide( approval_id, approved=True, approver=actor, reason=reason, ) if success: click.echo(f"[console] approved: {approval_id}") else: click.echo(f"[console] approval not found: {approval_id}") except PermissionError as e: click.echo(f"[console] permission denied: {e}", err=True) except KeyError: click.echo(f"[console] approval not found: {approval_id}", err=True) @console.command("reject") @click.argument("approval_id") @click.option("--reason", required=True, help="Reason for rejection") @click.option("--actor-did", required=False, help="Override actor DID") def console_reject(approval_id: str, reason: str, actor_did: Optional[str]) -> None: """Reject a pending action.""" from engines.console.approvals import get_approval_manager actor = actor_did or os.environ.get("VAULTMESH_ACTOR_DID", "did:vm:human:unknown") manager = get_approval_manager(str(VAULTMESH_ROOT)) try: success = manager.decide( approval_id, approved=False, approver=actor, reason=reason, ) if success: click.echo(f"[console] rejected: {approval_id}") else: click.echo(f"[console] approval not found: {approval_id}") except PermissionError as e: click.echo(f"[console] permission denied: {e}", err=True) except KeyError: click.echo(f"[console] approval not found: {approval_id}", err=True) @console.command("story") @click.argument("session_id") @click.option( "--format", "fmt", default="narrative", type=click.Choice(["narrative", "json", "timeline"]), help="Output format", ) def console_story(session_id: str, fmt: str) -> None: """ Generate a human-readable narrative of a Console session. Reads all receipts for a session and presents them as a coherent story, showing the sequence of events from start to finish. """ from engines.console.receipts import get_emitter emitter = get_emitter(str(VAULTMESH_ROOT)) events_path = Path(emitter.events_path) if not events_path.exists(): click.echo(f"No Console scroll found at {events_path}") return # Collect all receipts for this session receipts = [] for line in events_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: r = json.loads(line) if r.get("session_id") == session_id: receipts.append(r) except json.JSONDecodeError: continue if not receipts: click.echo(f"No receipts found for session: {session_id}") return if fmt == "json": click.echo(json.dumps(receipts, indent=2)) return if fmt == "timeline": click.echo(f"Timeline: {session_id}") click.echo("=" * 60) for r in receipts: ts = r.get("ts", "")[:19] # Trim to seconds rtype = r.get("type", "unknown") payload = r.get("payload", {}) # One-line summary if rtype == "console_session_start": agent = payload.get("agent_type", "unknown") caller = payload.get("caller", "unknown") click.echo(f"{ts} START {agent} by {caller}") elif rtype == "console_session_end": reason = payload.get("exit_reason", "unknown") click.echo(f"{ts} END {reason}") elif rtype == "console_command": cmd = payload.get("command", "?") exit_code = payload.get("exit_code", "?") click.echo(f"{ts} CMD {cmd} (exit: {exit_code})") elif rtype == "console_file_edit": path = payload.get("file_path", "?") edit_type = payload.get("edit_type", "?") click.echo(f"{ts} EDIT {edit_type} {path}") elif rtype == "console_approval_request": action = payload.get("action_type", "?") aid = payload.get("approval_id", "?") click.echo(f"{ts} REQUEST {action} [{aid}]") elif rtype == "console_approval": action = payload.get("action_type", "?") approved = "APPROVED" if payload.get("approved") else "REJECTED" approver = payload.get("approver", "?") click.echo(f"{ts} {approved} {action} by {approver}") else: click.echo(f"{ts} {rtype}") return # Default: narrative format click.echo("") click.echo("=" * 70) click.echo(f" CONSOLE SESSION STORY: {session_id}") click.echo("=" * 70) click.echo("") # Find session metadata session_start = None session_end = None commands = [] edits = [] approval_requests = [] approvals = [] for r in receipts: rtype = r.get("type", "") payload = r.get("payload", {}) if rtype == "console_session_start": session_start = r elif rtype == "console_session_end": session_end = r elif rtype == "console_command": commands.append(r) elif rtype == "console_file_edit": edits.append(r) elif rtype == "console_approval_request": approval_requests.append(r) elif rtype == "console_approval": approvals.append(r) # Narrator header if session_start: p = session_start.get("payload", {}) agent = p.get("agent_type", "unknown agent") caller = p.get("caller", "unknown caller") project = p.get("project_path", "unknown project") ts = session_start.get("ts", "")[:19] click.echo(f" On {ts}, a session began.") click.echo(f" Agent: {agent}") click.echo(f" Caller: {caller}") click.echo(f" Project: {project}") click.echo("") # Commands if commands: click.echo(f" The session executed {len(commands)} command(s):") for c in commands: p = c.get("payload", {}) cmd = p.get("command", "?") exit_code = p.get("exit_code", "?") ts = c.get("ts", "")[:19] status = "successfully" if exit_code == 0 else f"with exit code {exit_code}" click.echo(f" - [{ts}] {cmd} completed {status}") click.echo("") # Edits if edits: click.echo(f" The session modified {len(edits)} file(s):") for e in edits: p = e.get("payload", {}) path = p.get("file_path", "?") edit_type = p.get("edit_type", "modified") lines = p.get("lines_changed", 0) click.echo(f" - {edit_type} {path} ({lines} lines changed)") click.echo("") # Approval requests if approval_requests: click.echo(f" The session requested {len(approval_requests)} approval(s):") for ar in approval_requests: p = ar.get("payload", {}) action = p.get("action_type", "?") aid = p.get("approval_id", "?") approvers = ", ".join(p.get("approvers", [])) ts = ar.get("ts", "")[:19] click.echo(f" - [{ts}] Requested approval for '{action}'") click.echo(f" Approval ID: {aid}") click.echo(f" Required approvers: {approvers}") click.echo("") # Approvals if approvals: click.echo(f" Approval decisions made during session:") for a in approvals: p = a.get("payload", {}) action = p.get("action_type", "?") approved = p.get("approved", False) approver = p.get("approver", "?") reason = p.get("reason", "") ts = a.get("ts", "")[:19] decision = "APPROVED" if approved else "REJECTED" click.echo(f" - [{ts}] {decision}: '{action}'") click.echo(f" By: {approver}") if reason: click.echo(f" Reason: {reason}") click.echo("") # Session end if session_end: p = session_end.get("payload", {}) reason = p.get("exit_reason", "unknown") ts = session_end.get("ts", "")[:19] click.echo(f" The session ended at {ts}.") click.echo(f" Exit reason: {reason}") click.echo("") # Summary click.echo("-" * 70) click.echo(f" Summary: {len(receipts)} total events") click.echo( f" Commands: {len(commands)} | Edits: {len(edits)} | Approvals: {len(approvals)}/{len(approval_requests)}" ) click.echo("=" * 70) click.echo("") # ============================================================================ # Entry Point # ============================================================================ if __name__ == "__main__": cli()