Files
Vault Sovereign f0b8d962de
Some checks failed
WAF Intelligence Guardrail / waf-intel (push) Waiting to run
Cloudflare Registry Validation / validate-registry (push) Has been cancelled
chore: pre-migration snapshot
Layer0, MCP servers, Terraform consolidation
2025-12-27 01:52:27 +00:00

726 lines
25 KiB
Python

from __future__ import annotations
import json
import os
import sys
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
from .cloudflare_api import (
CloudflareClient,
CloudflareContext,
CloudflareError,
SnapshotStore,
ingress_summary_from_file,
)
MAX_BYTES_DEFAULT = 32_000
def _repo_root() -> Path:
# server.py -> cloudflare_safe -> mcp -> <repo root>
return Path(__file__).resolve().parents[3]
def _max_bytes() -> int:
raw = (os.getenv("VM_MCP_MAX_BYTES") or "").strip()
if not raw:
return MAX_BYTES_DEFAULT
try:
return max(4_096, int(raw))
except ValueError:
return MAX_BYTES_DEFAULT
def _redact(obj: Any) -> Any:
sensitive_keys = ("token", "secret", "password", "private", "key", "certificate")
if isinstance(obj, dict):
out: Dict[str, Any] = {}
for k, v in obj.items():
if any(s in str(k).lower() for s in sensitive_keys):
out[k] = "<REDACTED>"
else:
out[k] = _redact(v)
return out
if isinstance(obj, list):
return [_redact(v) for v in obj]
if isinstance(obj, str):
if obj.startswith("ghp_") or obj.startswith("github_pat_"):
return "<REDACTED>"
return obj
return obj
def _safe_json(payload: Dict[str, Any]) -> str:
payload = _redact(payload)
raw = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
if len(raw.encode("utf-8")) <= _max_bytes():
return json.dumps(payload, ensure_ascii=False, indent=2)
# Truncate: keep only summary + next_steps.
truncated = {
"ok": payload.get("ok", True),
"truncated": True,
"summary": payload.get("summary", "Response exceeded max size; truncated."),
"next_steps": payload.get(
"next_steps",
[
"request a narrower scope (e.g., scopes=['tunnels'])",
"request an export path instead of inline content",
],
),
}
return json.dumps(truncated, ensure_ascii=False, indent=2)
def _mcp_text_result(
payload: Dict[str, Any], *, is_error: bool = False
) -> Dict[str, Any]:
result: Dict[str, Any] = {
"content": [{"type": "text", "text": _safe_json(payload)}]
}
if is_error:
result["isError"] = True
return result
def _default_state_dir() -> Path:
return _repo_root() / "archive_runtime" / "cloudflare_mcp"
class CloudflareSafeTools:
def __init__(self) -> None:
self.store = SnapshotStore(
Path(os.getenv("VM_CF_MCP_STATE_DIR") or _default_state_dir())
)
def cf_snapshot(
self,
*,
scopes: Optional[Sequence[str]] = None,
zone_id: Optional[str] = None,
zone_name: Optional[str] = None,
dns_max_pages: int = 1,
) -> Dict[str, Any]:
scopes_use = list(scopes or ["tunnels", "access_apps"])
ctx = CloudflareContext.from_env()
client = CloudflareClient(api_token=ctx.api_token)
meta, snapshot = self.store.create_snapshot(
client=client,
ctx=ctx,
scopes=scopes_use,
zone_id=zone_id,
zone_name=zone_name,
dns_max_pages=dns_max_pages,
)
summary = (
f"Snapshot {meta.snapshot_id} captured "
f"(scopes={','.join(meta.scopes)}) and written to {meta.snapshot_path}."
)
return {
"ok": True,
"summary": summary,
"data": {
"snapshot_id": meta.snapshot_id,
"created_at": meta.created_at,
"scopes": meta.scopes,
"snapshot_path": meta.snapshot_path,
"counts": {
"zones": len(snapshot.get("zones") or []),
"tunnels": len(snapshot.get("tunnels") or []),
"access_apps": len(snapshot.get("access_apps") or []),
},
},
"truncated": False,
"next_steps": [
"cf_config_diff(from_snapshot_id=..., to_snapshot_id=...)",
"cf_export_config(full=false, snapshot_id=...)",
],
}
def cf_refresh(
self,
*,
snapshot_id: str,
scopes: Optional[Sequence[str]] = None,
dns_max_pages: int = 1,
) -> Dict[str, Any]:
before_meta = self.store.get(snapshot_id)
before = self.store.load_snapshot(snapshot_id)
scopes_use = list(scopes or (before.get("meta", {}).get("scopes") or []))
ctx = CloudflareContext.from_env()
client = CloudflareClient(api_token=ctx.api_token)
meta, _snapshot = self.store.create_snapshot(
client=client,
ctx=ctx,
scopes=scopes_use,
zone_id=(before.get("dns") or {}).get("zone_id"),
zone_name=(before.get("dns") or {}).get("zone_name"),
dns_max_pages=dns_max_pages,
)
return {
"ok": True,
"summary": f"Refreshed {before_meta.snapshot_id} -> {meta.snapshot_id} (scopes={','.join(meta.scopes)}).",
"data": {
"from_snapshot_id": before_meta.snapshot_id,
"to_snapshot_id": meta.snapshot_id,
"snapshot_path": meta.snapshot_path,
},
"truncated": False,
"next_steps": [
"cf_config_diff(from_snapshot_id=..., to_snapshot_id=...)",
],
}
def cf_config_diff(
self,
*,
from_snapshot_id: str,
to_snapshot_id: str,
scopes: Optional[Sequence[str]] = None,
) -> Dict[str, Any]:
diff = self.store.diff(
from_snapshot_id=from_snapshot_id,
to_snapshot_id=to_snapshot_id,
scopes=scopes,
)
# Keep the response small; point to diff_path for full detail.
changes = diff.get("changes") or {}
counts = {
scope: (changes.get(scope) or {}).get("counts")
for scope in sorted(changes.keys())
}
return {
"ok": True,
"summary": f"Diff computed and written to {diff.get('diff_path')}.",
"data": {
"from_snapshot_id": from_snapshot_id,
"to_snapshot_id": to_snapshot_id,
"scopes": diff.get("scopes"),
"counts": counts,
"diff_path": diff.get("diff_path"),
},
"truncated": False,
"next_steps": [
"Use filesystem MCP to open diff_path for full details",
"Run cf_export_config(full=false, snapshot_id=...) for a safe export path",
],
}
def cf_export_config(
self,
*,
snapshot_id: Optional[str] = None,
full: bool = False,
scopes: Optional[Sequence[str]] = None,
) -> Dict[str, Any]:
if snapshot_id is None:
snap = self.cf_snapshot(scopes=scopes)
snapshot_id = str((snap.get("data") or {}).get("snapshot_id"))
meta = self.store.get(snapshot_id)
if not full:
return {
"ok": True,
"summary": "Export is summary-first; full config requires full=true.",
"data": {
"snapshot_id": meta.snapshot_id,
"snapshot_path": meta.snapshot_path,
},
"truncated": False,
"next_steps": [
"Use filesystem MCP to open snapshot_path",
"If you truly need inline data, call cf_export_config(full=true, snapshot_id=...)",
],
}
snapshot = self.store.load_snapshot(snapshot_id)
return {
"ok": True,
"summary": "Full snapshot export (redacted + size-capped). Prefer snapshot_path for large data.",
"data": snapshot,
"truncated": False,
"next_steps": [
f"Snapshot file: {meta.snapshot_path}",
],
}
def cf_tunnel_status(
self,
*,
snapshot_id: Optional[str] = None,
tunnel_name: Optional[str] = None,
tunnel_id: Optional[str] = None,
) -> Dict[str, Any]:
if snapshot_id:
snap = self.store.load_snapshot(snapshot_id)
tunnels = snap.get("tunnels") or []
else:
snap = self.cf_snapshot(scopes=["tunnels"])
sid = str((snap.get("data") or {}).get("snapshot_id"))
tunnels = self.store.load_snapshot(sid).get("tunnels") or []
def matches(t: Dict[str, Any]) -> bool:
if tunnel_id and str(t.get("id")) != str(tunnel_id):
return False
if tunnel_name and str(t.get("name")) != str(tunnel_name):
return False
return True
filtered = [t for t in tunnels if isinstance(t, dict) and matches(t)]
if not filtered and (tunnel_id or tunnel_name):
return {
"ok": False,
"summary": "Tunnel not found in snapshot.",
"data": {"tunnel_id": tunnel_id, "tunnel_name": tunnel_name},
"truncated": False,
"next_steps": ["Call cf_snapshot(scopes=['tunnels']) and retry."],
}
connectors = [t.get("connector_count") for t in filtered if isinstance(t, dict)]
connectors = [c for c in connectors if isinstance(c, int)]
return {
"ok": True,
"summary": f"Returned {len(filtered)} tunnel(s).",
"data": {
"tunnels": [
{
"id": t.get("id"),
"name": t.get("name"),
"status": t.get("status"),
"connector_count": t.get("connector_count"),
"last_seen": t.get("last_seen"),
}
for t in filtered
],
"connectors_total": sum(connectors) if connectors else 0,
},
"truncated": False,
"next_steps": [
"For local ingress hostnames, use cf_tunnel_ingress_summary(config_path='/etc/cloudflared/config.yml')",
],
}
def cf_tunnel_ingress_summary(
self,
*,
config_path: str = "/etc/cloudflared/config.yml",
full: bool = False,
max_rules: int = 50,
) -> Dict[str, Any]:
summary = ingress_summary_from_file(
config_path=config_path, max_rules=max_rules
)
if not full:
return {
"ok": True,
"summary": f"Parsed ingress hostnames from {config_path}.",
"data": {
"config_path": summary["config_path"],
"ingress_rule_count": summary["ingress_rule_count"],
"hostnames": summary["hostnames"],
"truncated": summary["truncated"],
},
"truncated": False,
"next_steps": [
"Call cf_tunnel_ingress_summary(full=true, ...) to include service mappings (still capped).",
],
}
return {
"ok": True,
"summary": f"Ingress summary (full=true) for {config_path}.",
"data": summary,
"truncated": False,
"next_steps": [],
}
def cf_access_policy_list(
self,
*,
app_id: Optional[str] = None,
) -> Dict[str, Any]:
ctx = CloudflareContext.from_env()
client = CloudflareClient(api_token=ctx.api_token)
if not app_id:
apps = client.list_access_apps(ctx.account_id)
apps_min = [
{
"id": a.get("id"),
"name": a.get("name"),
"domain": a.get("domain"),
"type": a.get("type"),
}
for a in apps
]
return {
"ok": True,
"summary": f"Returned {len(apps_min)} Access app(s). Provide app_id to list policies.",
"data": {"apps": apps_min},
"truncated": False,
"next_steps": [
"Call cf_access_policy_list(app_id=...)",
],
}
policies = client.list_access_policies(ctx.account_id, app_id)
policies_min = [
{
"id": p.get("id"),
"name": p.get("name"),
"decision": p.get("decision"),
"precedence": p.get("precedence"),
}
for p in policies
]
return {
"ok": True,
"summary": f"Returned {len(policies_min)} policy/policies for app_id={app_id}.",
"data": {"app_id": app_id, "policies": policies_min},
"truncated": False,
"next_steps": [],
}
TOOLS: List[Dict[str, Any]] = [
{
"name": "cf_snapshot",
"description": "Create a summary-first Cloudflare state snapshot (writes JSON to disk; returns snapshot_id + paths).",
"inputSchema": {
"type": "object",
"properties": {
"scopes": {
"type": "array",
"items": {"type": "string"},
"description": "Scopes to fetch (default: ['tunnels','access_apps']). Supported: zones,tunnels,access_apps,dns",
},
"zone_id": {"type": "string"},
"zone_name": {"type": "string"},
"dns_max_pages": {"type": "integer", "default": 1},
},
},
},
{
"name": "cf_refresh",
"description": "Refresh a prior snapshot (creates a new snapshot_id).",
"inputSchema": {
"type": "object",
"properties": {
"snapshot_id": {"type": "string"},
"scopes": {"type": "array", "items": {"type": "string"}},
"dns_max_pages": {"type": "integer", "default": 1},
},
"required": ["snapshot_id"],
},
},
{
"name": "cf_config_diff",
"description": "Diff two snapshots (summary counts inline; full diff written to disk).",
"inputSchema": {
"type": "object",
"properties": {
"from_snapshot_id": {"type": "string"},
"to_snapshot_id": {"type": "string"},
"scopes": {"type": "array", "items": {"type": "string"}},
},
"required": ["from_snapshot_id", "to_snapshot_id"],
},
},
{
"name": "cf_export_config",
"description": "Export snapshot config. Defaults to summary-only; full=true returns redacted + size-capped data.",
"inputSchema": {
"type": "object",
"properties": {
"snapshot_id": {"type": "string"},
"full": {"type": "boolean", "default": False},
"scopes": {"type": "array", "items": {"type": "string"}},
},
},
},
{
"name": "cf_tunnel_status",
"description": "Return tunnel status summary (connector count, last seen).",
"inputSchema": {
"type": "object",
"properties": {
"snapshot_id": {"type": "string"},
"tunnel_name": {"type": "string"},
"tunnel_id": {"type": "string"},
},
},
},
{
"name": "cf_tunnel_ingress_summary",
"description": "Parse cloudflared ingress hostnames from a local config file (never dumps full YAML unless full=true, still capped).",
"inputSchema": {
"type": "object",
"properties": {
"config_path": {
"type": "string",
"default": "/etc/cloudflared/config.yml",
},
"full": {"type": "boolean", "default": False},
"max_rules": {"type": "integer", "default": 50},
},
},
},
{
"name": "cf_access_policy_list",
"description": "List Access apps, or policies for a specific app_id (summary-only).",
"inputSchema": {
"type": "object",
"properties": {
"app_id": {"type": "string"},
},
},
},
]
class StdioJsonRpc:
def __init__(self) -> None:
self._in = sys.stdin.buffer
self._out = sys.stdout.buffer
self._mode: str | None = None # "headers" | "line"
def read_message(self) -> Optional[Dict[str, Any]]:
while True:
if self._mode == "line":
line = self._in.readline()
if not line:
return None
raw = line.decode("utf-8", "replace").strip()
if not raw:
continue
try:
msg = json.loads(raw)
except Exception:
continue
if isinstance(msg, dict):
return msg
continue
first = self._in.readline()
if not first:
return None
if first in (b"\r\n", b"\n"):
continue
# Auto-detect newline-delimited JSON framing.
if self._mode is None and first.lstrip().startswith(b"{"):
try:
msg = json.loads(first.decode("utf-8", "replace"))
except Exception:
msg = None
if isinstance(msg, dict):
self._mode = "line"
return msg
headers: Dict[str, str] = {}
try:
text = first.decode("utf-8", "replace").strip()
except Exception:
continue
if ":" not in text:
continue
k, v = text.split(":", 1)
headers[k.lower().strip()] = v.strip()
while True:
line = self._in.readline()
if not line:
return None
if line in (b"\r\n", b"\n"):
break
try:
text = line.decode("utf-8", "replace").strip()
except Exception:
continue
if ":" not in text:
continue
k, v = text.split(":", 1)
headers[k.lower().strip()] = v.strip()
if "content-length" not in headers:
return None
try:
length = int(headers["content-length"])
except ValueError:
return None
body = self._in.read(length)
if not body:
return None
self._mode = "headers"
msg = json.loads(body.decode("utf-8", "replace"))
if isinstance(msg, dict):
return msg
return None
def write_message(self, message: Dict[str, Any]) -> None:
if self._mode == "line":
payload = json.dumps(
message, ensure_ascii=False, separators=(",", ":"), default=str
).encode("utf-8")
self._out.write(payload + b"\n")
self._out.flush()
return
body = json.dumps(message, ensure_ascii=False, separators=(",", ":")).encode(
"utf-8"
)
header = f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8")
self._out.write(header)
self._out.write(body)
self._out.flush()
def main() -> None:
tools = CloudflareSafeTools()
rpc = StdioJsonRpc()
handlers: Dict[str, Callable[[Dict[str, Any]], Dict[str, Any]]] = {
"cf_snapshot": lambda a: tools.cf_snapshot(**a),
"cf_refresh": lambda a: tools.cf_refresh(**a),
"cf_config_diff": lambda a: tools.cf_config_diff(**a),
"cf_export_config": lambda a: tools.cf_export_config(**a),
"cf_tunnel_status": lambda a: tools.cf_tunnel_status(**a),
"cf_tunnel_ingress_summary": lambda a: tools.cf_tunnel_ingress_summary(**a),
"cf_access_policy_list": lambda a: tools.cf_access_policy_list(**a),
}
while True:
msg = rpc.read_message()
if msg is None:
return
method = msg.get("method")
msg_id = msg.get("id")
params = msg.get("params") or {}
try:
if method == "initialize":
result = {
"protocolVersion": "2024-11-05",
"serverInfo": {"name": "cloudflare_safe", "version": "0.1.0"},
"capabilities": {"tools": {}},
}
rpc.write_message({"jsonrpc": "2.0", "id": msg_id, "result": result})
continue
if method == "tools/list":
rpc.write_message(
{"jsonrpc": "2.0", "id": msg_id, "result": {"tools": TOOLS}}
)
continue
if method == "tools/call":
tool_name = str(params.get("name") or "")
args = params.get("arguments") or {}
if tool_name not in handlers:
rpc.write_message(
{
"jsonrpc": "2.0",
"id": msg_id,
"result": _mcp_text_result(
{
"ok": False,
"summary": f"Unknown tool: {tool_name}",
"data": {"known_tools": sorted(handlers.keys())},
"truncated": False,
"next_steps": ["Call tools/list"],
},
is_error=True,
),
}
)
continue
try:
payload = handlers[tool_name](args)
rpc.write_message(
{
"jsonrpc": "2.0",
"id": msg_id,
"result": _mcp_text_result(payload),
}
)
except CloudflareError as e:
rpc.write_message(
{
"jsonrpc": "2.0",
"id": msg_id,
"result": _mcp_text_result(
{
"ok": False,
"summary": str(e),
"truncated": False,
"next_steps": [
"Verify CLOUDFLARE_API_TOKEN and CLOUDFLARE_ACCOUNT_ID are set",
"Retry with a narrower scope",
],
},
is_error=True,
),
}
)
except Exception as e: # noqa: BLE001
rpc.write_message(
{
"jsonrpc": "2.0",
"id": msg_id,
"result": _mcp_text_result(
{
"ok": False,
"summary": f"Unhandled error: {e}",
"truncated": False,
"next_steps": ["Retry with a narrower scope"],
},
is_error=True,
),
}
)
continue
# Ignore notifications.
if msg_id is None:
continue
rpc.write_message(
{
"jsonrpc": "2.0",
"id": msg_id,
"result": _mcp_text_result(
{
"ok": False,
"summary": f"Unsupported method: {method}",
"truncated": False,
},
is_error=True,
),
}
)
except Exception as e: # noqa: BLE001
# Last-resort: avoid crashing the server.
if msg_id is not None:
rpc.write_message(
{
"jsonrpc": "2.0",
"id": msg_id,
"result": _mcp_text_result(
{
"ok": False,
"summary": f"fatal error: {e}",
"truncated": False,
},
),
}
)