新手进阶Python:办公看板集成AI异常预警+全流程审计+风险拦截
摘要:本文介绍了一个办公看板安全升级项目,针对企业用户反馈的异常行为识别缺失、操作审计不完善等问题,新增三大核心功能:1)AI异常预警模块,基于操作行为特征实现风险分级预警;2)全流程操作审计系统,完整记录用户操作并提供追溯功能;3)实时风险拦截机制,结合AI分析结果阻止高危操作。项目基于Flask+MySQL+OpenAI+Scikit-learn技术栈实现,包含详细的代码注释和配置指南,帮助新
大家好!我是CSDN的Python新手博主~ 上一篇我们完成了看板的可视化升级与权限管控,解决了多角色数据查看与汇报需求,但企业用户(尤其甲方客户)反馈两大核心痛点:① 缺乏异常行为识别能力,对批量修改数据、越权操作尝试、异常数据录入等风险无法主动预警,易造成数据泄露或错误;② 操作审计不完善,仅记录核心功能操作,普通查询、数据导出、权限变更等行为无追溯,不符合合规审计要求;③ 风险操作无拦截机制,仅靠权限控制无法规避恶意操作或误操作。今天就带来超落地的新手实战项目——办公看板集成AI异常预警+全流程操作审计+风险拦截!
本次基于之前的“权限管控看板”代码,新增3大核心功能:① AI异常预警(基于操作行为特征、数据规律,识别批量操作、异常时段、越权尝试等风险,支持实时预警与历史分析);② 全流程操作审计(记录所有用户行为,含查询、新增、修改、删除、导出、权限变更,支持审计报表生成与追溯);③ 实时风险拦截(针对高危操作设置拦截规则,结合AI预警结果,阻止恶意或误操作并触发审批流程)。全程基于现有技术栈(Flask+MySQL+OpenAI+Scikit-learn),新增AI预警模块、审计日志引擎、风险拦截中间件,代码注释详细,新手只需配置预警规则与拦截阈值,跟着步骤复制操作就能成功,让看板兼顾业务效率与安全合规~
一、本次学习目标
-
掌握AI异常行为识别逻辑,基于操作特征(频率、时段、范围)与数据规律,搭建多维度预警模型,实现实时预警与风险分级;
-
学会全流程操作审计设计,记录所有用户行为的关键信息(操作人、时间、IP、内容、结果),实现审计日志分类、检索与报表导出;
-
理解风险拦截机制,通过中间件拦截高危操作,结合AI预警结果触发审批流程,平衡操作灵活性与安全管控;
-
实现预警、审计、拦截功能联动,异常行为实时预警+全程审计留痕+高危操作拦截,形成安全闭环;
-
适配企业合规要求,审计日志不可篡改、预警记录可追溯、拦截操作有审批留痕,满足甲方客户审计需求。
二、前期准备
- 安装核心依赖库
安装核心依赖(AI特征工程、异常检测、日志解析)
pip3 install scikit-learn pandas numpy python-dotenv python-multipart -i https://pypi.tuna.tsinghua.edu.cn/simple
确保已有依赖正常(Flask、OpenAI、APScheduler等)
pip3 install --upgrade flask flask-login flask-sqlalchemy apscheduler openai requests pymysql -i https://pypi.tuna.tsinghua.edu.cn/simple
说明:AI异常检测基于Scikit-learn实现简单特征工程(如操作频率、时段特征),结合OpenAI做智能风险分析;审计日志通过Flask中间件全局捕获请求;风险拦截基于自定义装饰器与规则引擎实现。
- 第三方服务与配置准备
-
预警规则配置:定义异常行为类型(批量操作:单次修改>50条数据;异常时段:凌晨0-6点操作;越权尝试:10分钟内3次以上权限校验失败;数据异常:订单金额超出常规范围),设置预警阈值与分级(高风险:立即拦截,中风险:弹窗提醒,低风险:日志记录);
-
审计范围配置:确定需审计的操作类型(查询、新增、修改、删除、导出、权限变更、登录/登出),定义每条审计记录需包含的字段(操作人、角色、IP、操作时间、操作内容、操作结果、客户端信息);
-
风险拦截配置:梳理高危操作清单(批量删除数据、修改他人数据、导出全量客户信息、变更管理员权限),设置拦截规则(高风险操作需上级审批,中风险操作二次确认);
-
安全配置:在.env文件中补充预警通知邮箱、审批流程配置、异常检测阈值,确保AI预警结果可通过邮件推送;开启服务器IP日志记录,用于审计溯源。
- 数据库表优化与创建
– 连接MySQL数据库(替换为你的数据库信息)
mysql -u office_user -p -h 47.108.xxx.xxx office_data
– 创建操作审计表(operation_audit)
CREATE TABLE operation_audit (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL COMMENT ‘用户ID’,
username VARCHAR(50) NOT NULL COMMENT ‘用户名’,
user_role VARCHAR(50) NOT NULL COMMENT ‘用户角色’,
operation_type ENUM(‘query’, ‘add’, ‘update’, ‘delete’, ‘export’, ‘perm_change’, ‘login’, ‘logout’) NOT NULL COMMENT ‘操作类型’,
operation_content TEXT NOT NULL COMMENT ‘操作内容(JSON格式)’,
operation_result ENUM(‘success’, ‘fail’, ‘intercepted’) NOT NULL COMMENT ‘操作结果(成功/失败/被拦截)’,
ip_address VARCHAR(50) NOT NULL COMMENT ‘操作IP’,
user_agent TEXT NULL COMMENT ‘客户端信息(浏览器/设备)’,
operation_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘操作时间’,
audit_remark TEXT NULL COMMENT ‘审计备注’,
KEY idx_user_id (user_id),
KEY idx_operation_type (operation_type),
KEY idx_operation_time (operation_time),
KEY idx_operation_result (operation_result)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘全流程操作审计表’;
– 创建异常预警表(abnormal_alert)
CREATE TABLE abnormal_alert (
id INT AUTO_INCREMENT PRIMARY KEY,
audit_id BIGINT NULL COMMENT ‘关联审计ID(可为空,独立预警场景)’,
user_id INT NOT NULL COMMENT ‘触发用户ID’,
alert_type ENUM(‘batch_operation’, ‘abnormal_time’, ‘privilege_attempt’, ‘data_abnormal’) NOT NULL COMMENT ‘预警类型’,
alert_level ENUM(‘high’, ‘medium’, ‘low’) NOT NULL COMMENT ‘预警级别’,
alert_content TEXT NOT NULL COMMENT ‘预警内容’,
ai_analysis TEXT NULL COMMENT ‘AI风险分析’,
handle_status ENUM(‘pending’, ‘processed’, ‘ignored’) DEFAULT ‘pending’ COMMENT ‘处理状态’,
handle_time DATETIME NULL COMMENT ‘处理时间’,
handler_id INT NULL COMMENT ‘处理人ID’,
handle_remark TEXT NULL COMMENT ‘处理备注’,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间’,
KEY idx_user_id (user_id),
KEY idx_alert_level (alert_level),
KEY idx_handle_status (handle_status),
KEY idx_create_time (create_time),
FOREIGN KEY (audit_id) REFERENCES operation_audit(id) ON DELETE SET NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘AI异常预警表’;
– 创建风险拦截表(risk_intercept)
CREATE TABLE risk_intercept (
id INT AUTO_INCREMENT PRIMARY KEY,
audit_id BIGINT NOT NULL COMMENT ‘关联审计ID’,
alert_id INT NULL COMMENT ‘关联预警ID’,
operation_type ENUM(‘batch_delete’, ‘cross_user_update’, ‘full_data_export’, ‘perm_change_admin’) NOT NULL COMMENT ‘高危操作类型’,
intercept_reason TEXT NOT NULL COMMENT ‘拦截原因’,
intercept_rule VARCHAR(100) NOT NULL COMMENT ‘触发的拦截规则’,
approval_status ENUM(‘pending’, ‘approved’, ‘rejected’) DEFAULT ‘pending’ COMMENT ‘审批状态’,
approval_user_id INT NULL COMMENT ‘审批人ID’,
approval_time DATETIME NULL COMMENT ‘审批时间’,
approval_remark TEXT NULL COMMENT ‘审批备注’,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间’,
KEY idx_audit_id (audit_id),
KEY idx_approval_status (approval_status),
FOREIGN KEY (audit_id) REFERENCES operation_audit(id) ON DELETE CASCADE,
FOREIGN KEY (alert_id) REFERENCES abnormal_alert(id) ON DELETE SET NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘风险操作拦截表’;
– 创建审批流程表(approval_flow)
CREATE TABLE approval_flow (
id INT AUTO_INCREMENT PRIMARY KEY,
intercept_id INT NOT NULL COMMENT ‘关联拦截ID’,
apply_user_id INT NOT NULL COMMENT ‘申请人ID’,
approve_user_id INT NOT NULL COMMENT ‘审批人ID’,
flow_status ENUM(‘pending’, ‘approved’, ‘rejected’, ‘cancelled’) DEFAULT ‘pending’ COMMENT ‘流程状态’,
apply_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘申请时间’,
approve_time DATETIME NULL COMMENT ‘审批时间’,
apply_remark TEXT NULL COMMENT ‘申请备注’,
approve_remark TEXT NULL COMMENT ‘审批备注’,
KEY idx_intercept_id (intercept_id),
KEY idx_flow_status (flow_status),
FOREIGN KEY (intercept_id) REFERENCES risk_intercept(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘高危操作审批流程表’;
– 初始化预警规则数据(可后续通过接口动态配置)
INSERT INTO abnormal_alert (user_id, alert_type, alert_level, alert_content, handle_status)
VALUES (0, ‘batch_operation’, ‘high’, ‘单次修改数据量>50条,触发高风险预警’, ‘pending’);
INSERT INTO abnormal_alert (user_id, alert_type, alert_level, alert_content, handle_status)
VALUES (0, ‘abnormal_time’, ‘medium’, ‘凌晨0-6点进行数据操作,触发中风险预警’, ‘pending’);
三、实战:AI预警+全流程审计+风险拦截集成
- 第一步:搭建全流程操作审计引擎,实现行为留痕
-- coding: utf-8 --
audit_engine.py 全流程操作审计引擎
from flask import Blueprint, request, jsonify, g
from flask_login import login_required, current_user
from functools import wraps
import json
import datetime
from models import db, OperationAudit, Department, User
from user_agents import parse # 解析客户端信息
from dotenv import load_dotenv
import os
加载环境变量
load_dotenv()
audit_bp = Blueprint(“audit”, name)
====================== 核心装饰器:记录操作审计日志 ======================
def audit_log(operation_type):
“”"
操作审计装饰器:记录函数执行对应的操作日志
:param operation_type: 操作类型(query/add/update/delete/export/perm_change/login/logout)
“”"
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 初始化审计数据
audit_data = {
“user_id”: current_user.id if current_user.is_authenticated else 0,
“username”: current_user.username if current_user.is_authenticated else “anonymous”,
“user_role”: current_user.role.role_name if (current_user.is_authenticated and current_user.role) else “unknown”,
“operation_type”: operation_type,
“operation_content”: {},
“operation_result”: “fail”,
“ip_address”: request.remote_addr,
“user_agent”: str(parse(request.user_agent.string)),
“operation_time”: datetime.datetime.now()
}
try:
# 执行原函数,获取结果
result = f(*args, **kwargs)
# 更新操作结果为成功
audit_data["operation_result"] = "success"
# 补充操作内容(根据操作类型提取关键信息)
if operation_type == "query":
audit_data["operation_content"] = {
"query_params": request.args.to_dict(),
"query_path": request.path
}
elif operation_type in ["add", "update", "delete"]:
audit_data["operation_content"] = {
"request_data": request.get_json() or request.form.to_dict(),
"target_id": kwargs.get("id") or ""
}
elif operation_type == "export":
audit_data["operation_content"] = {
"export_type": request.args.get("export_type", "custom"),
"filter_params": request.args.to_dict()
}
elif operation_type == "perm_change":
audit_data["operation_content"] = {
"target_user_id": kwargs.get("user_id"),
"new_role": kwargs.get("new_role"),
"old_role": kwargs.get("old_role")
}
elif operation_type in ["login", "logout"]:
audit_data["operation_content"] = {
"status": "success" if operation_type == "login" else "normal_logout"
}
return result
except Exception as e:
# 操作失败,记录错误信息
audit_data["operation_content"]["error_msg"] = str(e)
raise e
finally:
# 保存审计日志(无论成功/失败都记录)
if audit_data["user_id"] != 0 or operation_type == "login": # 不记录匿名非登录操作
audit = OperationAudit(
user_id=audit_data["user_id"],
username=audit_data["username"],
user_role=audit_data["user_role"],
operation_type=audit_data["operation_type"],
operation_content=json.dumps(audit_data["operation_content"], ensure_ascii=False),
operation_result=audit_data["operation_result"],
ip_address=audit_data["ip_address"],
user_agent=audit_data["user_agent"]
)
db.session.add(audit)
db.session.commit()
# 将审计ID存入g对象,供后续预警、拦截使用
g.audit_id = audit.id
return decorated_function
return decorator
====================== 全局中间件:捕获未被装饰器覆盖的操作 ======================
@audit_bp.before_app_request
def before_request_audit():
“”“全局请求拦截,记录未被audit_log装饰器覆盖的操作(主要是查询类)”“”
# 排除静态文件、登录接口(登录接口单独用audit_log装饰)
exclude_paths = [“/static”, “/auth/login”, “/auth/logout”, “/api/health”]
if any(request.path.startswith(path) for path in exclude_paths):
return
# 仅记录已登录用户的操作,且未被audit_log装饰的GET请求(查询类)
if current_user.is_authenticated and request.method == "GET" and not hasattr(g, "audit_recorded"):
audit = OperationAudit(
user_id=current_user.id,
username=current_user.username,
user_role=current_user.role.role_name,
operation_type="query",
operation_content=json.dumps({
"query_params": request.args.to_dict(),
"query_path": request.path
}, ensure_ascii=False),
operation_result="success",
ip_address=request.remote_addr,
user_agent=str(parse(request.user_agent.string))
)
db.session.add(audit)
db.session.commit()
g.audit_id = audit.id
g.audit_recorded = True
====================== 接口:审计日志查询与筛选 ======================
@audit_bp.route(“/audit/log/list”, methods=[“GET”])
@login_required
@audit_log(“query”)
def get_audit_log():
“”“获取审计日志列表(支持多条件筛选)”“”
page = int(request.args.get(“page”, 1))
page_size = int(request.args.get(“page_size”, 20))
username = request.args.get(“username”)
operation_type = request.args.get(“operation_type”)
operation_result = request.args.get(“operation_result”)
start_time = request.args.get(“start_time”)
end_time = request.args.get(“end_time”)
# 构建查询条件
query = OperationAudit.query.order_by(OperationAudit.operation_time.desc())
if username:
query = query.filter(OperationAudit.username.like(f"%{username}%"))
if operation_type:
query = query.filter(OperationAudit.operation_type == operation_type)
if operation_result:
query = query.filter(OperationAudit.operation_result == operation_result)
if start_time:
query = query.filter(OperationAudit.operation_time >= datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S"))
if end_time:
query = query.filter(OperationAudit.operation_time <= datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S"))
# 分页查询
pagination = query.paginate(page=page, per_page=page_size)
logs = pagination.items
# 格式化返回结果
log_list = []
for log in logs:
content = json.loads(log.operation_content) if log.operation_content else {}
log_list.append({
"audit_id": log.id,
"username": log.username,
"user_role": log.user_role,
"operation_type": {
"query": "查询", "add": "新增", "update": "修改", "delete": "删除",
"export": "导出", "perm_change": "权限变更", "login": "登录", "logout": "登出"
}[log.operation_type],
"operation_content": content,
"operation_result": {"success": "成功", "fail": "失败", "intercepted": "被拦截"}[log.operation_result],
"ip_address": log.ip_address,
"user_agent": log.user_agent,
"operation_time": log.operation_time.strftime("%Y-%m-%d %H:%M:%S"),
"audit_remark": log.audit_remark
})
return jsonify({
"success": True,
"data": log_list,
"total": pagination.total,
"page": page,
"page_size": page_size
})
====================== 接口:审计日志导出(Excel) ======================
@audit_bp.route(“/audit/log/export”, methods=[“GET”])
@login_required
@audit_log(“export”)
def export_audit_log():
“”“导出审计日志为Excel(复用之前的报表导出功能)”“”
from report_export import export_excel_by_template # 复用报表导出模块
from models import ReportConfig
# 查找审计日志导出报表配置(需提前在report_config表中创建)
report_config = ReportConfig.query.filter_by(report_name="操作审计日志报表").first()
if not report_config:
return jsonify({"success": False, "error": "未找到审计日志报表配置"})
# 筛选参数
filter_params = {
"username": request.args.get("username"),
"operation_type": request.args.get("operation_type"),
"start_time": request.args.get("start_time"),
"end_time": request.args.get("end_time")
}
try:
# 调用报表导出功能
temp_path = export_excel_by_template(report_config, filter_params)
# 发送文件给前端下载
return send_file(
temp_path,
as_attachment=True,
download_name=f"操作审计日志_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.xlsx",
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
except Exception as e:
return jsonify({"success": False, "error": f"导出失败:{str(e)}"})
在app.py中新增以下内容
from audit_engine import audit_bp, audit_log
from flask import send_file
注册审计蓝图
app.register_blueprint(audit_bp)
示例:使用审计装饰器记录操作
@app.route(“/data/add”, methods=[“POST”])
@login_required
@audit_log(“add”) # 记录新增操作
def add_data():
“”“新增数据接口,自动记录审计日志”“”
data = request.get_json()
# 业务逻辑省略…
return jsonify({“success”: True, “msg”: “新增成功”})
@app.route(“/user/role/change/int:user_id”, methods=[“POST”])
@login_required
@audit_log(“perm_change”) # 记录权限变更操作
def change_user_role(user_id):
“”“变更用户角色接口,自动记录审计日志”“”
data = request.get_json()
new_role = data.get(“new_role”)
# 业务逻辑省略(查询旧角色、更新角色)
old_role = “sales” # 示例值
# 传递参数给审计装饰器
g.perm_change_info = {“user_id”: user_id, “new_role”: new_role, “old_role”: old_role}
return jsonify({“success”: True, “msg”: “角色变更成功”})
- 第二步:实现AI异常预警模型,主动识别风险
-- coding: utf-8 --
ai_alert.py AI异常预警模块
from flask import g
from flask_login import current_user
import datetime
import json
import pandas as pd
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from models import db, AbnormalAlert, OperationAudit
from dotenv import load_dotenv
import os
import smtplib
from email.mime.text import MIMEText
from openai import OpenAI
加载环境变量
load_dotenv()
OPENAI_API_KEY = os.getenv(“OPENAI_API_KEY”)
client = OpenAI(api_key=OPENAI_API_KEY)
预警配置
ALERT_THRESHOLDS = {
“batch_operation”: 50, # 批量操作阈值(单次修改/删除数据量)
“abnormal_time_start”: 0, # 异常时段开始时间(小时)
“abnormal_time_end”: 6, # 异常时段结束时间(小时)
“privilege_attempt_count”: 3, # 越权尝试次数阈值
“privilege_attempt_interval”: 10, # 越权尝试时间间隔(分钟)
“data_abnormal_ratio”: 0.5 # 数据异常比例阈值(超出常规范围的数据占比)
}
预警通知邮箱
ALERT_EMAILS = os.getenv(“ALERT_EMAILS”).split(“,”)
邮件配置(复用报表导出的SMTP配置)
SMTP_SERVER = os.getenv(“SMTP_SERVER”)
SMTP_PORT = int(os.getenv(“SMTP_PORT”, 465))
SMTP_USER = os.getenv(“SMTP_USER”)
SMTP_PASS = os.getenv(“SMTP_PASS”)
====================== 核心功能:发送预警邮件 ======================
def send_alert_email(alert):
“”“发送异常预警邮件”“”
if not ALERT_EMAILS:
return False
# 构建邮件内容
subject = f"【{alert.alert_level.upper()}风险预警】{alert.username}触发异常行为"
content = f"""
<h3>异常预警详情</h3>
<p>预警ID:{alert.id}</p>
<p>触发用户:{alert.username}(角色:{alert.user_role})</p>
<p>预警类型:{
{"batch_operation": "批量操作", "abnormal_time": "异常时段操作",
"privilege_attempt": "越权尝试", "data_abnormal": "数据异常录入"}[alert.alert_type]
}</p>
<p>预警级别:{
{"high": "高风险(需立即处理)", "medium": "中风险(建议核查)", "low": "低风险(可忽略)"}[alert.alert_level]
}</p>
<p>预警内容:{alert.alert_content}</p>
<p>AI风险分析:{alert.ai_analysis or "暂无分析"}</p>
<p>触发时间:{alert.create_time.strftime("%Y-%m-%d %H:%M:%S")}</p>
<p>操作IP:{alert.ip_address}</p>
"""
# 发送邮件
try:
msg = MIMEText(content, "html", "utf-8")
msg["Subject"] = subject
msg["From"] = SMTP_USER
msg["To"] = ",".join(ALERT_EMAILS)
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
server.login(SMTP_USER, SMTP_PASS)
server.sendmail(SMTP_USER, ALERT_EMAILS, msg.as_string())
return True
except Exception as e:
print(f"预警邮件发送失败:{str(e)}")
return False
====================== 核心功能:AI风险分析 ======================
def ai_analyze_risk(alert):
“”“基于预警内容,用AI分析风险原因与处理建议”“”
prompt = f"“”
你是企业办公系统的安全风控助手,需分析以下异常行为预警,完成以下任务:
1. 分析异常行为的核心风险点(是否存在数据泄露、数据错误、恶意操作风险);
2. 给出具体的处理建议(步骤清晰,适配管理员操作,区分高/中/低风险处理优先级);
3. 标注是否需要进一步拦截相关操作,若需要,说明拦截规则优化方向。
预警详情:
- 触发用户:{alert.username}(角色:{alert.user_role})
- 预警类型:{alert.alert_type}
- 预警级别:{alert.alert_level}
- 预警内容:{alert.alert_content}
- 触发时间:{alert.create_time.strftime("%Y-%m-%d %H:%M:%S")}
- 操作IP:{alert.ip_address}
请按以下格式输出(简洁明了,重点突出):
【风险点】XXX
【处理建议】XXX
【拦截优化建议】XXX
"""
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.3 # 降低随机性,确保结果精准
)
return response.choices[0].message.content.strip()
except Exception as e:
return f"AI分析失败:{str(e)},请手动核查"
====================== 核心功能:异常行为检测(批量操作) ======================
def detect_batch_operation(audit):
“”“检测批量操作异常(单次修改/删除数据量超出阈值)”“”
if audit.operation_type not in [“update”, “delete”]:
return None
content = json.loads(audit.operation_content)
# 提取操作数据量(新增/修改/删除的条数)
target_ids = content.get("target_ids") or [content.get("target_id")]
operation_count = len(target_ids) if isinstance(target_ids, list) else 1
if operation_count > ALERT_THRESHOLDS["batch_operation"]:
# 触发批量操作预警
alert_content = f"用户{audit.username}单次{audit.operation_type}数据{operation_count}条,超出阈值{ALERT_THRESHOLDS['batch_operation']}条,疑似批量操作风险"
alert = AbnormalAlert(
audit_id=audit.id,
user_id=audit.user_id,
alert_type="batch_operation",
alert_level="high",
alert_content=alert_content
)
# AI分析风险
alert.ai_analysis = ai_analyze_risk(alert)
db.session.add(alert)
db.session.commit()
# 高风险预警发送邮件
send_alert_email(alert)
return alert
return None
====================== 核心功能:异常行为检测(异常时段) ======================
def detect_abnormal_time(audit):
“”“检测异常时段操作(凌晨0-6点)”“”
operation_hour = audit.operation_time.hour
if (operation_hour >= ALERT_THRESHOLDS[“abnormal_time_start”] and
operation_hour < ALERT_THRESHOLDS[“abnormal_time_end”]):
# 排除登录/登出操作,仅关注数据操作
if audit.operation_type in [“add”, “update”, “delete”, “export”, “perm_change”]:
alert_content = f"用户{audit.username}在异常时段({operation_hour}:00-{operation_hour+1}:00)进行{audit.operation_type}操作,存在风险"
alert = AbnormalAlert(
audit_id=audit.id,
user_id=audit.user_id,
alert_type=“abnormal_time”,
alert_level=“medium”,
alert_content=alert_content
)
alert.ai_analysis = ai_analyze_risk(alert)
db.session.add(alert)
db.session.commit()
# 中风险预警发送邮件
send_alert_email(alert)
return alert
return None
====================== 核心功能:异常行为检测(越权尝试) ======================
def detect_privilege_attempt(audit):
“”“检测越权操作尝试(短时间内多次权限校验失败)”“”
if audit.operation_result != “fail”:
return None
# 查找该用户近N分钟内的权限失败记录
start_time = audit.operation_time - datetime.timedelta(minutes=ALERT_THRESHOLDS["privilege_attempt_interval"])
fail_records = OperationAudit.query.filter(
OperationAudit.user_id == audit.user_id,
OperationAudit.operation_result == "fail",
OperationAudit.operation_time >= start_time,
OperationAudit.operation_content.like("%权限%")
).count()
if fail_records >= ALERT_THRESHOLDS["privilege_attempt_count"]:
alert_content = f"用户{audit.username}在{ALERT_THRESHOLDS['privilege_attempt_interval']}分钟内,越权操作尝试{fail_records}次,疑似恶意越权"
alert = AbnormalAlert(
audit_id=audit.id,
user_id=audit.user_id,
alert_type="privilege_attempt",
alert_level="high",
alert_content=alert_content
)
alert.ai_analysis = ai_analyze_risk(alert)
db.session.add(alert)
db.session.commit()
send_alert_email(alert)
return alert
return None
====================== 核心功能:异常行为检测(数据异常) ======================
def detect_data_abnormal(audit, data_list):
“”“检测数据异常录入(基于聚类算法识别超出常规范围的数据)”“”
if audit.operation_type != “add”:
return None
# 提取数据特征(以订单金额为例)
amounts = [data.get("amount") for data in data_list if data.get("amount")]
if len(amounts) < 10: # 数据量不足,不进行检测
return None
# 数据预处理
X = pd.DataFrame(amounts, columns=["amount"]).values
X_scaled = StandardScaler().fit_transform(X)
# DBSCAN聚类检测异常值
dbscan = DBSCAN(eps=0.5, min_samples=5)
labels = dbscan.fit_predict(X_scaled)
abnormal_count = sum(1 for label in labels if label == -1)
abnormal_ratio = abnormal_count / len(amounts)
if abnormal_ratio > ALERT_THRESHOLDS["data_abnormal_ratio"]:
alert_content = f"用户{audit.username}新增数据中,异常数据占比{abnormal_ratio:.2f},超出阈值{ALERT_THRESHOLDS['data_abnormal_ratio']},疑似数据录入错误或恶意篡改"
alert = AbnormalAlert(
audit_id=audit.id,
user_id=audit.user_id,
alert_type="data_abnormal",
alert_level="medium",
alert_content=alert_content
)
alert.ai_analysis = ai_analyze_risk(alert)
db.session.add(alert)
db.session.commit()
send_alert_email(alert)
return alert
return None
====================== 入口函数:触发异常检测 ======================
def trigger_abnormal_detection(audit_id, data_list=None):
“”"
触发异常检测
:param audit_id: 关联审计ID
:param data_list: 新增/修改的数据列表(用于数据异常检测)
“”"
audit = OperationAudit.query.get(audit_id)
if not audit:
return
# 依次检测各类异常行为
detect_batch_operation(audit)
detect_abnormal_time(audit)
detect_privilege_attempt(audit)
if data_list:
detect_data_abnormal(audit, data_list)
====================== 接口:异常预警列表与处理 ======================
@ai_alert_bp.route(“/alert/list”, methods=[“GET”])
@login_required
def get_alert_list():
“”“获取异常预警列表(支持筛选与处理状态更新)”“”
page = int(request.args.get(“page”, 1))
page_size = int(request.args.get(“page_size”, 20))
alert_level = request.args.get(“alert_level”)
handle_status = request.args.get(“handle_status”)
username = request.args.get(“username”)
query = AbnormalAlert.query.join(
OperationAudit, AbnormalAlert.audit_id == OperationAudit.id
).add_columns(
OperationAudit.username, OperationAudit.user_role,
OperationAudit.ip_address, OperationAudit.operation_time
).order_by(AbnormalAlert.create_time.desc())
if alert_level:
query = query.filter(AbnormalAlert.alert_level == alert_level)
if handle_status:
query = query.filter(AbnormalAlert.handle_status == handle_status)
if username:
query = query.filter(OperationAudit.username.like(f"%{username}%"))
pagination = query.paginate(page=page, per_page=page_size)
alerts = pagination.items
alert_list = []
for alert, username, user_role, ip_address, operation_time in alerts:
alert_list.append({
"alert_id": alert.id,
"username": username,
"user_role": user_role,
"alert_type": {
"batch_operation": "批量操作", "abnormal_time": "异常时段操作",
"privilege_attempt": "越权尝试", "data_abnormal": "数据异常录入"
}[alert.alert_type],
"alert_level": {"high": "高风险", "medium": "中风险", "low": "低风险"}[alert.alert_level],
"alert_content": alert.alert_content,
"ai_analysis": alert.ai_analysis,
"handle_status": {"pending": "待处理", "processed": "已处理", "ignored": "已忽略"}[alert.handle_status],
"handle_time": alert.handle_time.strftime("%Y-%m-%d %H:%M:%S") if alert.handle_time else "",
"handler": alert.handler.username if (alert.handler and alert.handler.username) else "",
"handle_remark": alert.handle_remark,
"ip_address": ip_address,
"operation_time": operation_time.strftime("%Y-%m-%d %H:%M:%S"),
"create_time": alert.create_time.strftime("%Y-%m-%d %H:%M:%S")
})
return jsonify({
"success": True,
"data": alert_list,
"total": pagination.total,
"page": page,
"page_size": page_size
})
@ai_alert_bp.route(“/alert/handle/int:alert_id”, methods=[“POST”])
@login_required
def handle_alert(alert_id):
“”“处理异常预警(更新状态与备注)”“”
alert = AbnormalAlert.query.get(alert_id)
if not alert:
return jsonify({“success”: False, “error”: “预警记录不存在”})
data = request.get_json()
alert.handle_status = data.get("handle_status")
alert.handle_remark = data.get("handle_remark")
alert.handle_time = datetime.datetime.now()
alert.handler_id = current_user.id
db.session.commit()
return jsonify({"success": True, "msg": "预警处理成功"})
在数据新增/修改/删除接口中添加异常检测触发
@app.route(“/data/batch/delete”, methods=[“POST”])
@login_required
@audit_log(“delete”)
def batch_delete_data():
“”“批量删除数据接口,触发异常检测”“”
data = request.get_json()
target_ids = data.get(“target_ids”, [])
if not target_ids:
return jsonify({“success”: False, “error”: “请选择要删除的数据”})
# 业务逻辑省略(批量删除数据)
# 触发异常检测
trigger_abnormal_detection(g.audit_id)
return jsonify({"success": True, "msg": f"成功删除{len(target_ids)}条数据"})
@app.route(“/data/add/batch”, methods=[“POST”])
@login_required
@audit_log(“add”)
def batch_add_data():
“”“批量新增数据接口,触发数据异常检测”“”
data = request.get_json()
data_list = data.get(“data_list”, [])
if not data_list:
return jsonify({“success”: False, “error”: “请传入数据列表”})
# 业务逻辑省略(批量新增数据)
# 触发异常检测(传入数据列表用于数据异常分析)
trigger_abnormal_detection(g.audit_id, data_list)
return jsonify({"success": True, "msg": f"成功新增{len(data_list)}条数据"})
- 第三步:集成风险拦截与审批流程,阻止高危操作
-- coding: utf-8 --
risk_intercept.py 风险拦截与审批流程模块
from flask import Blueprint, request, jsonify, g
from flask_login import login_required, current_user
from functools import wraps
import datetime
import json
from models import db, RiskIntercept, OperationAudit, AbnormalAlert, ApprovalFlow, User
from dotenv import load_dotenv
import os
加载环境变量
load_dotenv()
risk_bp = Blueprint(“risk”, name)
高危操作清单与对应拦截规则
HIGH_RISK_OPERATIONS = {
“batch_delete”: {
“rule”: “单次删除数据量>=50条,需部门负责人审批”,
“approval_role”: “dept_leader” # 审批角色(部门负责人)
},
“cross_user_update”: {
“rule”: “修改非本人创建的数据,需二次确认”,
“approval_role”: None # 无需审批,仅二次确认
},
“full_data_export”: {
“rule”: “导出全量客户/订单数据,需管理员审批”,
“approval_role”: “leader” # 审批角色(管理员)
},
“perm_change_admin”: {
“rule”: “变更管理员权限,需超级管理员审批”,
“approval_role”: “super_leader” # 审批角色(超级管理员)
}
}
====================== 核心装饰器:风险操作拦截 ======================
def risk_intercept(operation_type):
“”"
风险拦截装饰器:拦截高危操作,触发审批或二次确认
:param operation_type: 高危操作类型(对应HIGH_RISK_OPERATIONS)
“”"
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 检查操作类型是否为高危操作
if operation_type not in HIGH_RISK_OPERATIONS:
return f(*args, **kwargs)
# 获取拦截规则
rule_info = HIGH_RISK_OPERATIONS[operation_type]
intercept_reason = f"触发高危操作拦截规则:{rule_info['rule']}"
# 1. 检查是否已通过审批(针对需要审批的操作)
if rule_info["approval_role"]:
approval_status = check_approval_status(operation_type, kwargs)
if approval_status == "pending":
# 待审批,返回审批流程信息
return jsonify({
"success": False,
"code": "need_approval",
"msg": intercept_reason,
"approval_info": get_approval_info(operation_type, kwargs)
})
elif approval_status == "rejected":
# 审批驳回,拦截操作
update_audit_result(g.audit_id, "intercepted")
return jsonify({"success": False, "code": "approval_rejected", "msg": "操作已被审批驳回,无法执行"})
# 2. 无需审批的操作,进行二次确认校验
if not rule_info["approval_role"]:
confirm_flag = request.args.get("confirm") or request.get_json().get("confirm")
if not confirm_flag or confirm_flag != "true":
# 未二次确认,返回确认提示
return jsonify({
"success": False,
"code": "need_confirm",
"msg": f"确认执行该高危操作?{intercept_reason}",
"operation_type": operation_type
})
# 3. 检查是否存在高风险预警,若存在则拦截
if has_high_risk_alert(g.audit_id):
update_audit_result(g.audit_id, "intercepted")
return jsonify({"success": False, "code": "high_risk_alert", "msg": "存在高风险预警,操作已被拦截"})
# 4. 允许执行操作
return f(*args, **kwargs)
return decorated_function
return decorator
====================== 辅助函数:更新审计结果为“被拦截” ======================
def update_audit_result(audit_id, result):
“”“更新审计记录的操作结果”“”
audit = OperationAudit.query.get(audit_id)
if audit:
audit.operation_result = result
db.session.commit()
====================== 辅助函数:检查审批状态 ======================
def check_approval_status(operation_type, kwargs):
“”“检查高危操作的审批状态”“”
# 构建审批查询条件(根据操作类型提取关键信息)
query = ApprovalFlow.query.join(
RiskIntercept, ApprovalFlow.intercept_id == RiskIntercept.id
).filter(
RiskIntercept.operation_type == operation_type
)
if operation_type == "batch_delete":
query = query.filter(RiskIntercept.intercept_content.like(f"%{kwargs.get('target_ids')}%"))
elif operation_type == "cross_user_update":
query = query.filter(RiskIntercept.intercept_content.like(f"%{kwargs.get('user_id')}%"))
elif operation_type == "full_data_export":
query = query.filter(RiskIntercept.intercept_content.like(f"%{kwargs.get('export_type')}%"))
elif operation_type == "perm_change_admin":
query = query.filter(RiskIntercept.intercept_content.like(f"%{kwargs.get('user_id')}%"))
# 获取最新的审批记录
latest_approval = query.order_by(ApprovalFlow.approve_time.desc()).first()
if not latest_approval:
return "pending" # 无审批记录,待审批
return latest_approval.flow_status
====================== 辅助函数:获取审批信息(创建审批流程) ======================
def get_approval_info(operation_type, kwargs):
“”“获取审批信息,若不存在则创建审批流程”“”
# 查找审批人(根据审批角色)
rule_info = HIGH_RISK_OPERATIONS[operation_type]
if rule_info[“approval_role”] == “dept_leader”:
# 部门负责人审批(当前用户所属部门的负责人)
dept_leader = User.query.join(
UserDepartment, User.id == UserDepartment.user_id
).filter(
UserDepartment.dept_id == current_user.departments[0].id,
UserDepartment.is_leader == 1
).first()
approve_user_id = dept_leader.id if dept_leader else None
elif rule_info[“approval_role”] == “leader”:
# 管理员审批
approve_user_id = User.query.filter_by(role_id=1).first().id # 假设role_id=1为管理员
elif rule_info[“approval_role”] == “super_leader”:
# 超级管理员审批
approve_user_id = User.query.filter_by(role_id=0).first().id # 假设role_id=0为超级管理员
else:
approve_user_id = None
if not approve_user_id:
return {"error": "无对应审批人,请联系系统管理员"}
# 创建风险拦截记录
intercept_content = json.dumps(kwargs, ensure_ascii=False)
intercept = RiskIntercept(
audit_id=g.audit_id,
operation_type=operation_type,
intercept_reason=HIGH_RISK_OPERATIONS[operation_type]["rule"],
intercept_rule=operation_type,
approval_status="pending"
)
db.session.add(intercept)
db.session.commit()
# 创建审批流程
approval = ApprovalFlow(
intercept_id=intercept.id,
apply_user_id=current_user.id,
approve_user_id=approve_user_id,
flow_status="pending",
apply_remark=request.get_json().get("apply_remark", "")
)
db.session.add(approval)
db.session.commit()
# 返回审批信息
return {
"approval_id": approval.id,
"approve_user": User.query.get(approve_user_id).username,
"approve_role": rule_info["approval_role"],
"intercept_id": intercept.id,
"apply_time": approval.apply_time.strftime("%Y-%m-%d %H:%M:%S")
}
====================== 辅助函数:检查是否存在高风险预警 ======================
def has_high_risk_alert(audit_id):
“”“检查该审计记录是否关联高风险预警”“”
alert = AbnormalAlert.query.filter(
AbnormalAlert.audit_id == audit_id,
AbnormalAlert.alert_level == “high”,
AbnormalAlert.handle_status == “pending”
).first()
return alert is not None
====================== 接口:审批流程操作(申请、审批、撤销) ======================
@risk_bp.route(“/approval/apply”, methods=[“POST”])
@login_required
def apply_approval():
“”“发起高危操作审批”“”
data = request.get_json()
operation_type = data.get(“operation_type”)
更多推荐



所有评论(0)