S4E provides a Server-Sent Events (SSE) endpoint for receiving real-time platform events. Use event streaming for live dashboards, monitoring integrations, and real-time alerting without polling.

SSE Endpoint

GET /api/events/stream

Authentication

Pass your API key as a query parameter (SSE connections cannot set custom headers in browsers):

GET /api/events/stream?token=YOUR_API_KEY

Or use the Authorization header for server-side clients:

curl -N "https://api.s4e.io/api/events/stream" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Accept: text/event-stream"

Filtering Events

Filter the stream to specific event types:

GET /api/events/stream?token=YOUR_API_KEY&events=finding.new,scan.completed
Parameter Type Description
token string API key for authentication.
events string Comma-separated event types to receive.
asset_id string Only receive events for a specific asset.

Event Format

Events follow the standard SSE specification:

event: finding.new
id: evt-8a3b9c2d
data: {"finding_id":"f-91827","title":"SQL Injection","severity":"critical","asset":{"id":"a-1001","name":"api.example.com"}}

event: scan.completed
id: evt-9b4c0d3e
data: {"scan_id":"sc-44021","asset_id":"a-1001","status":"completed","finding_count":21}

event: heartbeat
id: evt-hb-001
data: {"timestamp":"2026-04-28T12:01:00Z"}

SSE Fields

Field Description
event Event type name.
id Unique event identifier (for reconnection).
data JSON payload with event details.
retry Reconnection interval in milliseconds (sent once on connection).

Event Types

Event Description Data Fields
scan.created Scan created. scan_id, asset_id, scan_type
scan.progress Scan progress update. scan_id, progress, phase
scan.completed Scan finished. scan_id, finding_count, duration
scan.failed Scan failed. scan_id, error
finding.new New finding discovered. finding_id, title, severity
finding.status_changed Finding status updated. finding_id, old_status, new_status
action.triggered Action execution started. action_id, execution_id
action.completed Action execution finished. action_id, execution_id, status
playbook.started Playbook execution started. playbook_id, execution_id
playbook.completed Playbook execution finished. playbook_id, execution_id
system.alert System-level alert. alert_type, message
heartbeat Connection keepalive (every 30 seconds). timestamp

Connection Management

Auto-Reconnect

SSE clients should reconnect automatically when the connection drops. The id field enables resuming from where you left off.

Set the Last-Event-ID header on reconnection:

GET /api/events/stream
Last-Event-ID: evt-8a3b9c2d

S4E buffers events for up to 5 minutes. Events older than 5 minutes are not replayed.

Connection Limits

Tier Max Concurrent SSE Connections
Standard 5
Premium 20
Enterprise 100

JavaScript Client Example

const eventSource = new EventSource(
  "https://api.s4e.io/api/events/stream?token=YOUR_API_KEY&events=finding.new,scan.completed"
);

eventSource.addEventListener("finding.new", (event) => {
  const data = JSON.parse(event.data);
  console.log(`New finding: ${data.title} (${data.severity})`);
  updateDashboard(data);
});

eventSource.addEventListener("scan.completed", (event) => {
  const data = JSON.parse(event.data);
  console.log(`Scan ${data.scan_id} completed with ${data.finding_count} findings.`);
});

eventSource.onerror = (error) => {
  console.error("SSE connection error:", error);
};

Python Client Example

import json
import requests

def stream_events(api_key, events=None):
    params = {"token": api_key}
    if events:
        params["events"] = ",".join(events)

    response = requests.get(
        "https://api.s4e.io/api/events/stream",
        params=params,
        stream=True,
        headers={"Accept": "text/event-stream"}
    )

    event_type = None
    for line in response.iter_lines(decode_unicode=True):
        if line.startswith("event:"):
            event_type = line[7:].strip()
        elif line.startswith("data:"):
            data = json.loads(line[5:].strip())
            handle_event(event_type, data)
        elif line == "":
            event_type = None

def handle_event(event_type, data):
    if event_type == "finding.new":
        print(f"New finding: {data['title']} ({data['severity']})")
    elif event_type == "scan.completed":
        print(f"Scan {data['scan_id']} completed.")

stream_events("YOUR_API_KEY", events=["finding.new", "scan.completed"])

Use Cases

Use Case Events to Subscribe
Real-time dashboard finding.new, scan.progress, scan.completed
Alert pipeline finding.new (filtered by severity)
Action monitoring action.triggered, action.completed
Playbook observability playbook.started, playbook.completed
System health system.alert, heartbeat

Tip

For server-side integrations that need guaranteed delivery, use Webhooks instead of SSE. Webhooks include retry logic and delivery confirmation, while SSE connections may miss events during disconnections.

Next Steps