Malware IOC extraction is the process of analyzing malicious software to identify actionable indicators of compromise including file hashes, network indicators (C2 domains, IP addresses, URLs), registry modifications, mutex names, embedded strings, and behavioral artifacts. This skill covers static analysis with PE parsing and string extraction, dynamic analysis with sandbox detonation, automated IOC extraction using tools like YARA, and formatting results as STIX 2.1 indicators for sharing.
pefile, yara-python, oletools, stix2 librariesYARA is a pattern-matching tool for identifying and classifying malware. Rules consist of strings (text, hex, regex) and conditions that define matching logic. Rules can detect malware families, packers, exploit kits, and specific campaign tools.
import pefile
import hashlib
import os
def analyze_pe(filepath):
"""Extract IOCs from a PE file through static analysis."""
iocs = {"hashes": {}, "pe_info": {}, "strings": [], "imports": []}
# Calculate file hashes
with open(filepath, "rb") as f:
data = f.read()
iocs["hashes"]["md5"] = hashlib.md5(data).hexdigest()
iocs["hashes"]["sha1"] = hashlib.sha1(data).hexdigest()
iocs["hashes"]["sha256"] = hashlib.sha256(data).hexdigest()
iocs["hashes"]["file_size"] = len(data)
# Parse PE headers
try:
pe = pefile.PE(filepath)
iocs["hashes"]["imphash"] = pe.get_imphash()
iocs["pe_info"]["compilation_time"] = str(pe.FILE_HEADER.TimeDateStamp)
iocs["pe_info"]["machine_type"] = hex(pe.FILE_HEADER.Machine)
iocs["pe_info"]["subsystem"] = pe.OPTIONAL_HEADER.Subsystem
# Extract sections
iocs["pe_info"]["sections"] = []
for section in pe.sections:
iocs["pe_info"]["sections"].append({
"name": section.Name.decode("utf-8", errors="ignore").strip("\x00"),
"virtual_size": section.Misc_VirtualSize,
"raw_size": section.SizeOfRawData,
"entropy": section.get_entropy(),
"md5": section.get_hash_md5(),
})
# Extract imports
if hasattr(pe, "DIRECTORY_ENTRY_IMPORT"):
for entry in pe.DIRECTORY_ENTRY_IMPORT:
dll_name = entry.dll.decode("utf-8", errors="ignore")
functions = [
imp.name.decode("utf-8", errors="ignore")
for imp in entry.imports
if imp.name
]
iocs["imports"].append({"dll": dll_name, "functions": functions})
# Check for suspicious characteristics
iocs["pe_info"]["is_dll"] = pe.is_dll()
iocs["pe_info"]["is_driver"] = pe.is_driver()
iocs["pe_info"]["is_exe"] = pe.is_exe()
# Version info
if hasattr(pe, "VS_VERSIONINFO"):
for entry in pe.FileInfo:
for st in entry:
for item in st.entries.items():
key = item[0].decode("utf-8", errors="ignore")
val = item[1].decode("utf-8", errors="ignore")
iocs["pe_info"][f"version_{key}"] = val
pe.close()
except pefile.PEFormatError as e:
iocs["pe_info"]["error"] = str(e)
return iocs
import re
def extract_ioc_strings(filepath):
"""Extract IOC-relevant strings from binary file."""
patterns = {
"ipv4": re.compile(
r"\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}"
r"(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b"
),
"domain": re.compile(
r"\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+"
r"(?:com|net|org|io|ru|cn|tk|xyz|top|info|biz|cc|ws|pw)\b"
),
"url": re.compile(
r"https?://[^\s\"'<>]{5,200}"
),
"email": re.compile(
r"\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b"
),
"registry": re.compile(
r"(?:HKEY_[A-Z_]+|HKLM|HKCU|HKU|HKCR|HKCC)"
r"\\[\\a-zA-Z0-9_ .{}-]+"
),
"filepath_windows": re.compile(
r"[A-Z]:\\(?:[^\\/:*?\"<>|\r\n]+\\)*[^\\/:*?\"<>|\r\n]+"
),
"mutex": re.compile(
r"(?:Global\\|Local\\)[a-zA-Z0-9_\-{}.]{4,}"
),
"useragent": re.compile(
r"Mozilla/[45]\.0[^\"']{10,200}"
),
"bitcoin": re.compile(
r"\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b"
),
"pdb_path": re.compile(
r"[A-Z]:\\[^\"]{5,200}\.pdb"
),
}
with open(filepath, "rb") as f:
data = f.read()
# Extract ASCII strings (min length 4)
ascii_strings = re.findall(rb"[\x20-\x7e]{4,}", data)
# Extract Unicode strings
unicode_strings = re.findall(
rb"(?:[\x20-\x7e]\x00){4,}", data
)
all_strings = [s.decode("ascii", errors="ignore") for s in ascii_strings]
all_strings += [
s.decode("utf-16-le", errors="ignore") for s in unicode_strings
]
extracted = {category: set() for category in patterns}
for string in all_strings:
for category, pattern in patterns.items():
matches = pattern.findall(string)
for match in matches:
extracted[category].add(match)
# Convert sets to sorted lists
return {k: sorted(v) for k, v in extracted.items() if v}
import yara
def scan_with_yara(filepath, rules_path):
"""Scan file with YARA rules for malware classification."""
rules = yara.compile(filepath=rules_path)
matches = rules.match(filepath)
results = []
for match in matches:
result = {
"rule": match.rule,
"namespace": match.namespace,
"tags": match.tags,
"meta": match.meta,
"strings": [],
}
for offset, identifier, data in match.strings:
result["strings"].append({
"offset": hex(offset),
"identifier": identifier,
"data": data.hex() if len(data) < 100 else data[:100].hex() + "...",
})
results.append(result)
return results
# Example YARA rule for common malware indicators
SAMPLE_YARA_RULE = """
rule Suspicious_Network_Indicators {
meta:
description = "Detects suspicious network-related strings"
author = "CTI Analyst"
severity = "medium"
strings:
$ua1 = "Mozilla/5.0" ascii
$cmd1 = "cmd.exe /c" ascii nocase
$ps1 = "powershell" ascii nocase
$wget = "wget" ascii nocase
$curl = "curl" ascii nocase
$b64 = "base64" ascii nocase
$reg1 = "HKLM\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Run" ascii nocase
condition:
uint16(0) == 0x5A4D and
(2 of ($ua1, $cmd1, $ps1, $wget, $curl, $b64)) or $reg1
}
rule Packed_Binary {
meta:
description = "Detects potentially packed binary"
author = "CTI Analyst"
condition:
uint16(0) == 0x5A4D and
for any section in pe.sections : (
section.entropy >= 7.0
)
}
"""
from stix2 import (
Bundle, Indicator, Malware, Relationship,
File as STIXFile, DomainName, IPv4Address,
ObservedData,
)
from datetime import datetime
def create_stix_bundle(pe_iocs, string_iocs, yara_results, sample_name):
"""Create STIX 2.1 bundle from extracted IOCs."""
objects = []
# Create Malware SDO
malware = Malware(
name=sample_name,
is_family=False,
malware_types=["unknown"],
description=f"Malware sample analyzed: {pe_iocs['hashes']['sha256']}",
allow_custom=True,
)
objects.append(malware)
# File hash indicator
sha256 = pe_iocs["hashes"]["sha256"]
hash_indicator = Indicator(
name=f"Malware hash: {sha256[:16]}...",
pattern=f"[file:hashes.'SHA-256' = '{sha256}']",
pattern_type="stix",
valid_from=datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
indicator_types=["malicious-activity"],
allow_custom=True,
)
objects.append(hash_indicator)
objects.append(Relationship(
relationship_type="indicates",
source_ref=hash_indicator.id,
target_ref=malware.id,
))
# Network indicators from strings
for ip in string_iocs.get("ipv4", []):
if not ip.startswith(("10.", "172.", "192.168.", "127.")):
ip_indicator = Indicator(
name=f"C2 IP: {ip}",
pattern=f"[ipv4-addr:value = '{ip}']",
pattern_type="stix",
valid_from=datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
indicator_types=["malicious-activity"],
allow_custom=True,
)
objects.append(ip_indicator)
objects.append(Relationship(
relationship_type="indicates",
source_ref=ip_indicator.id,
target_ref=malware.id,
))
for domain in string_iocs.get("domain", []):
domain_indicator = Indicator(
name=f"C2 Domain: {domain}",
pattern=f"[domain-name:value = '{domain}']",
pattern_type="stix",
valid_from=datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
indicator_types=["malicious-activity"],
allow_custom=True,
)
objects.append(domain_indicator)
objects.append(Relationship(
relationship_type="indicates",
source_ref=domain_indicator.id,
target_ref=malware.id,
))
bundle = Bundle(objects=objects, allow_custom=True)
return bundle