Use this skill when:
Do not use this skill for fully automated blocking decisions without human review — enrichment automation should inform decisions, not execute blocks autonomously for high-impact actions.
Define the enrichment flow for each IOC type:
SIEM Alert → Extract IOCs → Classify Type → Route to enrichment functions
IP Address → AbuseIPDB + Shodan + VirusTotal IP + MISP
Domain → VirusTotal Domain + PassiveTotal + Shodan + MISP
URL → URLScan.io + VirusTotal URL + Google Safe Browse
File Hash → VirusTotal Files + MalwareBazaar + MISP
→ Aggregate results → Calculate confidence score → Update alert → Notify analyst
import requests
import time
from dataclasses import dataclass, field
from typing import Optional
RATE_LIMIT_DELAY = 0.25 # 4 requests/second for VT free tier
@dataclass
class EnrichmentResult:
ioc_value: str
ioc_type: str
vt_malicious: int = 0
vt_total: int = 0
abuse_confidence: int = 0
shodan_ports: list = field(default_factory=list)
misp_events: list = field(default_factory=list)
confidence_score: int = 0
def enrich_ip(ip: str, vt_key: str, abuse_key: str, shodan_key: str) -> EnrichmentResult:
result = EnrichmentResult(ip, "ip")
# VirusTotal IP lookup
vt_resp = requests.get(
f"https://www.virustotal.com/api/v3/ip_addresses/{ip}",
headers={"x-apikey": vt_key}
)
if vt_resp.status_code == 200:
stats = vt_resp.json()["data"]["attributes"]["last_analysis_stats"]
result.vt_malicious = stats.get("malicious", 0)
result.vt_total = sum(stats.values())
time.sleep(RATE_LIMIT_DELAY)
# AbuseIPDB
abuse_resp = requests.get(
"https://api.abuseipdb.com/api/v2/check",
headers={"Key": abuse_key, "Accept": "application/json"},
params={"ipAddress": ip, "maxAgeInDays": 90}
)
if abuse_resp.status_code == 200:
result.abuse_confidence = abuse_resp.json()["data"]["abuseConfidenceScore"]
# Calculate composite confidence score
result.confidence_score = min(
(result.vt_malicious / max(result.vt_total, 1)) * 60 +
(result.abuse_confidence / 100) * 40, 100
)
return result
def enrich_hash(sha256: str, vt_key: str) -> EnrichmentResult:
result = EnrichmentResult(sha256, "sha256")
vt_resp = requests.get(
f"https://www.virustotal.com/api/v3/files/{sha256}",
headers={"x-apikey": vt_key}
)
if vt_resp.status_code == 200:
stats = vt_resp.json()["data"]["attributes"]["last_analysis_stats"]
result.vt_malicious = stats.get("malicious", 0)
result.vt_total = sum(stats.values())
result.confidence_score = int((result.vt_malicious / max(result.vt_total, 1)) * 100)
return result
In Cortex XSOAR, create an enrichment playbook:
!vt-file-scan or !vt-ip-scan commands!abuseipdb-check-ip command!misp-search for cross-referencingimport time
from functools import wraps
def rate_limited(max_per_second):
min_interval = 1.0 / max_per_second
def decorator(func):
last_called = [0.0]
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_called[0]
wait = min_interval - elapsed
if wait > 0:
time.sleep(wait)
result = func(*args, **kwargs)
last_called[0] = time.time()
return result
return wrapper
return decorator
def retry_on_429(max_retries=3):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
response = func(*args, **kwargs)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
time.sleep(retry_after)
else:
return response
return wrapper
return decorator
Track pipeline performance weekly:
| Term | Definition |
|---|---|
| SOAR | Security Orchestration, Automation, and Response — platform for automating security workflows and integrating disparate tools |
| Enrichment Playbook | Automated workflow sequence that adds contextual intelligence to raw security events |
| Rate Limiting | API provider restrictions on request frequency (e.g., VT free: 4 requests/minute); pipelines must respect these limits |
| Composite Confidence Score | Single score aggregating signals from multiple enrichment sources using weighted formula |
| Fan-out Pattern | Parallel execution of multiple enrichment queries simultaneously to minimize total enrichment latency |