Do not use for general DNS performance monitoring or DNS configuration auditing; use DNS health monitoring tools for those. For HTTP/HTTPS-based C2 detection, use network traffic analysis skills focused on web protocols.
DISCLAIMER: DNS tunneling tools referenced in this skill (Iodine, dnscat2, dns2tcp) are dual-use. They have legitimate uses (bypassing captive portals, security research) and malicious uses (C2 channels, exfiltration). Only deploy detection in networks you are authorized to monitor. Testing tunneling tools requires explicit authorization.
numpy, scikit-learn, pandas, tldextract, and dnspython
Ingest DNS traffic from network sensors and parse into analyzable format:
# Zeek - extract dns.log fields
# Default Zeek dns.log columns:
# ts uid id.orig_h id.orig_p id.resp_h id.resp_p proto trans_id rtt query
# qclass qclass_name qtype qtype_name rcode rcode_name AA TC RD RA Z
# answers TTLs rejected
# Filter for potentially suspicious record types
cat dns.log | zeek-cut ts id.orig_h query qtype_name answers rcode_name | \
grep -E "TXT|NULL|CNAME|MX" > suspicious_qtypes.log
# Extract unique queried domains
cat dns.log | zeek-cut query | sort -u > unique_domains.txt
# Suricata EVE JSON - extract DNS events
cat eve.json | jq -r 'select(.event_type=="dns") |
[.timestamp, .src_ip, .dns.rrname, .dns.rrtype, .dns.rcode] |
@tsv' > dns_events.tsv
# tshark - extract DNS queries from pcap
tshark -r capture.pcap -T fields \
-e frame.time -e ip.src -e ip.dst \
-e dns.qry.name -e dns.qry.type \
-e dns.resp.type -e dns.txt \
-Y "dns" > dns_queries.tsv
# Count queries per domain (find high-volume destinations)
cat dns.log | zeek-cut query | \
awk -F. '{print $(NF-1)"."$NF}' | \
sort | uniq -c | sort -rn | head -50
Calculate entropy of subdomain strings to identify encoded/encrypted data:
#!/usr/bin/env python3
"""Shannon entropy analysis for DNS query subdomains."""
import math
import csv
import sys
from collections import Counter
try:
import tldextract
HAS_TLDEXTRACT = True
except ImportError:
HAS_TLDEXTRACT = False
def shannon_entropy(data):
"""Calculate Shannon entropy of a string (bits per character)."""
if not data:
return 0.0
counter = Counter(data)
length = len(data)
entropy = -sum(
(count / length) * math.log2(count / length)
for count in counter.values()
)
return entropy
def extract_subdomain(fqdn):
"""Extract the subdomain portion from a fully qualified domain name."""
if HAS_TLDEXTRACT:
ext = tldextract.extract(fqdn)
if ext.subdomain:
return ext.subdomain, f"{ext.domain}.{ext.suffix}"
return "", f"{ext.domain}.{ext.suffix}"
else:
# Fallback: assume last two labels are domain + TLD
parts = fqdn.rstrip(".").split(".")
if len(parts) > 2:
return ".".join(parts[:-2]), ".".join(parts[-2:])
return "", fqdn
def analyze_dns_entropy(queries, entropy_threshold=3.5, length_threshold=30):
"""
Analyze DNS queries for tunneling indicators using entropy.
Thresholds (tunable per environment):
- entropy_threshold: Shannon entropy above this flags as suspicious (3.5-4.0 typical)
- length_threshold: Subdomain length above this flags as suspicious (30-50 chars)
Returns list of flagged queries with scores.
"""
results = []
for query_record in queries:
fqdn = query_record.get("query", "").lower().rstrip(".")
if not fqdn:
continue
subdomain, base_domain = extract_subdomain(fqdn)
if not subdomain:
continue
# Remove dots from subdomain for entropy calculation
subdomain_flat = subdomain.replace(".", "")
if not subdomain_flat:
continue
entropy = shannon_entropy(subdomain_flat)
length = len(subdomain_flat)
label_count = subdomain.count(".") + 1
# Scoring: higher = more suspicious
score = 0.0
flags = []
if entropy > entropy_threshold:
score += (entropy - entropy_threshold) * 25
flags.append(f"high_entropy:{entropy:.2f}")
if length > length_threshold:
score += (length - length_threshold) * 0.5
flags.append(f"long_subdomain:{length}")
if label_count > 4:
score += label_count * 2
flags.append(f"many_labels:{label_count}")
# Check for hex/base32/base64 encoding patterns
hex_ratio = sum(1 for c in subdomain_flat if c in "0123456789abcdef") / max(len(subdomain_flat), 1)
if hex_ratio > 0.85 and length > 20:
score += 20
flags.append("hex_encoded")
b32_chars = set("abcdefghijklmnopqrstuvwxyz234567")
b32_ratio = sum(1 for c in subdomain_flat if c in b32_chars) / max(len(subdomain_flat), 1)
if b32_ratio > 0.95 and length > 20:
score += 15
flags.append("base32_encoded")
# Only report if at least one flag triggered
if flags:
results.append({
"fqdn": fqdn,
"subdomain": subdomain,
"base_domain": base_domain,
"entropy": round(entropy, 4),
"subdomain_length": length,
"label_count": label_count,
"score": round(score, 2),
"flags": flags,
"src_ip": query_record.get("src_ip", ""),
"timestamp": query_record.get("timestamp", ""),
"qtype": query_record.get("qtype", ""),
})
# Sort by score descending
results.sort(key=lambda x: x["score"], reverse=True)
return results
# Thresholds for known tunneling tools
TOOL_SIGNATURES = {
"iodine": {
"subdomain_pattern": r"^[a-z0-9]{50,}$", # Long hex-like subdomains
"common_qtypes": ["NULL", "TXT", "CNAME", "MX", "A"],
"typical_entropy": (3.8, 4.2),
"description": "Iodine DNS tunnel - IPv4 over DNS, uses NULL/TXT records",
},
"dnscat2": {
"subdomain_pattern": r"^dnscat\.|^[a-f0-9]{16,}",
"common_qtypes": ["TXT", "CNAME", "MX", "A"],
"typical_entropy": (3.5, 4.5),
"description": "dnscat2 encrypted C2 channel over DNS",
},
"dns2tcp": {
"subdomain_pattern": r"^[a-z2-7]{20,}", # Base32 encoding
"common_qtypes": ["TXT", "KEY"],
"typical_entropy": (3.6, 4.0),
"description": "dns2tcp tunnel - TCP over DNS using TXT/KEY records",
},
"cobalt_strike_dns": {
"subdomain_pattern": r"^[a-f0-9]{12,}\.",
"common_qtypes": ["A", "AAAA", "TXT"],
"typical_entropy": (3.2, 4.0),
"description": "Cobalt Strike DNS beacon - encoded commands in A/TXT records",
},
}
def print_entropy_report(results, top_n=25):
"""Print formatted entropy analysis report."""
print("=" * 80)
print(" DNS ENTROPY ANALYSIS - TUNNELING DETECTION")
print("=" * 80)
print(f" Suspicious queries found: {len(results)}")
print()
if not results:
print(" No suspicious queries detected.")
return
# Group by base domain
domain_groups = {}
for r in results:
bd = r["base_domain"]
if bd not in domain_groups:
domain_groups[bd] = {"count": 0, "max_entropy": 0, "max_score": 0, "queries": []}
domain_groups[bd]["count"] += 1
domain_groups[bd]["max_entropy"] = max(domain_groups[bd]["max_entropy"], r["entropy"])
domain_groups[bd]["max_score"] = max(domain_groups[bd]["max_score"], r["score"])
domain_groups[bd]["queries"].append(r)
# Sort domains by total suspicious query count
sorted_domains = sorted(domain_groups.items(), key=lambda x: x[1]["count"], reverse=True)
print(" TOP SUSPICIOUS BASE DOMAINS")
print(" " + "-" * 76)
print(f" {'Domain':<35} {'Queries':>8} {'Max Ent':>8} {'Max Score':>10}")
print(" " + "-" * 76)
for domain, data in sorted_domains[:20]:
print(f" {domain:<35} {data['count']:>8} {data['max_entropy']:>8.3f} {data['max_score']:>10.1f}")
print()
print(f" TOP {top_n} HIGHEST-SCORING QUERIES")
print(" " + "-" * 76)
for r in results[:top_n]:
print(f" Score: {r['score']:.1f} Entropy: {r['entropy']:.3f} Len: {r['subdomain_length']}")
print(f" FQDN: {r['fqdn'][:75]}")
print(f" Flags: {', '.join(r['flags'])}")
print(f" Source: {r['src_ip']} Type: {r['qtype']}")
print()
Identify C2 commands or staged payloads delivered via DNS TXT records:
#!/usr/bin/env python3
"""DNS TXT record payload detection for C2 command delivery."""
import base64
import re
import math
from collections import Counter
def shannon_entropy(data):
"""Calculate Shannon entropy."""
if not data:
return 0.0
counter = Counter(data)
length = len(data)
return -sum((c / length) * math.log2(c / length) for c in counter.values())
def analyze_txt_record(txt_data, domain=""):
"""
Analyze a DNS TXT record response for C2 payload indicators.
Indicators:
- High entropy content (encoded/encrypted payloads)
- Base64-encoded executable content
- PowerShell stager patterns
- Unusually large TXT records (>255 bytes per string, multiple strings)
- Known C2 framework patterns
"""
findings = {
"domain": domain,
"txt_length": len(txt_data),
"entropy": shannon_entropy(txt_data),
"suspicious": False,
"indicators": [],
"decoded_preview": None,
}
# Length check - legitimate TXT records are typically short (SPF, DKIM, verification)
if len(txt_data) > 500:
findings["indicators"].append({
"type": "oversized_txt",
"detail": f"TXT record length {len(txt_data)} exceeds normal threshold (500)",
"severity": "medium",
})
# High entropy - suggests encoded/encrypted payload
if findings["entropy"] > 4.5 and len(txt_data) > 100:
findings["indicators"].append({
"type": "high_entropy_payload",
"detail": f"Entropy {findings['entropy']:.3f} suggests encoded data",
"severity": "high",
})
# Base64 detection
b64_pattern = re.compile(r'^[A-Za-z0-9+/]{40,}={0,2}$')
if b64_pattern.match(txt_data.strip()):
findings["indicators"].append({
"type": "base64_encoded",
"detail": "Content matches base64 pattern",
"severity": "high",
})
try:
decoded = base64.b64decode(txt_data.strip())
preview = decoded[:200]
# Check for PE header (MZ)
if preview[:2] == b'MZ':
findings["indicators"].append({
"type": "pe_executable",
"detail": "Decoded base64 contains PE executable (MZ header)",
"severity": "critical",
})
# Check for ELF header
if preview[:4] == b'\x7fELF':
findings["indicators"].append({
"type": "elf_executable",
"detail": "Decoded base64 contains ELF executable",
"severity": "critical",
})
# Check for PowerShell patterns
decoded_str = decoded.decode("utf-8", errors="ignore")
ps_patterns = [
r"Invoke-Expression",
r"IEX\s*\(",
r"New-Object\s+System\.Net",
r"DownloadString",
r"FromBase64String",
r"Start-Process",
r"\-enc\s",
r"powershell\s.*\-e\s",
]
for pattern in ps_patterns:
if re.search(pattern, decoded_str, re.IGNORECASE):
findings["indicators"].append({
"type": "powershell_stager",
"detail": f"Decoded content contains PowerShell pattern: {pattern}",
"severity": "critical",
})
break
findings["decoded_preview"] = repr(preview[:100])
except Exception:
pass
# Known C2 TXT patterns
cobalt_pattern = re.compile(r'^[a-f0-9]{32,}$', re.IGNORECASE)
if cobalt_pattern.match(txt_data.strip()):
findings["indicators"].append({
"type": "hex_encoded_payload",
"detail": "Pure hex string in TXT record - possible Cobalt Strike beacon config",
"severity": "high",
})
# Multiple concatenated base64 blocks (common in staged delivery)
b64_blocks = re.findall(r'[A-Za-z0-9+/]{50,}={0,2}', txt_data)
if len(b64_blocks) > 3:
findings["indicators"].append({
"type": "multi_block_payload",
"detail": f"{len(b64_blocks)} base64 blocks found - possible staged payload",
"severity": "high",
})
# Check for known legitimate TXT patterns to reduce false positives
legitimate_patterns = [
r'^v=spf1\s', # SPF record
r'^v=DKIM1', # DKIM record
r'^v=DMARC1', # DMARC record
r'^google-site-verification=',
r'^MS=', # Microsoft domain verification
r'^docusign=',
r'^apple-domain-verification=',
r'^facebook-domain-verification=',
r'^_globalsign-domain-verification=',
]
for pattern in legitimate_patterns:
if re.match(pattern, txt_data, re.IGNORECASE):
findings["indicators"] = []
findings["legitimate"] = True
return findings
findings["suspicious"] = len(findings["indicators"]) > 0
return findings
def analyze_txt_records_bulk(records):
"""Analyze a batch of DNS TXT records."""
results = []
for record in records:
domain = record.get("domain", record.get("query", ""))
txt_data = record.get("txt", record.get("answer", ""))
if txt_data:
finding = analyze_txt_record(txt_data, domain)
if finding["suspicious"]:
results.append(finding)
results.sort(
key=lambda x: max((i.get("severity_score", 0) for i in x["indicators"]),
default=0),
reverse=True,
)
return results
Train a classifier to distinguish DGA-generated domains from legitimate ones:
#!/usr/bin/env python3
"""
DGA domain classification using character-level feature extraction and ML.
Features extracted per domain:
- Shannon entropy of the domain string
- Domain length
- Digit ratio, consonant ratio, vowel ratio
- Longest consecutive consonant sequence
- N-gram frequency deviation from English
- Number of distinct characters
- Presence of dictionary words
"""
import math
import re
import string
from collections import Counter
import numpy as np
try:
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler
HAS_SKLEARN = True
except ImportError:
HAS_SKLEARN = False
# English language character bigram frequencies (normalized, top bigrams)
# Source: Peter Norvig's English letter frequency analysis
ENGLISH_BIGRAMS = {
"th": 0.0356, "he": 0.0307, "in": 0.0243, "er": 0.0205,
"an": 0.0199, "re": 0.0185, "on": 0.0176, "at": 0.0149,
"en": 0.0145, "nd": 0.0135, "ti": 0.0134, "es": 0.0134,
"or": 0.0128, "te": 0.0120, "of": 0.0117, "ed": 0.0117,
"is": 0.0113, "it": 0.0112, "al": 0.0109, "ar": 0.0107,
"st": 0.0105, "to": 0.0104, "nt": 0.0104, "ng": 0.0095,
"se": 0.0093, "ha": 0.0093, "as": 0.0087, "ou": 0.0087,
"io": 0.0083, "le": 0.0083, "ve": 0.0083, "co": 0.0079,
"me": 0.0079, "de": 0.0076, "hi": 0.0076, "ri": 0.0073,
"ro": 0.0073, "ic": 0.0070, "ne": 0.0069, "ea": 0.0069,
}
VOWELS = set("aeiou")
CONSONANTS = set("bcdfghjklmnpqrstvwxyz")
def extract_domain_features(domain):
"""Extract numerical features from a domain name for ML classification."""
domain = domain.lower().strip(".")
# Remove TLD for analysis (focus on SLD + subdomain)
parts = domain.split(".")
if len(parts) > 1:
analysis_str = ".".join(parts[:-1]) # Drop TLD
else:
analysis_str = domain
# Remove dots for character analysis
flat = analysis_str.replace(".", "")
length = len(flat)
if length == 0:
return None
# 1. Shannon entropy
entropy = 0.0
counter = Counter(flat)
for count in counter.values():
p = count / length
entropy -= p * math.log2(p)
# 2. Character ratios
digit_count = sum(1 for c in flat if c.isdigit())
vowel_count = sum(1 for c in flat if c in VOWELS)
consonant_count = sum(1 for c in flat if c in CONSONANTS)
special_count = sum(1 for c in flat if c == '-')
digit_ratio = digit_count / length
vowel_ratio = vowel_count / length
consonant_ratio = consonant_count / length
# 3. Longest consecutive consonant run
max_consonant_run = 0
current_run = 0
for c in flat:
if c in CONSONANTS:
current_run += 1
max_consonant_run = max(max_consonant_run, current_run)
else:
current_run = 0
# 4. Distinct character count and ratio
distinct_chars = len(set(flat))
distinct_ratio = distinct_chars / length
# 5. Bigram frequency deviation from English
bigrams = [flat[i:i+2] for i in range(len(flat) - 1)]
if bigrams:
english_score = sum(
ENGLISH_BIGRAMS.get(bg, 0) for bg in bigrams
) / len(bigrams)
else:
english_score = 0
# 6. Number of labels (dots + 1)
label_count = len(parts)
# 7. Hex character ratio (common in DGA)
hex_chars = set("0123456789abcdef")
hex_ratio = sum(1 for c in flat if c in hex_chars) / length
# 8. Digit-letter transitions (DGA domains mix digits and letters)
transitions = 0
for i in range(1, len(flat)):
if (flat[i].isdigit() != flat[i-1].isdigit()):
transitions += 1
transition_ratio = transitions / max(length - 1, 1)
# 9. Repeated character ratio
if length > 1:
repeats = sum(1 for i in range(1, len(flat)) if flat[i] == flat[i-1])
repeat_ratio = repeats / (length - 1)
else:
repeat_ratio = 0
return {
"domain": domain,
"length": length,
"entropy": round(entropy, 4),
"digit_ratio": round(digit_ratio, 4),
"vowel_ratio": round(vowel_ratio, 4),
"consonant_ratio": round(consonant_ratio, 4),
"max_consonant_run": max_consonant_run,
"distinct_chars": distinct_chars,
"distinct_ratio": round(distinct_ratio, 4),
"english_bigram_score": round(english_score, 6),
"label_count": label_count,
"hex_ratio": round(hex_ratio, 4),
"transition_ratio": round(transition_ratio, 4),
"repeat_ratio": round(repeat_ratio, 4),
"special_count": special_count,
}
FEATURE_COLUMNS = [
"length", "entropy", "digit_ratio", "vowel_ratio", "consonant_ratio",
"max_consonant_run", "distinct_chars", "distinct_ratio",
"english_bigram_score", "label_count", "hex_ratio",
"transition_ratio", "repeat_ratio", "special_count",
]
def features_to_vector(features):
"""Convert feature dict to numpy array."""
return np.array([features[col] for col in FEATURE_COLUMNS])
def train_dga_classifier(legitimate_domains, dga_domains, model_type="random_forest"):
"""
Train a DGA classifier on labeled domain lists.
Args:
legitimate_domains: list of known-good domain strings
dga_domains: list of known DGA domain strings
model_type: 'random_forest' or 'gradient_boosting'
Returns:
trained model, scaler, and evaluation metrics
"""
if not HAS_SKLEARN:
print("[ERROR] scikit-learn required: pip install scikit-learn")
return None, None, None
# Extract features
X_legit = []
X_dga = []
for d in legitimate_domains:
feats = extract_domain_features(d)
if feats:
X_legit.append(features_to_vector(feats))
for d in dga_domains:
feats = extract_domain_features(d)
if feats:
X_dga.append(features_to_vector(feats))
if not X_legit or not X_dga:
print("[ERROR] Insufficient feature data")
return None, None, None
X = np.vstack([np.array(X_legit), np.array(X_dga)])
y = np.array([0] * len(X_legit) + [1] * len(X_dga))
# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
X_scaled, y, test_size=0.2, random_state=42, stratify=y
)
# Train model
if model_type == "gradient_boosting":
model = GradientBoostingClassifier(
n_estimators=200, max_depth=6, learning_rate=0.1,
min_samples_split=10, random_state=42,
)
else:
model = RandomForestClassifier(
n_estimators=200, max_depth=15, min_samples_split=5,
random_state=42, n_jobs=-1,
)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
report = classification_report(y_test, y_pred, target_names=["legitimate", "dga"],
output_dict=True)
cm = confusion_matrix(y_test, y_pred)
# Cross-validation
cv_scores = cross_val_score(model, X_scaled, y, cv=5, scoring="f1")
metrics = {
"accuracy": report["accuracy"],
"precision_dga": report["dga"]["precision"],
"recall_dga": report["dga"]["recall"],
"f1_dga": report["dga"]["f1-score"],
"precision_legit": report["legitimate"]["precision"],
"recall_legit": report["legitimate"]["recall"],
"confusion_matrix": cm.tolist(),
"cv_f1_mean": cv_scores.mean(),
"cv_f1_std": cv_scores.std(),
"feature_importance": dict(zip(
FEATURE_COLUMNS,
[round(float(x), 4) for x in model.feature_importances_]
)),
}
print(f"[+] Model trained: {model_type}")
print(f" Accuracy: {metrics['accuracy']:.4f}")
print(f" DGA F1: {metrics['f1_dga']:.4f}")
print(f" DGA Recall: {metrics['recall_dga']:.4f}")
print(f" CV F1 (5-fold): {metrics['cv_f1_mean']:.4f} +/- {metrics['cv_f1_std']:.4f}")
print(f" Top features: ", end="")
top_feats = sorted(metrics["feature_importance"].items(), key=lambda x: x[1], reverse=True)[:5]
print(", ".join(f"{k}={v:.3f}" for k, v in top_feats))
return model, scaler, metrics
def classify_domains(domains, model, scaler):
"""Classify a list of domains as legitimate or DGA using a trained model."""
results = []
for domain in domains:
feats = extract_domain_features(domain)
if feats is None:
continue
vec = features_to_vector(feats).reshape(1, -1)
vec_scaled = scaler.transform(vec)
prediction = model.predict(vec_scaled)[0]
probability = model.predict_proba(vec_scaled)[0]
results.append({
"domain": domain,
"prediction": "dga" if prediction == 1 else "legitimate",
"confidence": round(float(max(probability)), 4),
"dga_probability": round(float(probability[1]), 4),
"features": feats,
})
return results
Identify periodic DNS query patterns indicative of C2 check-ins:
#!/usr/bin/env python3
"""DNS beaconing detection - identifies periodic C2 check-in patterns."""
import math
from collections import defaultdict
from datetime import datetime, timedelta
import numpy as np
def parse_timestamp(ts_str):
"""Parse various timestamp formats to datetime."""
formats = [
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%dT%H:%M:%S.%f",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M:%S.%f",
"%Y-%m-%d %H:%M:%S",
]
for fmt in formats:
try:
return datetime.strptime(ts_str, fmt)
except ValueError:
continue
# Try epoch timestamp
try:
ts_float = float(ts_str)
return datetime.utcfromtimestamp(ts_float)
except (ValueError, OverflowError, OSError):
pass
return None
def detect_beaconing(dns_queries, min_queries=10, max_jitter_pct=25,
min_interval_sec=10, max_interval_sec=7200):
"""
Detect DNS beaconing by analyzing inter-query timing intervals.
Beaconing indicators:
- Regular inter-query intervals (low standard deviation)
- Consistent query sizes
- Single source IP to single domain over extended period
- Low jitter (variation in timing)
Args:
dns_queries: list of dicts with 'src_ip', 'query', 'timestamp'
min_queries: minimum queries to analyze (default 10)
max_jitter_pct: maximum coefficient of variation for beacon (default 25%)
min_interval_sec: minimum beacon interval to detect (default 10s)
max_interval_sec: maximum beacon interval to detect (default 7200s / 2hr)
Returns:
list of detected beacon patterns with confidence scores
"""
# Group queries by (source IP, base domain)
groups = defaultdict(list)
for q in dns_queries:
src_ip = q.get("src_ip", "")
fqdn = q.get("query", "").lower().rstrip(".")
ts_str = q.get("timestamp", "")
ts = parse_timestamp(ts_str)
if not ts or not src_ip or not fqdn:
continue
# Extract base domain (last 2 labels)
parts = fqdn.split(".")
if len(parts) >= 2:
base_domain = ".".join(parts[-2:])
else:
base_domain = fqdn
groups[(src_ip, base_domain)].append(ts)
beacons = []
for (src_ip, base_domain), timestamps in groups.items():
if len(timestamps) < min_queries:
continue
# Sort timestamps and compute intervals
timestamps.sort()
intervals = [
(timestamps[i+1] - timestamps[i]).total_seconds()
for i in range(len(timestamps) - 1)
]
if not intervals:
continue
intervals = np.array(intervals)
# Filter out zero intervals (duplicate timestamps)
intervals = intervals[intervals > 0]
if len(intervals) < min_queries - 1:
continue
mean_interval = np.mean(intervals)
std_interval = np.std(intervals)
median_interval = np.median(intervals)
# Skip if interval is outside detection range
if mean_interval < min_interval_sec or mean_interval > max_interval_sec:
continue
# Coefficient of variation (jitter)
cv = (std_interval / mean_interval * 100) if mean_interval > 0 else 100
# Time span of activity
time_span = (timestamps[-1] - timestamps[0]).total_seconds()
hours_active = time_span / 3600
# Beacon scoring
score = 0.0
flags = []
# Low jitter = strong beacon indicator
if cv < 5:
score += 40
flags.append(f"very_low_jitter:CV={cv:.1f}%")
elif cv < 15:
score += 30
flags.append(f"low_jitter:CV={cv:.1f}%")
elif cv < max_jitter_pct:
score += 15
flags.append(f"moderate_jitter:CV={cv:.1f}%")
else:
continue # Too much jitter, not a beacon
# Long duration increases confidence
if hours_active > 24:
score += 20
flags.append(f"persistent:{hours_active:.1f}h")
elif hours_active > 4:
score += 10
flags.append(f"sustained:{hours_active:.1f}h")
# High query count increases confidence
if len(timestamps) > 100:
score += 15
flags.append(f"high_volume:{len(timestamps)}")
elif len(timestamps) > 50:
score += 10
flags.append(f"moderate_volume:{len(timestamps)}")
# Common C2 intervals (60s, 120s, 300s, 600s, 900s, 1800s, 3600s)
common_intervals = [60, 120, 300, 600, 900, 1800, 3600]
for ci in common_intervals:
if abs(mean_interval - ci) < ci * 0.1: # Within 10% of common interval
score += 10
flags.append(f"common_c2_interval:~{ci}s")
break
beacons.append({
"src_ip": src_ip,
"base_domain": base_domain,
"query_count": len(timestamps),
"mean_interval_sec": round(mean_interval, 2),
"median_interval_sec": round(median_interval, 2),
"std_interval_sec": round(std_interval, 2),
"jitter_cv_pct": round(cv, 2),
"first_seen": timestamps[0].isoformat(),
"last_seen": timestamps[-1].isoformat(),
"duration_hours": round(hours_active, 2),
"score": round(score, 1),
"flags": flags,
})
beacons.sort(key=lambda x: x["score"], reverse=True)
return beacons
def print_beacon_report(beacons, top_n=20):
"""Print formatted beacon detection report."""
print("=" * 80)
print(" DNS BEACONING DETECTION REPORT")
print("=" * 80)
print(f" Beacon patterns detected: {len(beacons)}")
print()
if not beacons:
print(" No beaconing patterns detected.")
return
print(f" TOP {min(top_n, len(beacons))} BEACON CANDIDATES")
print(" " + "-" * 76)
for b in beacons[:top_n]:
print(f" Score: {b['score']:.1f} | {b['src_ip']} -> {b['base_domain']}")
print(f" Queries: {b['query_count']} "
f"Interval: {b['mean_interval_sec']:.1f}s +/- {b['std_interval_sec']:.1f}s "
f"Jitter: {b['jitter_cv_pct']:.1f}%")
print(f" Active: {b['duration_hours']:.1f}h "
f"({b['first_seen']} to {b['last_seen']})")
print(f" Flags: {', '.join(b['flags'])}")
print()
Combine all detection methods into a unified analysis:
DNS C2 Detection Pipeline Architecture:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
┌────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ Zeek dns.log | Suricata EVE | Recursive Resolver │
│ Passive DNS | PCAP capture | EDR DNS telemetry │
└───────────────────────┬────────────────────────────────┘
│
┌───────────────────────▼────────────────────────────────┐
│ PREPROCESSING │
│ Parse timestamps | Extract subdomains | Normalize │
│ FQDN | Resolve base domain | Lookup in whitelist │
└───────────────────────┬────────────────────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
┌───────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ ENTROPY │ │ BEACONING │ │ DGA │
│ ANALYSIS │ │ DETECTION │ │ CLASSIFIER │
│ │ │ │ │ │
│ Shannon ent. │ │ Interval │ │ ML model │
│ Subdomain │ │ analysis │ │ Random │
│ length │ │ Jitter/CV │ │ Forest or │
│ Encoding │ │ Duration │ │ Gradient │
│ patterns │ │ Periodicity │ │ Boosting │
└───────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
┌───────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ TXT RECORD │ │ TOOL │ │ PASSIVE │
│ PAYLOAD │ │ SIGNATURE │ │ DNS │
│ ANALYSIS │ │ MATCHING │ │ ENRICHMENT │
│ │ │ │ │ │
│ Base64 decode│ │ Iodine │ │ First seen │
│ PE/ELF detect│ │ dnscat2 │ │ Registrar │
│ PS stager │ │ dns2tcp │ │ Age check │
│ Size anomaly │ │ Cobalt DNS │ │ Reputation │
└───────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
┌───────▼───────────────▼───────────────▼────────────────┐
│ CORRELATION ENGINE │
│ Combine scores from all detectors │
│ Weighted scoring: entropy(30%) + beacon(25%) + │
│ DGA(20%) + TXT payload(15%) + signature(10%) │
│ Threshold: score > 60 = alert, > 40 = investigate │
└───────────────────────┬────────────────────────────────┘
│
┌───────────────────────▼────────────────────────────────┐
│ ALERTING & RESPONSE │
│ Generate SIEM alert with all evidence │
│ Block domain in DNS firewall / RPZ │
│ Isolate endpoint via EDR │
│ Create incident ticket with IOCs │
└────────────────────────────────────────────────────────┘
Deploy detection queries in your SIEM platform:
Splunk SPL - DNS Tunneling Detection:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
-- High entropy subdomain queries
index=dns sourcetype="bro:dns:json" OR sourcetype="suricata:dns"
| eval subdomain=mvindex(split(query,"."),0)
| eval sub_len=len(subdomain)
| where sub_len > 30
| eval char_counts=mvmap(split(subdomain,""),1)
| lookup dns_entropy_lookup subdomain OUTPUT entropy
| where entropy > 3.5
| stats count as query_count dc(query) as unique_queries
avg(sub_len) as avg_sub_len values(query) as sample_queries
by src_ip, domain
| where query_count > 20
| sort -query_count
-- DNS TXT record abuse
index=dns (qtype="TXT" OR qtype_name="TXT")
NOT (query="*._domainkey.*" OR query="*._dmarc.*" OR query="*._spf.*")
| stats count as txt_queries dc(query) as unique_txt_queries
values(query) as domains
by src_ip
| where txt_queries > 50
| sort -txt_queries
-- DNS beaconing (regular interval queries)
index=dns sourcetype="bro:dns:json"
| bin _time span=60s
| stats count by src_ip, query, _time
| streamstats window=10 current=t avg(count) as avg_count stdev(count) as std_count by src_ip, query
| eval cv = if(avg_count>0, (std_count/avg_count)*100, 100)
| where cv < 20 AND avg_count > 0
| stats count as beacon_windows avg(cv) as avg_jitter
min(_time) as first_seen max(_time) as last_seen
by src_ip, query
| where beacon_windows > 10
| sort -beacon_windows
-- Unusual record type volume (NULL, KEY, SRV for tunneling)
index=dns (qtype_name="NULL" OR qtype_name="KEY" OR qtype_name="SRV"
OR qtype_name="CNAME" OR qtype_name="MX")
NOT qtype_name="A" NOT qtype_name="AAAA" NOT qtype_name="PTR"
| stats count by src_ip, qtype_name, query
| where count > 10
| sort -count
Elastic KQL - DNS C2 Detection:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
-- Long subdomain queries (potential tunneling)
dns.question.name: * and not dns.question.name: *.in-addr.arpa
| where length(dns.question.subdomain) > 40
-- High volume DNS to single domain
event.dataset: "zeek.dns" or event.dataset: "suricata.dns"
| stats count by source.ip, dns.question.registered_domain
| where count > 500
-- TXT record queries to non-standard domains
dns.question.type: "TXT"
and not dns.question.name: (*._domainkey.* or *._dmarc.* or *._spf.*)
Zeek Script - DNS Tunneling Indicator:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# dns_tunnel_detect.zeek
@load base/protocols/dns
module DNSTunnel;
export {
redef enum Notice::Type += {
DNS_Tunneling_Suspected,
DNS_High_Entropy_Query,
DNS_Excessive_TXT_Queries,
};
const entropy_threshold = 3.5 &redef;
const subdomain_length_threshold = 40 &redef;
const txt_query_threshold = 50 &redef;
const tracking_interval = 5min &redef;
}
global txt_query_tracker: table[addr] of count &create_expire=5min &default=0;
global domain_query_tracker: table[addr, string] of count &create_expire=10min &default=0;
function shannon_entropy(s: string): double
{
local counts: table[string] of count;
local total = |s|;
if (total == 0) return 0.0;
for (i in s)
{
local c = s[i];
if (c !in counts) counts[c] = 0;
++counts[c];
}
local ent = 0.0;
for (ch, cnt in counts)
{
local p = cnt * 1.0 / total;
ent -= p * log2(p);
}
return ent;
}
event dns_request(c: connection, msg: dns_msg, query: string, qtype: count,
qclass: count)
{
if (|query| == 0) return;
# Track TXT queries
if (qtype == 16) # TXT
{
++txt_query_tracker[c$id$orig_h];
if (txt_query_tracker[c$id$orig_h] == txt_query_threshold)
{
NOTICE([
$note=DNS_Excessive_TXT_Queries,
$conn=c,
$msg=fmt("Host %s made %d TXT queries in tracking window",
c$id$orig_h, txt_query_threshold),
$identifier=cat(c$id$orig_h),
]);
}
}
# Extract subdomain and check entropy
local parts = split_string(query, /\./);
if (|parts| < 3) return;
# Subdomain = everything except last two labels
local subdomain = "";
local i = 0;
for (idx in parts)
{
if (i < |parts| - 2)
subdomain += parts[idx];
++i;
}
if (|subdomain| > subdomain_length_threshold)
{
local ent = shannon_entropy(subdomain);
if (ent > entropy_threshold)
{
NOTICE([
$note=DNS_High_Entropy_Query,
$conn=c,
$msg=fmt("High entropy DNS query: entropy=%.2f len=%d query=%s",
ent, |subdomain|, query),
$identifier=cat(c$id$orig_h, query),
]);
}
}
}
# suricata-dns-c2.rules
# DNS Tunneling and C2 Detection Rules
# Iodine DNS tunnel detection
alert dns any any -> any any (msg:"ET TROJAN Iodine DNS Tunnel Activity - NULL Query"; \
dns.query; content:"."; pcre:"/^[a-z0-9]{50,}\.[a-z0-9.-]+$/i"; \
dns_query; content:"|00 0a|"; \
classtype:trojan-activity; sid:2030001; rev:1;)
# dnscat2 DNS tunnel detection
alert dns any any -> any any (msg:"ET TROJAN dnscat2 DNS Tunnel - Handshake"; \
dns.query; content:"dnscat."; nocase; fast_pattern; \
classtype:trojan-activity; sid:2030002; rev:1;)
alert dns any any -> any any (msg:"ET TROJAN dnscat2 DNS Tunnel - Data Channel"; \
dns.query; pcre:"/^[a-f0-9]{16,}\./i"; \
dns_query; content:"|00 10|"; \
classtype:trojan-activity; sid:2030003; rev:1;)
# Cobalt Strike DNS beacon
alert dns any any -> any any (msg:"ET TROJAN Cobalt Strike DNS Beacon - A Record"; \
dns.query; pcre:"/^[a-f0-9]{12,}\.[a-z0-9.-]+$/i"; \
threshold:type both, track by_src, count 20, seconds 60; \
classtype:trojan-activity; sid:2030004; rev:1;)
# Generic DNS tunneling - high volume TXT queries to single domain
alert dns any any -> any any (msg:"ET POLICY Excessive TXT DNS Queries - Possible Tunneling"; \
dns_query; content:"|00 10|"; \
threshold:type threshold, track by_src, count 50, seconds 300; \
classtype:policy-violation; sid:2030005; rev:1;)
# Long subdomain query (generic tunneling indicator)
alert dns any any -> any any (msg:"ET POLICY Unusually Long DNS Subdomain - Possible Tunneling"; \
dns.query; pcre:"/^[a-z0-9-]{52,}\./i"; \
threshold:type limit, track by_src, count 1, seconds 60; \
classtype:policy-violation; sid:2030006; rev:1;)
# DNS query for known C2 TXT payload staging
alert dns any any -> any any (msg:"ET TROJAN DNS TXT Record Staged Payload Request"; \
dns_query; content:"|00 10|"; \
dns.query; pcre:"/^(stage|payload|cmd|exec|download|update|config)\d*\./i"; \
classtype:trojan-activity; sid:2030007; rev:1;)
| Term | Definition |
|---|---|
| DNS Tunneling | Technique of encoding data within DNS queries and responses to create a covert communication channel, bypassing firewalls that allow DNS traffic |
| Shannon Entropy | Information theory metric measuring randomness in a string; legitimate domains typically have entropy below 3.5, while encoded tunnel data exceeds 3.8-4.5 |
| Domain Generation Algorithm (DGA) | Malware technique that algorithmically generates thousands of pseudo-random domain names for C2 rendezvous, making domain-based blocking impractical |
| DNS Beaconing | Regular, periodic DNS queries from a compromised host to a C2 domain, identifiable by consistent inter-query intervals and low timing jitter |
| TXT Record Abuse | Using DNS TXT records to deliver encoded C2 commands or staged payloads, exploiting the large payload capacity (up to 65535 bytes across multiple strings) |
| Iodine | Open-source DNS tunneling tool that tunnels IPv4 traffic through DNS using NULL, TXT, or CNAME records, commonly used to bypass captive portals |
| dnscat2 | Encrypted C2 tool that creates a command channel over DNS, supporting file transfer, port forwarding, and shell access through DNS queries |
| Cobalt Strike DNS Beacon | Commercial C2 framework's DNS communication mode that uses A, AAAA, and TXT records to receive tasks and return results via DNS resolution |
| Passive DNS (pDNS) | Database of historical DNS resolution data collected by monitoring DNS traffic; used to identify infrastructure reuse and domain history |
| Response Policy Zone (RPZ) | DNS firewall mechanism that allows real-time blocking of malicious domains by injecting override responses at the recursive resolver level |
| Coefficient of Variation | Standard deviation divided by mean, expressed as percentage; used to measure beacon jitter -- lower CV indicates more regular (suspicious) timing |
| NXDOMAIN | DNS response code indicating the queried domain does not exist; high NXDOMAIN rates from a host suggest DGA activity where most generated domains are unregistered |
Context: The SOC receives an alert from the DNS firewall showing a single internal host (10.1.5.42) making 15,000+ DNS queries to the domain c8a3f1e2.tunnelsvc.example.com in the past hour. All queries are TXT type with long, random-looking subdomains. Normal DNS volume for this host is ~200 queries/hour.
Approach:
tunnelsvc.example.com registration date, nameserver, and historical resolutionsPitfalls:
Context: The threat intelligence team identified that a botnet family active in the industry uses DGA for C2 domain generation. The SOC needs an automated way to classify DNS queries as potentially DGA-generated and alert on matches.
Approach:
Pitfalls:
DNS C2 DETECTION ANALYSIS REPORT
====================================
Analysis Period: 2026-03-15 00:00 to 2026-03-19 23:59
Data Source: Zeek dns.log (gateway sensor)
Total Queries: 14,283,501
Unique Domains: 892,041
Hosts Analyzed: 3,847
ENTROPY ANALYSIS
Queries with entropy > 3.5: 2,847 (0.02%)
Queries with subdomain > 40 chars: 1,203 (0.008%)
Suspicious base domains: 12
[CRITICAL] tunnelsvc.example[.]com
Queries: 15,247 Source: 10.1.5.42 Avg Entropy: 4.21
Avg Subdomain Length: 63 Record Types: TXT (98%), A (2%)
Tool Signature: dnscat2 (hex prefix pattern match)
[HIGH] update-cdn.malicious[.]net
Queries: 3,891 Source: 10.1.12.7 Avg Entropy: 3.87
Avg Subdomain Length: 48 Record Types: A (60%), TXT (40%)
Tool Signature: Cobalt Strike DNS beacon (interval pattern)
BEACONING DETECTION
Beacon patterns detected: 4
Score: 85.0 10.1.5.42 -> tunnelsvc.example[.]com
Interval: 0.5s +/- 0.1s Jitter: 8.2% Duration: 18.4h
Queries: 15,247 Flags: very_low_jitter, persistent, high_volume
Score: 72.0 10.1.12.7 -> update-cdn.malicious[.]net
Interval: 60.2s +/- 3.1s Jitter: 5.1% Duration: 72.1h
Queries: 3,891 Flags: very_low_jitter, persistent, common_c2_interval:~60s
DGA CLASSIFICATION
Domains classified: 892,041
DGA predictions (>0.85 conf): 47
DGA predictions (0.65-0.85): 183
[HIGH] a8f3k2m1x9.com (DGA prob: 0.97, entropy: 3.92)
[HIGH] j7t2p5q8w3.net (DGA prob: 0.95, entropy: 4.01)
[HIGH] m3x8k1f6y2.org (DGA prob: 0.94, entropy: 3.88)
TXT RECORD ANALYSIS
Suspicious TXT responses: 8
Base64 payloads detected: 3
PowerShell stager patterns: 1
[CRITICAL] cmd.staging[.]example.com
TXT Length: 4,096 Entropy: 5.82
Finding: Base64-encoded PowerShell stager with IEX pattern
RECOMMENDED ACTIONS
[CRITICAL] Block tunnelsvc.example[.]com and update-cdn.malicious[.]net in DNS RPZ
[CRITICAL] Isolate hosts 10.1.5.42 and 10.1.12.7 for forensic investigation
[HIGH] Block 47 high-confidence DGA domains in DNS firewall
[HIGH] Investigate cmd.staging[.]example.com TXT payload staging
[MEDIUM] Review 183 moderate-confidence DGA domains with threat intel
[MEDIUM] Deploy Suricata rules for dnscat2 and Cobalt Strike DNS signatures