『宝藏代码胶囊开张啦!』—— 我的 CodeCapsule 来咯!✨写代码不再头疼!我的新站点 CodeCapsule 主打一个 “白菜价”+“量身定制”!无论是卡脖子的毕设/课设/文献复现,需要灵光一现的算法改进,还是想给项目加个“外挂”,这里都有便宜又好用的代码方案等你发现!低成本,高适配,助你轻松通关!速来围观 👉 CodeCapsule官网

Django安全最佳实践:防范常见Web攻击

1. Web安全概述

1.1 Web安全的重要性

在当今数字化时代,Web应用安全已成为开发过程中不可忽视的关键环节。根据OWASP(开放式Web应用程序安全项目)的统计,超过70%的安全漏洞发生在应用层而非网络层。Django作为一个"自带电池"的Web框架,提供了许多内置的安全特性,但正确配置和使用这些特性至关重要。

1.2 常见Web攻击类型

Web安全威胁
注入攻击
跨站脚本XSS
跨站请求伪造CSRF
认证与会话攻击
文件上传漏洞
安全配置错误
敏感数据泄露
SQL注入
命令注入
存储型XSS
反射型XSS
DOM型XSS

2. Django安全配置基础

2.1 安全设置配置

# settings.py - 安全关键配置

# 密钥管理 - 永远不要将密钥提交到版本控制系统
SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY', 'your-default-dev-key-only')

# 调试模式 - 生产环境必须关闭
DEBUG = os.environ.get('DEBUG', 'False').lower() == 'true'

# 允许的主机 - 生产环境必须设置
ALLOWED_HOSTS = os.environ.get('ALLOWED_HOSTS', 'localhost,127.0.0.1').split(',')

# HTTPS安全设置
if not DEBUG:
    # 强制HTTPS
    SECURE_SSL_REDIRECT = True
    SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https')
    
    # Cookie安全设置
    SESSION_COOKIE_SECURE = True
    CSRF_COOKIE_SECURE = True
    SESSION_COOKIE_HTTPONLY = True
    CSRF_COOKIE_HTTPONLY = True
    
    # HSTS设置
    SECURE_HSTS_SECONDS = 31536000  # 1年
    SECURE_HSTS_INCLUDE_SUBDOMAINS = True
    SECURE_HSTS_PRELOAD = True
    
    # 其他安全头
    SECURE_BROWSER_XSS_FILTER = True
    SECURE_CONTENT_TYPE_NOSNIFF = True
    X_FRAME_OPTIONS = 'DENY'

# 数据库配置
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': os.environ.get('DB_NAME', 'mydatabase'),
        'USER': os.environ.get('DB_USER', 'myuser'),
        'PASSWORD': os.environ.get('DB_PASSWORD', 'mypassword'),
        'HOST': os.environ.get('DB_HOST', 'localhost'),
        'PORT': os.environ.get('DB_PORT', '5432'),
        'CONN_MAX_AGE': 600,  # 连接池
        'OPTIONS': {
            'sslmode': 'require',  # 强制SSL连接
        }
    }
}

# 静态文件配置
STATIC_ROOT = os.path.join(BASE_DIR, 'staticfiles')
STATIC_URL = '/static/'

# 媒体文件配置
MEDIA_ROOT = os.path.join(BASE_DIR, 'media')
MEDIA_URL = '/media/'

# 文件上传权限
FILE_UPLOAD_PERMISSIONS = 0o644
FILE_UPLOAD_DIRECTORY_PERMISSIONS = 0o755

2.2 中间件安全配置

# settings.py - 中间件配置
MIDDLEWARE = [
    # 安全相关中间件应该在最前面
    'django.middleware.security.SecurityMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
    
    # 其他中间件
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    
    # 自定义安全中间件
    'myapp.middleware.SecurityHeadersMiddleware',
]

# 自定义安全头中间件
class SecurityHeadersMiddleware:
    """自定义安全头中间件"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        response = self.get_response(request)
        
        # 添加安全头
        response['X-Content-Type-Options'] = 'nosniff'
        response['X-Frame-Options'] = 'DENY'
        response['X-XSS-Protection'] = '1; mode=block'
        response['Referrer-Policy'] = 'strict-origin-when-cross-origin'
        response['Permissions-Policy'] = 'geolocation=(), microphone=(), camera=()'
        
        # CSP内容安全策略
        csp_directives = [
            "default-src 'self'",
            "script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net",
            "style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net",
            "img-src 'self' data: https:",
            "font-src 'self' https://cdn.jsdelivr.net",
            "connect-src 'self'",
            "frame-ancestors 'none'",
        ]
        response['Content-Security-Policy'] = '; '.join(csp_directives)
        
        return response

3. SQL注入防护

3.1 SQL注入原理与危害

SQL注入是通过将恶意SQL代码插入到应用输入参数中,从而在数据库执行非法操作的攻击方式。Django的ORM提供了天然的SQL注入防护。

3.2 安全查询实践

# 危险的原始SQL查询 - 不要这样做!
from django.db import connection

def unsafe_user_search(request):
    username = request.GET.get('username')
    
    # 存在SQL注入漏洞
    with connection.cursor() as cursor:
        cursor.execute(f"SELECT * FROM auth_user WHERE username = '{username}'")
        results = cursor.fetchall()
    
    return results

# 安全的Django ORM查询
def safe_user_search(request):
    username = request.GET.get('username')
    
    # 使用ORM防止SQL注入
    users = User.objects.filter(username=username)
    return users

# 必须使用原始SQL时的安全做法
def safe_raw_sql(request):
    username = request.GET.get('username')
    
    with connection.cursor() as cursor:
        # 使用参数化查询
        cursor.execute("SELECT * FROM auth_user WHERE username = %s", [username])
        results = cursor.fetchall()
    
    return results

# 复杂查询的安全示例
class UserManager(models.Manager):
    def search_users_safely(self, search_terms, min_login_count=0):
        """
        安全的用户搜索方法
        使用参数化查询和输入验证
        """
        # 输入验证和清理
        if not search_terms or len(search_terms) > 100:
            return self.none()
        
        # 使用ORM构建安全查询
        query = self.get_queryset()
        
        if min_login_count > 0:
            query = query.filter(login_count__gte=min_login_count)
        
        # 安全地处理搜索条件
        conditions = models.Q()
        for term in search_terms.split():
            if len(term) < 50:  # 防止过长的搜索词
                conditions |= models.Q(username__icontains=term) | \
                             models.Q(email__icontains=term) | \
                             models.Q(first_name__icontains=term)
        
        return query.filter(conditions).distinct()

# 额外的输入验证
from django.core.exceptions import ValidationError
import re

def validate_sql_safe_string(value):
    """
    验证字符串是否不包含危险的SQL字符
    """
    dangerous_patterns = [
        r"(\b(DROP|DELETE|UPDATE|INSERT|ALTER|CREATE|EXEC)\b)",
        r"(\-\-|\#|\/\*)",  # SQL注释
        r"(\b(OR|AND)\b.*=)",  # 逻辑操作符滥用
        r"(;|\|&)",  # 命令分隔符
    ]
    
    for pattern in dangerous_patterns:
        if re.search(pattern, value, re.IGNORECASE):
            raise ValidationError(f"输入包含潜在的危险字符: {value}")
    
    return value

# 在模型中使用验证
class UserQuery(models.Model):
    search_term = models.CharField(
        max_length=100,
        validators=[validate_sql_safe_string]
    )
    
    def clean(self):
        super().clean()
        validate_sql_safe_string(self.search_term)

3.3 高级防护技术

# database_security.py
from django.db import connection, models
from django.core.exceptions import PermissionDenied
import logging

logger = logging.getLogger(__name__)

class DatabaseSecurityMonitor:
    """数据库安全监控"""
    
    @classmethod
    def log_suspicious_activity(cls, query, params, user):
        """记录可疑的数据库活动"""
        suspicious_keywords = ['drop', 'delete', 'alter', 'truncate', 'exec']
        
        query_lower = query.lower()
        if any(keyword in query_lower for keyword in suspicious_keywords):
            logger.warning(
                f"Suspicious database activity by user {user}: {query} with params {params}"
            )
    
    @classmethod
    def execute_safe_query(cls, query, params=None, user=None):
        """执行安全的数据库查询"""
        if user and not user.is_authenticated:
            raise PermissionDenied("未授权访问")
        
        cls.log_suspicious_activity(query, params, user)
        
        with connection.cursor() as cursor:
            cursor.execute(query, params or [])
            return cursor.fetchall()

# 自定义查询集添加安全方法
class SecureQuerySet(models.QuerySet):
    def safe_filter(self, **filters):
        """安全的过滤方法,添加额外的验证"""
        # 验证过滤条件
        for field, value in filters.items():
            if hasattr(self.model, field):
                field_obj = self.model._meta.get_field(field)
                # 可以在这里添加字段级别的验证
                if isinstance(field_obj, models.CharField) and len(str(value)) > 255:
                    raise ValueError(f"字段 {field} 的值过长")
        
        return self.filter(**filters)

class SecureManager(models.Manager):
    def get_queryset(self):
        return SecureQuerySet(self.model, using=self._db)

4. 跨站脚本(XSS)防护

4.1 XSS攻击类型

  • 存储型XSS:恶意脚本存储在服务器端
  • 反射型XSS:恶意脚本通过URL参数反射给用户
  • DOM型XSS:在客户端DOM环境中执行的XSS

4.2 Django模板自动转义

# 模板中的自动转义
"""
Django模板默认开启自动HTML转义
以下字符会被转义:
< 转义为 &lt;
> 转义为 &gt;
' 转义为 &#x27;
" 转义为 &quot;
& 转义为 &amp;
"""

# 安全的模板示例
{% comment %} 自动转义,安全 {% endcomment %}
<p>{{ user_input }}</p>

{% comment %} 手动关闭转义 - 危险! {% endcomment %}
<p>{{ user_input|safe }}</p>

{% comment %} 有条件的安全输出 {% endcomment %}
<p>{{ user_input|escape }}</p>

{% comment %} 自动转义JavaScript上下文 {% endcomment %}
<script>
var username = "{{ username|escapejs }}";
var userData = {{ user_data|json_script:"user-data" }};
</script>

# 自定义安全过滤器
from django import template
from django.utils.html import escape, strip_tags
from django.utils.safestring import mark_safe
import html
import json

register = template.Library()

@register.filter
def safe_html(text):
    """
    安全地允许有限的HTML标签
    """
    if not text:
        return ""
    
    allowed_tags = ['b', 'i', 'u', 'em', 'strong', 'p', 'br']
    allowed_attrs = {
        'a': ['href', 'title', 'target'],
        'img': ['src', 'alt', 'width', 'height'],
    }
    
    from django.utils.html import strip_tags
    text = strip_tags(text, allowed_tags)
    
    # 这里可以添加更复杂的HTML清理逻辑
    # 考虑使用 bleach 库进行更严格的白名单过滤
    
    return mark_safe(text)

@register.filter
def json_safe(obj):
    """
    安全地将Python对象转换为JSON
    """
    return mark_safe(json.dumps(obj))

@register.filter
def truncate_safe(text, length=100):
    """
    安全地截断文本
    """
    if not text:
        return ""
    
    text = str(text)
    if len(text) <= length:
        return escape(text)
    
    return escape(text[:length]) + "..."

# 在模板中使用
"""
{{ user_comment|safe_html }}
{{ user_data|json_safe }}
{{ long_text|truncate_safe:50 }}
"""

4.3 内容安全策略(CSP)

# csp.py - 内容安全策略实现
from django.utils.deprecation import MiddlewareMixin
import hashlib

class CSPMiddleware(MiddlewareMixin):
    """内容安全策略中间件"""
    
    def process_response(self, request, response):
        # 只为HTML响应添加CSP
        if response.get('Content-Type', '').startswith('text/html'):
            nonce = self.generate_nonce()
            
            csp_directives = [
                f"default-src 'self'",
                f"script-src 'self' 'nonce-{nonce}' https://cdn.example.com",
                f"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com",
                f"img-src 'self' data: https:",
                f"font-src 'self' https://fonts.gstatic.com",
                f"connect-src 'self'",
                f"frame-ancestors 'none'",
                f"base-uri 'self'",
                f"form-action 'self'",
            ]
            
            response['Content-Security-Policy'] = '; '.join(csp_directives)
            
            # 将nonce存储在request中供模板使用
            request.csp_nonce = nonce
        
        return response
    
    def generate_nonce(self):
        """生成随机nonce值"""
        return hashlib.sha256(os.urandom(32)).hexdigest()[:16]

# 在模板中使用nonce
"""
<script nonce="{{ request.csp_nonce }}">
    // 内联脚本需要nonce才能执行
    console.log('这个脚本有nonce,可以执行');
</script>
"""

# 使用django-csp包(推荐)
"""
安装:pip install django-csp

在settings.py中配置:
MIDDLEWARE = [
    'csp.middleware.CSPMiddleware',
    # ... 其他中间件
]

CSP_DEFAULT_SRC = ["'self'"]
CSP_SCRIPT_SRC = ["'self'", "https://cdn.example.com"]
CSP_STYLE_SRC = ["'self'", "'unsafe-inline'"]
CSP_IMG_SRC = ["'self'", "data:", "https:"]
CSP_INCLUDE_NONCE_IN = ['script-src']
"""

5. 跨站请求伪造(CSRF)防护

5.1 CSRF防护机制

# CSRF安全配置
# settings.py

# 确保CSRF中间件启用
MIDDLEWARE = [
    # ...
    'django.middleware.csrf.CsrfViewMiddleware',
    # ...
]

# CSRF信任的源(在需要时设置)
CSRF_TRUSTED_ORIGINS = [
    'https://mydomain.com',
    'https://api.mydomain.com',
]

# CSRF Cookie设置
CSRF_COOKIE_HTTPONLY = True
CSRF_COOKIE_SECURE = True  # 仅HTTPS
CSRF_COOKIE_SAMESITE = 'Lax'  # 或 'Strict'

# 视图中的CSRF保护
from django.views.decorators.csrf import csrf_protect, csrf_exempt, ensure_csrf_cookie
from django.utils.decorators import method_decorator
from django.middleware.csrf import get_token

@method_decorator(csrf_protect, name='dispatch')
class SecureFormView(View):
    """需要CSRF保护的视图"""
    
    def get(self, request):
        # 确保CSRF token被设置
        get_token(request)
        return render(request, 'form.html')
    
    def post(self, request):
        # CSRF中间件会自动验证
        # 如果验证失败,会返回403错误
        form = MyForm(request.POST)
        if form.is_valid():
            # 处理表单
            return redirect('success')
        return render(request, 'form.html', {'form': form})

@method_decorator(csrf_exempt, name='dispatch')
class APIPublicView(View):
    """豁免CSRF保护的API视图(谨慎使用)"""
    
    def post(self, request):
        # 这里需要其他形式的认证
        return JsonResponse({'status': 'ok'})

@method_decorator(ensure_csrf_cookie, name='dispatch')
class GetCSRFTokenView(View):
    """获取CSRF token的API端点"""
    
    def get(self, request):
        return JsonResponse({'detail': 'CSRF cookie set'})

# 模板中的CSRF token
"""
<!-- 在表单中包含CSRF token -->
<form method="post">
    {% csrf_token %}
    <!-- 表单字段 -->
    <input type="text" name="username">
    <button type="submit">提交</button>
</form>

<!-- 对于AJAX请求 -->
<script>
// 获取CSRF token
function getCookie(name) {
    let cookieValue = null;
    if (document.cookie && document.cookie !== '') {
        const cookies = document.cookie.split(';');
        for (let i = 0; i < cookies.length; i++) {
            const cookie = cookies[i].trim();
            if (cookie.substring(0, name.length + 1) === (name + '=')) {
                cookieValue = decodeURIComponent(cookie.substring(name.length + 1));
                break;
            }
        }
    }
    return cookieValue;
}

const csrftoken = getCookie('csrftoken');

// 在AJAX请求中包含CSRF token
fetch('/api/endpoint/', {
    method: 'POST',
    headers: {
        'Content-Type': 'application/json',
        'X-CSRFToken': csrftoken,
    },
    body: JSON.stringify(data)
});
</script>
"""

5.2 高级CSRF防护

# advanced_csrf.py
from django.views.decorators.csrf import csrf_protect
from django.middleware.csrf import rotate_token
import logging

logger = logging.getLogger(__name__)

class AdvancedCSRFProtection:
    """高级CSRF防护"""
    
    @classmethod
    def validate_referer(cls, request):
        """验证Referer头"""
        referer = request.META.get('HTTP_REFERER')
        if not referer:
            logger.warning("请求缺少Referer头")
            return False
        
        # 检查Referer是否来自可信域名
        trusted_domains = ['https://mydomain.com', 'https://www.mydomain.com']
        if not any(referer.startswith(domain) for domain in trusted_domains):
            logger.warning(f"可疑的Referer: {referer}")
            return False
        
        return True
    
    @classmethod
    def double_submit_cookie_pattern(cls, request):
        """双提交Cookie模式"""
        # 除了标准的CSRF token,还可以使用自定义header
        custom_token = request.headers.get('X-Custom-CSRF-Token')
        cookie_token = request.COOKIES.get('custom_csrf_token')
        
        if custom_token and cookie_token and custom_token == cookie_token:
            return True
        return False

# 自定义CSRF验证装饰器
def enhanced_csrf_protect(view_func):
    """增强的CSRF保护装饰器"""
    
    @csrf_protect
    def wrapped_view(request, *args, **kwargs):
        # 额外的安全检查
        if request.method in ('POST', 'PUT', 'PATCH', 'DELETE'):
            # 验证Referer
            if not AdvancedCSRFProtection.validate_referer(request):
                logger.warning(f"CSRF Referer验证失败: {request.path}")
                from django.http import HttpResponseForbidden
                return HttpResponseForbidden("安全验证失败")
            
            # 旋转token(可选)
            if request.user.is_authenticated:
                rotate_token(request)
        
        return view_func(request, *args, **kwargs)
    
    return wrapped_view

# 在视图中使用
@method_decorator(enhanced_csrf_protect, name='dispatch')
class SecureTransactionView(View):
    """需要增强CSRF保护的重要操作视图"""
    
    def post(self, request):
        # 这里已经通过了增强的CSRF验证
        # 处理敏感操作
        return JsonResponse({'status': 'success'})

6. 认证与授权安全

6.1 密码安全

# authentication/security.py
from django.contrib.auth.hashers import make_password, check_password
from django.contrib.auth.password_validation import validate_password
from django.core.exceptions import ValidationError
import re
import logging

logger = logging.getLogger(__name__)

class PasswordSecurity:
    """密码安全工具类"""
    
    @staticmethod
    def validate_password_strength(password):
        """
        验证密码强度
        """
        errors = []
        
        # 长度检查
        if len(password) < 12:
            errors.append("密码至少需要12个字符")
        
        # 复杂度检查
        if not re.search(r'[A-Z]', password):
            errors.append("密码必须包含至少一个大写字母")
        if not re.search(r'[a-z]', password):
            errors.append("密码必须包含至少一个小写字母")
        if not re.search(r'[0-9]', password):
            errors.append("密码必须包含至少一个数字")
        if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
            errors.append("密码必须包含至少一个特殊字符")
        
        # 常见密码检查
        common_passwords = [
            'password', '123456', 'qwerty', 'admin', 'welcome'
        ]
        if password.lower() in common_passwords:
            errors.append("密码过于常见")
        
        if errors:
            raise ValidationError(errors)
    
    @staticmethod
    def create_secure_password(raw_password):
        """
        创建安全的密码哈希
        """
        try:
            # 使用Django的密码验证
            validate_password(raw_password)
            PasswordSecurity.validate_password_strength(raw_password)
            
            # 创建密码哈希
            return make_password(raw_password)
        
        except ValidationError as e:
            logger.warning(f"密码强度验证失败: {e}")
            raise
    
    @staticmethod
    def verify_password(raw_password, hashed_password):
        """
        验证密码
        """
        return check_password(raw_password, hashed_password)

# 自定义用户模型
from django.contrib.auth.models import AbstractUser
from django.db import models

class SecureUser(AbstractUser):
    """增强安全性的用户模型"""
    
    # 额外的安全字段
    last_password_change = models.DateTimeField(auto_now_add=True)
    password_changed = models.BooleanField(default=False)
    failed_login_attempts = models.IntegerField(default=0)
    account_locked_until = models.DateTimeField(null=True, blank=True)
    mfa_enabled = models.BooleanField(default=False)
    mfa_secret = models.CharField(max_length=32, blank=True)
    
    def set_password(self, raw_password):
        """重写设置密码方法"""
        self.password = PasswordSecurity.create_secure_password(raw_password)
        self.last_password_change = timezone.now()
        self.password_changed = True
        self.failed_login_attempts = 0  # 重置失败尝试计数
    
    def check_password(self, raw_password):
        """检查密码前验证账户状态"""
        if self.account_locked_until and timezone.now() < self.account_locked_until:
            raise ValidationError("账户已被锁定,请稍后重试")
        
        is_correct = super().check_password(raw_password)
        
        if is_correct:
            self.failed_login_attempts = 0
            self.save()
        else:
            self.failed_login_attempts += 1
            if self.failed_login_attempts >= 5:
                self.account_locked_until = timezone.now() + timedelta(minutes=30)
            self.save()
        
        return is_correct
    
    class Meta:
        db_table = 'secure_users'

# 自定义认证后端
from django.contrib.auth.backends import ModelBackend

class SecureAuthenticationBackend(ModelBackend):
    """安全认证后端"""
    
    def authenticate(self, request, username=None, password=None, **kwargs):
        try:
            user = SecureUser.objects.get(username=username)
            
            # 检查账户是否被锁定
            if user.account_locked_until and timezone.now() < user.account_locked_until:
                logger.warning(f"登录尝试被拒绝:账户 {username} 已被锁定")
                return None
            
            # 验证密码
            if user.check_password(password):
                # 记录登录成功
                user.last_login = timezone.now()
                user.save()
                
                # 记录审计日志
                self.log_authentication_success(request, user)
                return user
            else:
                # 记录登录失败
                self.log_authentication_failure(request, username)
                return None
        
        except SecureUser.DoesNotExist:
            # 用户不存在也记录日志(防止用户枚举)
            self.log_authentication_failure(request, username)
            return None
    
    def log_authentication_success(self, request, user):
        """记录认证成功日志"""
        logger.info(f"用户 {user.username} 登录成功 - IP: {self.get_client_ip(request)}")
    
    def log_authentication_failure(self, request, username):
        """记录认证失败日志"""
        logger.warning(f"登录失败 - 用户名: {username} - IP: {self.get_client_ip(request)}")
    
    def get_client_ip(self, request):
        """获取客户端IP"""
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            ip = x_forwarded_for.split(',')[0]
        else:
            ip = request.META.get('REMOTE_ADDR')
        return ip

6.2 会话安全

# session_security.py
from django.contrib.sessions.backends.db import SessionStore
from django.utils import timezone
from datetime import timedelta
import logging

logger = logging.getLogger(__name__)

class SecureSessionStore(SessionStore):
    """安全会话存储"""
    
    def __init__(self, session_key=None):
        super().__init__(session_key)
        # 设置会话超时时间(15分钟)
        self.set_expiry(900)
    
    def validate_session(self, request):
        """验证会话安全性"""
        user_agent = request.META.get('HTTP_USER_AGENT', '')
        client_ip = self.get_client_ip(request)
        
        # 检查用户代理是否改变
        stored_ua = self.get('user_agent')
        if stored_ua and stored_ua != user_agent:
            logger.warning(f"会话用户代理不匹配: {self.session_key}")
            self.flush()  # 销毁会话
            return False
        
        # 检查IP地址是否改变(可选,根据安全要求)
        stored_ip = self.get('client_ip')
        if stored_ip and stored_ip != client_ip:
            logger.warning(f"会话IP地址改变: {self.session_key}")
            self.flush()
            return False
        
        # 存储当前会话信息
        if not stored_ua:
            self['user_agent'] = user_agent
        if not stored_ip:
            self['client_ip'] = client_ip
        
        self['last_activity'] = timezone.now().isoformat()
        self.save()
        
        return True
    
    def get_client_ip(self, request):
        """获取客户端IP"""
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            ip = x_forwarded_for.split(',')[0]
        else:
            ip = request.META.get('REMOTE_ADDR')
        return ip

# 会话安全中间件
class SessionSecurityMiddleware:
    """会话安全中间件"""
    
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        # 只为认证用户检查会话安全
        if request.user.is_authenticated:
            if hasattr(request, 'session'):
                session_store = SecureSessionStore(request.session.session_key)
                if not session_store.validate_session(request):
                    # 会话无效,强制登出
                    from django.contrib.auth import logout
                    logout(request)
                    logger.warning(f"无效会话,用户已登出: {request.user.username}")
        
        response = self.get_response(request)
        return response

# 会话配置
# settings.py
SESSION_ENGINE = 'django.contrib.sessions.backends.db'  # 使用数据库存储会话
SESSION_COOKIE_AGE = 900  # 15分钟
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
SESSION_EXPIRE_AT_BROWSER_CLOSE = True  # 浏览器关闭时过期

# 防止会话固定攻击
SESSION_SERIALIZER = 'django.contrib.sessions.serializers.JSONSerializer'

# 定期清理过期会话
from django.core.management.base import BaseCommand
from django.contrib.sessions.models import Session
from django.utils import timezone

class Command(BaseCommand):
    """清理过期会话的管理命令"""
    
    help = '清理过期会话'
    
    def handle(self, *args, **options):
        expired_sessions = Session.objects.filter(expire_date__lt=timezone.now())
        count = expired_sessions.count()
        expired_sessions.delete()
        
        self.stdout.write(
            self.style.SUCCESS(f'成功删除 {count} 个过期会话')
        )

7. 文件上传安全

7.1 安全的文件上传处理

# file_upload/security.py
import os
import magic
from django.core.exceptions import ValidationError
from django.core.files.uploadedfile import UploadedFile
from PIL import Image
import hashlib
import logging

logger = logging.getLogger(__name__)

class FileUploadSecurity:
    """文件上传安全类"""
    
    # 允许的MIME类型
    ALLOWED_IMAGE_TYPES = [
        'image/jpeg', 'image/png', 'image/gif', 'image/webp'
    ]
    
    ALLOWED_DOCUMENT_TYPES = [
        'application/pdf',
        'application/msword',
        'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
        'text/plain'
    ]
    
    # 文件大小限制(字节)
    MAX_FILE_SIZE = 10 * 1024 * 1024  # 10MB
    MAX_IMAGE_SIZE = 5 * 1024 * 1024   # 5MB
    
    @classmethod
    def validate_file_upload(cls, uploaded_file: UploadedFile, file_type='image'):
        """
        验证文件上传的安全性
        """
        # 检查文件大小
        if file_type == 'image':
            max_size = cls.MAX_IMAGE_SIZE
        else:
            max_size = cls.MAX_FILE_SIZE
        
        if uploaded_file.size > max_size:
            raise ValidationError(f"文件大小不能超过 {max_size // 1024 // 1024}MB")
        
        # 检查文件类型
        allowed_types = cls.ALLOWED_IMAGE_TYPES if file_type == 'image' else cls.ALLOWED_DOCUMENT_TYPES
        
        # 使用python-magic进行MIME类型检测
        try:
            file_mime = magic.from_buffer(uploaded_file.read(1024), mime=True)
            uploaded_file.seek(0)  # 重置文件指针
            
            if file_mime not in allowed_types:
                raise ValidationError(f"不支持的文件类型: {file_mime}")
        
        except ImportError:
            # 如果python-magic不可用,使用文件扩展名验证(不够安全)
            logger.warning("python-magic未安装,使用扩展名验证")
            ext = os.path.splitext(uploaded_file.name)[1].lower()
            allowed_extensions = {
                'image': ['.jpg', '.jpeg', '.png', '.gif', '.webp'],
                'document': ['.pdf', '.doc', '.docx', '.txt']
            }
            
            if ext not in allowed_extensions.get(file_type, []):
                raise ValidationError(f"不支持的文件扩展名: {ext}")
        
        # 检查文件名安全性
        cls.validate_filename(uploaded_file.name)
        
        return True
    
    @classmethod
    def validate_filename(cls, filename):
        """验证文件名安全性"""
        # 防止路径遍历攻击
        if '..' in filename or '/' in filename or '\\' in filename:
            raise ValidationError("文件名包含非法字符")
        
        # 防止特殊字符
        dangerous_chars = ['<', '>', ':', '"', '|', '?', '*']
        if any(char in filename for char in dangerous_chars):
            raise ValidationError("文件名包含危险字符")
        
        # 文件名长度限制
        if len(filename) > 255:
            raise ValidationError("文件名过长")
    
    @classmethod
    def generate_secure_filename(cls, original_filename):
        """生成安全的文件名"""
        name, ext = os.path.splitext(original_filename)
        
        # 清理文件名
        import re
        name = re.sub(r'[^\w\s-]', '', name)
        name = re.sub(r'[-\s]+', '-', name).strip('-')
        
        # 添加随机哈希防止冲突
        random_hash = hashlib.md5(os.urandom(32)).hexdigest()[:8]
        
        return f"{name}_{random_hash}{ext}"
    
    @classmethod
    def process_image_safely(cls, image_path):
        """安全地处理图片(去除元数据等)"""
        try:
            with Image.open(image_path) as img:
                # 转换为RGB模式(去除透明度等)
                if img.mode in ('RGBA', 'LA'):
                    background = Image.new('RGB', img.size, (255, 255, 255))
                    background.paste(img, mask=img.split()[-1])
                    img = background
                
                # 保存图片,不包含元数据
                img.save(image_path, 'JPEG', quality=85, optimize=True)
                
                return True
                
        except Exception as e:
            logger.error(f"图片处理失败: {str(e)}")
            # 删除有问题的文件
            if os.path.exists(image_path):
                os.remove(image_path)
            raise ValidationError("图片处理失败")

# Django表单中的文件上传验证
from django import forms
from django.core.validators import FileExtensionValidator

class SecureFileUploadForm(forms.Form):
    """安全的文件上传表单"""
    
    file = forms.FileField(
        validators=[
            FileExtensionValidator(
                allowed_extensions=['jpg', 'jpeg', 'png', 'gif', 'pdf', 'doc', 'docx']
            )
        ]
    )
    
    def clean_file(self):
        file = self.cleaned_data.get('file')
        
        if file:
            try:
                # 使用安全类验证文件
                FileUploadSecurity.validate_file_upload(file)
                
                # 生成安全文件名
                secure_name = FileUploadSecurity.generate_secure_filename(file.name)
                file.name = secure_name
                
            except ValidationError as e:
                raise forms.ValidationError(e.message)
        
        return file

# 视图中的文件上传处理
from django.views.generic.edit import FormView
from django.urls import reverse_lazy

class SecureUploadView(FormView):
    """安全的文件上传视图"""
    
    form_class = SecureFileUploadForm
    template_name = 'upload.html'
    success_url = reverse_lazy('upload_success')
    
    def form_valid(self, form):
        file = form.cleaned_data['file']
        
        # 安全的文件保存路径
        upload_dir = os.path.join(settings.MEDIA_ROOT, 'uploads')
        os.makedirs(upload_dir, exist_ok=True)
        
        file_path = os.path.join(upload_dir, file.name)
        
        # 分块写入文件
        with open(file_path, 'wb+') as destination:
            for chunk in file.chunks():
                destination.write(chunk)
        
        # 如果是图片,进行安全处理
        if file.content_type.startswith('image/'):
            try:
                FileUploadSecurity.process_image_safely(file_path)
            except ValidationError as e:
                # 处理失败,删除文件
                if os.path.exists(file_path):
                    os.remove(file_path)
                form.add_error('file', e)
                return self.form_invalid(form)
        
        # 记录上传日志
        logger.info(f"文件上传成功: {file.name} - 用户: {self.request.user}")
        
        return super().form_valid(form)

7.2 文件存储安全配置

# storage_security.py
from django.core.files.storage import FileSystemStorage
from django.utils.deconstruct import deconstructible
import os

@deconstructible
class SecureFileStorage(FileSystemStorage):
    """安全的文件存储系统"""
    
    def __init__(self, location=None, base_url=None):
        if location is None:
            location = os.path.join(settings.MEDIA_ROOT, 'secure_uploads')
        super().__init__(location, base_url)
    
    def get_valid_name(self, name):
        """获取安全的文件名"""
        from .file_upload.security import FileUploadSecurity
        return FileUploadSecurity.generate_secure_filename(name)
    
    def _save(self, name, content):
        """保存文件前的安全检查"""
        # 验证文件类型和内容
        from .file_upload.security import FileUploadSecurity
        
        # 创建上传文件的临时副本进行验证
        import tempfile
        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
            for chunk in content.chunks():
                temp_file.write(chunk)
            temp_path = temp_file.name
        
        try:
            # 验证文件
            FileUploadSecurity.validate_file_upload(content)
            
            # 调用父类方法保存文件
            return super()._save(name, content)
        
        finally:
            # 清理临时文件
            if os.path.exists(temp_path):
                os.remove(temp_path)
    
    def url(self, name):
        """生成安全的文件URL"""
        # 防止直接访问敏感文件
        if name.startswith('private/'):
            raise ValueError("无法生成私有文件的URL")
        
        return super().url(name)

# 配置安全的文件存储
# settings.py
DEFAULT_FILE_STORAGE = 'myapp.storage_security.SecureFileStorage'

# Nginx配置示例(防止直接执行上传的文件)
"""
location /media/ {
    # 禁止执行PHP等脚本文件
    location ~ \.(php|php5|phtml|pl|py|jsp|asp|sh|cgi)$ {
        deny all;
        return 404;
    }
    
    # 设置安全的Content-Type
    types {
        image/jpeg jpg jpeg;
        image/png png;
        image/gif gif;
        application/pdf pdf;
    }
    default_type application/octet-stream;
    
    # 添加安全头
    add_header X-Content-Type-Options nosniff;
    add_header X-Frame-Options DENY;
}
"""

8. 数据验证与序列化安全

8.1 输入验证

# validation/security.py
import re
import html
from django.core.exceptions import ValidationError
from django.utils.html import strip_tags
from django.utils.encoding import force_str

class InputValidator:
    """输入验证器"""
    
    @staticmethod
    def validate_email(email):
        """验证邮箱地址"""
        if not email or len(email) > 254:
            raise ValidationError("无效的邮箱地址")
        
        email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        if not re.match(email_regex, email):
            raise ValidationError("无效的邮箱格式")
        
        return email.lower()
    
    @staticmethod
    def validate_phone(phone):
        """验证手机号码"""
        if not phone:
            raise ValidationError("手机号码不能为空")
        
        # 简单的手机号验证(根据需求调整)
        phone_regex = r'^1[3-9]\d{9}$'
        if not re.match(phone_regex, phone):
            raise ValidationError("无效的手机号码格式")
        
        return phone
    
    @staticmethod
    def sanitize_html(html_content, max_length=5000):
        """清理HTML内容"""
        if not html_content:
            return ""
        
        # 长度限制
        if len(html_content) > max_length:
            raise ValidationError(f"内容长度不能超过 {max_length} 个字符")
        
        # 使用bleach进行HTML清理(如果可用)
        try:
            import bleach
            from bleach.sanitizer import ALLOWED_TAGS, ALLOWED_ATTRIBUTES
            
            # 定义允许的标签和属性
            allowed_tags = ALLOWED_TAGS + ['p', 'br', 'span', 'div', 'h1', 'h2', 'h3']
            allowed_attributes = {
                **ALLOWED_ATTRIBUTES,
                'a': ['href', 'title', 'target', 'rel'],
                'img': ['src', 'alt', 'width', 'height'],
                'span': ['class'],
                'div': ['class'],
            }
            
            cleaned_html = bleach.clean(
                html_content,
                tags=allowed_tags,
                attributes=allowed_attributes,
                strip=True
            )
            
            return cleaned_html
            
        except ImportError:
            # 回退方案:去除所有HTML标签
            return strip_tags(html_content)
    
    @staticmethod
    def validate_url(url):
        """验证URL"""
        if not url:
            return ""
        
        url_regex = r'^https?://[^\s/$.?#].[^\s]*$'
        if not re.match(url_regex, url):
            raise ValidationError("无效的URL格式")
        
        # 防止JavaScript URL
        if url.lower().startswith(('javascript:', 'data:', 'vbscript:')):
            raise ValidationError("不允许的URL协议")
        
        return url
    
    @staticmethod
    def prevent_numeric_overflow(value, max_digits=10, decimal_places=2):
        """防止数值溢出"""
        if value is None:
            return value
        
        str_value = str(value)
        if '.' in str_value:
            integer_part, decimal_part = str_value.split('.')
            if len(integer_part) > max_digits - decimal_places:
                raise ValidationError("数值过大")
            if len(decimal_part) > decimal_places:
                decimal_part = decimal_part[:decimal_places]
                value = float(f"{integer_part}.{decimal_part}")
        else:
            if len(str_value) > max_digits:
                raise ValidationError("数值过大")
        
        return value

# 自定义模型字段
from django.db import models
from django.core.validators import BaseValidator

class SafeCharField(models.CharField):
    """安全的字符字段"""
    
    def __init__(self, *args, **kwargs):
        # 默认设置最大长度
        kwargs.setdefault('max_length', 255)
        super().__init__(*args, **kwargs)
    
    def to_python(self, value):
        value = super().to_python(value)
        if value:
            # 清理输入
            value = html.escape(force_str(value))
        return value

class SafeTextField(models.TextField):
    """安全的文本字段"""
    
    def to_python(self, value):
        value = super().to_python(value)
        if value:
            # 清理HTML内容
            value = InputValidator.sanitize_html(value)
        return value

# 在模型中使用安全字段
class UserProfile(models.Model):
    user = models.OneToOneField(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
    bio = SafeTextField(blank=True, max_length=2000)
    website = models.URLField(blank=True, validators=[InputValidator.validate_url])
    phone = SafeCharField(max_length=20, blank=True, validators=[InputValidator.validate_phone])
    
    def clean(self):
        super().clean()
        # 额外的模型级别验证
        if self.website and not self.website.startswith(('http://', 'https://')):
            self.website = 'https://' + self.website

8.2 API序列化安全

# api/security.py
from rest_framework import serializers
from django.core.exceptions import ValidationError

class SafeModelSerializer(serializers.ModelSerializer):
    """安全的模型序列化器"""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.set_field_limits()
    
    def set_field_limits(self):
        """根据模型字段设置序列化器字段限制"""
        if hasattr(self, 'Meta') and hasattr(self.Meta, 'model'):
            for field_name, field in self.fields.items():
                model_field = None
                try:
                    model_field = self.Meta.model._meta.get_field(field_name)
                except:
                    continue
                
                # 设置最大长度
                if hasattr(model_field, 'max_length') and model_field.max_length:
                    if hasattr(field, 'max_length'):
                        field.max_length = min(field.max_length or float('inf'), model_field.max_length)
    
    def validate(self, data):
        """全局验证"""
        data = super().validate(data)
        
        # 防止大规模分配攻击
        if hasattr(self, 'initial_data'):
            extra_fields = set(self.initial_data.keys()) - set(data.keys())
            if extra_fields:
                raise serializers.ValidationError({
                    'non_field_errors': f'不允许的字段: {", ".join(extra_fields)}'
                })
        
        return data

class UserSerializer(SafeModelSerializer):
    """用户序列化器"""
    
    email = serializers.EmailField(max_length=254)
    username = serializers.CharField(max_length=150)
    
    class Meta:
        model = User
        fields = ['id', 'username', 'email', 'first_name', 'last_name']
        read_only_fields = ['id']
    
    def validate_username(self, value):
        """用户名验证"""
        from django.contrib.auth.validators import UnicodeUsernameValidator
        validator = UnicodeUsernameValidator()
        
        try:
            validator(value)
        except ValidationError:
            raise serializers.ValidationError("无效的用户名格式")
        
        # 检查用户名是否已存在(排除当前用户)
        if self.instance:
            if User.objects.exclude(pk=self.instance.pk).filter(username=value).exists():
                raise serializers.ValidationError("用户名已存在")
        else:
            if User.objects.filter(username=value).exists():
                raise serializers.ValidationError("用户名已存在")
        
        return value
    
    def validate_email(self, value):
        """邮箱验证"""
        try:
            InputValidator.validate_email(value)
        except ValidationError as e:
            raise serializers.ValidationError(str(e))
        
        return value

# API视图安全
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status

class SecureAPIView(APIView):
    """安全的API视图基类"""
    
    # 速率限制配置
    throttle_scope = 'api'
    
    def initial(self, request, *args, **kwargs):
        super().initial(request, *args, **kwargs)
        
        # 安全检查
        self.perform_security_checks(request)
    
    def perform_security_checks(self, request):
        """执行安全检查"""
        # 检查用户代理
        user_agent = request.META.get('HTTP_USER_AGENT', '')
        if not user_agent or len(user_agent) > 500:
            raise serializers.ValidationError("无效的用户代理")
        
        # 检查内容类型
        content_type = request.content_type
        if request.method in ['POST', 'PUT', 'PATCH']:
            if content_type not in ['application/json', 'multipart/form-data']:
                raise serializers.ValidationError("不支持的内容类型")
    
    def handle_exception(self, exc):
        """安全地处理异常"""
        # 不要暴露敏感信息
        if isinstance(exc, (ValidationError, serializers.ValidationError)):
            # 记录详细的错误信息到日志
            logger.warning(f"API验证错误: {str(exc)}")
            # 返回通用的错误消息给客户端
            return Response(
                {'error': '请求数据无效'},
                status=status.HTTP_400_BAD_REQUEST
            )
        
        # 其他异常
        logger.error(f"API异常: {str(exc)}", exc_info=True)
        return Response(
            {'error': '服务器内部错误'},
            status=status.HTTP_500_INTERNAL_SERVER_ERROR
        )

9. 安全监控与日志

9.1 安全事件日志

# security/logging.py
import logging
import json
from django.utils import timezone
from django.core.signals import got_request_exception
from django.dispatch import receiver

# 安全专用日志器
security_logger = logging.getLogger('security')

class SecurityEventLogger:
    """安全事件日志记录器"""
    
    @staticmethod
    def log_login_success(request, user):
        """记录登录成功事件"""
        security_logger.info(
            '用户登录成功',
            extra={
                'event_type': 'login_success',
                'user_id': user.id,
                'username': user.username,
                'ip_address': SecurityEventLogger.get_client_ip(request),
                'user_agent': request.META.get('HTTP_USER_AGENT', ''),
                'timestamp': timezone.now().isoformat(),
            }
        )
    
    @staticmethod
    def log_login_failure(request, username, reason=''):
        """记录登录失败事件"""
        security_logger.warning(
            '用户登录失败',
            extra={
                'event_type': 'login_failure',
                'username': username,
                'ip_address': SecurityEventLogger.get_client_ip(request),
                'user_agent': request.META.get('HTTP_USER_AGENT', ''),
                'reason': reason,
                'timestamp': timezone.now().isoformat(),
            }
        )
    
    @staticmethod
    def log_suspicious_activity(request, activity_type, details):
        """记录可疑活动"""
        security_logger.warning(
            '检测到可疑活动',
            extra={
                'event_type': 'suspicious_activity',
                'activity_type': activity_type,
                'ip_address': SecurityEventLogger.get_client_ip(request),
                'user_agent': request.META.get('HTTP_USER_AGENT', ''),
                'details': json.dumps(details),
                'timestamp': timezone.now().isoformat(),
            }
        )
    
    @staticmethod
    def log_security_violation(request, violation_type, details):
        """记录安全违规"""
        security_logger.error(
            '安全违规',
            extra={
                'event_type': 'security_violation',
                'violation_type': violation_type,
                'ip_address': SecurityEventLogger.get_client_ip(request),
                'user_agent': request.META.get('HTTP_USER_AGENT', ''),
                'details': json.dumps(details),
                'timestamp': timezone.now().isoformat(),
            }
        )
    
    @staticmethod
    def get_client_ip(request):
        """获取客户端IP"""
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            ip = x_forwarded_for.split(',')[0]
        else:
            ip = request.META.get('REMOTE_ADDR')
        return ip

# 异常信号接收器
@receiver(got_request_exception)
def log_unhandled_exception(sender, request, **kwargs):
    """记录未处理的异常"""
    import traceback
    
    exc_info = kwargs.get('exc_info')
    if exc_info:
        SecurityEventLogger.log_security_violation(
            request,
            'unhandled_exception',
            {
                'exception_type': exc_info[0].__name__,
                'exception_message': str(exc_info[1]),
                'traceback': traceback.format_exc(),
            }
        )

# 自定义日志配置
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'security': {
            'format': '[SECURITY] {asctime} {event_type} {ip_address} {username} {message}',
            'style': '{',
        },
    },
    'handlers': {
        'security_file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/django/security.log',
            'maxBytes': 1024 * 1024 * 100,  # 100MB
            'backupCount': 10,
            'formatter': 'security',
        },
        'security_console': {
            'level': 'WARNING',
            'class': 'logging.StreamHandler',
            'formatter': 'security',
        },
    },
    'loggers': {
        'security': {
            'handlers': ['security_file', 'security_console'],
            'level': 'INFO',
            'propagate': False,
        },
    },
}

9.2 安全监控中间件

# security/monitoring.py
import time
from django.utils.deprecation import MiddlewareMixin
from .logging import SecurityEventLogger

class SecurityMonitoringMiddleware(MiddlewareMixin):
    """安全监控中间件"""
    
    def process_request(self, request):
        request.start_time = time.time()
        
        # 检查可疑请求
        self.check_suspicious_request(request)
        
        return None
    
    def process_response(self, request, response):
        # 记录请求耗时
        if hasattr(request, 'start_time'):
            duration = time.time() - request.start_time
            
            # 记录慢请求
            if duration > 5.0:  # 超过5秒
                SecurityEventLogger.log_suspicious_activity(
                    request,
                    'slow_request',
                    {'duration': duration, 'path': request.path}
                )
        
        # 检查可疑响应
        self.check_suspicious_response(request, response)
        
        return response
    
    def check_suspicious_request(self, request):
        """检查可疑请求"""
        suspicious_indicators = []
        
        # 检查过长的URL
        if len(request.get_full_path()) > 2048:
            suspicious_indicators.append('url_too_long')
        
        # 检查过多的参数
        if len(request.GET) > 100 or len(request.POST) > 100:
            suspicious_indicators.append('too_many_parameters')
        
        # 检查可疑的用户代理
        user_agent = request.META.get('HTTP_USER_AGENT', '')
        if any(suspicious in user_agent.lower() for suspicious in ['sqlmap', 'nikto', 'wget', 'curl']):
            suspicious_indicators.append('suspicious_user_agent')
        
        # 检查可疑的Referer
        referer = request.META.get('HTTP_REFERER', '')
        if referer and 'example.com' not in referer and request.method == 'POST':
            suspicious_indicators.append('suspicious_referer')
        
        if suspicious_indicators:
            SecurityEventLogger.log_suspicious_activity(
                request,
                'suspicious_request',
                {'indicators': suspicious_indicators}
            )
    
    def check_suspicious_response(self, request, response):
        """检查可疑响应"""
        # 检查错误响应
        if response.status_code >= 400:
            SecurityEventLogger.log_suspicious_activity(
                request,
                'error_response',
                {'status_code': response.status_code, 'path': request.path}
            )
        
        # 检查敏感信息泄露
        if hasattr(response, 'content'):
            content = str(response.content).lower()
            sensitive_patterns = [
                'password', 'secret', 'key', 'token', 'database',
                'traceback', 'exception', 'error at'
            ]
            
            if any(pattern in content for pattern in sensitive_patterns):
                SecurityEventLogger.log_security_violation(
                    request,
                    'sensitive_data_leak',
                    {'path': request.path}
                )

class RateLimitMiddleware(MiddlewareMixin):
    """速率限制中间件"""
    
    def __init__(self, get_response):
        super().__init__(get_response)
        # 使用Redis或内存存储请求计数
        self.request_counts = {}
    
    def process_request(self, request):
        client_ip = SecurityEventLogger.get_client_ip(request)
        path = request.path
        
        # 构建键
        key = f"{client_ip}:{path}"
        current_time = int(time.time())
        window_start = current_time - 60  # 1分钟窗口
        
        # 清理过期记录
        self.clean_old_entries(current_time)
        
        # 获取当前计数
        current_count = self.get_request_count(key, window_start)
        
        # 检查是否超过限制
        if current_count > 100:  # 每分钟100次请求
            SecurityEventLogger.log_security_violation(
                request,
                'rate_limit_exceeded',
                {'client_ip': client_ip, 'path': path, 'count': current_count}
            )
            from django.http import HttpResponseTooManyRequests
            return HttpResponseTooManyRequests("请求过于频繁")
        
        # 增加计数
        self.increment_request_count(key, current_time)
        
        return None
    
    def get_request_count(self, key, window_start):
        """获取请求计数"""
        # 简化实现,实际应该使用Redis
        count = 0
        for timestamp in self.request_counts.get(key, []):
            if timestamp >= window_start:
                count += 1
        return count
    
    def increment_request_count(self, key, timestamp):
        """增加请求计数"""
        if key not in self.request_counts:
            self.request_counts[key] = []
        self.request_counts[key].append(timestamp)
    
    def clean_old_entries(self, current_time):
        """清理过期条目"""
        window_start = current_time - 60
        for key in list(self.request_counts.keys()):
            self.request_counts[key] = [
                ts for ts in self.request_counts[key] 
                if ts >= window_start
            ]
            if not self.request_counts[key]:
                del self.request_counts[key]

10. 生产环境安全配置

10.1 完整的安全设置

# production_settings.py
"""
生产环境安全配置
"""

import os
from django.core.management.utils import get_random_secret_key

# 从环境变量获取配置
SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY')
if not SECRET_KEY:
    # 在生产环境中,必须设置环境变量
    raise Exception("DJANGO_SECRET_KEY environment variable must be set")

DEBUG = False

ALLOWED_HOSTS = [
    'yourdomain.com',
    'www.yourdomain.com',
    '.yourdomain.com',  # 允许所有子域名
]

# 数据库安全配置
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': os.environ.get('DB_NAME'),
        'USER': os.environ.get('DB_USER'),
        'PASSWORD': os.environ.get('DB_PASSWORD'),
        'HOST': os.environ.get('DB_HOST'),
        'PORT': os.environ.get('DB_PORT', '5432'),
        'CONN_MAX_AGE': 600,
        'OPTIONS': {
            'sslmode': 'require',
            'connect_timeout': 10,
        }
    }
}

# HTTPS安全设置
SECURE_SSL_REDIRECT = True
SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https')

# Cookie安全
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
CSRF_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
CSRF_COOKIE_SAMESITE = 'Lax'

# HSTS设置
SECURE_HSTS_SECONDS = 31536000  # 1年
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True

# 其他安全头
SECURE_BROWSER_XSS_FILTER = True
SECURE_CONTENT_TYPE_NOSNIFF = True
X_FRAME_OPTIONS = 'DENY'

# CSRF信任的源
CSRF_TRUSTED_ORIGINS = [
    'https://yourdomain.com',
    'https://www.yourdomain.com',
]

# 会话配置
SESSION_ENGINE = 'django.contrib.sessions.backends.cached_db'
SESSION_COOKIE_AGE = 1209600  # 2周
SESSION_EXPIRE_AT_BROWSER_CLOSE = False

# 密码验证
AUTH_PASSWORD_VALIDATORS = [
    {
        'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
        'OPTIONS': {
            'min_length': 12,
        }
    },
    {
        'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
    },
]

# 日志配置
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'verbose': {
            'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
            'style': '{',
        },
    },
    'handlers': {
        'file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/django/app.log',
            'maxBytes': 1024 * 1024 * 100,  # 100MB
            'backupCount': 10,
            'formatter': 'verbose',
        },
        'security_file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/django/security.log',
            'maxBytes': 1024 * 1024 * 100,
            'backupCount': 10,
            'formatter': 'verbose',
        },
    },
    'loggers': {
        'django': {
            'handlers': ['file'],
            'level': 'INFO',
            'propagate': True,
        },
        'security': {
            'handlers': ['security_file'],
            'level': 'INFO',
            'propagate': False,
        },
    },
}

# 邮件配置(用于安全通知)
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = os.environ.get('EMAIL_HOST')
EMAIL_PORT = int(os.environ.get('EMAIL_PORT', 587))
EMAIL_USE_TLS = True
EMAIL_HOST_USER = os.environ.get('EMAIL_HOST_USER')
EMAIL_HOST_PASSWORD = os.environ.get('EMAIL_HOST_PASSWORD')
DEFAULT_FROM_EMAIL = 'security@yourdomain.com'
SERVER_EMAIL = 'server@yourdomain.com'

# ADMINS和MANAGERS用于接收错误邮件
ADMINS = [
    ('Admin', 'admin@yourdomain.com'),
]

MANAGERS = ADMINS

# 缓存配置(使用Redis)
CACHES = {
    'default': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': os.environ.get('REDIS_URL', 'redis://127.0.0.1:6379/1'),
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
            'SOCKET_CONNECT_TIMEOUT': 5,
            'SOCKET_TIMEOUT': 5,
        }
    }
}

# 使用缓存存储会话
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'default'

10.2 安全审计与检查

# security/audit.py
from django.core.management.base import BaseCommand
from django.conf import settings
import subprocess
import requests
import sys

class SecurityAuditCommand(BaseCommand):
    """安全审计命令"""
    
    help = '执行安全审计检查'
    
    def handle(self, *args, **options):
        self.stdout.write("开始安全审计...")
        
        checks = [
            self.check_debug_mode,
            self.check_secret_key,
            self.check_allowed_hosts,
            self.check_https_settings,
            self.check_database_ssl,
            self.check_dependencies,
        ]
        
        all_passed = True
        
        for check in checks:
            try:
                result = check()
                if result:
                    self.stdout.write(
                        self.style.SUCCESS(f"✓ {check.__name__}: 通过")
                    )
                else:
                    self.stdout.write(
                        self.style.ERROR(f"✗ {check.__name__}: 失败")
                    )
                    all_passed = False
            except Exception as e:
                self.stdout.write(
                    self.style.ERROR(f"✗ {check.__name__}: 错误 - {str(e)}")
                )
                all_passed = False
        
        if all_passed:
            self.stdout.write(
                self.style.SUCCESS("所有安全检查通过!")
            )
        else:
            self.stdout.write(
                self.style.ERROR("发现安全问题,请及时修复!")
            )
            sys.exit(1)
    
    def check_debug_mode(self):
        """检查调试模式"""
        return not settings.DEBUG
    
    def check_secret_key(self):
        """检查密钥安全性"""
        secret_key = settings.SECRET_KEY
        return secret_key and len(secret_key) >= 50 and secret_key != 'your-secret-key-here'
    
    def check_allowed_hosts(self):
        """检查ALLOWED_HOSTS配置"""
        return len(settings.ALLOWED_HOSTS) > 0 and '*' not in settings.ALLOWED_HOSTS
    
    def check_https_settings(self):
        """检查HTTPS配置"""
        if not settings.DEBUG:
            return (settings.SECURE_SSL_REDIRECT and 
                   settings.SESSION_COOKIE_SECURE and 
                   settings.CSRF_COOKIE_SECURE)
        return True
    
    def check_database_ssl(self):
        """检查数据库SSL连接"""
        db_options = settings.DATABASES['default'].get('OPTIONS', {})
        return db_options.get('sslmode') == 'require'
    
    def check_dependencies(self):
        """检查依赖包安全性"""
        try:
            # 使用safety检查已知漏洞
            result = subprocess.run([
                'safety', 'check', '--json'
            ], capture_output=True, text=True)
            
            if result.returncode == 0:
                return True
            else:
                self.stdout.write(
                    self.style.WARNING("发现依赖包安全漏洞:")
                )
                self.stdout.write(result.stdout)
                return False
                
        except FileNotFoundError:
            self.stdout.write(
                self.style.WARNING("safety未安装,跳过依赖检查")
            )
            return True

# 在settings.py中导入时检查
def security_checks():
    """启动时的安全检查"""
    import warnings
    
    if settings.DEBUG:
        warnings.warn(
            "DEBUG模式已开启,这会在生产环境中造成安全风险!",
            RuntimeWarning
        )
    
    if not settings.ALLOWED_HOSTS:
        warnings.warn(
            "ALLOWED_HOSTS未设置,这会造成安全风险!",
            RuntimeWarning
        )
    
    if settings.SECRET_KEY == 'your-secret-key-here':
        warnings.warn(
            "使用默认的SECRET_KEY,这会造成严重安全风险!",
            RuntimeWarning
        )

# 在settings.py末尾调用
security_checks()

11. 应急响应与恢复

11.1 安全事件响应

# security/incident_response.py
from django.core.management.base import BaseCommand
from django.core.mail import send_mail
from django.utils import timezone
import logging

logger = logging.getLogger(__name__)

class SecurityIncidentResponse:
    """安全事件响应"""
    
    @classmethod
    def handle_suspicious_activity(cls, request, activity_type, details):
        """处理可疑活动"""
        # 记录事件
        logger.critical(
            f"安全事件: {activity_type}",
            extra={
                'activity_type': activity_type,
                'details': details,
                'ip_address': cls.get_client_ip(request),
                'timestamp': timezone.now().isoformat(),
            }
        )
        
        # 发送警报邮件
        cls.send_alert_email(activity_type, details)
        
        # 执行自动响应措施
        cls.execute_automatic_response(activity_type, request)
    
    @classmethod
    def send_alert_email(cls, activity_type, details):
        """发送警报邮件"""
        subject = f"安全警报: {activity_type}"
        message = f"""
检测到安全事件:

类型: {activity_type}
详情: {details}
时间: {timezone.now().isoformat()}

请立即检查系统安全状况。
"""
        
        try:
            send_mail(
                subject,
                message,
                'security@yourdomain.com',
                ['admin@yourdomain.com', 'security-team@yourdomain.com'],
                fail_silently=False,
            )
        except Exception as e:
            logger.error(f"发送安全警报邮件失败: {str(e)}")
    
    @classmethod
    def execute_automatic_response(cls, activity_type, request):
        """执行自动响应"""
        client_ip = cls.get_client_ip(request)
        
        if activity_type in ['brute_force', 'rate_limit_exceeded']:
            # 暂时封禁IP
            cls.block_ip_temporarily(client_ip)
        
        elif activity_type == 'suspicious_file_upload':
            # 立即删除可疑文件
            cls.delete_suspicious_files()
    
    @classmethod
    def block_ip_temporarily(cls, ip_address, duration_minutes=30):
        """临时封禁IP"""
        # 实现IP封禁逻辑
        # 可以使用Redis或数据库存储封禁记录
        logger.info(f"临时封禁IP: {ip_address},时长: {duration_minutes}分钟")
    
    @classmethod
    def delete_suspicious_files(cls):
        """删除可疑文件"""
        # 实现文件删除逻辑
        logger.info("执行可疑文件清理")
    
    @classmethod
    def get_client_ip(cls, request):
        """获取客户端IP"""
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            ip = x_forwarded_for.split(',')[0]
        else:
            ip = request.META.get('REMOTE_ADDR')
        return ip

class EmergencyLockdownCommand(BaseCommand):
    """紧急锁定命令"""
    
    help = '紧急锁定系统'
    
    def add_arguments(self, parser):
        parser.add_argument(
            '--level',
            type=str,
            choices=['high', 'medium', 'low'],
            default='medium',
            help='锁定级别'
        )
    
    def handle(self, *args, **options):
        level = options['level']
        
        self.stdout.write(f"执行紧急锁定,级别: {level}")
        
        if level == 'high':
            self.high_alert_lockdown()
        elif level == 'medium':
            self.medium_alert_lockdown()
        else:
            self.low_alert_lockdown()
        
        self.stdout.write(
            self.style.SUCCESS("紧急锁定完成")
        )
    
    def high_alert_lockdown(self):
        """高级别锁定"""
        # 停止接受新用户注册
        # 禁用所有API端点
        # 强制所有用户重新认证
        # 启用维护模式
        
        logger.critical("执行高级别紧急锁定")
    
    def medium_alert_lockdown(self):
        """中级别锁定"""
        # 加强认证要求
        # 限制敏感操作
        # 增加安全监控
        
        logger.warning("执行中级别紧急锁定")
    
    def low_alert_lockdown(self):
        """低级别锁定"""
        # 增加日志记录
        # 发送安全通知
        # 加强输入验证
        
        logger.info("执行低级别紧急锁定")

12. 总结

12.1 关键安全实践

  1. 深度防御:在应用的各个层面实施安全措施
  2. 最小权限原则:只授予必要的权限
  3. 输入验证:对所有用户输入进行严格验证
  4. 输出编码:在显示用户数据时进行适当的编码
  5. 安全配置:正确配置所有安全相关的设置

12.2 持续安全维护

  • 定期更新依赖包
  • 监控安全公告和漏洞
  • 进行安全代码审查
  • 实施安全测试
  • 建立安全事件响应流程

12.3 安全检查清单

# security/checklist.py
SECURITY_CHECKLIST = {
    'authentication': [
        '使用强密码策略',
        '实施账户锁定机制',
        '使用安全的会话管理',
        '实现安全的注销功能',
    ],
    'authorization': [
        '实施最小权限原则',
        '验证所有访问控制',
        '保护敏感操作',
    ],
    'input_validation': [
        '验证所有用户输入',
        '使用白名单验证',
        '实施输出编码',
        '防范SQL注入',
    ],
    'security_headers': [
        '配置CSP策略',
        '设置安全Cookie属性',
        '启用HSTS',
        '配置X-Frame-Options',
    ],
    'data_protection': [
        '加密敏感数据',
        '安全处理文件上传',
        '保护静态文件',
        '实施数据备份',
    ],
    'monitoring': [
        '记录安全事件',
        '监控可疑活动',
        '设置警报机制',
        '定期安全审计',
    ],
}

通过实施这些安全最佳实践,您可以显著提高Django应用程序的安全性,有效防范常见的Web攻击。记住,安全是一个持续的过程,需要定期审查和更新安全措施。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐