The Diamond Model of Intrusion Analysis provides a structured framework for analyzing cyber intrusions by examining four core features: Adversary, Capability, Infrastructure, and Victim. This skill covers implementing the Diamond Model programmatically to classify and correlate intrusion events, build activity threads linking related events, create activity-attack graphs, and generate pivot-ready intelligence from intrusion data.
networkx, stix2, graphviz librariesfrom dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import json
import uuid
@dataclass
class DiamondEvent:
adversary: str = ""
capability: str = ""
infrastructure: str = ""
victim: str = ""
timestamp: str = ""
phase: str = ""
result: str = ""
direction: str = ""
methodology: str = ""
confidence: int = 0
notes: str = ""
event_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
mitre_techniques: list = field(default_factory=list)
iocs: list = field(default_factory=list)
def to_dict(self):
return {
"event_id": self.event_id,
"adversary": self.adversary,
"capability": self.capability,
"infrastructure": self.infrastructure,
"victim": self.victim,
"timestamp": self.timestamp,
"phase": self.phase,
"result": self.result,
"direction": self.direction,
"methodology": self.methodology,
"confidence": self.confidence,
"mitre_techniques": self.mitre_techniques,
"iocs": self.iocs,
"notes": self.notes,
}
import networkx as nx
class DiamondAnalysis:
def __init__(self):
self.events = []
self.graph = nx.DiGraph()
def add_event(self, event: DiamondEvent):
self.events.append(event)
self.graph.add_node(event.event_id, **event.to_dict())
def build_activity_thread(self):
"""Link events chronologically into activity threads."""
sorted_events = sorted(self.events, key=lambda e: e.timestamp)
for i in range(len(sorted_events) - 1):
self.graph.add_edge(
sorted_events[i].event_id,
sorted_events[i + 1].event_id,
relationship="followed_by",
)
def find_pivots(self):
"""Find pivot points where events share infrastructure or capabilities."""
pivots = {"infrastructure": {}, "capability": {}, "adversary": {}}
for event in self.events:
if event.infrastructure:
pivots["infrastructure"].setdefault(event.infrastructure, []).append(event.event_id)
if event.capability:
pivots["capability"].setdefault(event.capability, []).append(event.event_id)
if event.adversary:
pivots["adversary"].setdefault(event.adversary, []).append(event.event_id)
return {
k: {pk: pv for pk, pv in v.items() if len(pv) > 1}
for k, v in pivots.items()
}
def generate_report(self):
return {
"total_events": len(self.events),
"unique_adversaries": len(set(e.adversary for e in self.events if e.adversary)),
"unique_victims": len(set(e.victim for e in self.events if e.victim)),
"unique_infrastructure": len(set(e.infrastructure for e in self.events if e.infrastructure)),
"pivots": self.find_pivots(),
"events": [e.to_dict() for e in self.events],
}