Skip to main content
Calseta’s alert source system is plugin-based. Each source is a Python class implementing the AlertSourceBase abstract base class. This guide walks through adding a new source from scratch.

Prerequisites

  • Python 3.12+
  • Familiarity with the source system’s alert format
  • API documentation for the source system (commit to docs/integrations/{name}/api_notes.md before writing code)

Step 1: Research the Source API

Before writing any integration code, document the source system’s API:
mkdir -p docs/integrations/your-source
Create docs/integrations/your-source/api_notes.md with:
  • Alert payload structure and field names/types
  • Webhook configuration method
  • Signature verification format
  • Severity levels and their mapping
  • Rate limits
  • Edge cases and known quirks
This step is mandatory. Don’t skip it — integration bugs almost always stem from incomplete understanding of the source API.

Step 2: Create the Plugin File

Create app/integrations/sources/your_source.py:
from app.integrations.sources.base import AlertSourceBase
from app.schemas.alerts import CalsetaAlert
from app.schemas.indicators import IndicatorExtract


class YourSource(AlertSourceBase):
    source_name = "your_source"
    display_name = "Your Source Display Name"

    def validate_payload(self, raw: dict) -> bool:
        """Check if the payload is a valid alert from this source."""
        # Return True if this payload belongs to this source
        return "expected_field" in raw

    def normalize(self, raw: dict) -> CalsetaAlert:
        """Map source-specific fields to Calseta's agent-native schema."""
        return CalsetaAlert(
            title=raw.get("alert_name", "Unknown Alert"),
            severity=self._map_severity(raw.get("severity")),
            occurred_at=raw.get("timestamp"),
            source_name=self.source_name,
        )

    def extract_indicators(self, raw: dict) -> list[IndicatorExtract]:
        """Extract IOCs from the raw payload."""
        indicators = []
        if src_ip := raw.get("source_ip"):
            indicators.append(
                IndicatorExtract(type="ip", value=src_ip)
            )
        if user := raw.get("user_email"):
            indicators.append(
                IndicatorExtract(type="account", value=user)
            )
        return indicators

    def _map_severity(self, raw_severity: str | None) -> str:
        mapping = {
            "low": "Low",
            "medium": "Medium",
            "high": "High",
            "critical": "Critical",
        }
        return mapping.get((raw_severity or "").lower(), "Pending")

Step 3: Register the Plugin

Add your source to the source registry in app/integrations/sources/__init__.py:
from app.integrations.sources.your_source import YourSource

SOURCE_REGISTRY["your_source"] = YourSource()

Step 4: Add Webhook Signature Verification (Optional)

Override verify_webhook_signature() if your source supports it:
import hmac
import hashlib

def verify_webhook_signature(
    self, payload: bytes, signature: str, secret: str
) -> bool:
    expected = hmac.new(
        secret.encode(), payload, hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(signature, expected)
Always use hmac.compare_digest() for signature comparison — never ==. This prevents timing attacks.

Step 5: Add Indicator Field Mappings (Optional)

For indicators that aren’t extracted by your extract_indicators() method, add custom field mappings via the API:
curl -X POST http://localhost:8000/v1/indicator-field-mappings \
  -H "Authorization: Bearer cai_your_api_key" \
  -H "Content-Type: application/json" \
  -d '{
    "source_name": "your_source",
    "extraction_target": "raw_payload",
    "source_field": "network.destination.ip",
    "indicator_type": "ip"
  }'

Step 6: Write Tests

Create tests/test_your_source.py:
import pytest
from app.integrations.sources.your_source import YourSource


@pytest.fixture
def source():
    return YourSource()


def test_validate_payload(source):
    assert source.validate_payload({"expected_field": "value"})
    assert not source.validate_payload({"wrong_field": "value"})


def test_normalize(source):
    raw = {
        "alert_name": "Test Alert",
        "severity": "high",
        "timestamp": "2025-01-15T10:28:00Z",
    }
    alert = source.normalize(raw)
    assert alert.title == "Test Alert"
    assert alert.severity == "High"
    assert alert.severity == "High"


def test_extract_indicators(source):
    raw = {
        "source_ip": "192.168.1.1",
        "user_email": "admin@company.com",
    }
    indicators = source.extract_indicators(raw)
    assert len(indicators) == 2
    assert indicators[0].type == "ip"
    assert indicators[1].type == "account"

Step 7: Test End-to-End

curl -X POST http://localhost:8000/v1/ingest/your_source \
  -H "Authorization: Bearer cai_your_api_key" \
  -H "Content-Type: application/json" \
  -d '{"expected_field": "value", "alert_name": "Test", "severity": "high"}'
Verify the alert appears in GET /v1/alerts with the correct normalized fields.

Indicator Types

Supported indicator types for extraction:
TypeExample
ip192.168.1.1, 2001:db8::1
domainexample.com
hash_md5d41d8cd98f00b204e9800998ecf8427e
hash_sha1da39a3ee5e6b4b0d3255bfef95601890afd80709
hash_sha256e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
urlhttps://malicious.example.com/payload
emailphish@evil.com
accountjsmith@company.com

Contributing

Community-contributed source plugins are welcome. See Community Integrations for the contribution process.