177 lines
5.3 KiB
Python
177 lines
5.3 KiB
Python
import uuid
|
|
from enum import Enum
|
|
from typing import Any, List, Mapping, Optional
|
|
|
|
from .pattern_store import PatternStore, normalize_query_for_matching
|
|
|
|
|
|
class Classification(str, Enum):
|
|
BLESSED = "blessed"
|
|
AMBIGUOUS = "ambiguous"
|
|
FORBIDDEN = "forbidden"
|
|
CATASTROPHIC = "catastrophic"
|
|
|
|
|
|
class ShadowEvalResult:
|
|
def __init__(
|
|
self,
|
|
classification: Classification,
|
|
reason: Optional[str] = None,
|
|
risk_score: int = 0,
|
|
flags: Optional[List[str]] = None,
|
|
):
|
|
self.classification = classification
|
|
self.reason = reason
|
|
self.risk_score = risk_score
|
|
self.flags = flags or []
|
|
self.trace_id = str(uuid.uuid4())
|
|
|
|
def to_routing_action(self) -> str:
|
|
if self.classification == Classification.CATASTROPHIC:
|
|
return "FAIL_CLOSED"
|
|
if self.classification == Classification.FORBIDDEN:
|
|
return "HANDOFF_TO_GUARDRAILS"
|
|
if self.classification == Classification.AMBIGUOUS:
|
|
return "PROMPT_FOR_CLARIFICATION"
|
|
return "HANDOFF_TO_LAYER1"
|
|
|
|
|
|
class ShadowClassifier:
|
|
"""
|
|
Minimal doctrinal classifier for Layer 0 (Shadow Eval).
|
|
"""
|
|
|
|
def __init__(self, pattern_store: PatternStore | None = None):
|
|
self._patterns = pattern_store or PatternStore()
|
|
|
|
def classify(
|
|
self, query: str, *, context: Mapping[str, Any] | None = None
|
|
) -> ShadowEvalResult:
|
|
"""Return a doctrinal classification for the incoming query."""
|
|
|
|
q = (query or "").lower().strip()
|
|
q_norm = normalize_query_for_matching(query or "")
|
|
|
|
# 0. Catastrophic boundary (fail closed): never relaxed at runtime.
|
|
if any(
|
|
x in q
|
|
for x in [
|
|
"disable guardrails",
|
|
"override agent permissions",
|
|
"bypass governance",
|
|
"self-modifying",
|
|
]
|
|
):
|
|
return self._apply_context(
|
|
ShadowEvalResult(
|
|
classification=Classification.CATASTROPHIC,
|
|
reason="catastrophic_indicator",
|
|
risk_score=5,
|
|
flags=["permission_override", "guardrail_disable"],
|
|
),
|
|
context,
|
|
)
|
|
|
|
# 1. Learned patterns (highest specificity/support first)
|
|
learned = self._patterns.match_ordered(q_norm)
|
|
if learned:
|
|
p = learned[0]
|
|
return self._apply_context(
|
|
ShadowEvalResult(
|
|
classification=Classification(p.classification),
|
|
reason=p.reason or "telemetry_learned",
|
|
risk_score=int(p.risk_score),
|
|
flags=list(p.flags) + ["telemetry_learned"],
|
|
),
|
|
context,
|
|
)
|
|
|
|
# 2. Static patterns
|
|
|
|
# 2a. Forbidden (governance violation)
|
|
if any(
|
|
x in q
|
|
for x in [
|
|
"skip git",
|
|
"apply directly",
|
|
"dashboard",
|
|
"manual change",
|
|
]
|
|
):
|
|
return self._apply_context(
|
|
ShadowEvalResult(
|
|
classification=Classification.FORBIDDEN,
|
|
reason="governance_violation",
|
|
risk_score=3,
|
|
flags=["gitops_bypass"],
|
|
),
|
|
context,
|
|
)
|
|
|
|
# 2b. Ambiguous (needs clarification)
|
|
if (
|
|
any(
|
|
x in q
|
|
for x in [
|
|
"fix it",
|
|
"change this",
|
|
"update stuff",
|
|
]
|
|
)
|
|
or len(q.split()) <= 2
|
|
):
|
|
return self._apply_context(
|
|
ShadowEvalResult(
|
|
classification=Classification.AMBIGUOUS,
|
|
reason="insufficient_context",
|
|
risk_score=1,
|
|
flags=["needs_clarification"],
|
|
),
|
|
context,
|
|
)
|
|
|
|
# 4. Blessed (valid + lawful)
|
|
return self._apply_context(
|
|
ShadowEvalResult(
|
|
classification=Classification.BLESSED,
|
|
reason=None,
|
|
risk_score=0,
|
|
),
|
|
context,
|
|
)
|
|
|
|
@staticmethod
|
|
def _apply_context(
|
|
result: ShadowEvalResult, context: Mapping[str, Any] | None
|
|
) -> ShadowEvalResult:
|
|
if not context:
|
|
return result
|
|
|
|
env = str(context.get("environment") or "").lower()
|
|
realm = str(context.get("realm") or "").lower()
|
|
capability = str(context.get("capability") or "").lower()
|
|
role = str(context.get("actor_role") or context.get("role") or "").lower()
|
|
|
|
mult = 1.0
|
|
if env in {"prod", "production"}:
|
|
mult *= 2.0
|
|
elif env in {"staging", "stage"}:
|
|
mult *= 1.5
|
|
elif env in {"dev", "development", "test"}:
|
|
mult *= 1.0
|
|
|
|
if capability in {"destroy", "delete", "write"}:
|
|
mult *= 1.5
|
|
elif capability in {"read"}:
|
|
mult *= 1.0
|
|
|
|
if role in {"admin", "root"}:
|
|
mult *= 1.2
|
|
|
|
if realm in {"terraform", "gitops", "cloudflare"}:
|
|
mult *= 1.1
|
|
|
|
weighted = int(round(result.risk_score * mult))
|
|
result.risk_score = max(0, min(5, weighted))
|
|
return result
|