#!/usr/bin/env python3 """ VaultMesh Console Receipts HTTP Bridge A minimal FastAPI server that exposes the Console receipt emitter for the OpenCode plugin to call via HTTP. Usage: python scripts/console_receipts_server.py # Or with uvicorn directly: uvicorn scripts.console_receipts_server:app --host 127.0.0.1 --port 9110 """ import os import sys from pathlib import Path from typing import Any, Dict, List, Literal, Optional # Add parent directory to path for imports SCRIPT_DIR = Path(__file__).parent.absolute() VAULTMESH_ROOT = SCRIPT_DIR.parent sys.path.insert(0, str(VAULTMESH_ROOT)) # Set environment variable for the emitter os.environ.setdefault("VAULTMESH_ROOT", str(VAULTMESH_ROOT)) from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from dataclasses import asdict from engines.console.receipts import ( ConsoleReceiptEmitter, ReceiptType, emit_console_receipt, get_emitter, ) from engines.console.approvals import get_approval_manager, ApprovalRequest # ============================================================================ # FastAPI App # ============================================================================ app = FastAPI( title="VaultMesh Console Receipts API", description="HTTP bridge for emitting Console receipts to the Civilization Ledger", version="0.1.0", ) # Allow CORS for local development app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ============================================================================ # Request/Response Models # ============================================================================ class ReceiptIn(BaseModel): """Request body for emitting a receipt.""" type: ReceiptType session_id: Optional[str] = None payload: Dict[str, Any] class ReceiptOut(BaseModel): """Response after emitting a receipt.""" ok: bool record: Dict[str, Any] class RootInfo(BaseModel): """Console scroll root information.""" engine_id: str merkle_root: str events: int updated_at: Optional[str] = None class AnchorRequest(BaseModel): """Request to trigger Guardian anchor.""" scrolls: Optional[List[str]] = None class SearchRequest(BaseModel): """Request to search receipts.""" scroll: Optional[str] = "Console" receipt_type: Optional[str] = None limit: int = 50 class ApprovalRequestIn(BaseModel): """Request to create an approval.""" session_id: str action_type: str action_details: Dict[str, Any] requested_by: str approvers: List[str] timeout_minutes: int = 60 class ApprovalDecisionIn(BaseModel): """Request to decide on an approval.""" approved: bool approver: str reason: str = "" # ============================================================================ # Endpoints # ============================================================================ @app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "ok", "engine": "console"} @app.post("/v1/console/receipt", response_model=ReceiptOut) async def post_receipt(req: ReceiptIn): """ Emit a Console receipt. This is the main endpoint called by the OpenCode plugin. """ try: record = emit_console_receipt( receipt_type=req.type, payload=req.payload, session_id=req.session_id, ) return ReceiptOut(ok=True, record=record) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/v1/console/root", response_model=RootInfo) async def get_root(): """ Get Console scroll Merkle root info. """ try: emitter = get_emitter() info = emitter.get_root_info() return RootInfo(**info) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/v1/guardian/anchor") async def trigger_anchor(req: AnchorRequest): """ Trigger a Guardian anchor cycle. NOTE: This is a stub - in production, this would call the actual Guardian engine to anchor the specified scrolls. """ scrolls = req.scrolls or ["Console"] # TODO: Integrate with actual Guardian engine return { "ok": True, "message": f"Anchor requested for scrolls: {scrolls}", "note": "Stub implementation - Guardian integration pending", } @app.post("/v1/receipts/search") async def search_receipts(req: SearchRequest): """ Search Console receipts. NOTE: This is a simple implementation that reads from JSONL. In production, use a proper index or database. """ import json from pathlib import Path emitter = get_emitter() events_path = Path(emitter.events_path) if not events_path.exists(): return {"results": [], "total": 0} results = [] with open(events_path, "r", encoding="utf-8") as f: for line in f: if not line.strip(): continue try: record = json.loads(line) # Filter by receipt type if specified if req.receipt_type and record.get("type") != req.receipt_type: continue results.append(record) except json.JSONDecodeError: continue # Return most recent first, with limit results = list(reversed(results))[:req.limit] return { "results": results, "total": len(results), "scroll": req.scroll, } @app.get("/v1/console/receipts") async def list_receipts(limit: int = 20, offset: int = 0): """ List Console receipts with pagination. """ import json from pathlib import Path emitter = get_emitter() events_path = Path(emitter.events_path) if not events_path.exists(): return {"receipts": [], "total": 0, "limit": limit, "offset": offset} all_receipts = [] with open(events_path, "r", encoding="utf-8") as f: for line in f: if not line.strip(): continue try: all_receipts.append(json.loads(line)) except json.JSONDecodeError: continue # Most recent first all_receipts = list(reversed(all_receipts)) total = len(all_receipts) page = all_receipts[offset : offset + limit] return { "receipts": page, "total": total, "limit": limit, "offset": offset, } # ============================================================================ # Session Query Endpoints # ============================================================================ @app.get("/v1/console/sessions") async def list_sessions(status: str = "all", limit: int = 20): """List Console sessions.""" import json emitter = get_emitter() events_path = Path(emitter.events_path) if not events_path.exists(): return {"sessions": [], "total": 0} sessions: Dict[str, Dict[str, Any]] = {} for line in events_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: r = json.loads(line) except Exception: continue sid = r.get("session_id") if not sid: continue if sid not in sessions: sessions[sid] = { "session_id": sid, "status": "active", "started_at": None, "ended_at": None, "events": 0, } sessions[sid]["events"] += 1 r_type = r.get("type") if r_type == "console_session_start": sessions[sid]["started_at"] = r["ts"] elif r_type == "console_session_end": sessions[sid]["ended_at"] = r["ts"] sessions[sid]["status"] = "ended" results = list(sessions.values()) if status == "active": results = [s for s in results if s["status"] == "active"] elif status == "ended": results = [s for s in results if s["status"] == "ended"] return {"sessions": results[-limit:], "total": len(results)} @app.get("/v1/console/sessions/{session_id}") async def get_session(session_id: str): """Get detailed session status.""" import json emitter = get_emitter() events_path = Path(emitter.events_path) if not events_path.exists(): raise HTTPException(status_code=404, detail="Session not found") session: Optional[Dict[str, Any]] = None for line in events_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: r = json.loads(line) except Exception: continue if r.get("session_id") != session_id: continue r_type = r.get("type") payload = r.get("payload") or {} if r_type == "console_session_start": session = { "session_id": session_id, "status": "active", "started_at": r["ts"], "agent_type": payload.get("agent_type"), "model_id": payload.get("model_id"), "caller": payload.get("caller"), } elif r_type == "console_session_end" and session is not None: session["status"] = "ended" session["ended_at"] = r["ts"] if session is None: raise HTTPException(status_code=404, detail="Session not found") return session # ============================================================================ # Approval Endpoints # ============================================================================ @app.post("/v1/console/approvals/request") async def request_approval(req: ApprovalRequestIn): """Request approval for an action.""" manager = get_approval_manager() request = manager.request_approval( session_id=req.session_id, action_type=req.action_type, action_details=req.action_details, requested_by=req.requested_by, approvers=req.approvers, timeout_minutes=req.timeout_minutes, ) return {"ok": True, "approval_id": request.approval_id} @app.get("/v1/console/approvals/pending") async def list_pending_approvals(session_id: Optional[str] = None): """List pending approval requests.""" manager = get_approval_manager() pending = manager.list_pending(session_id) return {"pending": [asdict(r) for r in pending]} @app.post("/v1/console/approvals/{approval_id}/decide") async def decide_approval(approval_id: str, req: ApprovalDecisionIn): """Approve or reject a pending action.""" manager = get_approval_manager() try: success = manager.decide( approval_id=approval_id, approved=req.approved, approver=req.approver, reason=req.reason, ) return { "ok": success, "decision": "approved" if req.approved else "rejected", } except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) except KeyError: raise HTTPException(status_code=404, detail="Approval not found") # ============================================================================ # GitLab Webhook Handler # ============================================================================ @app.post("/gitlab/webhook") async def gitlab_webhook(request: Request): """ Handle GitLab webhook events and emit Console receipts. Supports: - Push events → console_gitlab_push - Merge request events → console_gitlab_mr - Pipeline events → console_gitlab_pipeline Configure in GitLab: Settings → Webhooks """ event_type = request.headers.get("X-Gitlab-Event", "unknown") try: body = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON body") emitter = get_emitter() if event_type == "Push Hook": # Push event project = body.get("project", {}) commits = body.get("commits", []) session_id = f"gitlab-push-{body.get('checkout_sha', 'unknown')[:12]}" emit_console_receipt( "console_command", # Reuse console_command for push events { "command": "git_push", "args_hash": body.get("checkout_sha", ""), "exit_code": 0, "duration_ms": 0, "gitlab_event": "push", "project_path": project.get("path_with_namespace", ""), "ref": body.get("ref", ""), "commits_count": len(commits), "user": body.get("user_name", ""), }, session_id=session_id, ) return {"ok": True, "event": "push", "session_id": session_id} elif event_type == "Merge Request Hook": # Merge request event mr = body.get("object_attributes", {}) project = body.get("project", {}) session_id = f"gitlab-mr-{mr.get('iid', 'unknown')}" action = mr.get("action", "update") # open, close, merge, update, etc. emit_console_receipt( "console_command", { "command": f"mr_{action}", "args_hash": mr.get("last_commit", {}).get("id", ""), "exit_code": 0, "duration_ms": 0, "gitlab_event": "merge_request", "project_path": project.get("path_with_namespace", ""), "mr_iid": mr.get("iid"), "mr_title": mr.get("title", ""), "mr_state": mr.get("state", ""), "source_branch": mr.get("source_branch", ""), "target_branch": mr.get("target_branch", ""), "user": body.get("user", {}).get("name", ""), }, session_id=session_id, ) return {"ok": True, "event": "merge_request", "action": action, "session_id": session_id} elif event_type == "Pipeline Hook": # Pipeline event pipeline = body.get("object_attributes", {}) project = body.get("project", {}) session_id = f"gitlab-pipeline-{pipeline.get('id', 'unknown')}" status = pipeline.get("status", "unknown") # Only emit for significant status changes if status in ("running", "success", "failed", "canceled"): receipt_type = "console_session_start" if status == "running" else "console_session_end" if status == "running": emit_console_receipt( "console_session_start", { "agent_type": "gitlab-ci", "model_id": "none", "caller": "did:vm:service:gitlab-webhook", "project_path": project.get("path_with_namespace", ""), "pipeline_id": pipeline.get("id"), "ref": pipeline.get("ref", ""), "commit": pipeline.get("sha", ""), }, session_id=session_id, ) else: emit_console_receipt( "console_session_end", { "duration_ms": pipeline.get("duration", 0) * 1000 if pipeline.get("duration") else 0, "commands_executed": 0, "files_modified": 0, "exit_reason": f"pipeline-{status}", }, session_id=session_id, ) return {"ok": True, "event": "pipeline", "status": status, "session_id": session_id} else: # Unknown event type - log but don't fail return {"ok": True, "event": event_type, "note": "Unhandled event type"} # ============================================================================ # HTML Dashboard # ============================================================================ from fastapi.responses import HTMLResponse def format_time_ago(ts_str: str) -> str: """Format timestamp as relative time.""" from datetime import datetime, timezone try: ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) now = datetime.now(timezone.utc) diff = now - ts seconds = diff.total_seconds() if seconds < 60: return f"{int(seconds)}s ago" elif seconds < 3600: return f"{int(seconds/60)}m ago" elif seconds < 86400: return f"{int(seconds/3600)}h ago" else: return f"{int(seconds/86400)}d ago" except: return ts_str[:19] def get_event_color(event_type: str) -> str: """Get color for event type.""" colors = { "console_session_start": "#22c55e", # green "console_session_end": "#6b7280", # gray "console_command": "#3b82f6", # blue "console_file_edit": "#a855f7", # purple "console_tool_call": "#06b6d4", # cyan "console_approval_request": "#f59e0b", # amber "console_approval": "#10b981", # emerald "console_git_commit": "#ec4899", # pink } return colors.get(event_type, "#9ca3af") def get_event_icon(event_type: str) -> str: """Get icon for event type.""" icons = { "console_session_start": "▶", "console_session_end": "■", "console_command": "⌘", "console_file_edit": "✎", "console_tool_call": "⚡", "console_approval_request": "⏳", "console_approval": "✓", "console_git_commit": "⬆", } return icons.get(event_type, "•") @app.get("/console/dashboard", response_class=HTMLResponse) async def console_dashboard(): """ HTML dashboard showing Console status at a glance. Shows: - Active and recent sessions - Pending approvals - Recent events stream """ import json from datetime import datetime, timezone emitter = get_emitter() events_path = Path(emitter.events_path) # Collect data sessions: Dict[str, Dict[str, Any]] = {} all_events: List[Dict[str, Any]] = [] if events_path.exists(): for line in events_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: r = json.loads(line) all_events.append(r) sid = r.get("session_id") if sid: if sid not in sessions: sessions[sid] = { "session_id": sid, "status": "active", "started_at": None, "ended_at": None, "events": 0, "agent_type": None, "caller": None, } sessions[sid]["events"] += 1 if r.get("type") == "console_session_start": sessions[sid]["started_at"] = r.get("ts") payload = r.get("payload", {}) sessions[sid]["agent_type"] = payload.get("agent_type") sessions[sid]["caller"] = payload.get("caller") elif r.get("type") == "console_session_end": sessions[sid]["ended_at"] = r.get("ts") sessions[sid]["status"] = "ended" except: continue # Get pending approvals manager = get_approval_manager() pending = manager.list_pending() # Get root info root_info = emitter.get_root_info() # Sort sessions by most recent activity session_list = sorted( sessions.values(), key=lambda s: s.get("started_at") or "", reverse=True )[:20] # Recent events (last 30) recent_events = list(reversed(all_events[-30:])) # Build HTML html = f"""