from __future__ import annotations import hashlib import json import os import re import sys import urllib.error import urllib.parse import urllib.request from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple AKASH_SITE_BASE = "https://akash.network" AKASH_DOCS_BASE = f"{AKASH_SITE_BASE}/docs" AKASH_DOCS_GITHUB_OWNER = "akash-network" AKASH_DOCS_GITHUB_REPO = "website-revamp" AKASH_DOCS_GITHUB_REF_DEFAULT = "main" AKASH_DOCS_GITHUB_DOCS_ROOT = "src/content/Docs" MAX_BYTES_DEFAULT = 32_000 def _repo_root() -> Path: # server.py -> akash_docs -> mcp -> cloudflare -> return Path(__file__).resolve().parents[3] def _utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _max_bytes() -> int: raw = (os.getenv("VM_MCP_MAX_BYTES") or "").strip() if not raw: return MAX_BYTES_DEFAULT try: return max(4_096, int(raw)) except ValueError: return MAX_BYTES_DEFAULT def _sha256_hex(text: str) -> str: return hashlib.sha256(text.encode("utf-8")).hexdigest() def _http_get(url: str, *, timeout: int = 30) -> str: req = urllib.request.Request( url=url, headers={ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "User-Agent": "work-core-akamcp/0.1 (+https://akash.network)", }, ) with urllib.request.urlopen(req, timeout=timeout) as resp: return resp.read().decode("utf-8", "replace") def _normalize_route(route_or_url: str) -> Tuple[str, str]: """ Returns (route, canonical_url). route: "getting-started/what-is-akash" (no leading/trailing slashes) canonical_url: https://akash.network/docs/ """ raw = (route_or_url or "").strip() if not raw: return "", AKASH_DOCS_BASE + "/" if raw.startswith("http://") or raw.startswith("https://"): parsed = urllib.parse.urlparse(raw) path = parsed.path or "" # Normalize to docs route if possible. if path in ("/docs", "/docs/"): return "", AKASH_DOCS_BASE + "/" if path.startswith("/docs/"): route = path[len("/docs/") :].strip("/") return route, f"{AKASH_DOCS_BASE}/{route}" return path.strip("/"), raw # Accept "/docs/..." or "docs/..." route = raw.lstrip("/") if route in ("docs", "docs/"): return "", AKASH_DOCS_BASE + "/" if route.startswith("docs/"): route = route[len("docs/") :] route = route.strip("/") return route, f"{AKASH_DOCS_BASE}/{route}" if route else AKASH_DOCS_BASE + "/" def _strip_frontmatter(markdown: str) -> str: # Remove leading YAML frontmatter: ---\n...\n---\n if not markdown.startswith("---"): return markdown m = re.match(r"^---\s*\n.*?\n---\s*\n", markdown, flags=re.S) if not m: return markdown return markdown[m.end() :] def _github_candidates(route: str) -> List[str]: base = f"{AKASH_DOCS_GITHUB_DOCS_ROOT}/{route}".rstrip("/") candidates = [ f"{base}/index.md", f"{base}/index.mdx", f"{base}.md", f"{base}.mdx", ] # Handle root docs landing if route is empty. if not route: candidates = [ f"{AKASH_DOCS_GITHUB_DOCS_ROOT}/index.md", f"{AKASH_DOCS_GITHUB_DOCS_ROOT}/index.mdx", ] return candidates def _fetch_markdown_from_github(route: str, *, ref: str) -> Tuple[str, str, str]: """ Returns (markdown, raw_url, repo_path) or raises urllib.error.HTTPError. """ last_err: Optional[urllib.error.HTTPError] = None for repo_path in _github_candidates(route): raw_url = ( f"https://raw.githubusercontent.com/{AKASH_DOCS_GITHUB_OWNER}/" f"{AKASH_DOCS_GITHUB_REPO}/{ref}/{repo_path}" ) try: return _http_get(raw_url), raw_url, repo_path except urllib.error.HTTPError as e: if e.code == 404: last_err = e continue raise if last_err: raise last_err raise urllib.error.HTTPError( url="", code=404, msg="Not Found", hdrs=None, fp=None, ) def _extract_article_html(page_html: str) -> str: m = re.search(r"]*>(.*?)", page_html, flags=re.S | re.I) if m: return m.group(1) m = re.search(r"]*>(.*?)", page_html, flags=re.S | re.I) if m: return m.group(1) return page_html def _html_to_text(article_html: str) -> str: # Drop scripts/styles cleaned = re.sub( r"<(script|style)\b[^>]*>.*?", "", article_html, flags=re.S | re.I ) # Preserve code blocks a bit better (Astro uses
for each line) def _pre_repl(match: re.Match[str]) -> str: pre = match.group(0) pre = re.sub(r"
\s*", "\n", pre, flags=re.I) pre = re.sub(r"]*>", "", pre, flags=re.I) pre = re.sub(r"", "\n", pre, flags=re.I) pre = re.sub(r"<[^>]+>", "", pre) return "\n```\n" + _html_unescape(pre).strip() + "\n```\n" cleaned = re.sub(r"]*>.*?", _pre_repl, cleaned, flags=re.S | re.I) # Newlines for common block tags cleaned = re.sub( r"", "\n", cleaned, flags=re.I ) cleaned = re.sub(r"", "\n", cleaned, flags=re.I) cleaned = re.sub(r"]*>", "\n---\n", cleaned, flags=re.I) # Strip remaining tags cleaned = re.sub(r"<[^>]+>", "", cleaned) text = _html_unescape(cleaned) lines = [ln.rstrip() for ln in text.splitlines()] # Collapse excessive blank lines out: List[str] = [] blank = False for ln in lines: if ln.strip() == "": if blank: continue blank = True out.append("") continue blank = False out.append(ln.strip()) return "\n".join(out).strip() def _html_unescape(text: str) -> str: # Avoid importing html module repeatedly; do it lazily. import html as _html # local import to keep global import list small return _html.unescape(text) def _discover_routes_from_docs_index() -> List[str]: html = _http_get(AKASH_DOCS_BASE + "/") hrefs = set(re.findall(r'href=\"(/docs/[^\"#?]+)\"', html)) routes: List[str] = [] for href in sorted(hrefs): route, _url = _normalize_route(href) if route: routes.append(route) return routes @dataclass(frozen=True) class CachedDoc: cache_key: str fetched_at: str source: str route: str url: str ref: str content_path: str class DocStore: def __init__(self, root_dir: Path) -> None: self.root_dir = root_dir self.pages_dir = root_dir / "pages" self.index_path = root_dir / "index.json" self.pages_dir.mkdir(parents=True, exist_ok=True) self._index: Dict[str, Dict[str, Any]] = {} if self.index_path.exists(): try: self._index = json.loads(self.index_path.read_text(encoding="utf-8")) except Exception: self._index = {} def _write_index(self) -> None: tmp = self.index_path.with_suffix(".tmp") tmp.write_text( json.dumps(self._index, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) tmp.replace(self.index_path) def get(self, cache_key: str) -> Optional[CachedDoc]: raw = self._index.get(cache_key) if not raw: return None path = Path(raw.get("content_path") or "") if not path.exists(): return None return CachedDoc( cache_key=cache_key, fetched_at=str(raw.get("fetched_at") or ""), source=str(raw.get("source") or ""), route=str(raw.get("route") or ""), url=str(raw.get("url") or ""), ref=str(raw.get("ref") or ""), content_path=str(path), ) def save( self, *, cache_key: str, source: str, route: str, url: str, ref: str, content: str, ) -> CachedDoc: content_hash = _sha256_hex(f"{source}:{ref}:{url}")[:20] path = self.pages_dir / f"{content_hash}.txt" path.write_text(content, encoding="utf-8") entry = { "fetched_at": _utc_now_iso(), "source": source, "route": route, "url": url, "ref": ref, "content_path": str(path), } self._index[cache_key] = entry self._write_index() return self.get(cache_key) or CachedDoc( cache_key=cache_key, fetched_at=entry["fetched_at"], source=source, route=route, url=url, ref=ref, content_path=str(path), ) def _default_state_dir() -> Path: return _repo_root() / "archive_runtime" / "akash_docs_mcp" def _truncate_to_max_bytes(text: str, *, max_bytes: int) -> Tuple[str, bool]: blob = text.encode("utf-8") if len(blob) <= max_bytes: return text, False # Reserve a bit for the truncation notice reserve = min(512, max_bytes // 10) head = blob[: max(0, max_bytes - reserve)].decode("utf-8", "replace") head = head.rstrip() + "\n\n[TRUNCATED: response exceeded VM_MCP_MAX_BYTES]\n" return head, True def _mcp_text_result(text: str, *, is_error: bool = False) -> Dict[str, Any]: text, _truncated = _truncate_to_max_bytes(text, max_bytes=_max_bytes()) result: Dict[str, Any] = {"content": [{"type": "text", "text": text}]} if is_error: result["isError"] = True return result class AkashDocsTools: def __init__(self) -> None: state_dir = Path(os.getenv("VM_AKASH_DOCS_MCP_STATE_DIR") or _default_state_dir()) self.store = DocStore(state_dir) def akash_docs_list_routes(self) -> Dict[str, Any]: routes = _discover_routes_from_docs_index() return { "ok": True, "summary": f"Discovered {len(routes)} docs route(s) from {AKASH_DOCS_BASE}/.", "data": {"routes": routes}, "next_steps": ["akash_docs_fetch(route_or_url=...)"], } def akash_docs_fetch( self, *, route_or_url: str, source: str = "auto", ref: str = AKASH_DOCS_GITHUB_REF_DEFAULT, max_chars: int = 12_000, refresh: bool = False, strip_frontmatter: bool = True, ) -> Dict[str, Any]: route, canonical_url = _normalize_route(route_or_url) source_norm = (source or "auto").strip().lower() if source_norm not in ("auto", "github", "site"): raise ValueError("source must be one of: auto, github, site") max_chars_int = max(0, int(max_chars)) # Avoid flooding clients; open content_path for full content. max_chars_int = min(max_chars_int, max(2_000, _max_bytes() - 8_000)) cache_key = f"{source_norm}:{ref}:{route or canonical_url}" cached = self.store.get(cache_key) if cached and not refresh: content = Path(cached.content_path).read_text(encoding="utf-8") if strip_frontmatter and cached.source == "github": content = _strip_frontmatter(content) truncated = len(content) > max_chars_int return { "ok": True, "summary": "Returned cached docs content.", "data": { "source": cached.source, "route": cached.route, "url": cached.url, "ref": cached.ref, "cached": True, "fetched_at": cached.fetched_at, "content": content[:max_chars_int], "truncated": truncated, "content_path": cached.content_path, }, "next_steps": ["Set refresh=true to refetch."], } attempted: List[Dict[str, Any]] = [] def _try_github() -> Optional[Tuple[str, str, str]]: try: md, raw_url, repo_path = _fetch_markdown_from_github(route, ref=ref) return md, raw_url, repo_path except urllib.error.HTTPError as e: attempted.append({"source": "github", "status": getattr(e, "code", None), "detail": str(e)}) return None def _try_site() -> Optional[Tuple[str, str]]: try: html = _http_get(canonical_url) article = _extract_article_html(html) text = _html_to_text(article) return text, canonical_url except urllib.error.HTTPError as e: attempted.append({"source": "site", "status": getattr(e, "code", None), "detail": str(e)}) return None content: str final_source: str final_url: str extra: Dict[str, Any] = {} if source_norm in ("auto", "github"): gh = _try_github() if gh: content, final_url, repo_path = gh final_source = "github" extra["repo_path"] = repo_path elif source_norm == "github": raise ValueError("GitHub fetch failed; try source='site' or verify the route/ref.") else: site = _try_site() if not site: raise ValueError(f"Fetch failed for route_or_url={route_or_url!r}. Attempts: {attempted}") content, final_url = site final_source = "site" else: site = _try_site() if not site: raise ValueError(f"Site fetch failed for route_or_url={route_or_url!r}. Attempts: {attempted}") content, final_url = site final_source = "site" cached_doc = self.store.save( cache_key=cache_key, source=final_source, route=route, url=final_url, ref=ref, content=content, ) content_view = content if strip_frontmatter and final_source == "github": content_view = _strip_frontmatter(content_view) truncated = len(content_view) > max_chars_int content_out = content_view[:max_chars_int] return { "ok": True, "summary": f"Fetched docs via {final_source}.", "data": { "source": final_source, "route": route, "url": final_url, "ref": ref, "cached": False, "fetched_at": cached_doc.fetched_at, "content": content_out, "truncated": truncated, "content_path": cached_doc.content_path, "attempts": attempted, **extra, }, "next_steps": [ "akash_docs_search(query=..., refresh=false)", ], } def akash_docs_search( self, *, query: str, limit: int = 10, refresh: bool = False, ref: str = AKASH_DOCS_GITHUB_REF_DEFAULT, ) -> Dict[str, Any]: q = (query or "").strip() if not q: raise ValueError("query is required") limit = max(1, min(50, int(limit))) routes = _discover_routes_from_docs_index() hits: List[Dict[str, Any]] = [] for route in routes: doc = self.akash_docs_fetch( route_or_url=route, source="github", ref=ref, max_chars=0, # search reads full content from content_path refresh=refresh, strip_frontmatter=True, ) data = doc.get("data") or {} content_path = data.get("content_path") if not content_path: continue try: content = Path(str(content_path)).read_text(encoding="utf-8") content = _strip_frontmatter(content) except Exception: continue idx = content.lower().find(q.lower()) if idx == -1: continue start = max(0, idx - 80) end = min(len(content), idx + 160) snippet = content[start:end].replace("\n", " ").strip() hits.append( { "route": route, "url": data.get("url"), "source": data.get("source"), "snippet": snippet, } ) if len(hits) >= limit: break return { "ok": True, "summary": f"Found {len(hits)} hit(s) across {len(routes)} route(s).", "data": {"query": q, "hits": hits, "routes_searched": len(routes)}, "next_steps": ["akash_docs_fetch(route_or_url=hits[0].route)"], } def akash_sdl_snippet( self, *, service_name: str, container_image: str, port: int, cpu_units: float = 0.5, memory_size: str = "512Mi", storage_size: str = "512Mi", denom: str = "uakt", price_amount: int = 100, ) -> Dict[str, Any]: svc = (service_name or "").strip() img = (container_image or "").strip() if not svc: raise ValueError("service_name is required") if not img: raise ValueError("container_image is required") port_int = int(port) if port_int <= 0 or port_int > 65535: raise ValueError("port must be 1..65535") sdl = f"""version: \"2.0\" services: {svc}: image: {img} expose: - port: {port_int} to: - global: true profiles: compute: {svc}: resources: cpu: units: {cpu_units} memory: size: {memory_size} storage: size: {storage_size} placement: akash: pricing: {svc}: denom: {denom} amount: {int(price_amount)} deployment: {svc}: akash: profile: {svc} count: 1 """ return { "ok": True, "summary": "Generated an Akash SDL template.", "data": { "service_name": svc, "container_image": img, "port": port_int, "sdl": sdl, }, "next_steps": [ "Save as deploy.yaml and deploy via Akash Console or akash CLI.", ], } TOOLS: List[Dict[str, Any]] = [ { "name": "akash_docs_list_routes", "description": "Discover common Akash docs routes by scraping https://akash.network/docs/ (SSR HTML).", "inputSchema": {"type": "object", "properties": {}}, }, { "name": "akash_docs_fetch", "description": "Fetch an Akash docs page (prefers GitHub markdown in akash-network/website-revamp; falls back to site HTML).", "inputSchema": { "type": "object", "properties": { "route_or_url": {"type": "string"}, "source": { "type": "string", "description": "auto|github|site", "default": "auto", }, "ref": {"type": "string", "default": AKASH_DOCS_GITHUB_REF_DEFAULT}, "max_chars": {"type": "integer", "default": 12000}, "refresh": {"type": "boolean", "default": False}, "strip_frontmatter": {"type": "boolean", "default": True}, }, "required": ["route_or_url"], }, }, { "name": "akash_docs_search", "description": "Keyword search across routes discovered from /docs (fetches + caches GitHub markdown).", "inputSchema": { "type": "object", "properties": { "query": {"type": "string"}, "limit": {"type": "integer", "default": 10}, "refresh": {"type": "boolean", "default": False}, "ref": {"type": "string", "default": AKASH_DOCS_GITHUB_REF_DEFAULT}, }, "required": ["query"], }, }, { "name": "akash_sdl_snippet", "description": "Generate a minimal Akash SDL manifest for a single service exposing one port.", "inputSchema": { "type": "object", "properties": { "service_name": {"type": "string"}, "container_image": {"type": "string"}, "port": {"type": "integer"}, "cpu_units": {"type": "number", "default": 0.5}, "memory_size": {"type": "string", "default": "512Mi"}, "storage_size": {"type": "string", "default": "512Mi"}, "denom": {"type": "string", "default": "uakt"}, "price_amount": {"type": "integer", "default": 100}, }, "required": ["service_name", "container_image", "port"], }, }, ] class StdioJsonRpc: def __init__(self) -> None: self._in = sys.stdin.buffer self._out = sys.stdout.buffer self._mode: str | None = None # "headers" | "line" def read_message(self) -> Optional[Dict[str, Any]]: while True: if self._mode == "line": line = self._in.readline() if not line: return None raw = line.decode("utf-8", "replace").strip() if not raw: continue try: msg = json.loads(raw) except Exception: continue if isinstance(msg, dict): return msg continue first = self._in.readline() if not first: return None if first in (b"\r\n", b"\n"): continue # Auto-detect newline-delimited JSON framing. if self._mode is None and first.lstrip().startswith(b"{"): try: msg = json.loads(first.decode("utf-8", "replace")) except Exception: msg = None if isinstance(msg, dict): self._mode = "line" return msg headers: Dict[str, str] = {} try: text = first.decode("utf-8", "replace").strip() except Exception: continue if ":" not in text: continue k, v = text.split(":", 1) headers[k.lower().strip()] = v.strip() while True: line = self._in.readline() if not line: return None if line in (b"\r\n", b"\n"): break try: text = line.decode("utf-8", "replace").strip() except Exception: continue if ":" not in text: continue k, v = text.split(":", 1) headers[k.lower().strip()] = v.strip() if "content-length" not in headers: return None try: length = int(headers["content-length"]) except ValueError: return None body = self._in.read(length) if not body: return None self._mode = "headers" msg = json.loads(body.decode("utf-8", "replace")) if isinstance(msg, dict): return msg return None def write_message(self, message: Dict[str, Any]) -> None: if self._mode == "line": payload = json.dumps( message, ensure_ascii=False, separators=(",", ":"), default=str ).encode("utf-8") self._out.write(payload + b"\n") self._out.flush() return body = json.dumps(message, ensure_ascii=False, separators=(",", ":")).encode( "utf-8" ) header = f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8") self._out.write(header) self._out.write(body) self._out.flush() def main() -> None: tools = AkashDocsTools() rpc = StdioJsonRpc() handlers: Dict[str, Callable[[Dict[str, Any]], Dict[str, Any]]] = { "akash_docs_list_routes": lambda a: tools.akash_docs_list_routes(), "akash_docs_fetch": lambda a: tools.akash_docs_fetch(**a), "akash_docs_search": lambda a: tools.akash_docs_search(**a), "akash_sdl_snippet": lambda a: tools.akash_sdl_snippet(**a), } while True: msg = rpc.read_message() if msg is None: return method = msg.get("method") msg_id = msg.get("id") params = msg.get("params") or {} try: if method == "initialize": result = { "protocolVersion": "2024-11-05", "serverInfo": {"name": "akash_docs", "version": "0.1.0"}, "capabilities": {"tools": {}}, } rpc.write_message({"jsonrpc": "2.0", "id": msg_id, "result": result}) continue if method == "tools/list": rpc.write_message( {"jsonrpc": "2.0", "id": msg_id, "result": {"tools": TOOLS}} ) continue if method == "tools/call": tool_name = str(params.get("name") or "") args = params.get("arguments") or {} if tool_name not in handlers: rpc.write_message( { "jsonrpc": "2.0", "id": msg_id, "result": _mcp_text_result( f"Unknown tool: {tool_name}\nKnown tools: {', '.join(sorted(handlers.keys()))}", is_error=True, ), } ) continue try: payload = handlers[tool_name](args) # Split payload: meta JSON + optional raw content. # If payload["data"]["content"] exists, emit it as a second text block for readability. data = payload.get("data") if isinstance(payload, dict) else None content_text = None if isinstance(data, dict) and isinstance(data.get("content"), str): content_text = data["content"] data = dict(data) data.pop("content", None) payload = dict(payload) payload["data"] = data blocks = [json.dumps(payload, ensure_ascii=False, indent=2)] if content_text: blocks.append(content_text) result: Dict[str, Any] = { "content": [{"type": "text", "text": b} for b in blocks] } rpc.write_message({"jsonrpc": "2.0", "id": msg_id, "result": result}) except Exception as e: # noqa: BLE001 rpc.write_message( { "jsonrpc": "2.0", "id": msg_id, "result": _mcp_text_result( f"Error: {e}", is_error=True, ), } ) continue # Ignore notifications. if msg_id is None: continue rpc.write_message( { "jsonrpc": "2.0", "id": msg_id, "result": _mcp_text_result( f"Unsupported method: {method}", is_error=True, ), } ) except Exception as e: # noqa: BLE001 # Last-resort: avoid crashing the server. if msg_id is not None: rpc.write_message( { "jsonrpc": "2.0", "id": msg_id, "result": _mcp_text_result(f"fatal error: {e}", is_error=True), } )