Use this skill when:
Do not use for bulk blocking decisions without analyst review — enrichment provides context, not definitive malicious/benign determination.
requests, vt-py, shodan librariesCreate a multi-source enrichment pipeline:
import requests
import vt
import shodan
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class EnrichmentResult:
ioc_value: str
ioc_type: str
virustotal: dict = field(default_factory=dict)
abuseipdb: dict = field(default_factory=dict)
shodan_data: dict = field(default_factory=dict)
greynoise: dict = field(default_factory=dict)
urlscan: dict = field(default_factory=dict)
misp_matches: list = field(default_factory=list)
risk_score: float = 0.0
disposition: str = "Unknown"
class IOCEnrichmentEngine:
def __init__(self, config):
self.vt_client = vt.Client(config["virustotal_key"])
self.shodan_api = shodan.Shodan(config["shodan_key"])
self.abuseipdb_key = config["abuseipdb_key"]
self.greynoise_key = config["greynoise_key"]
self.urlscan_key = config["urlscan_key"]
def enrich_ip(self, ip_address):
result = EnrichmentResult(ioc_value=ip_address, ioc_type="ip")
# VirusTotal
try:
vt_obj = self.vt_client.get_object(f"/ip_addresses/{ip_address}")
result.virustotal = {
"malicious": vt_obj.last_analysis_stats.get("malicious", 0),
"suspicious": vt_obj.last_analysis_stats.get("suspicious", 0),
"total_engines": sum(vt_obj.last_analysis_stats.values()),
"reputation": vt_obj.reputation,
"country": getattr(vt_obj, "country", "Unknown"),
"as_owner": getattr(vt_obj, "as_owner", "Unknown")
}
except Exception as e:
result.virustotal = {"error": str(e)}
# AbuseIPDB
try:
response = requests.get(
"https://api.abuseipdb.com/api/v2/check",
headers={"Key": self.abuseipdb_key, "Accept": "application/json"},
params={"ipAddress": ip_address, "maxAgeInDays": 90}
)
data = response.json()["data"]
result.abuseipdb = {
"confidence_score": data["abuseConfidenceScore"],
"total_reports": data["totalReports"],
"is_tor": data.get("isTor", False),
"usage_type": data.get("usageType", "Unknown"),
"isp": data.get("isp", "Unknown"),
"domain": data.get("domain", "Unknown")
}
except Exception as e:
result.abuseipdb = {"error": str(e)}
# Shodan
try:
host = self.shodan_api.host(ip_address)
result.shodan_data = {
"ports": host.get("ports", []),
"os": host.get("os", "Unknown"),
"organization": host.get("org", "Unknown"),
"isp": host.get("isp", "Unknown"),
"vulns": host.get("vulns", []),
"last_update": host.get("last_update", "Unknown")
}
except shodan.APIError:
result.shodan_data = {"status": "Not found in Shodan"}
# GreyNoise
try:
response = requests.get(
f"https://api.greynoise.io/v3/community/{ip_address}",
headers={"key": self.greynoise_key}
)
gn_data = response.json()
result.greynoise = {
"classification": gn_data.get("classification", "unknown"),
"noise": gn_data.get("noise", False),
"riot": gn_data.get("riot", False),
"name": gn_data.get("name", "Unknown")
}
except Exception as e:
result.greynoise = {"error": str(e)}
# Calculate composite risk score
result.risk_score = self._calculate_ip_risk(result)
result.disposition = self._determine_disposition(result.risk_score)
return result
def enrich_domain(self, domain):
result = EnrichmentResult(ioc_value=domain, ioc_type="domain")
# VirusTotal
try:
vt_obj = self.vt_client.get_object(f"/domains/{domain}")
result.virustotal = {
"malicious": vt_obj.last_analysis_stats.get("malicious", 0),
"suspicious": vt_obj.last_analysis_stats.get("suspicious", 0),
"reputation": vt_obj.reputation,
"creation_date": getattr(vt_obj, "creation_date", "Unknown"),
"registrar": getattr(vt_obj, "registrar", "Unknown"),
"categories": getattr(vt_obj, "categories", {})
}
except Exception as e:
result.virustotal = {"error": str(e)}
# URLScan.io
try:
response = requests.get(
f"https://urlscan.io/api/v1/search/?q=domain:{domain}",
headers={"API-Key": self.urlscan_key}
)
scans = response.json().get("results", [])
result.urlscan = {
"total_scans": len(scans),
"verdicts": [s.get("verdicts", {}).get("overall", {}).get("malicious", False)
for s in scans[:5]],
"last_scan": scans[0]["task"]["time"] if scans else "Never scanned"
}
except Exception as e:
result.urlscan = {"error": str(e)}
result.risk_score = self._calculate_domain_risk(result)
result.disposition = self._determine_disposition(result.risk_score)
return result
def enrich_hash(self, file_hash):
result = EnrichmentResult(ioc_value=file_hash, ioc_type="hash")
# VirusTotal
try:
vt_obj = self.vt_client.get_object(f"/files/{file_hash}")
result.virustotal = {
"malicious": vt_obj.last_analysis_stats.get("malicious", 0),
"suspicious": vt_obj.last_analysis_stats.get("suspicious", 0),
"undetected": vt_obj.last_analysis_stats.get("undetected", 0),
"total_engines": sum(vt_obj.last_analysis_stats.values()),
"type_description": getattr(vt_obj, "type_description", "Unknown"),
"popular_threat_name": getattr(vt_obj, "popular_threat_classification", {}).get(
"suggested_threat_label", "Unknown"
),
"sandbox_verdicts": getattr(vt_obj, "sandbox_verdicts", {}),
"first_seen": getattr(vt_obj, "first_submission_date", "Unknown")
}
except vt.APIError:
result.virustotal = {"status": "Not found in VirusTotal"}
# MalwareBazaar
try:
response = requests.post(
"https://mb-api.abuse.ch/api/v1/",
data={"query": "get_info", "hash": file_hash}
)
mb_data = response.json()
if mb_data["query_status"] == "ok":
entry = mb_data["data"][0]
result.abuseipdb = { # Reusing field for MalwareBazaar data
"malware_family": entry.get("signature", "Unknown"),
"tags": entry.get("tags", []),
"file_type": entry.get("file_type", "Unknown"),
"delivery_method": entry.get("delivery_method", "Unknown"),
"first_seen": entry.get("first_seen", "Unknown")
}
except Exception:
pass
result.risk_score = self._calculate_hash_risk(result)
result.disposition = self._determine_disposition(result.risk_score)
return result
def _calculate_ip_risk(self, result):
score = 0
vt = result.virustotal
abuse = result.abuseipdb
gn = result.greynoise
if isinstance(vt, dict) and "malicious" in vt:
score += min(vt["malicious"] * 3, 30)
if isinstance(abuse, dict) and "confidence_score" in abuse:
score += abuse["confidence_score"] * 0.3
if isinstance(gn, dict):
if gn.get("classification") == "malicious":
score += 20
elif gn.get("riot"):
score -= 20 # Known benign service
return min(max(score, 0), 100)
def _calculate_domain_risk(self, result):
score = 0
vt = result.virustotal
if isinstance(vt, dict) and "malicious" in vt:
score += min(vt["malicious"] * 4, 40)
if vt.get("reputation", 0) < -5:
score += 20
return min(max(score, 0), 100)
def _calculate_hash_risk(self, result):
score = 0
vt = result.virustotal
if isinstance(vt, dict) and "malicious" in vt:
total = vt.get("total_engines", 1)
detection_rate = vt["malicious"] / total if total > 0 else 0
score = detection_rate * 100
return min(max(score, 0), 100)
def _determine_disposition(self, risk_score):
if risk_score >= 70:
return "MALICIOUS — Block recommended"
elif risk_score >= 40:
return "SUSPICIOUS — Monitor and investigate"
elif risk_score >= 10:
return "LOW RISK — Likely benign, verify context"
else:
return "CLEAN — No indicators of malicious activity"
def close(self):
self.vt_client.close()
# Process multiple IOCs from an incident
iocs = [
{"type": "ip", "value": "185.234.218.50"},
{"type": "domain", "value": "evil-c2-server.com"},
{"type": "hash", "value": "a1b2c3d4e5f6..."},
{"type": "ip", "value": "45.33.32.156"},
]
config = {
"virustotal_key": "YOUR_VT_KEY",
"shodan_key": "YOUR_SHODAN_KEY",
"abuseipdb_key": "YOUR_ABUSEIPDB_KEY",
"greynoise_key": "YOUR_GREYNOISE_KEY",
"urlscan_key": "YOUR_URLSCAN_KEY"
}
engine = IOCEnrichmentEngine(config)
results = []
for ioc in iocs:
if ioc["type"] == "ip":
result = engine.enrich_ip(ioc["value"])
elif ioc["type"] == "domain":
result = engine.enrich_domain(ioc["value"])
elif ioc["type"] == "hash":
result = engine.enrich_hash(ioc["value"])
results.append(result)
time.sleep(15) # Rate limiting for free VT API
engine.close()
# Print summary
for r in results:
print(f"{r.ioc_type}: {r.ioc_value}")
print(f" Risk Score: {r.risk_score}")
print(f" Disposition: {r.disposition}")
print()
Create a Splunk custom search command for inline enrichment:
index=notable sourcetype="stash"
| table src_ip, dest_ip, file_hash, url
| lookup threat_intel_ip_lookup ip AS src_ip OUTPUT vt_score, abuse_score, disposition
| lookup threat_intel_hash_lookup hash AS file_hash OUTPUT vt_detections, malware_family
| eval combined_risk = coalesce(vt_score, 0) + coalesce(abuse_score, 0)
| where combined_risk > 50
| sort - combined_risk
def generate_enrichment_report(results):
report = []
report.append("IOC ENRICHMENT REPORT")
report.append("=" * 60)
for r in sorted(results, key=lambda x: x.risk_score, reverse=True):
report.append(f"\n{r.ioc_type.upper()}: {r.ioc_value}")
report.append(f" Risk Score: {r.risk_score}/100")
report.append(f" Disposition: {r.disposition}")
if r.virustotal and "malicious" in r.virustotal:
report.append(f" VirusTotal: {r.virustotal['malicious']}/{r.virustotal.get('total_engines', 'N/A')} malicious")
if r.abuseipdb and "confidence_score" in r.abuseipdb:
report.append(f" AbuseIPDB: {r.abuseipdb['confidence_score']}% confidence, {r.abuseipdb['total_reports']} reports")
if r.greynoise and "classification" in r.greynoise:
report.append(f" GreyNoise: {r.greynoise['classification']}")
if r.shodan_data and "ports" in r.shodan_data:
report.append(f" Shodan: Ports {r.shodan_data['ports']}, Org: {r.shodan_data.get('organization', 'N/A')}")
return "\n".join(report)
| Term | Definition |
|---|---|
| IOC Enrichment | Process of adding contextual intelligence to raw indicators from multiple external sources |
| Composite Risk Score | Weighted aggregate score combining multiple intelligence sources for disposition decisions |
| Rate Limiting | API request restrictions requiring throttling (VT free: 4/min, AbuseIPDB: 1000/day) |
| GreyNoise RIOT | Rule It Out — GreyNoise dataset of known benign services to reduce false positives |
| Passive DNS | Historical DNS resolution data showing domain-to-IP mappings over time |
| Defanging | Modifying IOCs for safe handling in reports (evil.com becomes evil[.]com) |
IOC ENRICHMENT REPORT — IR-2024-0450
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Enrichment Time: 2024-03-15 14:30 UTC
IOCs Processed: 4
IP: 185.234.218[.]50
Risk Score: 87/100 — MALICIOUS
VirusTotal: 14/90 engines flagged malicious
AbuseIPDB: 92% confidence, 347 reports
Shodan: Ports [22, 80, 443, 4444], Org: BulletProof Hosting
GreyNoise: malicious — known C2 infrastructure
Action: BLOCK immediately
DOMAIN: evil-c2-server[.]com
Risk Score: 73/100 — MALICIOUS
VirusTotal: 8/90 engines flagged
URLScan: 5 scans, 4 malicious verdicts
WHOIS: Registered 3 days ago via Namecheap
Action: BLOCK and add to DNS sinkhole
HASH: a1b2c3d4e5f6...
Risk Score: 91/100 — MALICIOUS
VirusTotal: 52/72 engines (Cobalt Strike Beacon)
MalwareBazaar: Tags: cobalt-strike, beacon, c2
Action: BLOCK hash, quarantine affected endpoints
IP: 45.33.32[.]156
Risk Score: 5/100 — CLEAN
VirusTotal: 0/90 engines
GreyNoise: benign — Shodan scanner
Action: No action required (known scanner)