Files
vm-core/OFFSEC-AGENTS-PLAN.md
2025-12-27 00:10:32 +00:00

30 KiB

OffSec Agent Workers Plan

Shield commands. Agents execute. Receipts prove.

Current State Assessment

What Exists

Component Status Location
Guardian Engine (Rust) Implemented vaultmesh-guardian/src/lib.rs
OffSec Engine (Rust) ⚠️ Stub only vaultmesh-offsec/src/lib.rs
Console Engine (Python) Receipts + Approvals engines/console/
Shield Portal (Node.js) Web UI vaultmesh-shield-portal/
CLI OffSec Commands Full implementation cli/vm_cli.py
OffSec Spec Complete VAULTMESH-OFFSEC-ENGINE.md

What's Missing

  1. OffSec Agent Workers - Autonomous AI agents that perform security tasks
  2. Agent Orchestration Layer - Dispatch, monitor, control agents
  3. Shield ↔ Agent Protocol - Communication and approval flow
  4. CAI Integration - Leverage existing security AI tools

Architecture: Shield-Agent Pattern

┌─────────────────────────────────────────────────────────────────────────────┐
│                         SHIELD (Guardian/Overseer)                          │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │  Shield Portal (vaultmesh-shield-portal)                            │   │
│  │  - Mission control UI                                               │   │
│  │  - Agent status dashboard                                           │   │
│  │  - Approval workflow                                                │   │
│  │  - Receipt viewer                                                   │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                    │                                        │
│                                    ▼                                        │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │  Agent Orchestrator (new: engines/offsec_agents/)                   │   │
│  │  - Agent registry                                                   │   │
│  │  - Mission dispatch                                                 │   │
│  │  - Approval enforcement                                             │   │
│  │  - Receipt emission                                                 │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────────┘
                                     │
                    ┌────────────────┼────────────────┐
                    │                │                │
                    ▼                ▼                ▼
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│   RECON AGENT       │ │   VULN AGENT        │ │   EXPLOIT AGENT     │
│   (Worker)          │ │   (Worker)          │ │   (Worker)          │
│                     │ │                     │ │                     │
│ • Target enum       │ │ • CVE scanning      │ │ • PoC execution     │
│ • Port scanning     │ │ • Config audit      │ │ • Payload delivery  │
│ • OSINT gathering   │ │ • Dependency check  │ │ • Post-exploit      │
│ • Service ID        │ │ • Web vuln scan     │ │ • Priv escalation   │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
          │                       │                       │
          └───────────────────────┴───────────────────────┘
                                  │
                                  ▼
                    ┌─────────────────────────────┐
                    │     OFFSEC SCROLL           │
                    │ receipts/offsec/            │
                    │ offsec_events.jsonl         │
                    └─────────────────────────────┘

Agent Types

Phase 1: Core Agents (Nigredo)

Agent Purpose Risk Level Approval Required
Recon Agent Target enumeration, OSINT Low Auto-approve
Vuln Agent Vulnerability scanning Medium Manager approve
Analyze Agent Code/binary analysis Low Auto-approve
Report Agent Finding aggregation Low Auto-approve

Phase 2: Active Agents (Albedo)

Agent Purpose Risk Level Approval Required
Exploit Agent PoC execution High Multi-party approve
CTF Agent CTF challenge solving Medium Engagement scope
Red Team Agent Full adversary emulation Critical Executive approve

Phase 3: Response Agents (Citrinitas)

Agent Purpose Risk Level Approval Required
DFIR Agent Incident response Medium Incident commander
Remediation Agent Automated patching High Change approval
Threat Intel Agent IOC correlation Low Auto-approve

Implementation Plan

Step 1: Agent Base Framework

File: engines/offsec_agents/__init__.py

"""
OffSec Agent Workers - Base Framework
"""

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional
import uuid

class AgentType(str, Enum):
    RECON = "recon"
    VULN = "vuln"
    EXPLOIT = "exploit"
    CTF = "ctf"
    ANALYZE = "analyze"
    DFIR = "dfir"
    REMEDIATION = "remediation"
    THREAT_INTEL = "threat_intel"
    REPORT = "report"

class AgentStatus(str, Enum):
    IDLE = "idle"
    ASSIGNED = "assigned"
    RUNNING = "running"
    AWAITING_APPROVAL = "awaiting_approval"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

class RiskLevel(str, Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class Mission:
    """A task assigned to an agent."""
    mission_id: str
    agent_type: AgentType
    target: str
    objectives: List[str]
    scope: Dict[str, Any]
    risk_level: RiskLevel
    requested_by: str
    engagement_id: Optional[str] = None
    incident_id: Optional[str] = None
    created_at: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )
    status: str = "pending"
    
@dataclass
class AgentResult:
    """Result from agent execution."""
    mission_id: str
    agent_id: str
    status: AgentStatus
    findings: List[Dict[str, Any]]
    evidence_paths: List[str]
    started_at: str
    completed_at: str
    duration_seconds: float
    receipt_hash: Optional[str] = None

Step 2: Agent Orchestrator

File: engines/offsec_agents/orchestrator.py

"""
Agent Orchestrator - Dispatch, monitor, and control agents.
"""

class AgentOrchestrator:
    """
    Central control for OffSec agents.
    
    Responsibilities:
    - Register available agents
    - Dispatch missions to appropriate agents
    - Enforce approval workflows
    - Emit receipts for all agent actions
    - Track agent status and results
    """
    
    def __init__(self, vaultmesh_root: str):
        self.vaultmesh_root = vaultmesh_root
        self.agents: Dict[str, BaseAgent] = {}
        self.missions: Dict[str, Mission] = {}
        self.approval_manager = ApprovalManager(vaultmesh_root)
        
    def register_agent(self, agent: "BaseAgent") -> None:
        """Register an agent with the orchestrator."""
        self.agents[agent.agent_id] = agent
        
    async def dispatch(self, mission: Mission) -> str:
        """
        Dispatch a mission to an appropriate agent.
        
        Flow:
        1. Validate mission scope
        2. Check if approval required
        3. Request approval if needed
        4. Assign to available agent
        5. Emit mission_assigned receipt
        """
        # Check approval requirements
        if self._requires_approval(mission):
            approval = await self._request_approval(mission)
            if not approval.approved:
                return "rejected"
                
        # Find available agent
        agent = self._find_agent(mission.agent_type)
        if not agent:
            raise NoAgentAvailable(mission.agent_type)
            
        # Assign mission
        agent.assign(mission)
        self.missions[mission.mission_id] = mission
        
        # Emit receipt
        self._emit_mission_receipt("offsec_mission_assigned", mission)
        
        return mission.mission_id
        
    def _requires_approval(self, mission: Mission) -> bool:
        """Check if mission requires approval based on risk level."""
        approval_matrix = {
            RiskLevel.LOW: False,
            RiskLevel.MEDIUM: True,
            RiskLevel.HIGH: True,
            RiskLevel.CRITICAL: True,
        }
        return approval_matrix.get(mission.risk_level, True)

Step 3: Base Agent Class

File: engines/offsec_agents/base.py

"""
Base Agent - Abstract class for all OffSec agents.
"""

class BaseAgent(ABC):
    """
    Abstract base class for OffSec agents.
    
    All agents must:
    1. Implement execute() for their core logic
    2. Emit receipts for significant actions
    3. Respect scope boundaries
    4. Handle approval checkpoints
    """
    
    def __init__(
        self,
        agent_id: str,
        agent_type: AgentType,
        did: str,
        orchestrator: AgentOrchestrator,
    ):
        self.agent_id = agent_id
        self.agent_type = agent_type
        self.did = did  # e.g., did:vm:agent:recon-01
        self.orchestrator = orchestrator
        self.status = AgentStatus.IDLE
        self.current_mission: Optional[Mission] = None
        
    def assign(self, mission: Mission) -> None:
        """Accept a mission assignment."""
        self.current_mission = mission
        self.status = AgentStatus.ASSIGNED
        
    async def run(self) -> AgentResult:
        """
        Execute the assigned mission.
        
        Flow:
        1. Validate mission is assigned
        2. Emit mission_started receipt
        3. Execute core logic
        4. Emit findings as receipts
        5. Emit mission_completed receipt
        """
        if not self.current_mission:
            raise NoMissionAssigned()
            
        self.status = AgentStatus.RUNNING
        started_at = datetime.now(timezone.utc)
        
        self._emit_receipt("offsec_agent_started", {
            "mission_id": self.current_mission.mission_id,
            "agent_id": self.agent_id,
            "agent_type": self.agent_type.value,
        })
        
        try:
            findings = await self.execute()
            self.status = AgentStatus.COMPLETED
        except Exception as e:
            self.status = AgentStatus.FAILED
            findings = [{"error": str(e)}]
            
        completed_at = datetime.now(timezone.utc)
        
        result = AgentResult(
            mission_id=self.current_mission.mission_id,
            agent_id=self.agent_id,
            status=self.status,
            findings=findings,
            evidence_paths=self.evidence_paths,
            started_at=started_at.isoformat(),
            completed_at=completed_at.isoformat(),
            duration_seconds=(completed_at - started_at).total_seconds(),
        )
        
        self._emit_receipt("offsec_agent_completed", asdict(result))
        
        self.current_mission = None
        self.status = AgentStatus.IDLE
        
        return result
        
    @abstractmethod
    async def execute(self) -> List[Dict[str, Any]]:
        """
        Core agent logic - must be implemented by subclasses.
        
        Returns:
            List of findings/results
        """
        pass
        
    async def checkpoint(self, action: str, details: Dict[str, Any]) -> bool:
        """
        Request approval for a risky action mid-execution.
        
        Use for actions that exceed initial mission scope or risk level.
        """
        if self._action_exceeds_scope(action, details):
            approval = await self.orchestrator.request_approval(
                session_id=self.current_mission.mission_id,
                action_type=action,
                action_details=details,
                requested_by=self.did,
                approvers=self._get_approvers(action),
            )
            return approval.approved
        return True

Step 4: Concrete Agent Implementations

File: engines/offsec_agents/agents/recon.py

"""
Recon Agent - Target enumeration and OSINT gathering.
"""

class ReconAgent(BaseAgent):
    """
    Reconnaissance agent for target enumeration.
    
    Capabilities:
    - Port scanning (nmap integration)
    - Service identification
    - OSINT gathering
    - DNS enumeration
    - Technology fingerprinting
    """
    
    def __init__(self, **kwargs):
        super().__init__(
            agent_type=AgentType.RECON,
            **kwargs
        )
        
    async def execute(self) -> List[Dict[str, Any]]:
        """Execute reconnaissance mission."""
        mission = self.current_mission
        target = mission.target
        findings = []
        
        # Phase 1: DNS/Host resolution
        dns_results = await self._dns_enum(target)
        findings.extend(dns_results)
        
        # Phase 2: Port scanning
        if "port_scan" in mission.objectives:
            ports = await self._port_scan(target, mission.scope)
            findings.extend(ports)
            
        # Phase 3: Service identification
        if "service_id" in mission.objectives:
            services = await self._identify_services(target, ports)
            findings.extend(services)
            
        # Phase 4: Technology fingerprinting
        if "tech_fingerprint" in mission.objectives:
            tech = await self._fingerprint(target)
            findings.extend(tech)
            
        return findings
        
    async def _port_scan(self, target: str, scope: Dict) -> List[Dict]:
        """Run port scan within scope constraints."""
        # Use CAI recon or nmap
        from cai_mcp_client import cai_recon
        
        depth = scope.get("scan_depth", "quick")
        result = await cai_recon(target=target, depth=depth)
        
        return self._parse_scan_results(result)

File: engines/offsec_agents/agents/vuln.py

"""
Vuln Agent - Vulnerability scanning and assessment.
"""

class VulnAgent(BaseAgent):
    """
    Vulnerability scanning agent.
    
    Capabilities:
    - CVE detection
    - Configuration auditing
    - Dependency vulnerability checks
    - Web application scanning
    - API security testing
    """
    
    async def execute(self) -> List[Dict[str, Any]]:
        """Execute vulnerability scan mission."""
        mission = self.current_mission
        target = mission.target
        findings = []
        
        # Determine scan type
        scan_type = mission.scope.get("scan_type", "all")
        
        # Checkpoint for web scans (can be noisy)
        if scan_type in ["web", "all"]:
            approved = await self.checkpoint(
                "web_vuln_scan",
                {"target": target, "reason": "Web scanning may generate traffic"}
            )
            if not approved:
                scan_type = "network"  # Fall back to quieter scan
                
        # Use CAI vuln scanner
        from cai_mcp_client import cai_vuln_scan
        
        result = await cai_vuln_scan(target=target, scan_type=scan_type)
        
        # Convert to findings
        for vuln in result.get("vulnerabilities", []):
            finding = {
                "type": "vulnerability",
                "title": vuln.get("title"),
                "severity": vuln.get("severity"),
                "cvss": vuln.get("cvss_score"),
                "cve": vuln.get("cve_id"),
                "affected_asset": target,
                "evidence": vuln.get("evidence"),
            }
            findings.append(finding)
            
            # Auto-create vuln receipt for high+ severity
            if vuln.get("severity") in ["high", "critical"]:
                self._emit_vuln_discovery(finding)
                
        return findings

File: engines/offsec_agents/agents/exploit.py

"""
Exploit Agent - Controlled exploitation and PoC execution.

⚠️ HIGH RISK - Requires multi-party approval
"""

class ExploitAgent(BaseAgent):
    """
    Exploitation agent for controlled PoC execution.
    
    CRITICAL: All exploits require:
    1. Valid engagement/incident context
    2. Multi-party approval
    3. Detailed evidence capture
    4. Immediate rollback capability
    """
    
    async def execute(self) -> List[Dict[str, Any]]:
        """Execute exploitation mission with strict controls."""
        mission = self.current_mission
        
        # Verify engagement context
        if not mission.engagement_id and not mission.incident_id:
            raise NoEngagementContext(
                "Exploitation requires valid engagement or incident context"
            )
            
        # Mandatory checkpoint before any exploit
        approved = await self.checkpoint(
            "exploit_execution",
            {
                "target": mission.target,
                "vulnerability": mission.scope.get("vuln_id"),
                "technique": mission.scope.get("technique"),
                "engagement_id": mission.engagement_id,
            }
        )
        
        if not approved:
            return [{"status": "blocked", "reason": "Approval denied"}]
            
        # Execute with full evidence capture
        findings = []
        
        try:
            # Start evidence capture
            self._start_capture()
            
            # Use CAI exploit
            from cai_mcp_client import cai_exploit
            
            result = await cai_exploit(
                target=mission.target,
                vulnerability=mission.scope.get("vuln_id"),
            )
            
            findings.append({
                "type": "exploit_result",
                "success": result.get("success"),
                "access_gained": result.get("access_level"),
                "evidence_path": self._stop_capture(),
            })
            
        except Exception as e:
            findings.append({
                "type": "exploit_error",
                "error": str(e),
                "evidence_path": self._stop_capture(),
            })
            
        return findings

Step 5: CAI Integration Bridge

File: engines/offsec_agents/cai_bridge.py

"""
Bridge to CAI MCP tools for agent use.
"""

import asyncio
from typing import Any, Dict, Optional

class CAIBridge:
    """
    Bridge to CAI security tools via MCP.
    
    Wraps CAI tools for use by OffSec agents:
    - cai_recon → ReconAgent
    - cai_vuln_scan → VulnAgent
    - cai_exploit → ExploitAgent
    - cai_ctf → CTFAgent
    - cai_analyze → AnalyzeAgent
    """
    
    async def recon(
        self,
        target: str,
        depth: str = "standard"
    ) -> Dict[str, Any]:
        """Run CAI reconnaissance."""
        # Call MCP tool
        result = await self._call_mcp("cai_recon", {
            "target": target,
            "depth": depth,
        })
        return result
        
    async def vuln_scan(
        self,
        target: str,
        scan_type: str = "all"
    ) -> Dict[str, Any]:
        """Run CAI vulnerability scan."""
        result = await self._call_mcp("cai_vuln_scan", {
            "target": target,
            "scan_type": scan_type,
        })
        return result
        
    async def exploit(
        self,
        target: str,
        vulnerability: str
    ) -> Dict[str, Any]:
        """Run CAI exploitation (requires authorization)."""
        result = await self._call_mcp("cai_exploit", {
            "target": target,
            "vulnerability": vulnerability,
        })
        return result
        
    async def analyze(
        self,
        file_path: str,
        analysis_type: str = "static"
    ) -> Dict[str, Any]:
        """Run CAI code/binary analysis."""
        result = await self._call_mcp("cai_analyze", {
            "file_path": file_path,
            "analysis_type": analysis_type,
        })
        return result
        
    async def ctf(
        self,
        challenge: str,
        category: Optional[str] = None
    ) -> Dict[str, Any]:
        """Run CAI CTF solver."""
        params = {"challenge": challenge}
        if category:
            params["category"] = category
        result = await self._call_mcp("cai_ctf", params)
        return result

Step 6: Shield Portal Integration

File: vaultmesh-shield-portal/app/src/routes/agents.js

/**
 * Agent management routes for Shield Portal
 */

const express = require('express');
const router = express.Router();

// GET /agents - List all registered agents
router.get('/', async (req, res) => {
    const agents = await getAgentRegistry();
    res.render('agents/list', { agents });
});

// GET /agents/:id - Agent detail view
router.get('/:id', async (req, res) => {
    const agent = await getAgent(req.params.id);
    const missions = await getAgentMissions(req.params.id);
    res.render('agents/detail', { agent, missions });
});

// POST /agents/dispatch - Dispatch a new mission
router.post('/dispatch', async (req, res) => {
    const { agent_type, target, objectives, scope } = req.body;
    
    const mission = await dispatchMission({
        agent_type,
        target,
        objectives,
        scope,
        requested_by: req.user.did,
    });
    
    res.redirect(`/missions/${mission.mission_id}`);
});

// GET /missions - List all missions
router.get('/missions', async (req, res) => {
    const missions = await getMissions({
        status: req.query.status,
        agent_type: req.query.agent_type,
    });
    res.render('missions/list', { missions });
});

// POST /missions/:id/approve - Approve a pending mission
router.post('/missions/:id/approve', async (req, res) => {
    await approveMission(req.params.id, {
        approver: req.user.did,
        reason: req.body.reason,
    });
    res.redirect(`/missions/${req.params.id}`);
});

module.exports = router;

Step 7: CLI Commands

Add to: cli/vm_cli.py

# ============================================================================
# Agent Commands
# ============================================================================

@cli.group()
def agent():
    """OffSec Agent management."""
    pass

@agent.command("list")
def agent_list():
    """List registered agents."""
    orchestrator = get_orchestrator()
    for agent in orchestrator.agents.values():
        status_icon = "🟢" if agent.status == AgentStatus.IDLE else "🔴"
        click.echo(f"{status_icon} {agent.agent_id} ({agent.agent_type.value})")

@agent.command("dispatch")
@click.option("--type", "agent_type", required=True, 
              type=click.Choice(["recon", "vuln", "exploit", "ctf", "analyze"]))
@click.option("--target", required=True, help="Target for the mission")
@click.option("--objective", multiple=True, help="Mission objectives")
@click.option("--engagement", help="Link to engagement ID")
@click.option("--incident", help="Link to incident ID")
def agent_dispatch(agent_type, target, objective, engagement, incident):
    """Dispatch a mission to an agent."""
    orchestrator = get_orchestrator()
    
    mission = Mission(
        mission_id=generate_mission_id(),
        agent_type=AgentType(agent_type),
        target=target,
        objectives=list(objective),
        scope={},
        risk_level=RiskLevel.MEDIUM,
        requested_by=get_actor_did(),
        engagement_id=engagement,
        incident_id=incident,
    )
    
    result = asyncio.run(orchestrator.dispatch(mission))
    click.echo(f"[agent] Mission dispatched: {result}")

@agent.command("status")
@click.argument("mission_id")
def agent_status(mission_id):
    """Check mission status."""
    orchestrator = get_orchestrator()
    mission = orchestrator.missions.get(mission_id)
    
    if not mission:
        click.echo(f"Mission not found: {mission_id}")
        return
        
    click.echo(f"Mission: {mission.mission_id}")
    click.echo(f"Type: {mission.agent_type.value}")
    click.echo(f"Target: {mission.target}")
    click.echo(f"Status: {mission.status}")

Receipt Types

New receipt types for agent operations:

Type When Emitted
offsec_mission_created Mission created and pending
offsec_mission_assigned Mission assigned to agent
offsec_mission_approved Mission approved to proceed
offsec_mission_rejected Mission approval rejected
offsec_agent_started Agent began execution
offsec_agent_checkpoint Agent requested mid-execution approval
offsec_agent_finding Agent discovered a finding
offsec_agent_completed Agent finished mission
offsec_agent_failed Agent encountered error

Directory Structure

engines/offsec_agents/
├── __init__.py              # Exports
├── base.py                  # BaseAgent class
├── orchestrator.py          # AgentOrchestrator
├── mission.py               # Mission dataclass
├── cai_bridge.py            # CAI MCP integration
├── receipts.py              # Receipt emission
├── agents/
│   ├── __init__.py
│   ├── recon.py             # ReconAgent
│   ├── vuln.py              # VulnAgent
│   ├── exploit.py           # ExploitAgent
│   ├── ctf.py               # CTFAgent
│   ├── analyze.py           # AnalyzeAgent
│   ├── dfir.py              # DFIRAgent
│   ├── remediation.py       # RemediationAgent
│   └── threat_intel.py      # ThreatIntelAgent
└── tests/
    ├── test_orchestrator.py
    ├── test_agents.py
    └── test_receipts.py

Implementation Phases

Phase 1: Foundation (Week 1)

  • Create engines/offsec_agents/ package structure
  • Implement BaseAgent abstract class
  • Implement AgentOrchestrator with basic dispatch
  • Implement Mission and AgentResult dataclasses
  • Add agent receipt types to OffSec scroll

Phase 2: Core Agents (Week 2)

  • Implement ReconAgent with CAI bridge
  • Implement VulnAgent with CAI bridge
  • Implement AnalyzeAgent with CAI bridge
  • Add CLI commands for agent management
  • Write unit tests for each agent

Phase 3: Approval Integration (Week 3)

  • Integrate with Console approval system
  • Implement checkpoint flow for risky actions
  • Add multi-party approval for high-risk agents
  • Update Shield Portal with approval UI

Phase 4: Active Agents (Week 4)

  • Implement ExploitAgent with strict controls
  • Implement CTFAgent for CTF engagements
  • Add engagement context validation
  • Full evidence capture pipeline

Phase 5: Response Agents (Week 5)

  • Implement DFIRAgent for incident response
  • Implement ThreatIntelAgent for IOC correlation
  • Implement RemediationAgent with change approval
  • Integration testing with real incidents

Phase 6: Shield Portal UI (Week 6)

  • Agent dashboard page
  • Mission control view
  • Real-time status updates
  • Finding browser
  • Approval workflow UI

Security Considerations

  1. Scope Enforcement: Agents cannot operate outside defined scope
  2. Approval Gates: Risky actions require human approval
  3. Evidence Trail: All actions produce receipts
  4. Capability Limits: Agents have DID-based capabilities
  5. Kill Switch: Orchestrator can terminate any agent
  6. Rate Limiting: Prevent runaway agent behavior
  7. Isolation: Agents run in sandboxed environments

Success Criteria

  • Agents can be dispatched via CLI and Portal
  • All agent actions emit receipts to OffSec scroll
  • High-risk actions require approval
  • CAI tools accessible to agents via bridge
  • Full audit trail for any mission
  • Guardian can anchor agent activity

Shield commands. Agents execute. Receipts prove.