Executors
Executors are plugins that extend the Opservant agent's capabilities. Each executor handles a specific type of action --- port scanning, configuration checks, service restarts, or any custom operation your environment requires.
Executor Interface
Every executor implements a Python class with three core methods:
from s4e_agent.executor import BaseExecutor, ExecutorResult
class PortScanExecutor(BaseExecutor):
"""Scans specified ports on a target host."""
name = "port-scanner"
version = "1.0.0"
def validate(self, params: dict) -> bool:
"""Validate input parameters before execution."""
required = ["target_host", "ports"]
for field in required:
if field not in params:
raise ValueError(f"Missing required parameter: {field}")
return True
def execute(self, params: dict, context: dict) -> ExecutorResult:
"""Run the executor logic."""
import socket
target = params["target_host"]
ports = params["ports"]
results = []
for port in ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
status = "open" if sock.connect_ex((target, port)) == 0 else "closed"
results.append({"port": port, "status": status})
sock.close()
return ExecutorResult(
status="completed",
output={"host": target, "ports": results}
)
def cleanup(self, context: dict):
"""Release resources after execution."""
pass
Method Reference
| Method | Required | Description |
|---|---|---|
validate() |
Yes | Checks parameters before execution. Raise ValueError on invalid input. |
execute() |
Yes | Contains the main logic. Returns an ExecutorResult. |
cleanup() |
No | Runs after execution (success or failure) to release resources. |
ExecutorResult
class ExecutorResult:
status: str # "completed", "failed", "partial"
output: dict # Structured output data
error: str # Error message (if status is "failed")
artifacts: list # Optional file paths or binary data
Executor Manifest
Each executor requires a manifest.json that declares metadata, supported actions, and resource requirements.
{
"name": "port-scanner",
"version": "1.0.0",
"description": "Scans TCP ports on target hosts.",
"author": "security-team",
"supported_actions": ["act-port-scan", "act-service-discovery"],
"entry_point": "port_scanner.PortScanExecutor",
"requirements": {
"python": ">=3.9",
"packages": []
},
"permissions": {
"network": {
"outbound": true,
"allowed_ports": [1, 65535]
},
"filesystem": {
"read": ["/etc/opservant/config.yaml"],
"write": ["/tmp/opservant/"]
}
},
"resource_limits": {
"max_memory_mb": 256,
"max_cpu_percent": 25,
"timeout_seconds": 300
}
}
Manifest Fields
| Field | Type | Required | Description |
|---|---|---|---|
name |
string | Yes | Unique executor name. |
version |
string | Yes | Semantic version. |
description |
string | Yes | Purpose of the executor. |
author |
string | No | Author or team name. |
supported_actions |
array | Yes | Action IDs this executor handles. |
entry_point |
string | Yes | Python module path to the executor class. |
requirements |
object | No | Python version and package dependencies. |
permissions |
object | Yes | Declared permissions (network, filesystem, secrets). |
resource_limits |
object | No | CPU, memory, and timeout constraints. |
Packaging an Executor
Organize your executor as a Python package:
my-executor/
manifest.json
my_executor/
__init__.py
executor.py
utils.py
requirements.txt
tests/
test_executor.py
Build the package:
Installing an Executor
Via Agent CLI
Via API
Upload the executor package to the platform, which distributes it to connected agents:
curl -X POST "https://api.s4e.io/api/agents/ag-001/executors" \
-H "Authorization: Bearer YOUR_API_KEY" \
-F "[email protected]"
Via Configuration
Add the executor to the agent's config.yaml:
Executor Lifecycle
- Init --- Agent loads the executor class and validates the manifest.
- Validate ---
validate()is called with task parameters. - Execute ---
execute()runs the main logic inside a sandboxed environment. - Report ---
ExecutorResultis sent back to the S4E platform. - Cleanup ---
cleanup()releases resources regardless of outcome.
Accessing Agent Context
The context dictionary provides information about the agent environment:
def execute(self, params: dict, context: dict) -> ExecutorResult:
agent_id = context["agent_id"]
network_info = context["network"]
vault = context["vault"]
api_key = vault.get_secret("firewall_api_key")
local_ip = network_info["local_ip"]
# ...
Context Fields
| Field | Type | Description |
|---|---|---|
context["agent_id"] |
string | The agent's unique identifier. |
context["network"] |
dict | Local network information (IP, subnet). |
context["vault"] |
object | Secrets manager for retrieving credentials. |
context["logger"] |
object | Structured logger instance. |
context["config"] |
dict | Agent configuration values. |
Error Handling
Return failures gracefully rather than raising unhandled exceptions:
def execute(self, params: dict, context: dict) -> ExecutorResult:
try:
result = self._do_scan(params)
return ExecutorResult(status="completed", output=result)
except ConnectionRefusedError as e:
return ExecutorResult(
status="failed",
error=f"Connection refused: {e}",
output={"partial_results": self._partial}
)
except Exception as e:
context["logger"].error(f"Unexpected error: {e}")
return ExecutorResult(status="failed", error=str(e))
Warning
Unhandled exceptions in execute() are caught by the agent runtime and reported as failed executions, but they do not produce structured output. Always catch and handle expected error conditions.
Testing Executors Locally
Use the executor test harness:
from s4e_agent.testing import ExecutorTestHarness
harness = ExecutorTestHarness("./manifest.json")
result = harness.run(
params={"target_host": "192.168.1.1", "ports": [22, 80, 443]},
mock_context={"agent_id": "ag-test-001"}
)
assert result.status == "completed"
assert len(result.output["ports"]) == 3
Best Practices
| Practice | Recommendation |
|---|---|
| Declare permissions | Only request the network and filesystem access you need. |
| Set resource limits | Prevent runaway executors with memory and timeout limits. |
| Log structured data | Use context["logger"] for consistent, searchable logs. |
| Handle partial results | Return what you can, even on failure. |
| Version semantically | Bump versions when changing behavior or parameters. |
| Write tests | Test with the harness before deploying to production agents. |
Next Steps
- Secure Execution for the sandboxing and permission model.
- Logging & Audit for structured logging from executors.
- Running the Agent for deployment instructions.