
安全事件响应
事件响应生命周期
准备 → 检测 → 分析 → 遏制 → 根除 → 恢复 → 事后

使用SIEM进行安全监控
from elasticsearch import Elasticsearch
import json
from datetime import datetime, timedelta
es = Elasticsearch([os.getenv('ELASTICSEARCH_URL')])
class SIEMAnalyzer:
def detect_brute_force(self, timeframe_minutes: int = 5) -> list:
query = {
"query": {
"bool": {
"filter": [
{"term": {"event_type": "auth_failure"}},
{"range": {"@timestamp": {"gte": f"now-{timeframe_minutes}m"}}},
]
}
},
"aggs": {
"by_user": {
"terms": {"field": "username.keyword", "size": 100},
"aggs": {
"by_ip": {"terms": {"field": "source_ip.keyword", "size": 100}},
"failure_count": {"value_count": {"field": "_id"}},
}
}
}
}
result = es.search(index="auth-logs-*", body=query)
incidents = []
for bucket in result['aggregations']['by_user']['buckets']:
if bucket['failure_count']['value'] >= 10:
incidents.append({
'type': 'brute_force',
'username': bucket['key'],
'attempts': bucket['failure_count']['value'],
'source_ips': [b['key'] for b in bucket['by_ip']['buckets']],
'severity': 'high',
})
return incidents
def detect_data_exfiltration(self) -> list:
query = {
"query": {
"bool": {
"filter": [
{"range": {"@timestamp": {"gte": "now-1h"}}},
{"range": {"bytes_out": {"gte": 104857600}}}, # > 100MB
]
}
}
}
result = es.search(index="network-logs-*", body=query, size=50)
return [hit['_source'] for hit in result['hits']['hits']]
def create_incident(self, detection: dict):
incident = {
"@timestamp": datetime.utcnow().isoformat(),
"incident_id": f"INC-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}",
"type": detection['type'],
"severity": detection['severity'],
"status": "open",
"details": detection,
}
es.index(index="incidents", body=incident)
self.alert_soc(incident)
def alert_soc(self, incident: dict):
import requests
requests.post(os.getenv('PAGERDUTY_WEBHOOK'), json={
"payload": {
"summary": f"{incident['severity'].upper()}: {incident['type']}",
"severity": incident['severity'],
"source": "SIEM",
"custom_details": incident,
},
"routing_key": os.getenv('PAGERDUTY_KEY'),
"event_action": "trigger",
})

自动遏制
import boto3
from typing import Optional
class IncidentContainment:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.iam = boto3.client('iam')
def isolate_instance(self, instance_id: str) -> bool:
"""隔离受感染的EC2实例。"""
# 创建隔离安全组
sg_response = self.ec2.create_security_group(
GroupName=f'quarantine-{instance_id}',
Description=f'Isolation SG for incident response - {instance_id}',
VpcId=self._get_instance_vpc(instance_id),
)
isolation_sg_id = sg_response['GroupId']
# 无入站或出站规则 = 完全隔离
# 应用隔离安全组
self.ec2.modify_instance_attribute(
InstanceId=instance_id,
Groups=[isolation_sg_id],
)
# 添加标签以便追踪
self.ec2.create_tags(
Resources=[instance_id],
Tags=[
{'Key': 'security:quarantined', 'Value': 'true'},
{'Key': 'security:quarantine_time', 'Value': datetime.utcnow().isoformat()},
]
)
print(f"Instance {instance_id} isolated in quarantine SG {isolation_sg_id}")
return True
def disable_user(self, username: str) -> bool:
"""在事件期间禁用IAM用户访问。"""
# 停用访问密钥
paginator = self.iam.get_paginator('list_access_keys')
for page in paginator.paginate(UserName=username):
for key in page['AccessKeyMetadata']:
self.iam.update_access_key(
UserName=username,
AccessKeyId=key['AccessKeyId'],
Status='Inactive',
)
# 分离所有策略
for policy in self.iam.list_attached_user_policies(UserName=username)['AttachedPolicies']:
self.iam.detach_user_policy(
UserName=username,
PolicyArn=policy['PolicyArn'],
)
# 添加显式拒绝策略
deny_policy = json.dumps({
"Version": "2012-10-17",
"Statement": [{"Effect": "Deny", "Action": "*", "Resource": "*"}]
})
self.iam.put_user_policy(
UserName=username,
PolicyName='IncidentResponseDeny',
PolicyDocument=deny_policy,
)
print(f"User {username} disabled during incident")
return True
def block_ip(self, ip_address: str, reason: str):
"""将IP添加到AWS WAF阻止列表。"""
waf = boto3.client('wafv2')
waf.update_ip_set(
Name='threat-block-list',
Scope='REGIONAL',
Id=os.getenv('WAF_IP_SET_ID'),
LockToken=self._get_waf_lock_token(),
Addresses=[f"{ip_address}/32"],
)

证据收集
#!/bin/bash
# incident_collect.sh - 收集取证证据
INCIDENT_ID=$1
OUTPUT_DIR="/forensics/\${INCIDENT_ID}_$(date +%Y%m%d_%H%M%S)"
mkdir -p "$OUTPUT_DIR"
echo "[*] 收集系统状态..."
# 运行中的进程
ps auxf > "$OUTPUT_DIR/processes.txt"
# 网络连接
netstat -tulpn > "$OUTPUT_DIR/network_connections.txt"
ss -tulpn >> "$OUTPUT_DIR/network_connections.txt"
# 最近的认证事件
last -F > "$OUTPUT_DIR/last_logins.txt"
lastb -F > "$OUTPUT_DIR/failed_logins.txt"
# 最近修改的文件
find /var /tmp /home -newer /proc/1 -type f 2>/dev/null > "$OUTPUT_DIR/recent_files.txt"
# Crontab
for user in $(cut -d: -f1 /etc/passwd); do
crontab -l -u "$user" 2>/dev/null >> "$OUTPUT_DIR/crontabs.txt"
done
# 内存转储(如果安装了volatility)
# sudo avml "$OUTPUT_DIR/memory.lime"
# 对所有收集的文件计算哈希
sha256sum "$OUTPUT_DIR"/* > "$OUTPUT_DIR/checksums.sha256"
echo "[*] 证据已收集到 $OUTPUT_DIR"
事件响应剧本
| 阶段 |
行动 |
时间线 |
| 检测 |
告警触发,初步分类 |
< 15 分钟 |
| 分析 |
范围评估,IOC收集 |
15-60 分钟 |
| 遏制 |
隔离受影响系统 |
1-4 小时 |
| 根除 |
移除恶意软件,修补漏洞 |
4-24 小时 |
| 恢复 |
从干净的备份恢复 |
24-72 小时 |
| 事后 |
根本原因分析,经验教训,改进 |
1-2 周 |