Files
vm-core/engines/console/approvals.py
2025-12-27 00:10:32 +00:00

210 lines
6.7 KiB
Python

"""
Console Engine Approval Manager
Handles approval requests, pending state, and decision recording.
State is derived from scroll on init, kept in memory during runtime.
"""
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Literal
import json
from .receipts import emit_console_receipt, get_emitter
ApprovalStatus = Literal["pending", "approved", "rejected", "expired"]
@dataclass
class ApprovalRequest:
"""An approval request waiting for decision."""
approval_id: str
session_id: str
action_type: str
action_details: Dict[str, Any]
requested_by: str
approvers: List[str]
expires_at: str
status: ApprovalStatus = "pending"
created_at: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
class ApprovalManager:
"""
Manages pending approval requests.
State is derived from scroll on init, kept in memory during runtime.
"""
def __init__(self, vaultmesh_root: Optional[str] = None):
self.vaultmesh_root = vaultmesh_root
self._pending: Dict[str, ApprovalRequest] = {}
self._load_pending_from_scroll()
# --------------------------------------------------------------------- #
# Internal helpers
# --------------------------------------------------------------------- #
def _load_pending_from_scroll(self) -> None:
"""
Reconstruct pending approvals from Console scroll.
Algorithm:
- Scan all receipts.
- Collect all console_approval_request receipts as candidates.
- Remove any whose approval_id appears in a console_approval receipt.
- Drop any that are expired.
"""
emitter = get_emitter(self.vaultmesh_root)
events_path = Path(emitter.events_path)
if not events_path.exists():
return
requests: Dict[str, ApprovalRequest] = {}
decided_ids: set = set()
for line in events_path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
try:
r = json.loads(line)
except Exception:
continue
r_type = r.get("type")
payload = r.get("payload") or {}
if r_type == "console_approval_request":
approval_id = payload.get("approval_id")
if not approval_id:
continue
req = ApprovalRequest(
approval_id=approval_id,
session_id=payload.get("session_id") or r.get("session_id"),
action_type=payload.get("action_type", ""),
action_details=payload.get("action_details", {}),
requested_by=payload.get("requested_by", ""),
approvers=payload.get("approvers", []),
expires_at=payload.get("expires_at", ""),
status=payload.get("status", "pending"),
created_at=payload.get("created_at", r.get("ts")),
)
requests[approval_id] = req
elif r_type == "console_approval":
approval_id = payload.get("approval_id")
if approval_id:
decided_ids.add(approval_id)
# Filter to pending and not expired
now = datetime.now(timezone.utc)
for approval_id, req in requests.items():
if approval_id in decided_ids:
continue
try:
exp = datetime.fromisoformat(req.expires_at.replace("Z", "+00:00"))
except Exception:
exp = now
if exp < now:
continue # expired; skip
req.status = "pending"
self._pending[approval_id] = req
# --------------------------------------------------------------------- #
# Public API
# --------------------------------------------------------------------- #
def request_approval(
self,
session_id: str,
action_type: str,
action_details: Dict[str, Any],
requested_by: str,
approvers: List[str],
timeout_minutes: int = 60,
) -> ApprovalRequest:
"""Create a new approval request and emit receipt."""
now = datetime.now(timezone.utc)
approval_id = f"approval-{now.strftime('%Y-%m-%d-%H%M%S')}"
expires_at = (now + timedelta(minutes=timeout_minutes)).isoformat()
request = ApprovalRequest(
approval_id=approval_id,
session_id=session_id,
action_type=action_type,
action_details=action_details,
requested_by=requested_by,
approvers=approvers,
expires_at=expires_at,
)
emit_console_receipt(
"console_approval_request",
asdict(request),
session_id=session_id,
)
self._pending[approval_id] = request
return request
def decide(
self,
approval_id: str,
approved: bool,
approver: str,
reason: str = "",
) -> bool:
"""Record approval decision and emit receipt."""
if approval_id not in self._pending:
# Let caller decide whether to treat this as error
raise KeyError(f"Approval not found: {approval_id}")
request = self._pending[approval_id]
if approver not in request.approvers:
raise PermissionError(f"{approver} not in approver pool")
request.status = "approved" if approved else "rejected"
emit_console_receipt(
"console_approval",
{
"approval_id": approval_id,
"action_type": request.action_type,
"approved": approved,
"approver": approver,
"reason": reason,
"decided_at": datetime.now(timezone.utc).isoformat(),
},
session_id=request.session_id,
)
del self._pending[approval_id]
return True
def list_pending(self, session_id: Optional[str] = None) -> List[ApprovalRequest]:
"""List pending approval requests."""
requests = list(self._pending.values())
if session_id:
requests = [r for r in requests if r.session_id == session_id]
return requests
# Singleton
_manager: Optional[ApprovalManager] = None
def get_approval_manager(vaultmesh_root: Optional[str] = None) -> ApprovalManager:
"""Get or create the ApprovalManager singleton."""
global _manager
if _manager is None:
_manager = ApprovalManager(vaultmesh_root)
return _manager