# OffSec Agent Workers Plan > *Shield commands. Agents execute. Receipts prove.* ## Current State Assessment ### What Exists | Component | Status | Location | |-----------|--------|----------| | **Guardian Engine (Rust)** | ✅ Implemented | `vaultmesh-guardian/src/lib.rs` | | **OffSec Engine (Rust)** | ⚠️ Stub only | `vaultmesh-offsec/src/lib.rs` | | **Console Engine (Python)** | ✅ Receipts + Approvals | `engines/console/` | | **Shield Portal (Node.js)** | ✅ Web UI | `vaultmesh-shield-portal/` | | **CLI OffSec Commands** | ✅ Full implementation | `cli/vm_cli.py` | | **OffSec Spec** | ✅ Complete | `VAULTMESH-OFFSEC-ENGINE.md` | ### What's Missing 1. **OffSec Agent Workers** - Autonomous AI agents that perform security tasks 2. **Agent Orchestration Layer** - Dispatch, monitor, control agents 3. **Shield ↔ Agent Protocol** - Communication and approval flow 4. **CAI Integration** - Leverage existing security AI tools --- ## Architecture: Shield-Agent Pattern ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ SHIELD (Guardian/Overseer) │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ Shield Portal (vaultmesh-shield-portal) │ │ │ │ - Mission control UI │ │ │ │ - Agent status dashboard │ │ │ │ - Approval workflow │ │ │ │ - Receipt viewer │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ Agent Orchestrator (new: engines/offsec_agents/) │ │ │ │ - Agent registry │ │ │ │ - Mission dispatch │ │ │ │ - Approval enforcement │ │ │ │ - Receipt emission │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────────┘ │ ┌────────────────┼────────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐ │ RECON AGENT │ │ VULN AGENT │ │ EXPLOIT AGENT │ │ (Worker) │ │ (Worker) │ │ (Worker) │ │ │ │ │ │ │ │ • Target enum │ │ • CVE scanning │ │ • PoC execution │ │ • Port scanning │ │ • Config audit │ │ • Payload delivery │ │ • OSINT gathering │ │ • Dependency check │ │ • Post-exploit │ │ • Service ID │ │ • Web vuln scan │ │ • Priv escalation │ └─────────────────────┘ └─────────────────────┘ └─────────────────────┘ │ │ │ └───────────────────────┴───────────────────────┘ │ ▼ ┌─────────────────────────────┐ │ OFFSEC SCROLL │ │ receipts/offsec/ │ │ offsec_events.jsonl │ └─────────────────────────────┘ ``` --- ## Agent Types ### Phase 1: Core Agents (Nigredo) | Agent | Purpose | Risk Level | Approval Required | |-------|---------|------------|-------------------| | **Recon Agent** | Target enumeration, OSINT | Low | Auto-approve | | **Vuln Agent** | Vulnerability scanning | Medium | Manager approve | | **Analyze Agent** | Code/binary analysis | Low | Auto-approve | | **Report Agent** | Finding aggregation | Low | Auto-approve | ### Phase 2: Active Agents (Albedo) | Agent | Purpose | Risk Level | Approval Required | |-------|---------|------------|-------------------| | **Exploit Agent** | PoC execution | High | Multi-party approve | | **CTF Agent** | CTF challenge solving | Medium | Engagement scope | | **Red Team Agent** | Full adversary emulation | Critical | Executive approve | ### Phase 3: Response Agents (Citrinitas) | Agent | Purpose | Risk Level | Approval Required | |-------|---------|------------|-------------------| | **DFIR Agent** | Incident response | Medium | Incident commander | | **Remediation Agent** | Automated patching | High | Change approval | | **Threat Intel Agent** | IOC correlation | Low | Auto-approve | --- ## Implementation Plan ### Step 1: Agent Base Framework **File:** `engines/offsec_agents/__init__.py` ```python """ OffSec Agent Workers - Base Framework """ from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from typing import Any, Dict, List, Optional import uuid class AgentType(str, Enum): RECON = "recon" VULN = "vuln" EXPLOIT = "exploit" CTF = "ctf" ANALYZE = "analyze" DFIR = "dfir" REMEDIATION = "remediation" THREAT_INTEL = "threat_intel" REPORT = "report" class AgentStatus(str, Enum): IDLE = "idle" ASSIGNED = "assigned" RUNNING = "running" AWAITING_APPROVAL = "awaiting_approval" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" class RiskLevel(str, Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" @dataclass class Mission: """A task assigned to an agent.""" mission_id: str agent_type: AgentType target: str objectives: List[str] scope: Dict[str, Any] risk_level: RiskLevel requested_by: str engagement_id: Optional[str] = None incident_id: Optional[str] = None created_at: str = field( default_factory=lambda: datetime.now(timezone.utc).isoformat() ) status: str = "pending" @dataclass class AgentResult: """Result from agent execution.""" mission_id: str agent_id: str status: AgentStatus findings: List[Dict[str, Any]] evidence_paths: List[str] started_at: str completed_at: str duration_seconds: float receipt_hash: Optional[str] = None ``` ### Step 2: Agent Orchestrator **File:** `engines/offsec_agents/orchestrator.py` ```python """ Agent Orchestrator - Dispatch, monitor, and control agents. """ class AgentOrchestrator: """ Central control for OffSec agents. Responsibilities: - Register available agents - Dispatch missions to appropriate agents - Enforce approval workflows - Emit receipts for all agent actions - Track agent status and results """ def __init__(self, vaultmesh_root: str): self.vaultmesh_root = vaultmesh_root self.agents: Dict[str, BaseAgent] = {} self.missions: Dict[str, Mission] = {} self.approval_manager = ApprovalManager(vaultmesh_root) def register_agent(self, agent: "BaseAgent") -> None: """Register an agent with the orchestrator.""" self.agents[agent.agent_id] = agent async def dispatch(self, mission: Mission) -> str: """ Dispatch a mission to an appropriate agent. Flow: 1. Validate mission scope 2. Check if approval required 3. Request approval if needed 4. Assign to available agent 5. Emit mission_assigned receipt """ # Check approval requirements if self._requires_approval(mission): approval = await self._request_approval(mission) if not approval.approved: return "rejected" # Find available agent agent = self._find_agent(mission.agent_type) if not agent: raise NoAgentAvailable(mission.agent_type) # Assign mission agent.assign(mission) self.missions[mission.mission_id] = mission # Emit receipt self._emit_mission_receipt("offsec_mission_assigned", mission) return mission.mission_id def _requires_approval(self, mission: Mission) -> bool: """Check if mission requires approval based on risk level.""" approval_matrix = { RiskLevel.LOW: False, RiskLevel.MEDIUM: True, RiskLevel.HIGH: True, RiskLevel.CRITICAL: True, } return approval_matrix.get(mission.risk_level, True) ``` ### Step 3: Base Agent Class **File:** `engines/offsec_agents/base.py` ```python """ Base Agent - Abstract class for all OffSec agents. """ class BaseAgent(ABC): """ Abstract base class for OffSec agents. All agents must: 1. Implement execute() for their core logic 2. Emit receipts for significant actions 3. Respect scope boundaries 4. Handle approval checkpoints """ def __init__( self, agent_id: str, agent_type: AgentType, did: str, orchestrator: AgentOrchestrator, ): self.agent_id = agent_id self.agent_type = agent_type self.did = did # e.g., did:vm:agent:recon-01 self.orchestrator = orchestrator self.status = AgentStatus.IDLE self.current_mission: Optional[Mission] = None def assign(self, mission: Mission) -> None: """Accept a mission assignment.""" self.current_mission = mission self.status = AgentStatus.ASSIGNED async def run(self) -> AgentResult: """ Execute the assigned mission. Flow: 1. Validate mission is assigned 2. Emit mission_started receipt 3. Execute core logic 4. Emit findings as receipts 5. Emit mission_completed receipt """ if not self.current_mission: raise NoMissionAssigned() self.status = AgentStatus.RUNNING started_at = datetime.now(timezone.utc) self._emit_receipt("offsec_agent_started", { "mission_id": self.current_mission.mission_id, "agent_id": self.agent_id, "agent_type": self.agent_type.value, }) try: findings = await self.execute() self.status = AgentStatus.COMPLETED except Exception as e: self.status = AgentStatus.FAILED findings = [{"error": str(e)}] completed_at = datetime.now(timezone.utc) result = AgentResult( mission_id=self.current_mission.mission_id, agent_id=self.agent_id, status=self.status, findings=findings, evidence_paths=self.evidence_paths, started_at=started_at.isoformat(), completed_at=completed_at.isoformat(), duration_seconds=(completed_at - started_at).total_seconds(), ) self._emit_receipt("offsec_agent_completed", asdict(result)) self.current_mission = None self.status = AgentStatus.IDLE return result @abstractmethod async def execute(self) -> List[Dict[str, Any]]: """ Core agent logic - must be implemented by subclasses. Returns: List of findings/results """ pass async def checkpoint(self, action: str, details: Dict[str, Any]) -> bool: """ Request approval for a risky action mid-execution. Use for actions that exceed initial mission scope or risk level. """ if self._action_exceeds_scope(action, details): approval = await self.orchestrator.request_approval( session_id=self.current_mission.mission_id, action_type=action, action_details=details, requested_by=self.did, approvers=self._get_approvers(action), ) return approval.approved return True ``` ### Step 4: Concrete Agent Implementations **File:** `engines/offsec_agents/agents/recon.py` ```python """ Recon Agent - Target enumeration and OSINT gathering. """ class ReconAgent(BaseAgent): """ Reconnaissance agent for target enumeration. Capabilities: - Port scanning (nmap integration) - Service identification - OSINT gathering - DNS enumeration - Technology fingerprinting """ def __init__(self, **kwargs): super().__init__( agent_type=AgentType.RECON, **kwargs ) async def execute(self) -> List[Dict[str, Any]]: """Execute reconnaissance mission.""" mission = self.current_mission target = mission.target findings = [] # Phase 1: DNS/Host resolution dns_results = await self._dns_enum(target) findings.extend(dns_results) # Phase 2: Port scanning if "port_scan" in mission.objectives: ports = await self._port_scan(target, mission.scope) findings.extend(ports) # Phase 3: Service identification if "service_id" in mission.objectives: services = await self._identify_services(target, ports) findings.extend(services) # Phase 4: Technology fingerprinting if "tech_fingerprint" in mission.objectives: tech = await self._fingerprint(target) findings.extend(tech) return findings async def _port_scan(self, target: str, scope: Dict) -> List[Dict]: """Run port scan within scope constraints.""" # Use CAI recon or nmap from cai_mcp_client import cai_recon depth = scope.get("scan_depth", "quick") result = await cai_recon(target=target, depth=depth) return self._parse_scan_results(result) ``` **File:** `engines/offsec_agents/agents/vuln.py` ```python """ Vuln Agent - Vulnerability scanning and assessment. """ class VulnAgent(BaseAgent): """ Vulnerability scanning agent. Capabilities: - CVE detection - Configuration auditing - Dependency vulnerability checks - Web application scanning - API security testing """ async def execute(self) -> List[Dict[str, Any]]: """Execute vulnerability scan mission.""" mission = self.current_mission target = mission.target findings = [] # Determine scan type scan_type = mission.scope.get("scan_type", "all") # Checkpoint for web scans (can be noisy) if scan_type in ["web", "all"]: approved = await self.checkpoint( "web_vuln_scan", {"target": target, "reason": "Web scanning may generate traffic"} ) if not approved: scan_type = "network" # Fall back to quieter scan # Use CAI vuln scanner from cai_mcp_client import cai_vuln_scan result = await cai_vuln_scan(target=target, scan_type=scan_type) # Convert to findings for vuln in result.get("vulnerabilities", []): finding = { "type": "vulnerability", "title": vuln.get("title"), "severity": vuln.get("severity"), "cvss": vuln.get("cvss_score"), "cve": vuln.get("cve_id"), "affected_asset": target, "evidence": vuln.get("evidence"), } findings.append(finding) # Auto-create vuln receipt for high+ severity if vuln.get("severity") in ["high", "critical"]: self._emit_vuln_discovery(finding) return findings ``` **File:** `engines/offsec_agents/agents/exploit.py` ```python """ Exploit Agent - Controlled exploitation and PoC execution. ⚠️ HIGH RISK - Requires multi-party approval """ class ExploitAgent(BaseAgent): """ Exploitation agent for controlled PoC execution. CRITICAL: All exploits require: 1. Valid engagement/incident context 2. Multi-party approval 3. Detailed evidence capture 4. Immediate rollback capability """ async def execute(self) -> List[Dict[str, Any]]: """Execute exploitation mission with strict controls.""" mission = self.current_mission # Verify engagement context if not mission.engagement_id and not mission.incident_id: raise NoEngagementContext( "Exploitation requires valid engagement or incident context" ) # Mandatory checkpoint before any exploit approved = await self.checkpoint( "exploit_execution", { "target": mission.target, "vulnerability": mission.scope.get("vuln_id"), "technique": mission.scope.get("technique"), "engagement_id": mission.engagement_id, } ) if not approved: return [{"status": "blocked", "reason": "Approval denied"}] # Execute with full evidence capture findings = [] try: # Start evidence capture self._start_capture() # Use CAI exploit from cai_mcp_client import cai_exploit result = await cai_exploit( target=mission.target, vulnerability=mission.scope.get("vuln_id"), ) findings.append({ "type": "exploit_result", "success": result.get("success"), "access_gained": result.get("access_level"), "evidence_path": self._stop_capture(), }) except Exception as e: findings.append({ "type": "exploit_error", "error": str(e), "evidence_path": self._stop_capture(), }) return findings ``` ### Step 5: CAI Integration Bridge **File:** `engines/offsec_agents/cai_bridge.py` ```python """ Bridge to CAI MCP tools for agent use. """ import asyncio from typing import Any, Dict, Optional class CAIBridge: """ Bridge to CAI security tools via MCP. Wraps CAI tools for use by OffSec agents: - cai_recon → ReconAgent - cai_vuln_scan → VulnAgent - cai_exploit → ExploitAgent - cai_ctf → CTFAgent - cai_analyze → AnalyzeAgent """ async def recon( self, target: str, depth: str = "standard" ) -> Dict[str, Any]: """Run CAI reconnaissance.""" # Call MCP tool result = await self._call_mcp("cai_recon", { "target": target, "depth": depth, }) return result async def vuln_scan( self, target: str, scan_type: str = "all" ) -> Dict[str, Any]: """Run CAI vulnerability scan.""" result = await self._call_mcp("cai_vuln_scan", { "target": target, "scan_type": scan_type, }) return result async def exploit( self, target: str, vulnerability: str ) -> Dict[str, Any]: """Run CAI exploitation (requires authorization).""" result = await self._call_mcp("cai_exploit", { "target": target, "vulnerability": vulnerability, }) return result async def analyze( self, file_path: str, analysis_type: str = "static" ) -> Dict[str, Any]: """Run CAI code/binary analysis.""" result = await self._call_mcp("cai_analyze", { "file_path": file_path, "analysis_type": analysis_type, }) return result async def ctf( self, challenge: str, category: Optional[str] = None ) -> Dict[str, Any]: """Run CAI CTF solver.""" params = {"challenge": challenge} if category: params["category"] = category result = await self._call_mcp("cai_ctf", params) return result ``` ### Step 6: Shield Portal Integration **File:** `vaultmesh-shield-portal/app/src/routes/agents.js` ```javascript /** * Agent management routes for Shield Portal */ const express = require('express'); const router = express.Router(); // GET /agents - List all registered agents router.get('/', async (req, res) => { const agents = await getAgentRegistry(); res.render('agents/list', { agents }); }); // GET /agents/:id - Agent detail view router.get('/:id', async (req, res) => { const agent = await getAgent(req.params.id); const missions = await getAgentMissions(req.params.id); res.render('agents/detail', { agent, missions }); }); // POST /agents/dispatch - Dispatch a new mission router.post('/dispatch', async (req, res) => { const { agent_type, target, objectives, scope } = req.body; const mission = await dispatchMission({ agent_type, target, objectives, scope, requested_by: req.user.did, }); res.redirect(`/missions/${mission.mission_id}`); }); // GET /missions - List all missions router.get('/missions', async (req, res) => { const missions = await getMissions({ status: req.query.status, agent_type: req.query.agent_type, }); res.render('missions/list', { missions }); }); // POST /missions/:id/approve - Approve a pending mission router.post('/missions/:id/approve', async (req, res) => { await approveMission(req.params.id, { approver: req.user.did, reason: req.body.reason, }); res.redirect(`/missions/${req.params.id}`); }); module.exports = router; ``` ### Step 7: CLI Commands **Add to:** `cli/vm_cli.py` ```python # ============================================================================ # Agent Commands # ============================================================================ @cli.group() def agent(): """OffSec Agent management.""" pass @agent.command("list") def agent_list(): """List registered agents.""" orchestrator = get_orchestrator() for agent in orchestrator.agents.values(): status_icon = "🟢" if agent.status == AgentStatus.IDLE else "🔴" click.echo(f"{status_icon} {agent.agent_id} ({agent.agent_type.value})") @agent.command("dispatch") @click.option("--type", "agent_type", required=True, type=click.Choice(["recon", "vuln", "exploit", "ctf", "analyze"])) @click.option("--target", required=True, help="Target for the mission") @click.option("--objective", multiple=True, help="Mission objectives") @click.option("--engagement", help="Link to engagement ID") @click.option("--incident", help="Link to incident ID") def agent_dispatch(agent_type, target, objective, engagement, incident): """Dispatch a mission to an agent.""" orchestrator = get_orchestrator() mission = Mission( mission_id=generate_mission_id(), agent_type=AgentType(agent_type), target=target, objectives=list(objective), scope={}, risk_level=RiskLevel.MEDIUM, requested_by=get_actor_did(), engagement_id=engagement, incident_id=incident, ) result = asyncio.run(orchestrator.dispatch(mission)) click.echo(f"[agent] Mission dispatched: {result}") @agent.command("status") @click.argument("mission_id") def agent_status(mission_id): """Check mission status.""" orchestrator = get_orchestrator() mission = orchestrator.missions.get(mission_id) if not mission: click.echo(f"Mission not found: {mission_id}") return click.echo(f"Mission: {mission.mission_id}") click.echo(f"Type: {mission.agent_type.value}") click.echo(f"Target: {mission.target}") click.echo(f"Status: {mission.status}") ``` --- ## Receipt Types New receipt types for agent operations: | Type | When Emitted | |------|--------------| | `offsec_mission_created` | Mission created and pending | | `offsec_mission_assigned` | Mission assigned to agent | | `offsec_mission_approved` | Mission approved to proceed | | `offsec_mission_rejected` | Mission approval rejected | | `offsec_agent_started` | Agent began execution | | `offsec_agent_checkpoint` | Agent requested mid-execution approval | | `offsec_agent_finding` | Agent discovered a finding | | `offsec_agent_completed` | Agent finished mission | | `offsec_agent_failed` | Agent encountered error | --- ## Directory Structure ``` engines/offsec_agents/ ├── __init__.py # Exports ├── base.py # BaseAgent class ├── orchestrator.py # AgentOrchestrator ├── mission.py # Mission dataclass ├── cai_bridge.py # CAI MCP integration ├── receipts.py # Receipt emission ├── agents/ │ ├── __init__.py │ ├── recon.py # ReconAgent │ ├── vuln.py # VulnAgent │ ├── exploit.py # ExploitAgent │ ├── ctf.py # CTFAgent │ ├── analyze.py # AnalyzeAgent │ ├── dfir.py # DFIRAgent │ ├── remediation.py # RemediationAgent │ └── threat_intel.py # ThreatIntelAgent └── tests/ ├── test_orchestrator.py ├── test_agents.py └── test_receipts.py ``` --- ## Implementation Phases ### Phase 1: Foundation (Week 1) - [ ] Create `engines/offsec_agents/` package structure - [ ] Implement `BaseAgent` abstract class - [ ] Implement `AgentOrchestrator` with basic dispatch - [ ] Implement `Mission` and `AgentResult` dataclasses - [ ] Add agent receipt types to OffSec scroll ### Phase 2: Core Agents (Week 2) - [ ] Implement `ReconAgent` with CAI bridge - [ ] Implement `VulnAgent` with CAI bridge - [ ] Implement `AnalyzeAgent` with CAI bridge - [ ] Add CLI commands for agent management - [ ] Write unit tests for each agent ### Phase 3: Approval Integration (Week 3) - [ ] Integrate with Console approval system - [ ] Implement checkpoint flow for risky actions - [ ] Add multi-party approval for high-risk agents - [ ] Update Shield Portal with approval UI ### Phase 4: Active Agents (Week 4) - [ ] Implement `ExploitAgent` with strict controls - [ ] Implement `CTFAgent` for CTF engagements - [ ] Add engagement context validation - [ ] Full evidence capture pipeline ### Phase 5: Response Agents (Week 5) - [ ] Implement `DFIRAgent` for incident response - [ ] Implement `ThreatIntelAgent` for IOC correlation - [ ] Implement `RemediationAgent` with change approval - [ ] Integration testing with real incidents ### Phase 6: Shield Portal UI (Week 6) - [ ] Agent dashboard page - [ ] Mission control view - [ ] Real-time status updates - [ ] Finding browser - [ ] Approval workflow UI --- ## Security Considerations 1. **Scope Enforcement**: Agents cannot operate outside defined scope 2. **Approval Gates**: Risky actions require human approval 3. **Evidence Trail**: All actions produce receipts 4. **Capability Limits**: Agents have DID-based capabilities 5. **Kill Switch**: Orchestrator can terminate any agent 6. **Rate Limiting**: Prevent runaway agent behavior 7. **Isolation**: Agents run in sandboxed environments --- ## Success Criteria - [ ] Agents can be dispatched via CLI and Portal - [ ] All agent actions emit receipts to OffSec scroll - [ ] High-risk actions require approval - [ ] CAI tools accessible to agents via bridge - [ ] Full audit trail for any mission - [ ] Guardian can anchor agent activity --- *Shield commands. Agents execute. Receipts prove.*