Dark web monitoring involves systematically scanning Tor hidden services, underground forums, paste sites, and dark web marketplaces to identify threats targeting an organization, including leaked credentials, data breaches, threat actor discussions, vulnerability exploitation tools, and planned attacks. This skill covers setting up monitoring infrastructure, using Tor-based collection tools, implementing automated alerting for brand mentions and credential leaks, and analyzing dark web intelligence for actionable threat indicators.
requests, stem, beautifulsoup4, stix2 librariesimport requests
from requests.adapters import HTTPAdapter
def create_tor_session():
"""Create a requests session routed through Tor SOCKS5 proxy."""
session = requests.Session()
session.proxies = {
"http": "socks5h://127.0.0.1:9050",
"https": "socks5h://127.0.0.1:9050",
}
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; rv:109.0) Gecko/20100101 Firefox/115.0",
})
return session
def verify_tor_connection(session):
"""Verify that traffic is routed through Tor."""
try:
resp = session.get("https://check.torproject.org/api/ip", timeout=30)
data = resp.json()
return {
"is_tor": data.get("IsTor", False),
"ip": data.get("IP", ""),
}
except Exception as e:
return {"error": str(e)}
import re
from datetime import datetime
def monitor_paste_sites(session, organization_domains):
"""Monitor paste sites for leaked credentials matching organization domains."""
findings = []
# Check Have I Been Pwned API (clearnet)
for domain in organization_domains:
try:
resp = requests.get(
f"https://haveibeenpwned.com/api/v3/breaches",
headers={"hibp-api-key": "YOUR_HIBP_KEY"},
timeout=30,
)
if resp.status_code == 200:
breaches = resp.json()
for breach in breaches:
if domain.lower() in breach.get("Domain", "").lower():
findings.append({
"source": "HIBP",
"breach_name": breach["Name"],
"breach_date": breach.get("BreachDate"),
"data_classes": breach.get("DataClasses", []),
"pwn_count": breach.get("PwnCount", 0),
"domain": domain,
})
except Exception as e:
print(f"[-] HIBP error for {domain}: {e}")
return findings
def search_for_keywords(session, keywords, onion_paste_urls):
"""Search dark web paste sites for specific keywords."""
results = []
for paste_url in onion_paste_urls:
try:
resp = session.get(paste_url, timeout=60)
if resp.status_code == 200:
content = resp.text.lower()
for keyword in keywords:
if keyword.lower() in content:
results.append({
"url": paste_url,
"keyword": keyword,
"timestamp": datetime.utcnow().isoformat(),
"snippet": extract_context(content, keyword.lower()),
})
except Exception as e:
print(f"[-] Error fetching {paste_url}: {e}")
return results
def extract_context(text, keyword, context_chars=200):
"""Extract text context around a keyword match."""
idx = text.find(keyword)
if idx == -1:
return ""
start = max(0, idx - context_chars)
end = min(len(text), idx + len(keyword) + context_chars)
return text[start:end]
def check_ransomware_leak_sites(session, organization_name):
"""Check known ransomware group leak sites for organization mentions."""
# Use Ransomwatch API (clearnet aggregator of ransomware leak sites)
try:
resp = requests.get(
"https://raw.githubusercontent.com/joshhighet/ransomwatch/main/posts.json",
timeout=30,
)
if resp.status_code == 200:
posts = resp.json()
matches = []
for post in posts:
post_title = post.get("post_title", "").lower()
if organization_name.lower() in post_title:
matches.append({
"group": post.get("group_name", ""),
"title": post.get("post_title", ""),
"discovered": post.get("discovered", ""),
"url": post.get("post_url", ""),
})
return matches
except Exception as e:
print(f"[-] Ransomwatch error: {e}")
return []
def generate_dark_web_report(findings, organization):
"""Generate structured dark web intelligence report."""
report = {
"organization": organization,
"report_date": datetime.utcnow().isoformat(),
"executive_summary": "",
"credential_leaks": [],
"ransomware_mentions": [],
"dark_web_mentions": [],
"recommendations": [],
}
for finding in findings:
if finding.get("source") == "HIBP":
report["credential_leaks"].append(finding)
elif finding.get("group"):
report["ransomware_mentions"].append(finding)
else:
report["dark_web_mentions"].append(finding)
# Generate executive summary
cred_count = len(report["credential_leaks"])
ransom_count = len(report["ransomware_mentions"])
report["executive_summary"] = (
f"Monitoring identified {cred_count} credential leak sources "
f"and {ransom_count} ransomware group mentions for {organization}."
)
if ransom_count > 0:
report["recommendations"].append(
"CRITICAL: Organization mentioned on ransomware leak site. "
"Initiate incident response immediately."
)
if cred_count > 0:
report["recommendations"].append(
"HIGH: Leaked credentials detected. Force password resets for "
"affected accounts and enable MFA."
)
return report