攻击链重构的具体实现思路

1. 数据收集与标准化层
实现思路:

python
class LogCollector:
    def collect_multi_source_logs(self):
        return {
            "network_logs": self.parse_netflow(),
            "endpoint_logs": self.parse_edr(), 
            "security_logs": self.parse_ids_ips(),
            "application_logs": self.parse_app_logs(),
            "cloud_logs": self.parse_cloudtrail()
        }
    
    def standardize_logs(self, raw_logs):
        """统一日志格式"""
        standardized = []
        for log in raw_logs:
            std_log = {
                "timestamp": log.get("time"),
                "source_ip": log.get("src_ip"),
                "destination_ip": log.get("dst_ip"),
                "user": log.get("user"),
                "process": log.get("process_name"),
                "action": log.get("action"),
                "result": log.get("result"),
                "raw_message": log.get("message")
            }
            standardized.append(std_log)
        return standardized


2. ATT&CK技术映射层
核心实现:

python
class AttackTechniqueMapper:
    def __init__(self):
        self.technique_patterns = self.load_attack_patterns()
        
    def map_to_mitre(self, log_entry):
        """将日志映射到MITRE ATT&CK技术"""
        techniques = []
        
        # 基于规则的基础映射
        techniques.extend(self.rule_based_mapping(log_entry))
        
        # 大模型增强映射
        techniques.extend(self.llm_enhanced_mapping(log_entry))
        
        return techniques
    
    def rule_based_mapping(self, log):
        """基于预定义规则的快速映射"""
        rules = {
            "T1566.001": lambda x: "phish" in x.get("raw_message", "").lower(),
            "T1059.003": lambda x: "powershell" in x.get("process", "").lower(),
            "T1021.002": lambda x: "smb" in x.get("action", "").lower(),
            "T1003.001": lambda x: "lsass" in x.get("process", "").lower()
        }
        
        matched_techniques = []
        for tech_id, rule_func in rules.items():
            if rule_func(log):
                matched_techniques.append({
                    "technique_id": tech_id,
                    "confidence": 0.8,  # 规则匹配置信度
                    "evidence": log["raw_message"]
                })
        return matched_techniques
    
    def llm_enhanced_mapping(self, log):
        """使用大模型进行复杂模式识别"""
        prompt = f"""
        分析以下安全事件,识别对应的MITRE ATT&CK技术:
        日志内容: {log['raw_message']}
        上下文: 源IP={log.get('source_ip')}, 目标IP={log.get('destination_ip')}, 操作={log.get('action')}
        
        请返回JSON格式:
        {{
            "techniques": [
                {{
                    "technique_id": "TXXXX.XXX",
                    "technique_name": "技术名称", 
                    "confidence": 0.95,
                    "reasoning": "映射理由"
                }}
            ]
        }}
        """
        
        response = self.llm_client.generate(prompt)
        return self.parse_llm_response(response)


3. 时序关联与图谱构建
攻击链重建算法:

python
class AttackChainBuilder:
    def build_attack_chain(self, labeled_events, time_window_minutes=60):
        """构建攻击链"""
        # 按时间排序
        sorted_events = sorted(labeled_events, key=lambda x: x["timestamp"])
        
        attack_chains = []
        current_chain = []
        
        for i, event in enumerate(sorted_events):
            if not current_chain:
                current_chain.append(event)
                continue
                
            last_event = current_chain[-1]
            
            # 判断是否属于同一条攻击链
            if self.is_same_attack_chain(last_event, event, time_window_minutes):
                current_chain.append(event)
            else:
                if len(current_chain) >= 2:  # 至少两个事件才构成链条
                    attack_chains.append(current_chain)
                current_chain = [event]
        
        # 处理最后一条链
        if len(current_chain) >= 2:
            attack_chains.append(current_chain)
            
        return attack_chains
    
    def is_same_attack_chain(self, event1, event2, time_window):
        """判断两个事件是否属于同一次攻击"""
        time_diff = (event2["timestamp"] - event1["timestamp"]).total_seconds() / 60
        
        # 时间窗口检查
        if time_diff > time_window:
            return False
            
        # 实体关联检查
        entity_overlap = self.calculate_entity_overlap(event1, event2)
        
        # 战术连续性检查
        tactical_flow = self.validate_tactical_sequence(
            event1["techniques"], 
            event2["techniques"]
        )
        
        return entity_overlap > 0.3 and tactical_flow


4. 战术阶段识别
阶段划分实现:

python
class TacticalPhaseIdentifier:
    # MITRE ATT&CK战术阶段定义
    TACTICAL_PHASES = {
        " reconnaissance": ["TA0043"],
        "resource_development": ["TA0042"], 
        "initial_access": ["TA0001"],
        "execution": ["TA0002"],
        "persistence": ["TA0003"],
        "privilege_escalation": ["TA0004"],
        "defense_evasion": ["TA0005"],
        "credential_access": ["TA0006"],
        "discovery": ["TA0007"],
        "lateral_movement": ["TA0008"],
        "collection": ["TA0009"],
        "command_and_control": ["TA0011"],
        "exfiltration": ["TA0010"],
        "impact": ["TA0040"]
    }
    
    def identify_phase(self, technique_id):
        """识别技术所属的战术阶段"""
        for phase, techniques in self.TACTICAL_PHASES.items():
            if any(tech in technique_id for tech in techniques):
                return phase
        return "unknown"
攻击链重构的具体输出
输出格式1:结构化攻击链报告
json
{
  "attack_chain_id": "ac-2024-001",
  "confidence_score": 0.92,
  "time_range": {
    "start": "2024-01-15T10:23:45Z",
    "end": "2024-01-15T11:45:30Z"
  },
  "attack_summary": "攻击者通过钓鱼邮件获得初始访问,在目标系统建立持久化后,进行内网横向移动并窃取敏感数据。",
  "attribution": {
    "threat_actor": "APT29",
    "confidence": 0.75,
    "matching_ttps": ["T1566.001", "T1059.003", "T1021.002"]
  },
  "kill_chain": [
    {
      "phase": "initial_access",
      "technique": "T1566.001 - Spearphishing Link",
      "timestamp": "2024-01-15T10:23:45Z",
      "evidence": "User clicked phishing link in email from fake_domain.com",
      "source_ip": "192.168.1.100",
      "destination_ip": "10.1.1.50",
      "impact_score": 8,
      "mitigation": ["用户安全意识培训", "邮件过滤规则更新"]
    },
    {
      "phase": "execution",
      "technique": "T1059.003 - Windows Command Shell",
      "timestamp": "2024-01-15T10:25:12Z", 
      "evidence": "Powershell executed encoded command from memory",
      "process": "powershell.exe",
      "command_line": "powershell -enc SQBFAFgAIAAoACg...",
      "impact_score": 7,
      "mitigation": ["应用白名单", "限制PowerShell执行权限"]
    },
    {
      "phase": "persistence",
      "technique": "T1053.005 - Scheduled Task",
      "timestamp": "2024-01-15T10:28:33Z",
      "evidence": "New scheduled task 'SystemUpdate' created",
      "task_name": "SystemUpdate",
      "impact_score": 6,
      "mitigation": ["监控计划任务创建", "实施最小权限原则"]
    }
  ],
  "impact_assessment": {
    "affected_assets": ["WEB-SRV-01", "USER-PC-23"],
    "data_breached": true,
    "sensitivity_level": "high",
    "business_impact": "客户数据可能泄露,需要立即通知相关方"
  },
  "recommended_actions": [
    {
      "priority": "critical",
      "action": "隔离受影响主机 WEB-SRV-01",
      "reason": "确认存在恶意持久化机制"
    },
    {
      "priority": "high", 
      "action": "重置所有相关用户密码",
      "reason": "凭证可能已泄露"
    },
    {
      "priority": "medium",
      "action": "更新邮件过滤规则",
      "reason": "阻止来自 fake_domain.com 的邮件"
    }
  ]
}
输出格式2:可视化攻击链图谱
python
# 攻击链图谱数据结构
attack_graph = {
    "nodes": [
        {
            "id": "node1",
            "type": "technique",
            "label": "T1566.001\nSpearphishing", 
            "phase": "initial_access",
            "timestamp": "2024-01-15T10:23:45Z",
            "risk_level": "high"
        },
        {
            "id": "node2", 
            "type": "technique",
            "label": "T1059.003\nCommand Shell",
            "phase": "execution", 
            "timestamp": "2024-01-15T10:25:12Z",
            "risk_level": "medium"
        },
        {
            "id": "node3",
            "type": "asset", 
            "label": "WEB-SRV-01",
            "role": "compromised_host",
            "risk_level": "critical"
        }
    ],
    "edges": [
        {
            "source": "node1",
            "target": "node2", 
            "relationship": "leads_to",
            "evidence": "同一源IP,时间连续性"
        },
        {
            "source": "node2",
            "target": "node3",
            "relationship": "compromises", 
            "evidence": "在目标主机执行命令"
        }
    ]
}
输出格式3:自然语言攻击故事


攻击事件分析报告

攻击概述:
在2024年1月15日10:23至11:45期间,检测到一次针对我司网络的针对性攻击。攻击者使用鱼叉式钓鱼作为入口点,成功在内部主机建立立足点并进行横向移动。

攻击时间线:
1. 初始访问(10:23):攻击者发送伪装成合作伙伴的钓鱼邮件,用户点击恶意链接
   - 证据:邮件日志显示用户点击来自fake_domain.com的链接
   - 技术:T1566.001 - 鱼叉式钓鱼链接

2. 代码执行(10:25):通过恶意链接下载并执行PowerShell脚本
   - 证据:进程监控发现powershell.exe执行base64编码命令
   - 技术:T1059.003 - Windows命令脚本

3. 持久化(10:28):攻击者创建计划任务维持访问
   - 证据:系统日志记录新建计划任务"SystemUpdate"
   - 技术:T1053.005 - 计划任务

影响评估:
- 2台主机确认受影响
- 潜在的数据泄露风险:高
- 业务连续性影响:中等

**紧急处置建议:**
1. 立即隔离主机 WEB-SRV-01
2. 检查并删除恶意计划任务
3. 重置相关用户凭证
4. 加强邮件安全过滤规则
输出格式4:IOC提取与威胁情报
json
{
  "indicators_of_compromise": {
    "network_iocs": [
      {
        "type": "domain",
        "value": "fake_domain.com",
        "first_seen": "2024-01-15T10:23:45Z",
        "last_seen": "2024-01-15T10:23:45Z",
        "confidence": "high"
      }
    ],
    "host_iocs": [
      {
        "type": "process",
        "value": "powershell.exe -enc SQBFAFgAIAAoACg...",
        "md5": "a1b2c3d4e5f678901234567890123456",
        "confidence": "high"
      },
      {
        "type": "scheduled_task", 
        "value": "SystemUpdate",
        "confidence": "medium"
      }
    ],
    "behavioral_iocs": [
      {
        "type": "tactic",
        "value": "T1566.001 -> T1059.003 -> T1053.005",
        "description": "钓鱼->命令执行->持久化的攻击模式",
        "confidence": "high"
      }
    ]
  }
}
关键技术优势
自动化程度高:从原始日志到完整攻击故事线全自动生成

可解释性强:每个攻击步骤都有明确的证据支撑

 actionable:直接提供可操作的处置建议

标准化输出:基于MITRE ATT&CK框架,便于跨团队沟通

这种攻击链重构能力让安全团队能够快速理解攻击全貌,制定精准的响应策略,极大提升了安全运营的效率。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐