Django安全最佳实践:防范常见Web攻击
Django安全实践指南:防范常见Web攻击 本文全面介绍了Django框架中的安全防护措施,涵盖以下核心内容: 安全基础配置: 强调密钥管理、调试模式关闭、HTTPS强制等关键设置 提供完整的settings.py安全配置模板 中间件防护: 详解安全中间件的正确顺序 包含自定义安全头中间件实现,强化XSS、点击劫持等防护 SQL注入防护: 对比危险查询与安全ORM实践 展示参数化查询的正确用法
·
目录
『宝藏代码胶囊开张啦!』—— 我的 CodeCapsule 来咯!✨写代码不再头疼!我的新站点 CodeCapsule 主打一个 “白菜价”+“量身定制”!无论是卡脖子的毕设/课设/文献复现,需要灵光一现的算法改进,还是想给项目加个“外挂”,这里都有便宜又好用的代码方案等你发现!低成本,高适配,助你轻松通关!速来围观 👉 CodeCapsule官网
Django安全最佳实践:防范常见Web攻击
1. Web安全概述
1.1 Web安全的重要性
在当今数字化时代,Web应用安全已成为开发过程中不可忽视的关键环节。根据OWASP(开放式Web应用程序安全项目)的统计,超过70%的安全漏洞发生在应用层而非网络层。Django作为一个"自带电池"的Web框架,提供了许多内置的安全特性,但正确配置和使用这些特性至关重要。
1.2 常见Web攻击类型
2. Django安全配置基础
2.1 安全设置配置
# settings.py - 安全关键配置
# 密钥管理 - 永远不要将密钥提交到版本控制系统
SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY', 'your-default-dev-key-only')
# 调试模式 - 生产环境必须关闭
DEBUG = os.environ.get('DEBUG', 'False').lower() == 'true'
# 允许的主机 - 生产环境必须设置
ALLOWED_HOSTS = os.environ.get('ALLOWED_HOSTS', 'localhost,127.0.0.1').split(',')
# HTTPS安全设置
if not DEBUG:
# 强制HTTPS
SECURE_SSL_REDIRECT = True
SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https')
# Cookie安全设置
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
CSRF_COOKIE_HTTPONLY = True
# HSTS设置
SECURE_HSTS_SECONDS = 31536000 # 1年
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
# 其他安全头
SECURE_BROWSER_XSS_FILTER = True
SECURE_CONTENT_TYPE_NOSNIFF = True
X_FRAME_OPTIONS = 'DENY'
# 数据库配置
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': os.environ.get('DB_NAME', 'mydatabase'),
'USER': os.environ.get('DB_USER', 'myuser'),
'PASSWORD': os.environ.get('DB_PASSWORD', 'mypassword'),
'HOST': os.environ.get('DB_HOST', 'localhost'),
'PORT': os.environ.get('DB_PORT', '5432'),
'CONN_MAX_AGE': 600, # 连接池
'OPTIONS': {
'sslmode': 'require', # 强制SSL连接
}
}
}
# 静态文件配置
STATIC_ROOT = os.path.join(BASE_DIR, 'staticfiles')
STATIC_URL = '/static/'
# 媒体文件配置
MEDIA_ROOT = os.path.join(BASE_DIR, 'media')
MEDIA_URL = '/media/'
# 文件上传权限
FILE_UPLOAD_PERMISSIONS = 0o644
FILE_UPLOAD_DIRECTORY_PERMISSIONS = 0o755
2.2 中间件安全配置
# settings.py - 中间件配置
MIDDLEWARE = [
# 安全相关中间件应该在最前面
'django.middleware.security.SecurityMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
# 其他中间件
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
# 自定义安全中间件
'myapp.middleware.SecurityHeadersMiddleware',
]
# 自定义安全头中间件
class SecurityHeadersMiddleware:
"""自定义安全头中间件"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
# 添加安全头
response['X-Content-Type-Options'] = 'nosniff'
response['X-Frame-Options'] = 'DENY'
response['X-XSS-Protection'] = '1; mode=block'
response['Referrer-Policy'] = 'strict-origin-when-cross-origin'
response['Permissions-Policy'] = 'geolocation=(), microphone=(), camera=()'
# CSP内容安全策略
csp_directives = [
"default-src 'self'",
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net",
"style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net",
"img-src 'self' data: https:",
"font-src 'self' https://cdn.jsdelivr.net",
"connect-src 'self'",
"frame-ancestors 'none'",
]
response['Content-Security-Policy'] = '; '.join(csp_directives)
return response
3. SQL注入防护
3.1 SQL注入原理与危害
SQL注入是通过将恶意SQL代码插入到应用输入参数中,从而在数据库执行非法操作的攻击方式。Django的ORM提供了天然的SQL注入防护。
3.2 安全查询实践
# 危险的原始SQL查询 - 不要这样做!
from django.db import connection
def unsafe_user_search(request):
username = request.GET.get('username')
# 存在SQL注入漏洞
with connection.cursor() as cursor:
cursor.execute(f"SELECT * FROM auth_user WHERE username = '{username}'")
results = cursor.fetchall()
return results
# 安全的Django ORM查询
def safe_user_search(request):
username = request.GET.get('username')
# 使用ORM防止SQL注入
users = User.objects.filter(username=username)
return users
# 必须使用原始SQL时的安全做法
def safe_raw_sql(request):
username = request.GET.get('username')
with connection.cursor() as cursor:
# 使用参数化查询
cursor.execute("SELECT * FROM auth_user WHERE username = %s", [username])
results = cursor.fetchall()
return results
# 复杂查询的安全示例
class UserManager(models.Manager):
def search_users_safely(self, search_terms, min_login_count=0):
"""
安全的用户搜索方法
使用参数化查询和输入验证
"""
# 输入验证和清理
if not search_terms or len(search_terms) > 100:
return self.none()
# 使用ORM构建安全查询
query = self.get_queryset()
if min_login_count > 0:
query = query.filter(login_count__gte=min_login_count)
# 安全地处理搜索条件
conditions = models.Q()
for term in search_terms.split():
if len(term) < 50: # 防止过长的搜索词
conditions |= models.Q(username__icontains=term) | \
models.Q(email__icontains=term) | \
models.Q(first_name__icontains=term)
return query.filter(conditions).distinct()
# 额外的输入验证
from django.core.exceptions import ValidationError
import re
def validate_sql_safe_string(value):
"""
验证字符串是否不包含危险的SQL字符
"""
dangerous_patterns = [
r"(\b(DROP|DELETE|UPDATE|INSERT|ALTER|CREATE|EXEC)\b)",
r"(\-\-|\#|\/\*)", # SQL注释
r"(\b(OR|AND)\b.*=)", # 逻辑操作符滥用
r"(;|\|&)", # 命令分隔符
]
for pattern in dangerous_patterns:
if re.search(pattern, value, re.IGNORECASE):
raise ValidationError(f"输入包含潜在的危险字符: {value}")
return value
# 在模型中使用验证
class UserQuery(models.Model):
search_term = models.CharField(
max_length=100,
validators=[validate_sql_safe_string]
)
def clean(self):
super().clean()
validate_sql_safe_string(self.search_term)
3.3 高级防护技术
# database_security.py
from django.db import connection, models
from django.core.exceptions import PermissionDenied
import logging
logger = logging.getLogger(__name__)
class DatabaseSecurityMonitor:
"""数据库安全监控"""
@classmethod
def log_suspicious_activity(cls, query, params, user):
"""记录可疑的数据库活动"""
suspicious_keywords = ['drop', 'delete', 'alter', 'truncate', 'exec']
query_lower = query.lower()
if any(keyword in query_lower for keyword in suspicious_keywords):
logger.warning(
f"Suspicious database activity by user {user}: {query} with params {params}"
)
@classmethod
def execute_safe_query(cls, query, params=None, user=None):
"""执行安全的数据库查询"""
if user and not user.is_authenticated:
raise PermissionDenied("未授权访问")
cls.log_suspicious_activity(query, params, user)
with connection.cursor() as cursor:
cursor.execute(query, params or [])
return cursor.fetchall()
# 自定义查询集添加安全方法
class SecureQuerySet(models.QuerySet):
def safe_filter(self, **filters):
"""安全的过滤方法,添加额外的验证"""
# 验证过滤条件
for field, value in filters.items():
if hasattr(self.model, field):
field_obj = self.model._meta.get_field(field)
# 可以在这里添加字段级别的验证
if isinstance(field_obj, models.CharField) and len(str(value)) > 255:
raise ValueError(f"字段 {field} 的值过长")
return self.filter(**filters)
class SecureManager(models.Manager):
def get_queryset(self):
return SecureQuerySet(self.model, using=self._db)
4. 跨站脚本(XSS)防护
4.1 XSS攻击类型
- 存储型XSS:恶意脚本存储在服务器端
- 反射型XSS:恶意脚本通过URL参数反射给用户
- DOM型XSS:在客户端DOM环境中执行的XSS
4.2 Django模板自动转义
# 模板中的自动转义
"""
Django模板默认开启自动HTML转义
以下字符会被转义:
< 转义为 <
> 转义为 >
' 转义为 '
" 转义为 "
& 转义为 &
"""
# 安全的模板示例
{% comment %} 自动转义,安全 {% endcomment %}
<p>{{ user_input }}</p>
{% comment %} 手动关闭转义 - 危险! {% endcomment %}
<p>{{ user_input|safe }}</p>
{% comment %} 有条件的安全输出 {% endcomment %}
<p>{{ user_input|escape }}</p>
{% comment %} 自动转义JavaScript上下文 {% endcomment %}
<script>
var username = "{{ username|escapejs }}";
var userData = {{ user_data|json_script:"user-data" }};
</script>
# 自定义安全过滤器
from django import template
from django.utils.html import escape, strip_tags
from django.utils.safestring import mark_safe
import html
import json
register = template.Library()
@register.filter
def safe_html(text):
"""
安全地允许有限的HTML标签
"""
if not text:
return ""
allowed_tags = ['b', 'i', 'u', 'em', 'strong', 'p', 'br']
allowed_attrs = {
'a': ['href', 'title', 'target'],
'img': ['src', 'alt', 'width', 'height'],
}
from django.utils.html import strip_tags
text = strip_tags(text, allowed_tags)
# 这里可以添加更复杂的HTML清理逻辑
# 考虑使用 bleach 库进行更严格的白名单过滤
return mark_safe(text)
@register.filter
def json_safe(obj):
"""
安全地将Python对象转换为JSON
"""
return mark_safe(json.dumps(obj))
@register.filter
def truncate_safe(text, length=100):
"""
安全地截断文本
"""
if not text:
return ""
text = str(text)
if len(text) <= length:
return escape(text)
return escape(text[:length]) + "..."
# 在模板中使用
"""
{{ user_comment|safe_html }}
{{ user_data|json_safe }}
{{ long_text|truncate_safe:50 }}
"""
4.3 内容安全策略(CSP)
# csp.py - 内容安全策略实现
from django.utils.deprecation import MiddlewareMixin
import hashlib
class CSPMiddleware(MiddlewareMixin):
"""内容安全策略中间件"""
def process_response(self, request, response):
# 只为HTML响应添加CSP
if response.get('Content-Type', '').startswith('text/html'):
nonce = self.generate_nonce()
csp_directives = [
f"default-src 'self'",
f"script-src 'self' 'nonce-{nonce}' https://cdn.example.com",
f"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com",
f"img-src 'self' data: https:",
f"font-src 'self' https://fonts.gstatic.com",
f"connect-src 'self'",
f"frame-ancestors 'none'",
f"base-uri 'self'",
f"form-action 'self'",
]
response['Content-Security-Policy'] = '; '.join(csp_directives)
# 将nonce存储在request中供模板使用
request.csp_nonce = nonce
return response
def generate_nonce(self):
"""生成随机nonce值"""
return hashlib.sha256(os.urandom(32)).hexdigest()[:16]
# 在模板中使用nonce
"""
<script nonce="{{ request.csp_nonce }}">
// 内联脚本需要nonce才能执行
console.log('这个脚本有nonce,可以执行');
</script>
"""
# 使用django-csp包(推荐)
"""
安装:pip install django-csp
在settings.py中配置:
MIDDLEWARE = [
'csp.middleware.CSPMiddleware',
# ... 其他中间件
]
CSP_DEFAULT_SRC = ["'self'"]
CSP_SCRIPT_SRC = ["'self'", "https://cdn.example.com"]
CSP_STYLE_SRC = ["'self'", "'unsafe-inline'"]
CSP_IMG_SRC = ["'self'", "data:", "https:"]
CSP_INCLUDE_NONCE_IN = ['script-src']
"""
5. 跨站请求伪造(CSRF)防护
5.1 CSRF防护机制
# CSRF安全配置
# settings.py
# 确保CSRF中间件启用
MIDDLEWARE = [
# ...
'django.middleware.csrf.CsrfViewMiddleware',
# ...
]
# CSRF信任的源(在需要时设置)
CSRF_TRUSTED_ORIGINS = [
'https://mydomain.com',
'https://api.mydomain.com',
]
# CSRF Cookie设置
CSRF_COOKIE_HTTPONLY = True
CSRF_COOKIE_SECURE = True # 仅HTTPS
CSRF_COOKIE_SAMESITE = 'Lax' # 或 'Strict'
# 视图中的CSRF保护
from django.views.decorators.csrf import csrf_protect, csrf_exempt, ensure_csrf_cookie
from django.utils.decorators import method_decorator
from django.middleware.csrf import get_token
@method_decorator(csrf_protect, name='dispatch')
class SecureFormView(View):
"""需要CSRF保护的视图"""
def get(self, request):
# 确保CSRF token被设置
get_token(request)
return render(request, 'form.html')
def post(self, request):
# CSRF中间件会自动验证
# 如果验证失败,会返回403错误
form = MyForm(request.POST)
if form.is_valid():
# 处理表单
return redirect('success')
return render(request, 'form.html', {'form': form})
@method_decorator(csrf_exempt, name='dispatch')
class APIPublicView(View):
"""豁免CSRF保护的API视图(谨慎使用)"""
def post(self, request):
# 这里需要其他形式的认证
return JsonResponse({'status': 'ok'})
@method_decorator(ensure_csrf_cookie, name='dispatch')
class GetCSRFTokenView(View):
"""获取CSRF token的API端点"""
def get(self, request):
return JsonResponse({'detail': 'CSRF cookie set'})
# 模板中的CSRF token
"""
<!-- 在表单中包含CSRF token -->
<form method="post">
{% csrf_token %}
<!-- 表单字段 -->
<input type="text" name="username">
<button type="submit">提交</button>
</form>
<!-- 对于AJAX请求 -->
<script>
// 获取CSRF token
function getCookie(name) {
let cookieValue = null;
if (document.cookie && document.cookie !== '') {
const cookies = document.cookie.split(';');
for (let i = 0; i < cookies.length; i++) {
const cookie = cookies[i].trim();
if (cookie.substring(0, name.length + 1) === (name + '=')) {
cookieValue = decodeURIComponent(cookie.substring(name.length + 1));
break;
}
}
}
return cookieValue;
}
const csrftoken = getCookie('csrftoken');
// 在AJAX请求中包含CSRF token
fetch('/api/endpoint/', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRFToken': csrftoken,
},
body: JSON.stringify(data)
});
</script>
"""
5.2 高级CSRF防护
# advanced_csrf.py
from django.views.decorators.csrf import csrf_protect
from django.middleware.csrf import rotate_token
import logging
logger = logging.getLogger(__name__)
class AdvancedCSRFProtection:
"""高级CSRF防护"""
@classmethod
def validate_referer(cls, request):
"""验证Referer头"""
referer = request.META.get('HTTP_REFERER')
if not referer:
logger.warning("请求缺少Referer头")
return False
# 检查Referer是否来自可信域名
trusted_domains = ['https://mydomain.com', 'https://www.mydomain.com']
if not any(referer.startswith(domain) for domain in trusted_domains):
logger.warning(f"可疑的Referer: {referer}")
return False
return True
@classmethod
def double_submit_cookie_pattern(cls, request):
"""双提交Cookie模式"""
# 除了标准的CSRF token,还可以使用自定义header
custom_token = request.headers.get('X-Custom-CSRF-Token')
cookie_token = request.COOKIES.get('custom_csrf_token')
if custom_token and cookie_token and custom_token == cookie_token:
return True
return False
# 自定义CSRF验证装饰器
def enhanced_csrf_protect(view_func):
"""增强的CSRF保护装饰器"""
@csrf_protect
def wrapped_view(request, *args, **kwargs):
# 额外的安全检查
if request.method in ('POST', 'PUT', 'PATCH', 'DELETE'):
# 验证Referer
if not AdvancedCSRFProtection.validate_referer(request):
logger.warning(f"CSRF Referer验证失败: {request.path}")
from django.http import HttpResponseForbidden
return HttpResponseForbidden("安全验证失败")
# 旋转token(可选)
if request.user.is_authenticated:
rotate_token(request)
return view_func(request, *args, **kwargs)
return wrapped_view
# 在视图中使用
@method_decorator(enhanced_csrf_protect, name='dispatch')
class SecureTransactionView(View):
"""需要增强CSRF保护的重要操作视图"""
def post(self, request):
# 这里已经通过了增强的CSRF验证
# 处理敏感操作
return JsonResponse({'status': 'success'})
6. 认证与授权安全
6.1 密码安全
# authentication/security.py
from django.contrib.auth.hashers import make_password, check_password
from django.contrib.auth.password_validation import validate_password
from django.core.exceptions import ValidationError
import re
import logging
logger = logging.getLogger(__name__)
class PasswordSecurity:
"""密码安全工具类"""
@staticmethod
def validate_password_strength(password):
"""
验证密码强度
"""
errors = []
# 长度检查
if len(password) < 12:
errors.append("密码至少需要12个字符")
# 复杂度检查
if not re.search(r'[A-Z]', password):
errors.append("密码必须包含至少一个大写字母")
if not re.search(r'[a-z]', password):
errors.append("密码必须包含至少一个小写字母")
if not re.search(r'[0-9]', password):
errors.append("密码必须包含至少一个数字")
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
errors.append("密码必须包含至少一个特殊字符")
# 常见密码检查
common_passwords = [
'password', '123456', 'qwerty', 'admin', 'welcome'
]
if password.lower() in common_passwords:
errors.append("密码过于常见")
if errors:
raise ValidationError(errors)
@staticmethod
def create_secure_password(raw_password):
"""
创建安全的密码哈希
"""
try:
# 使用Django的密码验证
validate_password(raw_password)
PasswordSecurity.validate_password_strength(raw_password)
# 创建密码哈希
return make_password(raw_password)
except ValidationError as e:
logger.warning(f"密码强度验证失败: {e}")
raise
@staticmethod
def verify_password(raw_password, hashed_password):
"""
验证密码
"""
return check_password(raw_password, hashed_password)
# 自定义用户模型
from django.contrib.auth.models import AbstractUser
from django.db import models
class SecureUser(AbstractUser):
"""增强安全性的用户模型"""
# 额外的安全字段
last_password_change = models.DateTimeField(auto_now_add=True)
password_changed = models.BooleanField(default=False)
failed_login_attempts = models.IntegerField(default=0)
account_locked_until = models.DateTimeField(null=True, blank=True)
mfa_enabled = models.BooleanField(default=False)
mfa_secret = models.CharField(max_length=32, blank=True)
def set_password(self, raw_password):
"""重写设置密码方法"""
self.password = PasswordSecurity.create_secure_password(raw_password)
self.last_password_change = timezone.now()
self.password_changed = True
self.failed_login_attempts = 0 # 重置失败尝试计数
def check_password(self, raw_password):
"""检查密码前验证账户状态"""
if self.account_locked_until and timezone.now() < self.account_locked_until:
raise ValidationError("账户已被锁定,请稍后重试")
is_correct = super().check_password(raw_password)
if is_correct:
self.failed_login_attempts = 0
self.save()
else:
self.failed_login_attempts += 1
if self.failed_login_attempts >= 5:
self.account_locked_until = timezone.now() + timedelta(minutes=30)
self.save()
return is_correct
class Meta:
db_table = 'secure_users'
# 自定义认证后端
from django.contrib.auth.backends import ModelBackend
class SecureAuthenticationBackend(ModelBackend):
"""安全认证后端"""
def authenticate(self, request, username=None, password=None, **kwargs):
try:
user = SecureUser.objects.get(username=username)
# 检查账户是否被锁定
if user.account_locked_until and timezone.now() < user.account_locked_until:
logger.warning(f"登录尝试被拒绝:账户 {username} 已被锁定")
return None
# 验证密码
if user.check_password(password):
# 记录登录成功
user.last_login = timezone.now()
user.save()
# 记录审计日志
self.log_authentication_success(request, user)
return user
else:
# 记录登录失败
self.log_authentication_failure(request, username)
return None
except SecureUser.DoesNotExist:
# 用户不存在也记录日志(防止用户枚举)
self.log_authentication_failure(request, username)
return None
def log_authentication_success(self, request, user):
"""记录认证成功日志"""
logger.info(f"用户 {user.username} 登录成功 - IP: {self.get_client_ip(request)}")
def log_authentication_failure(self, request, username):
"""记录认证失败日志"""
logger.warning(f"登录失败 - 用户名: {username} - IP: {self.get_client_ip(request)}")
def get_client_ip(self, request):
"""获取客户端IP"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
6.2 会话安全
# session_security.py
from django.contrib.sessions.backends.db import SessionStore
from django.utils import timezone
from datetime import timedelta
import logging
logger = logging.getLogger(__name__)
class SecureSessionStore(SessionStore):
"""安全会话存储"""
def __init__(self, session_key=None):
super().__init__(session_key)
# 设置会话超时时间(15分钟)
self.set_expiry(900)
def validate_session(self, request):
"""验证会话安全性"""
user_agent = request.META.get('HTTP_USER_AGENT', '')
client_ip = self.get_client_ip(request)
# 检查用户代理是否改变
stored_ua = self.get('user_agent')
if stored_ua and stored_ua != user_agent:
logger.warning(f"会话用户代理不匹配: {self.session_key}")
self.flush() # 销毁会话
return False
# 检查IP地址是否改变(可选,根据安全要求)
stored_ip = self.get('client_ip')
if stored_ip and stored_ip != client_ip:
logger.warning(f"会话IP地址改变: {self.session_key}")
self.flush()
return False
# 存储当前会话信息
if not stored_ua:
self['user_agent'] = user_agent
if not stored_ip:
self['client_ip'] = client_ip
self['last_activity'] = timezone.now().isoformat()
self.save()
return True
def get_client_ip(self, request):
"""获取客户端IP"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
# 会话安全中间件
class SessionSecurityMiddleware:
"""会话安全中间件"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# 只为认证用户检查会话安全
if request.user.is_authenticated:
if hasattr(request, 'session'):
session_store = SecureSessionStore(request.session.session_key)
if not session_store.validate_session(request):
# 会话无效,强制登出
from django.contrib.auth import logout
logout(request)
logger.warning(f"无效会话,用户已登出: {request.user.username}")
response = self.get_response(request)
return response
# 会话配置
# settings.py
SESSION_ENGINE = 'django.contrib.sessions.backends.db' # 使用数据库存储会话
SESSION_COOKIE_AGE = 900 # 15分钟
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
SESSION_EXPIRE_AT_BROWSER_CLOSE = True # 浏览器关闭时过期
# 防止会话固定攻击
SESSION_SERIALIZER = 'django.contrib.sessions.serializers.JSONSerializer'
# 定期清理过期会话
from django.core.management.base import BaseCommand
from django.contrib.sessions.models import Session
from django.utils import timezone
class Command(BaseCommand):
"""清理过期会话的管理命令"""
help = '清理过期会话'
def handle(self, *args, **options):
expired_sessions = Session.objects.filter(expire_date__lt=timezone.now())
count = expired_sessions.count()
expired_sessions.delete()
self.stdout.write(
self.style.SUCCESS(f'成功删除 {count} 个过期会话')
)
7. 文件上传安全
7.1 安全的文件上传处理
# file_upload/security.py
import os
import magic
from django.core.exceptions import ValidationError
from django.core.files.uploadedfile import UploadedFile
from PIL import Image
import hashlib
import logging
logger = logging.getLogger(__name__)
class FileUploadSecurity:
"""文件上传安全类"""
# 允许的MIME类型
ALLOWED_IMAGE_TYPES = [
'image/jpeg', 'image/png', 'image/gif', 'image/webp'
]
ALLOWED_DOCUMENT_TYPES = [
'application/pdf',
'application/msword',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'text/plain'
]
# 文件大小限制(字节)
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB
@classmethod
def validate_file_upload(cls, uploaded_file: UploadedFile, file_type='image'):
"""
验证文件上传的安全性
"""
# 检查文件大小
if file_type == 'image':
max_size = cls.MAX_IMAGE_SIZE
else:
max_size = cls.MAX_FILE_SIZE
if uploaded_file.size > max_size:
raise ValidationError(f"文件大小不能超过 {max_size // 1024 // 1024}MB")
# 检查文件类型
allowed_types = cls.ALLOWED_IMAGE_TYPES if file_type == 'image' else cls.ALLOWED_DOCUMENT_TYPES
# 使用python-magic进行MIME类型检测
try:
file_mime = magic.from_buffer(uploaded_file.read(1024), mime=True)
uploaded_file.seek(0) # 重置文件指针
if file_mime not in allowed_types:
raise ValidationError(f"不支持的文件类型: {file_mime}")
except ImportError:
# 如果python-magic不可用,使用文件扩展名验证(不够安全)
logger.warning("python-magic未安装,使用扩展名验证")
ext = os.path.splitext(uploaded_file.name)[1].lower()
allowed_extensions = {
'image': ['.jpg', '.jpeg', '.png', '.gif', '.webp'],
'document': ['.pdf', '.doc', '.docx', '.txt']
}
if ext not in allowed_extensions.get(file_type, []):
raise ValidationError(f"不支持的文件扩展名: {ext}")
# 检查文件名安全性
cls.validate_filename(uploaded_file.name)
return True
@classmethod
def validate_filename(cls, filename):
"""验证文件名安全性"""
# 防止路径遍历攻击
if '..' in filename or '/' in filename or '\\' in filename:
raise ValidationError("文件名包含非法字符")
# 防止特殊字符
dangerous_chars = ['<', '>', ':', '"', '|', '?', '*']
if any(char in filename for char in dangerous_chars):
raise ValidationError("文件名包含危险字符")
# 文件名长度限制
if len(filename) > 255:
raise ValidationError("文件名过长")
@classmethod
def generate_secure_filename(cls, original_filename):
"""生成安全的文件名"""
name, ext = os.path.splitext(original_filename)
# 清理文件名
import re
name = re.sub(r'[^\w\s-]', '', name)
name = re.sub(r'[-\s]+', '-', name).strip('-')
# 添加随机哈希防止冲突
random_hash = hashlib.md5(os.urandom(32)).hexdigest()[:8]
return f"{name}_{random_hash}{ext}"
@classmethod
def process_image_safely(cls, image_path):
"""安全地处理图片(去除元数据等)"""
try:
with Image.open(image_path) as img:
# 转换为RGB模式(去除透明度等)
if img.mode in ('RGBA', 'LA'):
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[-1])
img = background
# 保存图片,不包含元数据
img.save(image_path, 'JPEG', quality=85, optimize=True)
return True
except Exception as e:
logger.error(f"图片处理失败: {str(e)}")
# 删除有问题的文件
if os.path.exists(image_path):
os.remove(image_path)
raise ValidationError("图片处理失败")
# Django表单中的文件上传验证
from django import forms
from django.core.validators import FileExtensionValidator
class SecureFileUploadForm(forms.Form):
"""安全的文件上传表单"""
file = forms.FileField(
validators=[
FileExtensionValidator(
allowed_extensions=['jpg', 'jpeg', 'png', 'gif', 'pdf', 'doc', 'docx']
)
]
)
def clean_file(self):
file = self.cleaned_data.get('file')
if file:
try:
# 使用安全类验证文件
FileUploadSecurity.validate_file_upload(file)
# 生成安全文件名
secure_name = FileUploadSecurity.generate_secure_filename(file.name)
file.name = secure_name
except ValidationError as e:
raise forms.ValidationError(e.message)
return file
# 视图中的文件上传处理
from django.views.generic.edit import FormView
from django.urls import reverse_lazy
class SecureUploadView(FormView):
"""安全的文件上传视图"""
form_class = SecureFileUploadForm
template_name = 'upload.html'
success_url = reverse_lazy('upload_success')
def form_valid(self, form):
file = form.cleaned_data['file']
# 安全的文件保存路径
upload_dir = os.path.join(settings.MEDIA_ROOT, 'uploads')
os.makedirs(upload_dir, exist_ok=True)
file_path = os.path.join(upload_dir, file.name)
# 分块写入文件
with open(file_path, 'wb+') as destination:
for chunk in file.chunks():
destination.write(chunk)
# 如果是图片,进行安全处理
if file.content_type.startswith('image/'):
try:
FileUploadSecurity.process_image_safely(file_path)
except ValidationError as e:
# 处理失败,删除文件
if os.path.exists(file_path):
os.remove(file_path)
form.add_error('file', e)
return self.form_invalid(form)
# 记录上传日志
logger.info(f"文件上传成功: {file.name} - 用户: {self.request.user}")
return super().form_valid(form)
7.2 文件存储安全配置
# storage_security.py
from django.core.files.storage import FileSystemStorage
from django.utils.deconstruct import deconstructible
import os
@deconstructible
class SecureFileStorage(FileSystemStorage):
"""安全的文件存储系统"""
def __init__(self, location=None, base_url=None):
if location is None:
location = os.path.join(settings.MEDIA_ROOT, 'secure_uploads')
super().__init__(location, base_url)
def get_valid_name(self, name):
"""获取安全的文件名"""
from .file_upload.security import FileUploadSecurity
return FileUploadSecurity.generate_secure_filename(name)
def _save(self, name, content):
"""保存文件前的安全检查"""
# 验证文件类型和内容
from .file_upload.security import FileUploadSecurity
# 创建上传文件的临时副本进行验证
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
for chunk in content.chunks():
temp_file.write(chunk)
temp_path = temp_file.name
try:
# 验证文件
FileUploadSecurity.validate_file_upload(content)
# 调用父类方法保存文件
return super()._save(name, content)
finally:
# 清理临时文件
if os.path.exists(temp_path):
os.remove(temp_path)
def url(self, name):
"""生成安全的文件URL"""
# 防止直接访问敏感文件
if name.startswith('private/'):
raise ValueError("无法生成私有文件的URL")
return super().url(name)
# 配置安全的文件存储
# settings.py
DEFAULT_FILE_STORAGE = 'myapp.storage_security.SecureFileStorage'
# Nginx配置示例(防止直接执行上传的文件)
"""
location /media/ {
# 禁止执行PHP等脚本文件
location ~ \.(php|php5|phtml|pl|py|jsp|asp|sh|cgi)$ {
deny all;
return 404;
}
# 设置安全的Content-Type
types {
image/jpeg jpg jpeg;
image/png png;
image/gif gif;
application/pdf pdf;
}
default_type application/octet-stream;
# 添加安全头
add_header X-Content-Type-Options nosniff;
add_header X-Frame-Options DENY;
}
"""
8. 数据验证与序列化安全
8.1 输入验证
# validation/security.py
import re
import html
from django.core.exceptions import ValidationError
from django.utils.html import strip_tags
from django.utils.encoding import force_str
class InputValidator:
"""输入验证器"""
@staticmethod
def validate_email(email):
"""验证邮箱地址"""
if not email or len(email) > 254:
raise ValidationError("无效的邮箱地址")
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_regex, email):
raise ValidationError("无效的邮箱格式")
return email.lower()
@staticmethod
def validate_phone(phone):
"""验证手机号码"""
if not phone:
raise ValidationError("手机号码不能为空")
# 简单的手机号验证(根据需求调整)
phone_regex = r'^1[3-9]\d{9}$'
if not re.match(phone_regex, phone):
raise ValidationError("无效的手机号码格式")
return phone
@staticmethod
def sanitize_html(html_content, max_length=5000):
"""清理HTML内容"""
if not html_content:
return ""
# 长度限制
if len(html_content) > max_length:
raise ValidationError(f"内容长度不能超过 {max_length} 个字符")
# 使用bleach进行HTML清理(如果可用)
try:
import bleach
from bleach.sanitizer import ALLOWED_TAGS, ALLOWED_ATTRIBUTES
# 定义允许的标签和属性
allowed_tags = ALLOWED_TAGS + ['p', 'br', 'span', 'div', 'h1', 'h2', 'h3']
allowed_attributes = {
**ALLOWED_ATTRIBUTES,
'a': ['href', 'title', 'target', 'rel'],
'img': ['src', 'alt', 'width', 'height'],
'span': ['class'],
'div': ['class'],
}
cleaned_html = bleach.clean(
html_content,
tags=allowed_tags,
attributes=allowed_attributes,
strip=True
)
return cleaned_html
except ImportError:
# 回退方案:去除所有HTML标签
return strip_tags(html_content)
@staticmethod
def validate_url(url):
"""验证URL"""
if not url:
return ""
url_regex = r'^https?://[^\s/$.?#].[^\s]*$'
if not re.match(url_regex, url):
raise ValidationError("无效的URL格式")
# 防止JavaScript URL
if url.lower().startswith(('javascript:', 'data:', 'vbscript:')):
raise ValidationError("不允许的URL协议")
return url
@staticmethod
def prevent_numeric_overflow(value, max_digits=10, decimal_places=2):
"""防止数值溢出"""
if value is None:
return value
str_value = str(value)
if '.' in str_value:
integer_part, decimal_part = str_value.split('.')
if len(integer_part) > max_digits - decimal_places:
raise ValidationError("数值过大")
if len(decimal_part) > decimal_places:
decimal_part = decimal_part[:decimal_places]
value = float(f"{integer_part}.{decimal_part}")
else:
if len(str_value) > max_digits:
raise ValidationError("数值过大")
return value
# 自定义模型字段
from django.db import models
from django.core.validators import BaseValidator
class SafeCharField(models.CharField):
"""安全的字符字段"""
def __init__(self, *args, **kwargs):
# 默认设置最大长度
kwargs.setdefault('max_length', 255)
super().__init__(*args, **kwargs)
def to_python(self, value):
value = super().to_python(value)
if value:
# 清理输入
value = html.escape(force_str(value))
return value
class SafeTextField(models.TextField):
"""安全的文本字段"""
def to_python(self, value):
value = super().to_python(value)
if value:
# 清理HTML内容
value = InputValidator.sanitize_html(value)
return value
# 在模型中使用安全字段
class UserProfile(models.Model):
user = models.OneToOneField(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
bio = SafeTextField(blank=True, max_length=2000)
website = models.URLField(blank=True, validators=[InputValidator.validate_url])
phone = SafeCharField(max_length=20, blank=True, validators=[InputValidator.validate_phone])
def clean(self):
super().clean()
# 额外的模型级别验证
if self.website and not self.website.startswith(('http://', 'https://')):
self.website = 'https://' + self.website
8.2 API序列化安全
# api/security.py
from rest_framework import serializers
from django.core.exceptions import ValidationError
class SafeModelSerializer(serializers.ModelSerializer):
"""安全的模型序列化器"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.set_field_limits()
def set_field_limits(self):
"""根据模型字段设置序列化器字段限制"""
if hasattr(self, 'Meta') and hasattr(self.Meta, 'model'):
for field_name, field in self.fields.items():
model_field = None
try:
model_field = self.Meta.model._meta.get_field(field_name)
except:
continue
# 设置最大长度
if hasattr(model_field, 'max_length') and model_field.max_length:
if hasattr(field, 'max_length'):
field.max_length = min(field.max_length or float('inf'), model_field.max_length)
def validate(self, data):
"""全局验证"""
data = super().validate(data)
# 防止大规模分配攻击
if hasattr(self, 'initial_data'):
extra_fields = set(self.initial_data.keys()) - set(data.keys())
if extra_fields:
raise serializers.ValidationError({
'non_field_errors': f'不允许的字段: {", ".join(extra_fields)}'
})
return data
class UserSerializer(SafeModelSerializer):
"""用户序列化器"""
email = serializers.EmailField(max_length=254)
username = serializers.CharField(max_length=150)
class Meta:
model = User
fields = ['id', 'username', 'email', 'first_name', 'last_name']
read_only_fields = ['id']
def validate_username(self, value):
"""用户名验证"""
from django.contrib.auth.validators import UnicodeUsernameValidator
validator = UnicodeUsernameValidator()
try:
validator(value)
except ValidationError:
raise serializers.ValidationError("无效的用户名格式")
# 检查用户名是否已存在(排除当前用户)
if self.instance:
if User.objects.exclude(pk=self.instance.pk).filter(username=value).exists():
raise serializers.ValidationError("用户名已存在")
else:
if User.objects.filter(username=value).exists():
raise serializers.ValidationError("用户名已存在")
return value
def validate_email(self, value):
"""邮箱验证"""
try:
InputValidator.validate_email(value)
except ValidationError as e:
raise serializers.ValidationError(str(e))
return value
# API视图安全
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
class SecureAPIView(APIView):
"""安全的API视图基类"""
# 速率限制配置
throttle_scope = 'api'
def initial(self, request, *args, **kwargs):
super().initial(request, *args, **kwargs)
# 安全检查
self.perform_security_checks(request)
def perform_security_checks(self, request):
"""执行安全检查"""
# 检查用户代理
user_agent = request.META.get('HTTP_USER_AGENT', '')
if not user_agent or len(user_agent) > 500:
raise serializers.ValidationError("无效的用户代理")
# 检查内容类型
content_type = request.content_type
if request.method in ['POST', 'PUT', 'PATCH']:
if content_type not in ['application/json', 'multipart/form-data']:
raise serializers.ValidationError("不支持的内容类型")
def handle_exception(self, exc):
"""安全地处理异常"""
# 不要暴露敏感信息
if isinstance(exc, (ValidationError, serializers.ValidationError)):
# 记录详细的错误信息到日志
logger.warning(f"API验证错误: {str(exc)}")
# 返回通用的错误消息给客户端
return Response(
{'error': '请求数据无效'},
status=status.HTTP_400_BAD_REQUEST
)
# 其他异常
logger.error(f"API异常: {str(exc)}", exc_info=True)
return Response(
{'error': '服务器内部错误'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
9. 安全监控与日志
9.1 安全事件日志
# security/logging.py
import logging
import json
from django.utils import timezone
from django.core.signals import got_request_exception
from django.dispatch import receiver
# 安全专用日志器
security_logger = logging.getLogger('security')
class SecurityEventLogger:
"""安全事件日志记录器"""
@staticmethod
def log_login_success(request, user):
"""记录登录成功事件"""
security_logger.info(
'用户登录成功',
extra={
'event_type': 'login_success',
'user_id': user.id,
'username': user.username,
'ip_address': SecurityEventLogger.get_client_ip(request),
'user_agent': request.META.get('HTTP_USER_AGENT', ''),
'timestamp': timezone.now().isoformat(),
}
)
@staticmethod
def log_login_failure(request, username, reason=''):
"""记录登录失败事件"""
security_logger.warning(
'用户登录失败',
extra={
'event_type': 'login_failure',
'username': username,
'ip_address': SecurityEventLogger.get_client_ip(request),
'user_agent': request.META.get('HTTP_USER_AGENT', ''),
'reason': reason,
'timestamp': timezone.now().isoformat(),
}
)
@staticmethod
def log_suspicious_activity(request, activity_type, details):
"""记录可疑活动"""
security_logger.warning(
'检测到可疑活动',
extra={
'event_type': 'suspicious_activity',
'activity_type': activity_type,
'ip_address': SecurityEventLogger.get_client_ip(request),
'user_agent': request.META.get('HTTP_USER_AGENT', ''),
'details': json.dumps(details),
'timestamp': timezone.now().isoformat(),
}
)
@staticmethod
def log_security_violation(request, violation_type, details):
"""记录安全违规"""
security_logger.error(
'安全违规',
extra={
'event_type': 'security_violation',
'violation_type': violation_type,
'ip_address': SecurityEventLogger.get_client_ip(request),
'user_agent': request.META.get('HTTP_USER_AGENT', ''),
'details': json.dumps(details),
'timestamp': timezone.now().isoformat(),
}
)
@staticmethod
def get_client_ip(request):
"""获取客户端IP"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
# 异常信号接收器
@receiver(got_request_exception)
def log_unhandled_exception(sender, request, **kwargs):
"""记录未处理的异常"""
import traceback
exc_info = kwargs.get('exc_info')
if exc_info:
SecurityEventLogger.log_security_violation(
request,
'unhandled_exception',
{
'exception_type': exc_info[0].__name__,
'exception_message': str(exc_info[1]),
'traceback': traceback.format_exc(),
}
)
# 自定义日志配置
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'security': {
'format': '[SECURITY] {asctime} {event_type} {ip_address} {username} {message}',
'style': '{',
},
},
'handlers': {
'security_file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/django/security.log',
'maxBytes': 1024 * 1024 * 100, # 100MB
'backupCount': 10,
'formatter': 'security',
},
'security_console': {
'level': 'WARNING',
'class': 'logging.StreamHandler',
'formatter': 'security',
},
},
'loggers': {
'security': {
'handlers': ['security_file', 'security_console'],
'level': 'INFO',
'propagate': False,
},
},
}
9.2 安全监控中间件
# security/monitoring.py
import time
from django.utils.deprecation import MiddlewareMixin
from .logging import SecurityEventLogger
class SecurityMonitoringMiddleware(MiddlewareMixin):
"""安全监控中间件"""
def process_request(self, request):
request.start_time = time.time()
# 检查可疑请求
self.check_suspicious_request(request)
return None
def process_response(self, request, response):
# 记录请求耗时
if hasattr(request, 'start_time'):
duration = time.time() - request.start_time
# 记录慢请求
if duration > 5.0: # 超过5秒
SecurityEventLogger.log_suspicious_activity(
request,
'slow_request',
{'duration': duration, 'path': request.path}
)
# 检查可疑响应
self.check_suspicious_response(request, response)
return response
def check_suspicious_request(self, request):
"""检查可疑请求"""
suspicious_indicators = []
# 检查过长的URL
if len(request.get_full_path()) > 2048:
suspicious_indicators.append('url_too_long')
# 检查过多的参数
if len(request.GET) > 100 or len(request.POST) > 100:
suspicious_indicators.append('too_many_parameters')
# 检查可疑的用户代理
user_agent = request.META.get('HTTP_USER_AGENT', '')
if any(suspicious in user_agent.lower() for suspicious in ['sqlmap', 'nikto', 'wget', 'curl']):
suspicious_indicators.append('suspicious_user_agent')
# 检查可疑的Referer
referer = request.META.get('HTTP_REFERER', '')
if referer and 'example.com' not in referer and request.method == 'POST':
suspicious_indicators.append('suspicious_referer')
if suspicious_indicators:
SecurityEventLogger.log_suspicious_activity(
request,
'suspicious_request',
{'indicators': suspicious_indicators}
)
def check_suspicious_response(self, request, response):
"""检查可疑响应"""
# 检查错误响应
if response.status_code >= 400:
SecurityEventLogger.log_suspicious_activity(
request,
'error_response',
{'status_code': response.status_code, 'path': request.path}
)
# 检查敏感信息泄露
if hasattr(response, 'content'):
content = str(response.content).lower()
sensitive_patterns = [
'password', 'secret', 'key', 'token', 'database',
'traceback', 'exception', 'error at'
]
if any(pattern in content for pattern in sensitive_patterns):
SecurityEventLogger.log_security_violation(
request,
'sensitive_data_leak',
{'path': request.path}
)
class RateLimitMiddleware(MiddlewareMixin):
"""速率限制中间件"""
def __init__(self, get_response):
super().__init__(get_response)
# 使用Redis或内存存储请求计数
self.request_counts = {}
def process_request(self, request):
client_ip = SecurityEventLogger.get_client_ip(request)
path = request.path
# 构建键
key = f"{client_ip}:{path}"
current_time = int(time.time())
window_start = current_time - 60 # 1分钟窗口
# 清理过期记录
self.clean_old_entries(current_time)
# 获取当前计数
current_count = self.get_request_count(key, window_start)
# 检查是否超过限制
if current_count > 100: # 每分钟100次请求
SecurityEventLogger.log_security_violation(
request,
'rate_limit_exceeded',
{'client_ip': client_ip, 'path': path, 'count': current_count}
)
from django.http import HttpResponseTooManyRequests
return HttpResponseTooManyRequests("请求过于频繁")
# 增加计数
self.increment_request_count(key, current_time)
return None
def get_request_count(self, key, window_start):
"""获取请求计数"""
# 简化实现,实际应该使用Redis
count = 0
for timestamp in self.request_counts.get(key, []):
if timestamp >= window_start:
count += 1
return count
def increment_request_count(self, key, timestamp):
"""增加请求计数"""
if key not in self.request_counts:
self.request_counts[key] = []
self.request_counts[key].append(timestamp)
def clean_old_entries(self, current_time):
"""清理过期条目"""
window_start = current_time - 60
for key in list(self.request_counts.keys()):
self.request_counts[key] = [
ts for ts in self.request_counts[key]
if ts >= window_start
]
if not self.request_counts[key]:
del self.request_counts[key]
10. 生产环境安全配置
10.1 完整的安全设置
# production_settings.py
"""
生产环境安全配置
"""
import os
from django.core.management.utils import get_random_secret_key
# 从环境变量获取配置
SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY')
if not SECRET_KEY:
# 在生产环境中,必须设置环境变量
raise Exception("DJANGO_SECRET_KEY environment variable must be set")
DEBUG = False
ALLOWED_HOSTS = [
'yourdomain.com',
'www.yourdomain.com',
'.yourdomain.com', # 允许所有子域名
]
# 数据库安全配置
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': os.environ.get('DB_NAME'),
'USER': os.environ.get('DB_USER'),
'PASSWORD': os.environ.get('DB_PASSWORD'),
'HOST': os.environ.get('DB_HOST'),
'PORT': os.environ.get('DB_PORT', '5432'),
'CONN_MAX_AGE': 600,
'OPTIONS': {
'sslmode': 'require',
'connect_timeout': 10,
}
}
}
# HTTPS安全设置
SECURE_SSL_REDIRECT = True
SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https')
# Cookie安全
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
CSRF_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
CSRF_COOKIE_SAMESITE = 'Lax'
# HSTS设置
SECURE_HSTS_SECONDS = 31536000 # 1年
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
# 其他安全头
SECURE_BROWSER_XSS_FILTER = True
SECURE_CONTENT_TYPE_NOSNIFF = True
X_FRAME_OPTIONS = 'DENY'
# CSRF信任的源
CSRF_TRUSTED_ORIGINS = [
'https://yourdomain.com',
'https://www.yourdomain.com',
]
# 会话配置
SESSION_ENGINE = 'django.contrib.sessions.backends.cached_db'
SESSION_COOKIE_AGE = 1209600 # 2周
SESSION_EXPIRE_AT_BROWSER_CLOSE = False
# 密码验证
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
'OPTIONS': {
'min_length': 12,
}
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# 日志配置
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
},
'handlers': {
'file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/django/app.log',
'maxBytes': 1024 * 1024 * 100, # 100MB
'backupCount': 10,
'formatter': 'verbose',
},
'security_file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/django/security.log',
'maxBytes': 1024 * 1024 * 100,
'backupCount': 10,
'formatter': 'verbose',
},
},
'loggers': {
'django': {
'handlers': ['file'],
'level': 'INFO',
'propagate': True,
},
'security': {
'handlers': ['security_file'],
'level': 'INFO',
'propagate': False,
},
},
}
# 邮件配置(用于安全通知)
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = os.environ.get('EMAIL_HOST')
EMAIL_PORT = int(os.environ.get('EMAIL_PORT', 587))
EMAIL_USE_TLS = True
EMAIL_HOST_USER = os.environ.get('EMAIL_HOST_USER')
EMAIL_HOST_PASSWORD = os.environ.get('EMAIL_HOST_PASSWORD')
DEFAULT_FROM_EMAIL = 'security@yourdomain.com'
SERVER_EMAIL = 'server@yourdomain.com'
# ADMINS和MANAGERS用于接收错误邮件
ADMINS = [
('Admin', 'admin@yourdomain.com'),
]
MANAGERS = ADMINS
# 缓存配置(使用Redis)
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': os.environ.get('REDIS_URL', 'redis://127.0.0.1:6379/1'),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'SOCKET_CONNECT_TIMEOUT': 5,
'SOCKET_TIMEOUT': 5,
}
}
}
# 使用缓存存储会话
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'default'
10.2 安全审计与检查
# security/audit.py
from django.core.management.base import BaseCommand
from django.conf import settings
import subprocess
import requests
import sys
class SecurityAuditCommand(BaseCommand):
"""安全审计命令"""
help = '执行安全审计检查'
def handle(self, *args, **options):
self.stdout.write("开始安全审计...")
checks = [
self.check_debug_mode,
self.check_secret_key,
self.check_allowed_hosts,
self.check_https_settings,
self.check_database_ssl,
self.check_dependencies,
]
all_passed = True
for check in checks:
try:
result = check()
if result:
self.stdout.write(
self.style.SUCCESS(f"✓ {check.__name__}: 通过")
)
else:
self.stdout.write(
self.style.ERROR(f"✗ {check.__name__}: 失败")
)
all_passed = False
except Exception as e:
self.stdout.write(
self.style.ERROR(f"✗ {check.__name__}: 错误 - {str(e)}")
)
all_passed = False
if all_passed:
self.stdout.write(
self.style.SUCCESS("所有安全检查通过!")
)
else:
self.stdout.write(
self.style.ERROR("发现安全问题,请及时修复!")
)
sys.exit(1)
def check_debug_mode(self):
"""检查调试模式"""
return not settings.DEBUG
def check_secret_key(self):
"""检查密钥安全性"""
secret_key = settings.SECRET_KEY
return secret_key and len(secret_key) >= 50 and secret_key != 'your-secret-key-here'
def check_allowed_hosts(self):
"""检查ALLOWED_HOSTS配置"""
return len(settings.ALLOWED_HOSTS) > 0 and '*' not in settings.ALLOWED_HOSTS
def check_https_settings(self):
"""检查HTTPS配置"""
if not settings.DEBUG:
return (settings.SECURE_SSL_REDIRECT and
settings.SESSION_COOKIE_SECURE and
settings.CSRF_COOKIE_SECURE)
return True
def check_database_ssl(self):
"""检查数据库SSL连接"""
db_options = settings.DATABASES['default'].get('OPTIONS', {})
return db_options.get('sslmode') == 'require'
def check_dependencies(self):
"""检查依赖包安全性"""
try:
# 使用safety检查已知漏洞
result = subprocess.run([
'safety', 'check', '--json'
], capture_output=True, text=True)
if result.returncode == 0:
return True
else:
self.stdout.write(
self.style.WARNING("发现依赖包安全漏洞:")
)
self.stdout.write(result.stdout)
return False
except FileNotFoundError:
self.stdout.write(
self.style.WARNING("safety未安装,跳过依赖检查")
)
return True
# 在settings.py中导入时检查
def security_checks():
"""启动时的安全检查"""
import warnings
if settings.DEBUG:
warnings.warn(
"DEBUG模式已开启,这会在生产环境中造成安全风险!",
RuntimeWarning
)
if not settings.ALLOWED_HOSTS:
warnings.warn(
"ALLOWED_HOSTS未设置,这会造成安全风险!",
RuntimeWarning
)
if settings.SECRET_KEY == 'your-secret-key-here':
warnings.warn(
"使用默认的SECRET_KEY,这会造成严重安全风险!",
RuntimeWarning
)
# 在settings.py末尾调用
security_checks()
11. 应急响应与恢复
11.1 安全事件响应
# security/incident_response.py
from django.core.management.base import BaseCommand
from django.core.mail import send_mail
from django.utils import timezone
import logging
logger = logging.getLogger(__name__)
class SecurityIncidentResponse:
"""安全事件响应"""
@classmethod
def handle_suspicious_activity(cls, request, activity_type, details):
"""处理可疑活动"""
# 记录事件
logger.critical(
f"安全事件: {activity_type}",
extra={
'activity_type': activity_type,
'details': details,
'ip_address': cls.get_client_ip(request),
'timestamp': timezone.now().isoformat(),
}
)
# 发送警报邮件
cls.send_alert_email(activity_type, details)
# 执行自动响应措施
cls.execute_automatic_response(activity_type, request)
@classmethod
def send_alert_email(cls, activity_type, details):
"""发送警报邮件"""
subject = f"安全警报: {activity_type}"
message = f"""
检测到安全事件:
类型: {activity_type}
详情: {details}
时间: {timezone.now().isoformat()}
请立即检查系统安全状况。
"""
try:
send_mail(
subject,
message,
'security@yourdomain.com',
['admin@yourdomain.com', 'security-team@yourdomain.com'],
fail_silently=False,
)
except Exception as e:
logger.error(f"发送安全警报邮件失败: {str(e)}")
@classmethod
def execute_automatic_response(cls, activity_type, request):
"""执行自动响应"""
client_ip = cls.get_client_ip(request)
if activity_type in ['brute_force', 'rate_limit_exceeded']:
# 暂时封禁IP
cls.block_ip_temporarily(client_ip)
elif activity_type == 'suspicious_file_upload':
# 立即删除可疑文件
cls.delete_suspicious_files()
@classmethod
def block_ip_temporarily(cls, ip_address, duration_minutes=30):
"""临时封禁IP"""
# 实现IP封禁逻辑
# 可以使用Redis或数据库存储封禁记录
logger.info(f"临时封禁IP: {ip_address},时长: {duration_minutes}分钟")
@classmethod
def delete_suspicious_files(cls):
"""删除可疑文件"""
# 实现文件删除逻辑
logger.info("执行可疑文件清理")
@classmethod
def get_client_ip(cls, request):
"""获取客户端IP"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class EmergencyLockdownCommand(BaseCommand):
"""紧急锁定命令"""
help = '紧急锁定系统'
def add_arguments(self, parser):
parser.add_argument(
'--level',
type=str,
choices=['high', 'medium', 'low'],
default='medium',
help='锁定级别'
)
def handle(self, *args, **options):
level = options['level']
self.stdout.write(f"执行紧急锁定,级别: {level}")
if level == 'high':
self.high_alert_lockdown()
elif level == 'medium':
self.medium_alert_lockdown()
else:
self.low_alert_lockdown()
self.stdout.write(
self.style.SUCCESS("紧急锁定完成")
)
def high_alert_lockdown(self):
"""高级别锁定"""
# 停止接受新用户注册
# 禁用所有API端点
# 强制所有用户重新认证
# 启用维护模式
logger.critical("执行高级别紧急锁定")
def medium_alert_lockdown(self):
"""中级别锁定"""
# 加强认证要求
# 限制敏感操作
# 增加安全监控
logger.warning("执行中级别紧急锁定")
def low_alert_lockdown(self):
"""低级别锁定"""
# 增加日志记录
# 发送安全通知
# 加强输入验证
logger.info("执行低级别紧急锁定")
12. 总结
12.1 关键安全实践
- 深度防御:在应用的各个层面实施安全措施
- 最小权限原则:只授予必要的权限
- 输入验证:对所有用户输入进行严格验证
- 输出编码:在显示用户数据时进行适当的编码
- 安全配置:正确配置所有安全相关的设置
12.2 持续安全维护
- 定期更新依赖包
- 监控安全公告和漏洞
- 进行安全代码审查
- 实施安全测试
- 建立安全事件响应流程
12.3 安全检查清单
# security/checklist.py
SECURITY_CHECKLIST = {
'authentication': [
'使用强密码策略',
'实施账户锁定机制',
'使用安全的会话管理',
'实现安全的注销功能',
],
'authorization': [
'实施最小权限原则',
'验证所有访问控制',
'保护敏感操作',
],
'input_validation': [
'验证所有用户输入',
'使用白名单验证',
'实施输出编码',
'防范SQL注入',
],
'security_headers': [
'配置CSP策略',
'设置安全Cookie属性',
'启用HSTS',
'配置X-Frame-Options',
],
'data_protection': [
'加密敏感数据',
'安全处理文件上传',
'保护静态文件',
'实施数据备份',
],
'monitoring': [
'记录安全事件',
'监控可疑活动',
'设置警报机制',
'定期安全审计',
],
}
通过实施这些安全最佳实践,您可以显著提高Django应用程序的安全性,有效防范常见的Web攻击。记住,安全是一个持续的过程,需要定期审查和更新安全措施。
更多推荐



所有评论(0)