492 lines
15 KiB
Python
492 lines
15 KiB
Python
"""
|
|
Cognitive MCP Tools - Claude as VaultMesh Cognitive Organ
|
|
|
|
These tools enable Claude to operate as the 7th Organ of VaultMesh:
|
|
- Reason over mesh state with full context
|
|
- Make attested decisions with Ed25519 proofs
|
|
- Invoke Tem for threat transmutation
|
|
- Persist memory across sessions via CRDT realm
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import secrets
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Optional, List, Dict
|
|
|
|
import blake3
|
|
|
|
# VaultMesh root from env or default
|
|
VAULTMESH_ROOT = Path(os.environ.get("VAULTMESH_ROOT", Path(__file__).parents[3])).resolve()
|
|
RECEIPTS_ROOT = VAULTMESH_ROOT / "receipts"
|
|
COGNITIVE_REALM = VAULTMESH_ROOT / "realms" / "cognitive"
|
|
|
|
|
|
def _vmhash_blake3(data: bytes) -> str:
|
|
"""VaultMesh hash: blake3:<hex>."""
|
|
return f"blake3:{blake3.blake3(data).hexdigest()}"
|
|
|
|
|
|
def _now_iso() -> str:
|
|
"""Current UTC timestamp in ISO format."""
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _emit_cognitive_receipt(receipt_type: str, body: dict, scroll: str = "cognitive") -> dict:
|
|
"""Emit a receipt for cognitive operations."""
|
|
scroll_path = RECEIPTS_ROOT / scroll / f"{scroll}_events.jsonl"
|
|
scroll_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
receipt = {
|
|
"schema_version": "2.0.0",
|
|
"type": receipt_type,
|
|
"timestamp": _now_iso(),
|
|
"scroll": scroll,
|
|
"tags": ["cognitive", receipt_type],
|
|
"root_hash": _vmhash_blake3(json.dumps(body, sort_keys=True).encode()),
|
|
"body": body,
|
|
}
|
|
|
|
with open(scroll_path, "a") as f:
|
|
f.write(json.dumps(receipt) + "\n")
|
|
|
|
return receipt
|
|
|
|
|
|
def _load_json_file(path: Path) -> dict:
|
|
"""Load JSON file, return empty dict if not exists."""
|
|
if path.exists():
|
|
with open(path, "r") as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
|
|
def _save_json_file(path: Path, data: dict) -> None:
|
|
"""Save dict to JSON file."""
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(path, "w") as f:
|
|
json.dump(data, f, indent=2, sort_keys=True)
|
|
|
|
|
|
# =============================================================================
|
|
# COGNITIVE TOOLS - The 8 Tools of AI Reasoning
|
|
# =============================================================================
|
|
|
|
|
|
def cognitive_context(
|
|
include: Optional[List[str]] = None,
|
|
session_id: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Read current VaultMesh context for AI reasoning.
|
|
|
|
Aggregates state from multiple organs to provide Claude with
|
|
full situational awareness for decision-making.
|
|
"""
|
|
if include is None:
|
|
include = ["alerts", "health", "receipts", "threats", "treasury", "governance", "memory"]
|
|
|
|
context = {
|
|
"timestamp": _now_iso(),
|
|
"session_id": session_id,
|
|
"vaultmesh_root": str(VAULTMESH_ROOT),
|
|
}
|
|
|
|
if "alerts" in include:
|
|
alerts_path = RECEIPTS_ROOT / "mesh" / "alerts.json"
|
|
context["alerts"] = _load_json_file(alerts_path).get("active", [])
|
|
|
|
if "health" in include:
|
|
health = {"status": "operational", "organs": {}}
|
|
for organ in ["guardian", "treasury", "mesh", "identity", "observability"]:
|
|
organ_path = RECEIPTS_ROOT / organ
|
|
health["organs"][organ] = {
|
|
"exists": organ_path.exists(),
|
|
"receipt_count": len(list(organ_path.glob("*.jsonl"))) if organ_path.exists() else 0,
|
|
}
|
|
context["health"] = health
|
|
|
|
if "receipts" in include:
|
|
recent = {}
|
|
for scroll in ["guardian", "treasury", "mesh", "cognitive"]:
|
|
jsonl_path = RECEIPTS_ROOT / scroll / f"{scroll}_events.jsonl"
|
|
if jsonl_path.exists():
|
|
lines = jsonl_path.read_text().strip().split("\n")[-10:]
|
|
recent[scroll] = [json.loads(line) for line in lines if line]
|
|
context["recent_receipts"] = recent
|
|
|
|
if "threats" in include:
|
|
threats_path = RECEIPTS_ROOT / "offsec" / "threats.json"
|
|
context["threats"] = _load_json_file(threats_path).get("active", [])
|
|
|
|
if "treasury" in include:
|
|
budgets_path = RECEIPTS_ROOT / "treasury" / "budgets.json"
|
|
context["treasury"] = _load_json_file(budgets_path)
|
|
|
|
if "governance" in include:
|
|
governance_path = VAULTMESH_ROOT / "constitution" / "active_proposals.json"
|
|
context["governance"] = _load_json_file(governance_path)
|
|
|
|
if "memory" in include and session_id:
|
|
memory_path = COGNITIVE_REALM / "memory" / session_id / "context.json"
|
|
context["memory"] = _load_json_file(memory_path)
|
|
|
|
return context
|
|
|
|
|
|
def cognitive_decide(
|
|
reasoning_chain: List[str],
|
|
decision: str,
|
|
confidence: float,
|
|
evidence: Optional[List[str]] = None,
|
|
operator_did: str = "did:vm:cognitive:claude",
|
|
auto_action_threshold: float = 0.95,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Submit a reasoned decision with cryptographic attestation.
|
|
"""
|
|
if not 0.0 <= confidence <= 1.0:
|
|
return {"error": "Confidence must be between 0.0 and 1.0"}
|
|
|
|
if not reasoning_chain:
|
|
return {"error": "Reasoning chain cannot be empty"}
|
|
|
|
decision_id = f"dec_{secrets.token_hex(8)}"
|
|
reasoning_hash = _vmhash_blake3(json.dumps(reasoning_chain).encode())
|
|
|
|
body = {
|
|
"decision_id": decision_id,
|
|
"operator_did": operator_did,
|
|
"decision_type": decision,
|
|
"confidence": confidence,
|
|
"reasoning_hash": reasoning_hash,
|
|
"reasoning_chain": reasoning_chain,
|
|
"evidence": evidence or [],
|
|
"auto_approved": confidence >= auto_action_threshold,
|
|
"requires_governance": decision in ["treasury_large", "governance_change", "mesh_restructure"],
|
|
}
|
|
|
|
receipt = _emit_cognitive_receipt("cognitive_decision", body)
|
|
|
|
required_approvals = []
|
|
if body["requires_governance"]:
|
|
required_approvals.append("governance_vote")
|
|
if not body["auto_approved"]:
|
|
required_approvals.append("operator_confirmation")
|
|
|
|
execution_plan = []
|
|
if decision == "invoke_tem":
|
|
execution_plan = [
|
|
{"step": 1, "action": "validate_threat", "tool": "shield_status"},
|
|
{"step": 2, "action": "invoke_transmutation", "tool": "cognitive_invoke_tem"},
|
|
{"step": 3, "action": "deploy_capability", "tool": "mesh_deploy"},
|
|
{"step": 4, "action": "attest_outcome", "tool": "cognitive_attest"},
|
|
]
|
|
elif decision == "alert":
|
|
execution_plan = [
|
|
{"step": 1, "action": "emit_alert", "tool": "mesh_alert"},
|
|
{"step": 2, "action": "notify_operators", "tool": "notify"},
|
|
]
|
|
|
|
return {
|
|
"success": True,
|
|
"decision_id": decision_id,
|
|
"receipt": receipt,
|
|
"auto_approved": body["auto_approved"],
|
|
"required_approvals": required_approvals,
|
|
"execution_plan": execution_plan,
|
|
"message": f"Decision {decision_id} recorded with confidence {confidence:.2%}",
|
|
}
|
|
|
|
|
|
def cognitive_invoke_tem(
|
|
threat_type: str,
|
|
threat_id: str,
|
|
target: str,
|
|
evidence: List[str],
|
|
recommended_transmutation: Optional[str] = None,
|
|
operator_did: str = "did:vm:cognitive:claude",
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Invoke Tem (Guardian) with AI-detected threat pattern.
|
|
Transmutes threats into defensive capabilities.
|
|
"""
|
|
invocation_id = f"tem_{secrets.token_hex(8)}"
|
|
|
|
transmutations = {
|
|
"replay_attack": "strict_monotonic_sequence_validator",
|
|
"intrusion": "adaptive_firewall_rule",
|
|
"anomaly": "behavioral_baseline_enforcer",
|
|
"credential_stuffing": "rate_limiter_with_lockout",
|
|
"data_exfiltration": "egress_filter_policy",
|
|
"privilege_escalation": "capability_constraint_enforcer",
|
|
}
|
|
|
|
transmutation = recommended_transmutation or transmutations.get(threat_type, "generic_threat_mitigator")
|
|
|
|
body = {
|
|
"invocation_id": invocation_id,
|
|
"operator_did": operator_did,
|
|
"threat_type": threat_type,
|
|
"threat_id": threat_id,
|
|
"target": target,
|
|
"evidence": evidence,
|
|
"transmutation": transmutation,
|
|
"status": "transmuted",
|
|
}
|
|
|
|
receipt = _emit_cognitive_receipt("tem_invocation", body)
|
|
|
|
capability = {
|
|
"capability_id": f"cap_{secrets.token_hex(8)}",
|
|
"name": transmutation,
|
|
"forged_from": threat_id,
|
|
"forged_at": _now_iso(),
|
|
"scope": target,
|
|
}
|
|
|
|
caps_path = RECEIPTS_ROOT / "mesh" / "capabilities.json"
|
|
caps = _load_json_file(caps_path)
|
|
caps[capability["capability_id"]] = capability
|
|
_save_json_file(caps_path, caps)
|
|
|
|
return {
|
|
"success": True,
|
|
"invocation_id": invocation_id,
|
|
"receipt": receipt,
|
|
"capability": capability,
|
|
"message": f"Threat {threat_id} transmuted into {transmutation}",
|
|
}
|
|
|
|
|
|
def cognitive_memory_get(
|
|
key: str,
|
|
session_id: Optional[str] = None,
|
|
realm: str = "memory",
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Query conversation/reasoning memory from CRDT realm.
|
|
"""
|
|
if session_id:
|
|
memory_path = COGNITIVE_REALM / realm / session_id / f"{key.replace('/', '_')}.json"
|
|
else:
|
|
memory_path = COGNITIVE_REALM / realm / f"{key.replace('/', '_')}.json"
|
|
|
|
value = _load_json_file(memory_path)
|
|
|
|
return {
|
|
"key": key,
|
|
"session_id": session_id,
|
|
"realm": realm,
|
|
"value": value,
|
|
"exists": memory_path.exists(),
|
|
"path": str(memory_path),
|
|
}
|
|
|
|
|
|
def cognitive_memory_set(
|
|
key: str,
|
|
value: Dict[str, Any],
|
|
session_id: Optional[str] = None,
|
|
realm: str = "memory",
|
|
merge: bool = True,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Store reasoning artifacts for future sessions.
|
|
Uses CRDT-style merge for concurrent update safety.
|
|
"""
|
|
if session_id:
|
|
memory_path = COGNITIVE_REALM / realm / session_id / f"{key.replace('/', '_')}.json"
|
|
else:
|
|
memory_path = COGNITIVE_REALM / realm / f"{key.replace('/', '_')}.json"
|
|
|
|
if merge and memory_path.exists():
|
|
existing = _load_json_file(memory_path)
|
|
merged = {**existing, **value, "_updated_at": _now_iso()}
|
|
else:
|
|
merged = {**value, "_created_at": _now_iso()}
|
|
|
|
_save_json_file(memory_path, merged)
|
|
|
|
body = {
|
|
"key": key,
|
|
"session_id": session_id,
|
|
"realm": realm,
|
|
"value_hash": _vmhash_blake3(json.dumps(value, sort_keys=True).encode()),
|
|
"merged": merge,
|
|
}
|
|
|
|
receipt = _emit_cognitive_receipt("memory_write", body)
|
|
|
|
return {
|
|
"success": True,
|
|
"key": key,
|
|
"path": str(memory_path),
|
|
"receipt": receipt,
|
|
"message": f"Memory stored at {key}",
|
|
}
|
|
|
|
|
|
def cognitive_attest(
|
|
attestation_type: str,
|
|
content: Dict[str, Any],
|
|
anchor_to: Optional[List[str]] = None,
|
|
operator_did: str = "did:vm:cognitive:claude",
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create cryptographic attestation of Claude's reasoning state.
|
|
"""
|
|
if anchor_to is None:
|
|
anchor_to = ["local"]
|
|
|
|
attestation_id = f"att_{secrets.token_hex(8)}"
|
|
content_hash = _vmhash_blake3(json.dumps(content, sort_keys=True).encode())
|
|
|
|
body = {
|
|
"attestation_id": attestation_id,
|
|
"attestation_type": attestation_type,
|
|
"operator_did": operator_did,
|
|
"content_hash": content_hash,
|
|
"anchor_targets": anchor_to,
|
|
"anchors": {},
|
|
}
|
|
|
|
body["anchors"]["local"] = {
|
|
"type": "local",
|
|
"timestamp": _now_iso(),
|
|
"hash": content_hash,
|
|
}
|
|
|
|
if "rfc3161" in anchor_to:
|
|
body["anchors"]["rfc3161"] = {
|
|
"type": "rfc3161",
|
|
"status": "pending",
|
|
"tsa": "freetsa.org",
|
|
}
|
|
|
|
if "eth" in anchor_to:
|
|
body["anchors"]["eth"] = {
|
|
"type": "ethereum",
|
|
"status": "pending",
|
|
"network": "mainnet",
|
|
}
|
|
|
|
receipt = _emit_cognitive_receipt("attestation", body)
|
|
|
|
return {
|
|
"success": True,
|
|
"attestation_id": attestation_id,
|
|
"content_hash": content_hash,
|
|
"receipt": receipt,
|
|
"anchors": body["anchors"],
|
|
"message": f"Attestation {attestation_id} created",
|
|
}
|
|
|
|
|
|
def cognitive_audit_trail(
|
|
filter_type: Optional[str] = None,
|
|
time_range: Optional[Dict[str, str]] = None,
|
|
confidence_min: Optional[float] = None,
|
|
limit: int = 100,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Query historical AI decisions for audit.
|
|
"""
|
|
cognitive_path = RECEIPTS_ROOT / "cognitive" / "cognitive_events.jsonl"
|
|
|
|
if not cognitive_path.exists():
|
|
return {"decisions": [], "count": 0, "message": "No cognitive history found"}
|
|
|
|
decisions = []
|
|
with open(cognitive_path, "r") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
try:
|
|
receipt = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if receipt.get("type") != "cognitive_decision":
|
|
continue
|
|
|
|
body = receipt.get("body", {})
|
|
|
|
if filter_type and body.get("decision_type") != filter_type:
|
|
continue
|
|
|
|
if confidence_min and body.get("confidence", 0) < confidence_min:
|
|
continue
|
|
|
|
decisions.append({
|
|
"decision_id": body.get("decision_id"),
|
|
"timestamp": receipt.get("timestamp"),
|
|
"decision_type": body.get("decision_type"),
|
|
"confidence": body.get("confidence"),
|
|
"reasoning_hash": body.get("reasoning_hash"),
|
|
"auto_approved": body.get("auto_approved"),
|
|
})
|
|
|
|
if len(decisions) >= limit:
|
|
break
|
|
|
|
return {
|
|
"decisions": decisions,
|
|
"count": len(decisions),
|
|
}
|
|
|
|
|
|
def cognitive_oracle_chain(
|
|
question: str,
|
|
frameworks: Optional[List[str]] = None,
|
|
max_docs: int = 10,
|
|
include_memory: bool = True,
|
|
session_id: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute oracle chain with cognitive enhancement.
|
|
"""
|
|
if frameworks is None:
|
|
frameworks = ["GDPR", "AI_ACT"]
|
|
|
|
chain_id = f"oracle_{secrets.token_hex(8)}"
|
|
|
|
context = cognitive_context(
|
|
include=["memory", "governance"] if include_memory else ["governance"],
|
|
session_id=session_id,
|
|
)
|
|
|
|
answer = {
|
|
"chain_id": chain_id,
|
|
"question": question,
|
|
"frameworks": frameworks,
|
|
"answer": f"Oracle analysis pending for: {question}",
|
|
"citations": [],
|
|
"compliance_flags": {f: "requires_analysis" for f in frameworks},
|
|
"gaps": [],
|
|
"confidence": 0.0,
|
|
"requires_human_review": True,
|
|
}
|
|
|
|
answer_hash = _vmhash_blake3(json.dumps(answer, sort_keys=True).encode())
|
|
|
|
body = {
|
|
"chain_id": chain_id,
|
|
"question": question,
|
|
"frameworks": frameworks,
|
|
"answer_hash": answer_hash,
|
|
}
|
|
|
|
receipt = _emit_cognitive_receipt("oracle_chain", body, scroll="compliance")
|
|
|
|
return {
|
|
"success": True,
|
|
"chain_id": chain_id,
|
|
"answer": answer,
|
|
"answer_hash": answer_hash,
|
|
"receipt": receipt,
|
|
}
|