Use this skill when:
Do not use for fully autonomous containment without human approval gates — always include analyst decision points for high-impact actions like account disabling or host isolation.
Set up integrations with security tools via SOAR Apps:
VirusTotal Asset Configuration:
{
"app": "VirusTotal v3",
"asset_name": "virustotal_prod",
"configuration": {
"api_key": "YOUR_VT_API_KEY",
"rate_limit": true,
"max_requests_per_minute": 4
},
"product_vendor": "VirusTotal",
"product_name": "VirusTotal"
}
CrowdStrike Falcon Asset:
{
"app": "CrowdStrike Falcon",
"asset_name": "crowdstrike_prod",
"configuration": {
"client_id": "CS_CLIENT_ID",
"client_secret": "CS_CLIENT_SECRET",
"base_url": "https://api.crowdstrike.com"
}
}
Active Directory Asset:
{
"app": "Active Directory",
"asset_name": "ad_prod",
"configuration": {
"server": "dc01.company.com",
"username": "soar_service@company.com",
"password": "SERVICE_ACCOUNT_PASSWORD",
"ssl": true
}
}
Create an automated phishing response playbook in Python (Phantom playbook format):
"""
Phishing Triage Automation Playbook
Trigger: New phishing email reported via Splunk ES notable or email ingestion
"""
import phantom.rules as phantom
import json
def on_start(container):
# Extract artifacts (URLs, file hashes, sender) from the container
artifacts = phantom.get_artifacts(container_id=container["id"])
for artifact in artifacts:
artifact_type = artifact.get("cef", {}).get("type", "")
if artifact_type == "url":
phantom.act("url reputation", targets=artifact,
assets=["virustotal_prod"],
callback=url_reputation_callback,
name="url_reputation")
elif artifact_type == "hash":
phantom.act("file reputation", targets=artifact,
assets=["virustotal_prod"],
callback=hash_reputation_callback,
name="file_reputation")
elif artifact_type == "ip":
phantom.act("ip reputation", targets=artifact,
assets=["virustotal_prod"],
callback=ip_reputation_callback,
name="ip_reputation")
def url_reputation_callback(action, success, container, results, handle):
if not success:
phantom.comment(container, "URL reputation check failed")
return
for result in results:
data = result.get("data", [{}])[0]
malicious_count = data.get("summary", {}).get("malicious", 0)
total_engines = data.get("summary", {}).get("total_engines", 0)
if malicious_count > 5:
# High confidence malicious — auto-block and escalate
phantom.act("block url", targets=result,
assets=["palo_alto_prod"],
name="block_malicious_url")
phantom.set_severity(container, "high")
phantom.set_status(container, "open")
phantom.comment(container,
f"URL flagged by {malicious_count}/{total_engines} engines. "
f"Blocked on firewall. Escalating to Tier 2.")
# Create ServiceNow ticket
phantom.act("create ticket", targets=container,
assets=["servicenow_prod"],
parameters=[{
"short_description": f"Phishing - Malicious URL detected",
"urgency": "2",
"impact": "2"
}],
name="create_incident_ticket")
elif malicious_count > 0:
# Medium confidence — request analyst review
phantom.promote(container, template="Phishing Investigation")
phantom.comment(container,
f"URL flagged by {malicious_count}/{total_engines} engines. "
f"Requires analyst review.")
else:
# Clean — close with comment
phantom.set_status(container, "closed")
phantom.comment(container,
f"URL clean: 0/{total_engines} engines flagged. Auto-closed.")
def hash_reputation_callback(action, success, container, results, handle):
if not success:
return
for result in results:
data = result.get("data", [{}])[0]
positives = data.get("summary", {}).get("positives", 0)
if positives > 10:
# Known malware — quarantine and block
phantom.act("quarantine device", targets=result,
assets=["crowdstrike_prod"],
name="isolate_endpoint")
phantom.set_severity(container, "high")
def ip_reputation_callback(action, success, container, results, handle):
if not success:
return
for result in results:
data = result.get("data", [{}])[0]
malicious = data.get("summary", {}).get("malicious", 0)
if malicious > 3:
phantom.act("block ip", targets=result,
assets=["palo_alto_prod"],
name="block_malicious_ip")
Automate enrichment for all incoming SIEM alerts:
"""
Universal Alert Enrichment Playbook
Runs on every new event to add context before analyst review
"""
import phantom.rules as phantom
def on_start(container):
# Get all artifacts
success, message, artifacts = phantom.get_artifacts(
container_id=container["id"], full_data=True
)
ip_artifacts = [a for a in artifacts if a.get("cef", {}).get("sourceAddress")]
domain_artifacts = [a for a in artifacts if a.get("cef", {}).get("destinationDnsDomain")]
# Enrich IPs in parallel
for artifact in ip_artifacts:
ip = artifact["cef"]["sourceAddress"]
# VirusTotal lookup
phantom.act("ip reputation",
parameters=[{"ip": ip}],
assets=["virustotal_prod"],
callback=enrich_ip_callback,
name=f"vt_ip_{ip}")
# GeoIP lookup
phantom.act("geolocate ip",
parameters=[{"ip": ip}],
assets=["maxmind_prod"],
callback=geoip_callback,
name=f"geo_{ip}")
# Whois lookup
phantom.act("whois ip",
parameters=[{"ip": ip}],
assets=["whois_prod"],
name=f"whois_{ip}")
# Enrich domains
for artifact in domain_artifacts:
domain = artifact["cef"]["destinationDnsDomain"]
phantom.act("domain reputation",
parameters=[{"domain": domain}],
assets=["virustotal_prod"],
name=f"vt_domain_{domain}")
def enrich_ip_callback(action, success, container, results, handle):
"""Update container with enrichment data"""
if success:
for result in results:
summary = result.get("summary", {})
phantom.add_artifact(container, {
"cef": {
"vt_malicious": summary.get("malicious", 0),
"vt_suspicious": summary.get("suspicious", 0),
"enrichment_source": "VirusTotal"
},
"label": "enrichment",
"name": "VT IP Enrichment"
})
Add human-in-the-loop for critical actions:
def containment_decision(action, success, container, results, handle):
"""Present analyst with containment options"""
phantom.prompt(
container=container,
user="soc_tier2",
message=(
"Confirmed malicious activity detected.\n"
f"Host: {container['artifacts'][0]['cef'].get('sourceAddress')}\n"
f"Threat: {results[0]['summary'].get('threat_name')}\n\n"
"Select containment action:"
),
respond_in_mins=15,
options=["Isolate Host", "Disable Account", "Both", "Monitor Only"],
callback=execute_containment
)
def execute_containment(action, success, container, results, handle):
response = results.get("response", "Monitor Only")
if response in ["Isolate Host", "Both"]:
phantom.act("quarantine device",
parameters=[{"hostname": container["artifacts"][0]["cef"]["sourceHostName"]}],
assets=["crowdstrike_prod"],
name="isolate_host")
if response in ["Disable Account", "Both"]:
phantom.act("disable user",
parameters=[{"username": container["artifacts"][0]["cef"]["sourceUserName"]}],
assets=["ad_prod"],
name="disable_account")
phantom.comment(container, f"Analyst approved: {response}")
Set up event triggers in SOAR:
{
"playbook_name": "phishing_triage_automation",
"trigger": {
"type": "event_created",
"conditions": {
"label": ["phishing", "notable"],
"severity": ["high", "medium"]
}
},
"active": true,
"run_as": "automation_user"
}
Track automation effectiveness with SOAR metrics:
# Query SOAR API for playbook execution stats
import requests
headers = {"ph-auth-token": "YOUR_SOAR_TOKEN"}
response = requests.get(
"https://soar.company.com/rest/playbook_run",
headers=headers,
params={
"page_size": 100,
"filter": '{"status":"success"}',
"sort": "create_time",
"order": "desc"
}
)
runs = response.json()["data"]
# Calculate automation metrics
total_runs = len(runs)
avg_duration = sum(r["end_time"] - r["start_time"] for r in runs) / total_runs
auto_closed = sum(1 for r in runs if r.get("auto_resolved"))
print(f"Total runs: {total_runs}")
print(f"Avg duration: {avg_duration:.1f}s")
print(f"Auto-resolved: {auto_closed}/{total_runs} ({auto_closed/total_runs*100:.0f}%)")
| Term | Definition |
|---|---|
| SOAR | Security Orchestration, Automation, and Response — platform integrating security tools with automated playbooks |
| Playbook | Automated workflow defining sequential and parallel actions triggered by security events |
| Asset | SOAR configuration for a connected security tool (API endpoint, credentials, connection parameters) |
| Container | SOAR event object containing artifacts (IOCs) from an ingested alert or incident |
| Artifact | Individual IOC or data point within a container (IP, hash, URL, domain, email) |
| Approval Gate | Human-in-the-loop step requiring analyst decision before executing high-impact automated actions |
SOAR PLAYBOOK EXECUTION REPORT
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Playbook: Phishing Triage Automation v2.3
Container: SOAR-2024-08921
Trigger: Notable event from Splunk ES (phishing)
Actions Executed:
[1] URL Reputation (VirusTotal) — 14/90 engines malicious [2.1s]
[2] IP Reputation (AbuseIPDB) — Confidence: 85% [1.3s]
[3] Block URL (Palo Alto) — Blocked on PA-5260 [0.8s]
[4] Block IP (Palo Alto) — Blocked on PA-5260 [0.7s]
[5] Create Ticket (ServiceNow) — INC0012345 created [1.5s]
[6] Prompt Analyst (Tier 2) — Response: "Isolate Host" [4m 12s]
[7] Quarantine Device (CrowdStrike) — WORKSTATION-042 isolated [3.2s]
Total Duration: 4m 22s (vs 35min avg manual triage)
Time Saved: ~31 minutes
Disposition: True Positive — Escalated to IR