Calseta’s alert source system is plugin-based. Each source is a Python class implementing the AlertSourceBase abstract base class. This guide walks through adding a new source from scratch.
Prerequisites
- Python 3.12+
- Familiarity with the source system’s alert format
- API documentation for the source system (commit to
docs/integrations/{name}/api_notes.md before writing code)
Step 1: Research the Source API
Before writing any integration code, document the source system’s API:
mkdir -p docs/integrations/your-source
Create docs/integrations/your-source/api_notes.md with:
- Alert payload structure and field names/types
- Webhook configuration method
- Signature verification format
- Severity levels and their mapping
- Rate limits
- Edge cases and known quirks
This step is mandatory. Don’t skip it — integration bugs almost always stem from incomplete understanding of the source API.
Step 2: Create the Plugin File
Create app/integrations/sources/your_source.py:
from app.integrations.sources.base import AlertSourceBase
from app.schemas.alerts import CalsetaAlert
from app.schemas.indicators import IndicatorExtract
class YourSource(AlertSourceBase):
source_name = "your_source"
display_name = "Your Source Display Name"
def validate_payload(self, raw: dict) -> bool:
"""Check if the payload is a valid alert from this source."""
# Return True if this payload belongs to this source
return "expected_field" in raw
def normalize(self, raw: dict) -> CalsetaAlert:
"""Map source-specific fields to Calseta's agent-native schema."""
return CalsetaAlert(
title=raw.get("alert_name", "Unknown Alert"),
severity=self._map_severity(raw.get("severity")),
occurred_at=raw.get("timestamp"),
source_name=self.source_name,
)
def extract_indicators(self, raw: dict) -> list[IndicatorExtract]:
"""Extract IOCs from the raw payload."""
indicators = []
if src_ip := raw.get("source_ip"):
indicators.append(
IndicatorExtract(type="ip", value=src_ip)
)
if user := raw.get("user_email"):
indicators.append(
IndicatorExtract(type="account", value=user)
)
return indicators
def _map_severity(self, raw_severity: str | None) -> str:
mapping = {
"low": "Low",
"medium": "Medium",
"high": "High",
"critical": "Critical",
}
return mapping.get((raw_severity or "").lower(), "Pending")
Step 3: Register the Plugin
Add your source to the source registry in app/integrations/sources/__init__.py:
from app.integrations.sources.your_source import YourSource
SOURCE_REGISTRY["your_source"] = YourSource()
Step 4: Add Webhook Signature Verification (Optional)
Override verify_webhook_signature() if your source supports it:
import hmac
import hashlib
def verify_webhook_signature(
self, payload: bytes, signature: str, secret: str
) -> bool:
expected = hmac.new(
secret.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
Always use hmac.compare_digest() for signature comparison — never ==. This prevents timing attacks.
Step 5: Add Indicator Field Mappings (Optional)
For indicators that aren’t extracted by your extract_indicators() method, add custom field mappings via the API:
curl -X POST http://localhost:8000/v1/indicator-field-mappings \
-H "Authorization: Bearer cai_your_api_key" \
-H "Content-Type: application/json" \
-d '{
"source_name": "your_source",
"extraction_target": "raw_payload",
"source_field": "network.destination.ip",
"indicator_type": "ip"
}'
Step 6: Write Tests
Create tests/test_your_source.py:
import pytest
from app.integrations.sources.your_source import YourSource
@pytest.fixture
def source():
return YourSource()
def test_validate_payload(source):
assert source.validate_payload({"expected_field": "value"})
assert not source.validate_payload({"wrong_field": "value"})
def test_normalize(source):
raw = {
"alert_name": "Test Alert",
"severity": "high",
"timestamp": "2025-01-15T10:28:00Z",
}
alert = source.normalize(raw)
assert alert.title == "Test Alert"
assert alert.severity == "High"
assert alert.severity == "High"
def test_extract_indicators(source):
raw = {
"source_ip": "192.168.1.1",
"user_email": "admin@company.com",
}
indicators = source.extract_indicators(raw)
assert len(indicators) == 2
assert indicators[0].type == "ip"
assert indicators[1].type == "account"
Step 7: Test End-to-End
curl -X POST http://localhost:8000/v1/ingest/your_source \
-H "Authorization: Bearer cai_your_api_key" \
-H "Content-Type: application/json" \
-d '{"expected_field": "value", "alert_name": "Test", "severity": "high"}'
Verify the alert appears in GET /v1/alerts with the correct normalized fields.
Indicator Types
Supported indicator types for extraction:
| Type | Example |
|---|
ip | 192.168.1.1, 2001:db8::1 |
domain | example.com |
hash_md5 | d41d8cd98f00b204e9800998ecf8427e |
hash_sha1 | da39a3ee5e6b4b0d3255bfef95601890afd80709 |
hash_sha256 | e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 |
url | https://malicious.example.com/payload |
email | phish@evil.com |
account | jsmith@company.com |
Contributing
Community-contributed source plugins are welcome. See Community Integrations for the contribution process.