""" Console Engine Approval Manager Handles approval requests, pending state, and decision recording. State is derived from scroll on init, kept in memory during runtime. """ from __future__ import annotations from dataclasses import dataclass, field, asdict from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Literal import json from .receipts import emit_console_receipt, get_emitter ApprovalStatus = Literal["pending", "approved", "rejected", "expired"] @dataclass class ApprovalRequest: """An approval request waiting for decision.""" approval_id: str session_id: str action_type: str action_details: Dict[str, Any] requested_by: str approvers: List[str] expires_at: str status: ApprovalStatus = "pending" created_at: str = field( default_factory=lambda: datetime.now(timezone.utc).isoformat() ) class ApprovalManager: """ Manages pending approval requests. State is derived from scroll on init, kept in memory during runtime. """ def __init__(self, vaultmesh_root: Optional[str] = None): self.vaultmesh_root = vaultmesh_root self._pending: Dict[str, ApprovalRequest] = {} self._load_pending_from_scroll() # --------------------------------------------------------------------- # # Internal helpers # --------------------------------------------------------------------- # def _load_pending_from_scroll(self) -> None: """ Reconstruct pending approvals from Console scroll. Algorithm: - Scan all receipts. - Collect all console_approval_request receipts as candidates. - Remove any whose approval_id appears in a console_approval receipt. - Drop any that are expired. """ emitter = get_emitter(self.vaultmesh_root) events_path = Path(emitter.events_path) if not events_path.exists(): return requests: Dict[str, ApprovalRequest] = {} decided_ids: set = set() for line in events_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: r = json.loads(line) except Exception: continue r_type = r.get("type") payload = r.get("payload") or {} if r_type == "console_approval_request": approval_id = payload.get("approval_id") if not approval_id: continue req = ApprovalRequest( approval_id=approval_id, session_id=payload.get("session_id") or r.get("session_id"), action_type=payload.get("action_type", ""), action_details=payload.get("action_details", {}), requested_by=payload.get("requested_by", ""), approvers=payload.get("approvers", []), expires_at=payload.get("expires_at", ""), status=payload.get("status", "pending"), created_at=payload.get("created_at", r.get("ts")), ) requests[approval_id] = req elif r_type == "console_approval": approval_id = payload.get("approval_id") if approval_id: decided_ids.add(approval_id) # Filter to pending and not expired now = datetime.now(timezone.utc) for approval_id, req in requests.items(): if approval_id in decided_ids: continue try: exp = datetime.fromisoformat(req.expires_at.replace("Z", "+00:00")) except Exception: exp = now if exp < now: continue # expired; skip req.status = "pending" self._pending[approval_id] = req # --------------------------------------------------------------------- # # Public API # --------------------------------------------------------------------- # def request_approval( self, session_id: str, action_type: str, action_details: Dict[str, Any], requested_by: str, approvers: List[str], timeout_minutes: int = 60, ) -> ApprovalRequest: """Create a new approval request and emit receipt.""" now = datetime.now(timezone.utc) approval_id = f"approval-{now.strftime('%Y-%m-%d-%H%M%S')}" expires_at = (now + timedelta(minutes=timeout_minutes)).isoformat() request = ApprovalRequest( approval_id=approval_id, session_id=session_id, action_type=action_type, action_details=action_details, requested_by=requested_by, approvers=approvers, expires_at=expires_at, ) emit_console_receipt( "console_approval_request", asdict(request), session_id=session_id, ) self._pending[approval_id] = request return request def decide( self, approval_id: str, approved: bool, approver: str, reason: str = "", ) -> bool: """Record approval decision and emit receipt.""" if approval_id not in self._pending: # Let caller decide whether to treat this as error raise KeyError(f"Approval not found: {approval_id}") request = self._pending[approval_id] if approver not in request.approvers: raise PermissionError(f"{approver} not in approver pool") request.status = "approved" if approved else "rejected" emit_console_receipt( "console_approval", { "approval_id": approval_id, "action_type": request.action_type, "approved": approved, "approver": approver, "reason": reason, "decided_at": datetime.now(timezone.utc).isoformat(), }, session_id=request.session_id, ) del self._pending[approval_id] return True def list_pending(self, session_id: Optional[str] = None) -> List[ApprovalRequest]: """List pending approval requests.""" requests = list(self._pending.values()) if session_id: requests = [r for r in requests if r.session_id == session_id] return requests # Singleton _manager: Optional[ApprovalManager] = None def get_approval_manager(vaultmesh_root: Optional[str] = None) -> ApprovalManager: """Get or create the ApprovalManager singleton.""" global _manager if _manager is None: _manager = ApprovalManager(vaultmesh_root) return _manager