正在加载,请稍候…

安全事件响应:检测、遏制与恢复

构建并执行安全事件响应计划。学习使用SIEM进行威胁检测、事件分类、遏制策略、取证以及事后复盘。

安全事件响应:检测、遏制与恢复

安全事件响应

事件响应生命周期

准备 → 检测 → 分析 → 遏制 → 根除 → 恢复 → 事后

安全事件响应:检测、遏制与恢复 插图

使用SIEM进行安全监控

from elasticsearch import Elasticsearch
import json
from datetime import datetime, timedelta

es = Elasticsearch([os.getenv('ELASTICSEARCH_URL')])

class SIEMAnalyzer:
    def detect_brute_force(self, timeframe_minutes: int = 5) -> list:
        query = {
            "query": {
                "bool": {
                    "filter": [
                        {"term": {"event_type": "auth_failure"}},
                        {"range": {"@timestamp": {"gte": f"now-{timeframe_minutes}m"}}},
                    ]
                }
            },
            "aggs": {
                "by_user": {
                    "terms": {"field": "username.keyword", "size": 100},
                    "aggs": {
                        "by_ip": {"terms": {"field": "source_ip.keyword", "size": 100}},
                        "failure_count": {"value_count": {"field": "_id"}},
                    }
                }
            }
        }
        
        result = es.search(index="auth-logs-*", body=query)
        incidents = []
        
        for bucket in result['aggregations']['by_user']['buckets']:
            if bucket['failure_count']['value'] >= 10:
                incidents.append({
                    'type': 'brute_force',
                    'username': bucket['key'],
                    'attempts': bucket['failure_count']['value'],
                    'source_ips': [b['key'] for b in bucket['by_ip']['buckets']],
                    'severity': 'high',
                })
        
        return incidents

    def detect_data_exfiltration(self) -> list:
        query = {
            "query": {
                "bool": {
                    "filter": [
                        {"range": {"@timestamp": {"gte": "now-1h"}}},
                        {"range": {"bytes_out": {"gte": 104857600}}},  # > 100MB
                    ]
                }
            }
        }
        
        result = es.search(index="network-logs-*", body=query, size=50)
        return [hit['_source'] for hit in result['hits']['hits']]

    def create_incident(self, detection: dict):
        incident = {
            "@timestamp": datetime.utcnow().isoformat(),
            "incident_id": f"INC-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}",
            "type": detection['type'],
            "severity": detection['severity'],
            "status": "open",
            "details": detection,
        }
        es.index(index="incidents", body=incident)
        self.alert_soc(incident)

    def alert_soc(self, incident: dict):
        import requests
        requests.post(os.getenv('PAGERDUTY_WEBHOOK'), json={
            "payload": {
                "summary": f"{incident['severity'].upper()}: {incident['type']}",
                "severity": incident['severity'],
                "source": "SIEM",
                "custom_details": incident,
            },
            "routing_key": os.getenv('PAGERDUTY_KEY'),
            "event_action": "trigger",
        })

安全事件响应:检测、遏制与恢复 插图

自动遏制

import boto3
from typing import Optional

class IncidentContainment:
    def __init__(self):
        self.ec2 = boto3.client('ec2')
        self.iam = boto3.client('iam')

    def isolate_instance(self, instance_id: str) -> bool:
        """隔离受感染的EC2实例。"""
        # 创建隔离安全组
        sg_response = self.ec2.create_security_group(
            GroupName=f'quarantine-{instance_id}',
            Description=f'Isolation SG for incident response - {instance_id}',
            VpcId=self._get_instance_vpc(instance_id),
        )
        isolation_sg_id = sg_response['GroupId']
        
        # 无入站或出站规则 = 完全隔离
        
        # 应用隔离安全组
        self.ec2.modify_instance_attribute(
            InstanceId=instance_id,
            Groups=[isolation_sg_id],
        )
        
        # 添加标签以便追踪
        self.ec2.create_tags(
            Resources=[instance_id],
            Tags=[
                {'Key': 'security:quarantined', 'Value': 'true'},
                {'Key': 'security:quarantine_time', 'Value': datetime.utcnow().isoformat()},
            ]
        )
        
        print(f"Instance {instance_id} isolated in quarantine SG {isolation_sg_id}")
        return True

    def disable_user(self, username: str) -> bool:
        """在事件期间禁用IAM用户访问。"""
        # 停用访问密钥
        paginator = self.iam.get_paginator('list_access_keys')
        for page in paginator.paginate(UserName=username):
            for key in page['AccessKeyMetadata']:
                self.iam.update_access_key(
                    UserName=username,
                    AccessKeyId=key['AccessKeyId'],
                    Status='Inactive',
                )
        
        # 分离所有策略
        for policy in self.iam.list_attached_user_policies(UserName=username)['AttachedPolicies']:
            self.iam.detach_user_policy(
                UserName=username,
                PolicyArn=policy['PolicyArn'],
            )
        
        # 添加显式拒绝策略
        deny_policy = json.dumps({
            "Version": "2012-10-17",
            "Statement": [{"Effect": "Deny", "Action": "*", "Resource": "*"}]
        })
        self.iam.put_user_policy(
            UserName=username,
            PolicyName='IncidentResponseDeny',
            PolicyDocument=deny_policy,
        )
        
        print(f"User {username} disabled during incident")
        return True

    def block_ip(self, ip_address: str, reason: str):
        """将IP添加到AWS WAF阻止列表。"""
        waf = boto3.client('wafv2')
        waf.update_ip_set(
            Name='threat-block-list',
            Scope='REGIONAL',
            Id=os.getenv('WAF_IP_SET_ID'),
            LockToken=self._get_waf_lock_token(),
            Addresses=[f"{ip_address}/32"],
        )

安全事件响应:检测、遏制与恢复 插图

证据收集

#!/bin/bash
# incident_collect.sh - 收集取证证据

INCIDENT_ID=$1
OUTPUT_DIR="/forensics/\${INCIDENT_ID}_$(date +%Y%m%d_%H%M%S)"
mkdir -p "$OUTPUT_DIR"

echo "[*] 收集系统状态..."

# 运行中的进程
ps auxf > "$OUTPUT_DIR/processes.txt"

# 网络连接
netstat -tulpn > "$OUTPUT_DIR/network_connections.txt"
ss -tulpn >> "$OUTPUT_DIR/network_connections.txt"

# 最近的认证事件
last -F > "$OUTPUT_DIR/last_logins.txt"
lastb -F > "$OUTPUT_DIR/failed_logins.txt"

# 最近修改的文件
find /var /tmp /home -newer /proc/1 -type f 2>/dev/null > "$OUTPUT_DIR/recent_files.txt"

# Crontab
for user in $(cut -d: -f1 /etc/passwd); do
  crontab -l -u "$user" 2>/dev/null >> "$OUTPUT_DIR/crontabs.txt"
done

# 内存转储(如果安装了volatility)
# sudo avml "$OUTPUT_DIR/memory.lime"

# 对所有收集的文件计算哈希
sha256sum "$OUTPUT_DIR"/* > "$OUTPUT_DIR/checksums.sha256"

echo "[*] 证据已收集到 $OUTPUT_DIR"

事件响应剧本

阶段 行动 时间线
检测 告警触发,初步分类 < 15 分钟
分析 范围评估,IOC收集 15-60 分钟
遏制 隔离受影响系统 1-4 小时
根除 移除恶意软件,修补漏洞 4-24 小时
恢复 从干净的备份恢复 24-72 小时
事后 根本原因分析,经验教训,改进 1-2 周