Files
vm-cloudflare/layer0/shadow_classifier.py
Vault Sovereign f0b8d962de
Some checks failed
WAF Intelligence Guardrail / waf-intel (push) Waiting to run
Cloudflare Registry Validation / validate-registry (push) Has been cancelled
chore: pre-migration snapshot
Layer0, MCP servers, Terraform consolidation
2025-12-27 01:52:27 +00:00

177 lines
5.3 KiB
Python

import uuid
from enum import Enum
from typing import Any, List, Mapping, Optional
from .pattern_store import PatternStore, normalize_query_for_matching
class Classification(str, Enum):
BLESSED = "blessed"
AMBIGUOUS = "ambiguous"
FORBIDDEN = "forbidden"
CATASTROPHIC = "catastrophic"
class ShadowEvalResult:
def __init__(
self,
classification: Classification,
reason: Optional[str] = None,
risk_score: int = 0,
flags: Optional[List[str]] = None,
):
self.classification = classification
self.reason = reason
self.risk_score = risk_score
self.flags = flags or []
self.trace_id = str(uuid.uuid4())
def to_routing_action(self) -> str:
if self.classification == Classification.CATASTROPHIC:
return "FAIL_CLOSED"
if self.classification == Classification.FORBIDDEN:
return "HANDOFF_TO_GUARDRAILS"
if self.classification == Classification.AMBIGUOUS:
return "PROMPT_FOR_CLARIFICATION"
return "HANDOFF_TO_LAYER1"
class ShadowClassifier:
"""
Minimal doctrinal classifier for Layer 0 (Shadow Eval).
"""
def __init__(self, pattern_store: PatternStore | None = None):
self._patterns = pattern_store or PatternStore()
def classify(
self, query: str, *, context: Mapping[str, Any] | None = None
) -> ShadowEvalResult:
"""Return a doctrinal classification for the incoming query."""
q = (query or "").lower().strip()
q_norm = normalize_query_for_matching(query or "")
# 0. Catastrophic boundary (fail closed): never relaxed at runtime.
if any(
x in q
for x in [
"disable guardrails",
"override agent permissions",
"bypass governance",
"self-modifying",
]
):
return self._apply_context(
ShadowEvalResult(
classification=Classification.CATASTROPHIC,
reason="catastrophic_indicator",
risk_score=5,
flags=["permission_override", "guardrail_disable"],
),
context,
)
# 1. Learned patterns (highest specificity/support first)
learned = self._patterns.match_ordered(q_norm)
if learned:
p = learned[0]
return self._apply_context(
ShadowEvalResult(
classification=Classification(p.classification),
reason=p.reason or "telemetry_learned",
risk_score=int(p.risk_score),
flags=list(p.flags) + ["telemetry_learned"],
),
context,
)
# 2. Static patterns
# 2a. Forbidden (governance violation)
if any(
x in q
for x in [
"skip git",
"apply directly",
"dashboard",
"manual change",
]
):
return self._apply_context(
ShadowEvalResult(
classification=Classification.FORBIDDEN,
reason="governance_violation",
risk_score=3,
flags=["gitops_bypass"],
),
context,
)
# 2b. Ambiguous (needs clarification)
if (
any(
x in q
for x in [
"fix it",
"change this",
"update stuff",
]
)
or len(q.split()) <= 2
):
return self._apply_context(
ShadowEvalResult(
classification=Classification.AMBIGUOUS,
reason="insufficient_context",
risk_score=1,
flags=["needs_clarification"],
),
context,
)
# 4. Blessed (valid + lawful)
return self._apply_context(
ShadowEvalResult(
classification=Classification.BLESSED,
reason=None,
risk_score=0,
),
context,
)
@staticmethod
def _apply_context(
result: ShadowEvalResult, context: Mapping[str, Any] | None
) -> ShadowEvalResult:
if not context:
return result
env = str(context.get("environment") or "").lower()
realm = str(context.get("realm") or "").lower()
capability = str(context.get("capability") or "").lower()
role = str(context.get("actor_role") or context.get("role") or "").lower()
mult = 1.0
if env in {"prod", "production"}:
mult *= 2.0
elif env in {"staging", "stage"}:
mult *= 1.5
elif env in {"dev", "development", "test"}:
mult *= 1.0
if capability in {"destroy", "delete", "write"}:
mult *= 1.5
elif capability in {"read"}:
mult *= 1.0
if role in {"admin", "root"}:
mult *= 1.2
if realm in {"terraform", "gitops", "cloudflare"}:
mult *= 1.1
weighted = int(round(result.risk_score * mult))
result.risk_score = max(0, min(5, weighted))
return result