技能 硬件工程 Claroty OT 漏洞评估

Claroty OT 漏洞评估

v20260317
performing-ot-vulnerability-assessment-with-claroty
指导 OT 安全团队使用 Claroty xDome 进行漏洞评估,涵盖被动与安全主动资产发现、CVE/ICS-CERT 关联、风险打分与有限维护窗口下的修复优先级,并辅助合规证据生成。
获取技能
216 次下载
概览

Performing OT Vulnerability Assessment with Claroty

When to Use

  • When conducting scheduled OT vulnerability assessments per IEC 62443 or NERC CIP requirements
  • When deploying Claroty xDome for the first time and performing initial asset discovery and risk assessment
  • When correlating newly published ICS-CERT advisories against your OT asset inventory
  • When prioritizing OT vulnerability remediation with limited maintenance windows
  • When generating compliance evidence for CIP-010-4 vulnerability assessment requirements

Do not use for active vulnerability scanning of PLCs and safety systems (see performing-ot-network-security-assessment for passive approaches), for IT-only vulnerability management (see standard vulnerability scanners), or for penetration testing (see performing-ics-penetration-testing).

Prerequisites

  • Claroty xDome or CTD (Continuous Threat Detection) deployed with sensors on OT network
  • Network SPAN/TAP access for passive asset discovery
  • CISA ICS-CERT advisory subscription for vulnerability tracking
  • Asset inventory with firmware versions for all OT devices
  • Change management process for patch deployment during maintenance windows

Workflow

Step 1: Configure Asset Discovery and Vulnerability Correlation

Configure Claroty to perform passive and active-safe discovery to build complete asset inventory with firmware versions for vulnerability correlation.

#!/usr/bin/env python3
"""OT Vulnerability Assessment Manager.

Correlates OT asset inventory with ICS-CERT advisories and CVE data
to identify, prioritize, and track OT vulnerabilities. Designed to
integrate with Claroty xDome API or standalone operation.
"""

import json
import sys
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from datetime import datetime

import requests


@dataclass
class OTAsset:
    asset_id: str
    name: str
    vendor: str
    model: str
    firmware_version: str
    asset_type: str  # PLC, HMI, RTU, historian, switch, etc.
    purdue_level: str
    ip_address: str
    protocol: str
    criticality: str  # critical, high, medium, low
    zone: str


@dataclass
class OTVulnerability:
    vuln_id: str
    cve_id: str
    title: str
    severity: str  # critical, high, medium, low
    cvss_score: float
    affected_vendor: str
    affected_product: str
    affected_versions: str
    description: str
    ics_cert_advisory: str = ""
    remediation: str = ""
    patch_available: bool = False
    compensating_controls: str = ""


@dataclass
class RiskAssessment:
    asset: OTAsset
    vulnerability: OTVulnerability
    risk_score: float = 0.0
    risk_rating: str = ""
    exploitability: str = ""
    operational_impact: str = ""
    compensating_controls: list = field(default_factory=list)
    remediation_priority: int = 0


class OTVulnerabilityAssessment:
    """OT vulnerability assessment and prioritization engine."""

    def __init__(self):
        self.assets = []
        self.vulnerabilities = []
        self.risk_assessments = []

    def load_assets(self, assets_data):
        """Load asset inventory from Claroty export or manual inventory."""
        for a in assets_data:
            self.assets.append(OTAsset(**a))
        print(f"[*] Loaded {len(self.assets)} OT assets")

    def fetch_ics_advisories(self):
        """Fetch latest ICS-CERT advisories from CISA."""
        print("[*] Fetching ICS-CERT advisories from CISA...")
        try:
            # CISA Known Exploited Vulnerabilities catalog
            url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
            resp = requests.get(url, timeout=30)
            resp.raise_for_status()
            data = resp.json()

            ics_vulns = []
            for vuln in data.get("vulnerabilities", []):
                # Filter for ICS-relevant vendors
                ics_vendors = [
                    "siemens", "schneider", "rockwell", "honeywell",
                    "abb", "ge", "emerson", "yokogawa", "omron",
                    "mitsubishi", "phoenix", "moxa", "advantech",
                ]
                vendor = vuln.get("vendorProject", "").lower()
                if any(v in vendor for v in ics_vendors):
                    ics_vulns.append(vuln)

            print(f"  Found {len(ics_vulns)} ICS-relevant known exploited vulnerabilities")
            return ics_vulns

        except Exception as e:
            print(f"[WARN] Could not fetch advisories: {e}")
            return []

    def correlate_vulnerabilities(self):
        """Match vulnerabilities to assets based on vendor/model/firmware."""
        print("[*] Correlating vulnerabilities to assets...")

        for asset in self.assets:
            for vuln in self.vulnerabilities:
                if (vuln.affected_vendor.lower() in asset.vendor.lower() and
                    vuln.affected_product.lower() in asset.model.lower()):
                    # Check firmware version if specified
                    ra = RiskAssessment(asset=asset, vulnerability=vuln)
                    self._calculate_risk_score(ra)
                    self.risk_assessments.append(ra)

        print(f"  Correlated {len(self.risk_assessments)} asset-vulnerability pairs")

    def _calculate_risk_score(self, ra):
        """Calculate OT-specific risk score considering operational impact."""
        # Base score from CVSS
        base = ra.vulnerability.cvss_score

        # Criticality multiplier based on asset function
        criticality_weights = {
            "critical": 1.5,  # SIS, safety systems
            "high": 1.3,      # PLCs, primary control
            "medium": 1.0,    # HMIs, historians
            "low": 0.7,       # non-critical support systems
        }
        criticality = criticality_weights.get(ra.asset.criticality, 1.0)

        # Purdue level proximity factor (lower levels = higher risk)
        level_weights = {
            "Level 0-1": 1.5,
            "Level 2": 1.3,
            "Level 3": 1.0,
            "Level 3.5": 0.8,
            "Level 4": 0.6,
        }
        level_factor = level_weights.get(ra.asset.purdue_level, 1.0)

        # Network exposure reduction if compensating controls exist
        comp_reduction = 0.8 if ra.compensating_controls else 1.0

        ra.risk_score = round(base * criticality * level_factor * comp_reduction, 1)
        ra.risk_score = min(ra.risk_score, 10.0)

        if ra.risk_score >= 9.0:
            ra.risk_rating = "critical"
            ra.remediation_priority = 1
        elif ra.risk_score >= 7.0:
            ra.risk_rating = "high"
            ra.remediation_priority = 2
        elif ra.risk_score >= 4.0:
            ra.risk_rating = "medium"
            ra.remediation_priority = 3
        else:
            ra.risk_rating = "low"
            ra.remediation_priority = 4

    def generate_report(self):
        """Generate vulnerability assessment report."""
        # Sort by risk score descending
        sorted_ra = sorted(self.risk_assessments, key=lambda x: -x.risk_score)

        report = []
        report.append("=" * 70)
        report.append("OT VULNERABILITY ASSESSMENT REPORT")
        report.append(f"Date: {datetime.now().isoformat()}")
        report.append(f"Assets: {len(self.assets)} | Vulnerabilities: {len(self.vulnerabilities)}")
        report.append(f"Risk Assessments: {len(self.risk_assessments)}")
        report.append("=" * 70)

        for sev in ["critical", "high", "medium", "low"]:
            findings = [ra for ra in sorted_ra if ra.risk_rating == sev]
            if findings:
                report.append(f"\n--- {sev.upper()} RISK ({len(findings)}) ---")
                for ra in findings[:10]:
                    report.append(f"\n  Risk Score: {ra.risk_score}/10.0")
                    report.append(f"  Asset: {ra.asset.name} ({ra.asset.vendor} {ra.asset.model})")
                    report.append(f"  Zone: {ra.asset.zone} ({ra.asset.purdue_level})")
                    report.append(f"  CVE: {ra.vulnerability.cve_id} (CVSS: {ra.vulnerability.cvss_score})")
                    report.append(f"  Title: {ra.vulnerability.title}")
                    if ra.vulnerability.patch_available:
                        report.append(f"  Patch: Available - schedule for next maintenance window")
                    else:
                        report.append(f"  Patch: Not available - apply compensating controls")

        return "\n".join(report)

    def export_json(self, output_file):
        """Export assessment to JSON."""
        data = {
            "assessment_date": datetime.now().isoformat(),
            "asset_count": len(self.assets),
            "vulnerability_count": len(self.vulnerabilities),
            "risk_assessments": [
                {
                    "asset_name": ra.asset.name,
                    "asset_ip": ra.asset.ip_address,
                    "cve": ra.vulnerability.cve_id,
                    "risk_score": ra.risk_score,
                    "risk_rating": ra.risk_rating,
                    "priority": ra.remediation_priority,
                }
                for ra in sorted(self.risk_assessments, key=lambda x: -x.risk_score)
            ],
        }
        with open(output_file, "w") as f:
            json.dump(data, f, indent=2)


if __name__ == "__main__":
    assessment = OTVulnerabilityAssessment()
    advisories = assessment.fetch_ics_advisories()
    print(f"Fetched {len(advisories)} ICS advisories from CISA KEV catalog")

Key Concepts

Term Definition
Claroty xDome Cyber-physical systems protection platform providing asset discovery, vulnerability management, and threat detection for OT/IoT environments
Passive Discovery Identifying OT assets by analyzing network traffic without sending any packets, safe for production environments
Safe Active Query Querying OT devices using native industrial protocols at safe rates to collect detailed asset information without disrupting operations
OT Risk Score Risk rating that factors CVSS base score, asset criticality, Purdue level, and compensating controls for OT-appropriate prioritization
ICS-CERT Advisory CISA-published security advisories for industrial control system vulnerabilities with vendor-specific remediation guidance
Virtual Patching Deploying IPS/firewall rules to block exploitation of known vulnerabilities when firmware patches cannot be immediately applied

Tools & Systems

  • Claroty xDome: Comprehensive OT/IoT asset discovery, vulnerability management, and continuous threat detection platform
  • Claroty CTD: Continuous Threat Detection sensor for passive network monitoring in OT environments
  • CISA ICS-CERT: US government advisory service publishing ICS vulnerability notifications and mitigation guidance
  • Dragos Platform: Alternative OT security platform with asset visibility and vulnerability management capabilities
  • Nozomi Networks Guardian: OT monitoring platform with vulnerability correlation and risk scoring

Output Format

OT Vulnerability Assessment Report
=====================================
Tool: Claroty xDome / Manual Assessment
Date: YYYY-MM-DD
Assets Scanned: [N]

RISK SUMMARY:
  Critical Risk: [N] vulnerabilities on [N] assets
  High Risk: [N] vulnerabilities on [N] assets
  Medium Risk: [N] vulnerabilities on [N] assets
  Low Risk: [N] vulnerabilities on [N] assets

TOP RISKS:
  [Risk Score] [CVE-ID] on [Asset Name] ([Zone])
    Remediation: [Patch/Compensating Control]
    Timeline: [Next maintenance window / Immediate]
信息
Category 硬件工程
Name performing-ot-vulnerability-assessment-with-claroty
版本 v20260317
大小 10.95KB
更新时间 2026-03-18
语言