Do not use for active vulnerability scanning of PLCs and safety systems (see performing-ot-network-security-assessment for passive approaches), for IT-only vulnerability management (see standard vulnerability scanners), or for penetration testing (see performing-ics-penetration-testing).
Configure Claroty to perform passive and active-safe discovery to build complete asset inventory with firmware versions for vulnerability correlation.
#!/usr/bin/env python3
"""OT Vulnerability Assessment Manager.
Correlates OT asset inventory with ICS-CERT advisories and CVE data
to identify, prioritize, and track OT vulnerabilities. Designed to
integrate with Claroty xDome API or standalone operation.
"""
import json
import sys
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from datetime import datetime
import requests
@dataclass
class OTAsset:
asset_id: str
name: str
vendor: str
model: str
firmware_version: str
asset_type: str # PLC, HMI, RTU, historian, switch, etc.
purdue_level: str
ip_address: str
protocol: str
criticality: str # critical, high, medium, low
zone: str
@dataclass
class OTVulnerability:
vuln_id: str
cve_id: str
title: str
severity: str # critical, high, medium, low
cvss_score: float
affected_vendor: str
affected_product: str
affected_versions: str
description: str
ics_cert_advisory: str = ""
remediation: str = ""
patch_available: bool = False
compensating_controls: str = ""
@dataclass
class RiskAssessment:
asset: OTAsset
vulnerability: OTVulnerability
risk_score: float = 0.0
risk_rating: str = ""
exploitability: str = ""
operational_impact: str = ""
compensating_controls: list = field(default_factory=list)
remediation_priority: int = 0
class OTVulnerabilityAssessment:
"""OT vulnerability assessment and prioritization engine."""
def __init__(self):
self.assets = []
self.vulnerabilities = []
self.risk_assessments = []
def load_assets(self, assets_data):
"""Load asset inventory from Claroty export or manual inventory."""
for a in assets_data:
self.assets.append(OTAsset(**a))
print(f"[*] Loaded {len(self.assets)} OT assets")
def fetch_ics_advisories(self):
"""Fetch latest ICS-CERT advisories from CISA."""
print("[*] Fetching ICS-CERT advisories from CISA...")
try:
# CISA Known Exploited Vulnerabilities catalog
url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
resp = requests.get(url, timeout=30)
resp.raise_for_status()
data = resp.json()
ics_vulns = []
for vuln in data.get("vulnerabilities", []):
# Filter for ICS-relevant vendors
ics_vendors = [
"siemens", "schneider", "rockwell", "honeywell",
"abb", "ge", "emerson", "yokogawa", "omron",
"mitsubishi", "phoenix", "moxa", "advantech",
]
vendor = vuln.get("vendorProject", "").lower()
if any(v in vendor for v in ics_vendors):
ics_vulns.append(vuln)
print(f" Found {len(ics_vulns)} ICS-relevant known exploited vulnerabilities")
return ics_vulns
except Exception as e:
print(f"[WARN] Could not fetch advisories: {e}")
return []
def correlate_vulnerabilities(self):
"""Match vulnerabilities to assets based on vendor/model/firmware."""
print("[*] Correlating vulnerabilities to assets...")
for asset in self.assets:
for vuln in self.vulnerabilities:
if (vuln.affected_vendor.lower() in asset.vendor.lower() and
vuln.affected_product.lower() in asset.model.lower()):
# Check firmware version if specified
ra = RiskAssessment(asset=asset, vulnerability=vuln)
self._calculate_risk_score(ra)
self.risk_assessments.append(ra)
print(f" Correlated {len(self.risk_assessments)} asset-vulnerability pairs")
def _calculate_risk_score(self, ra):
"""Calculate OT-specific risk score considering operational impact."""
# Base score from CVSS
base = ra.vulnerability.cvss_score
# Criticality multiplier based on asset function
criticality_weights = {
"critical": 1.5, # SIS, safety systems
"high": 1.3, # PLCs, primary control
"medium": 1.0, # HMIs, historians
"low": 0.7, # non-critical support systems
}
criticality = criticality_weights.get(ra.asset.criticality, 1.0)
# Purdue level proximity factor (lower levels = higher risk)
level_weights = {
"Level 0-1": 1.5,
"Level 2": 1.3,
"Level 3": 1.0,
"Level 3.5": 0.8,
"Level 4": 0.6,
}
level_factor = level_weights.get(ra.asset.purdue_level, 1.0)
# Network exposure reduction if compensating controls exist
comp_reduction = 0.8 if ra.compensating_controls else 1.0
ra.risk_score = round(base * criticality * level_factor * comp_reduction, 1)
ra.risk_score = min(ra.risk_score, 10.0)
if ra.risk_score >= 9.0:
ra.risk_rating = "critical"
ra.remediation_priority = 1
elif ra.risk_score >= 7.0:
ra.risk_rating = "high"
ra.remediation_priority = 2
elif ra.risk_score >= 4.0:
ra.risk_rating = "medium"
ra.remediation_priority = 3
else:
ra.risk_rating = "low"
ra.remediation_priority = 4
def generate_report(self):
"""Generate vulnerability assessment report."""
# Sort by risk score descending
sorted_ra = sorted(self.risk_assessments, key=lambda x: -x.risk_score)
report = []
report.append("=" * 70)
report.append("OT VULNERABILITY ASSESSMENT REPORT")
report.append(f"Date: {datetime.now().isoformat()}")
report.append(f"Assets: {len(self.assets)} | Vulnerabilities: {len(self.vulnerabilities)}")
report.append(f"Risk Assessments: {len(self.risk_assessments)}")
report.append("=" * 70)
for sev in ["critical", "high", "medium", "low"]:
findings = [ra for ra in sorted_ra if ra.risk_rating == sev]
if findings:
report.append(f"\n--- {sev.upper()} RISK ({len(findings)}) ---")
for ra in findings[:10]:
report.append(f"\n Risk Score: {ra.risk_score}/10.0")
report.append(f" Asset: {ra.asset.name} ({ra.asset.vendor} {ra.asset.model})")
report.append(f" Zone: {ra.asset.zone} ({ra.asset.purdue_level})")
report.append(f" CVE: {ra.vulnerability.cve_id} (CVSS: {ra.vulnerability.cvss_score})")
report.append(f" Title: {ra.vulnerability.title}")
if ra.vulnerability.patch_available:
report.append(f" Patch: Available - schedule for next maintenance window")
else:
report.append(f" Patch: Not available - apply compensating controls")
return "\n".join(report)
def export_json(self, output_file):
"""Export assessment to JSON."""
data = {
"assessment_date": datetime.now().isoformat(),
"asset_count": len(self.assets),
"vulnerability_count": len(self.vulnerabilities),
"risk_assessments": [
{
"asset_name": ra.asset.name,
"asset_ip": ra.asset.ip_address,
"cve": ra.vulnerability.cve_id,
"risk_score": ra.risk_score,
"risk_rating": ra.risk_rating,
"priority": ra.remediation_priority,
}
for ra in sorted(self.risk_assessments, key=lambda x: -x.risk_score)
],
}
with open(output_file, "w") as f:
json.dump(data, f, indent=2)
if __name__ == "__main__":
assessment = OTVulnerabilityAssessment()
advisories = assessment.fetch_ics_advisories()
print(f"Fetched {len(advisories)} ICS advisories from CISA KEV catalog")
| Term | Definition |
|---|---|
| Claroty xDome | Cyber-physical systems protection platform providing asset discovery, vulnerability management, and threat detection for OT/IoT environments |
| Passive Discovery | Identifying OT assets by analyzing network traffic without sending any packets, safe for production environments |
| Safe Active Query | Querying OT devices using native industrial protocols at safe rates to collect detailed asset information without disrupting operations |
| OT Risk Score | Risk rating that factors CVSS base score, asset criticality, Purdue level, and compensating controls for OT-appropriate prioritization |
| ICS-CERT Advisory | CISA-published security advisories for industrial control system vulnerabilities with vendor-specific remediation guidance |
| Virtual Patching | Deploying IPS/firewall rules to block exploitation of known vulnerabilities when firmware patches cannot be immediately applied |
OT Vulnerability Assessment Report
=====================================
Tool: Claroty xDome / Manual Assessment
Date: YYYY-MM-DD
Assets Scanned: [N]
RISK SUMMARY:
Critical Risk: [N] vulnerabilities on [N] assets
High Risk: [N] vulnerabilities on [N] assets
Medium Risk: [N] vulnerabilities on [N] assets
Low Risk: [N] vulnerabilities on [N] assets
TOP RISKS:
[Risk Score] [CVE-ID] on [Asset Name] ([Zone])
Remediation: [Patch/Compensating Control]
Timeline: [Next maintenance window / Immediate]