大家好!我是CSDN的Python新手博主~ 上一篇我们完成了看板的可视化升级与权限管控,解决了多角色数据查看与汇报需求,但企业用户(尤其甲方客户)反馈两大核心痛点:① 缺乏异常行为识别能力,对批量修改数据、越权操作尝试、异常数据录入等风险无法主动预警,易造成数据泄露或错误;② 操作审计不完善,仅记录核心功能操作,普通查询、数据导出、权限变更等行为无追溯,不符合合规审计要求;③ 风险操作无拦截机制,仅靠权限控制无法规避恶意操作或误操作。今天就带来超落地的新手实战项目——办公看板集成AI异常预警+全流程操作审计+风险拦截!

本次基于之前的“权限管控看板”代码,新增3大核心功能:① AI异常预警(基于操作行为特征、数据规律,识别批量操作、异常时段、越权尝试等风险,支持实时预警与历史分析);② 全流程操作审计(记录所有用户行为,含查询、新增、修改、删除、导出、权限变更,支持审计报表生成与追溯);③ 实时风险拦截(针对高危操作设置拦截规则,结合AI预警结果,阻止恶意或误操作并触发审批流程)。全程基于现有技术栈(Flask+MySQL+OpenAI+Scikit-learn),新增AI预警模块、审计日志引擎、风险拦截中间件,代码注释详细,新手只需配置预警规则与拦截阈值,跟着步骤复制操作就能成功,让看板兼顾业务效率与安全合规~

一、本次学习目标

  1. 掌握AI异常行为识别逻辑,基于操作特征(频率、时段、范围)与数据规律,搭建多维度预警模型,实现实时预警与风险分级;

  2. 学会全流程操作审计设计,记录所有用户行为的关键信息(操作人、时间、IP、内容、结果),实现审计日志分类、检索与报表导出;

  3. 理解风险拦截机制,通过中间件拦截高危操作,结合AI预警结果触发审批流程,平衡操作灵活性与安全管控;

  4. 实现预警、审计、拦截功能联动,异常行为实时预警+全程审计留痕+高危操作拦截,形成安全闭环;

  5. 适配企业合规要求,审计日志不可篡改、预警记录可追溯、拦截操作有审批留痕,满足甲方客户审计需求。

二、前期准备

  1. 安装核心依赖库

安装核心依赖(AI特征工程、异常检测、日志解析)

pip3 install scikit-learn pandas numpy python-dotenv python-multipart -i https://pypi.tuna.tsinghua.edu.cn/simple

确保已有依赖正常(Flask、OpenAI、APScheduler等)

pip3 install --upgrade flask flask-login flask-sqlalchemy apscheduler openai requests pymysql -i https://pypi.tuna.tsinghua.edu.cn/simple

说明:AI异常检测基于Scikit-learn实现简单特征工程(如操作频率、时段特征),结合OpenAI做智能风险分析;审计日志通过Flask中间件全局捕获请求;风险拦截基于自定义装饰器与规则引擎实现。

  1. 第三方服务与配置准备
  • 预警规则配置:定义异常行为类型(批量操作:单次修改>50条数据;异常时段:凌晨0-6点操作;越权尝试:10分钟内3次以上权限校验失败;数据异常:订单金额超出常规范围),设置预警阈值与分级(高风险:立即拦截,中风险:弹窗提醒,低风险:日志记录);

  • 审计范围配置:确定需审计的操作类型(查询、新增、修改、删除、导出、权限变更、登录/登出),定义每条审计记录需包含的字段(操作人、角色、IP、操作时间、操作内容、操作结果、客户端信息);

  • 风险拦截配置:梳理高危操作清单(批量删除数据、修改他人数据、导出全量客户信息、变更管理员权限),设置拦截规则(高风险操作需上级审批,中风险操作二次确认);

  • 安全配置:在.env文件中补充预警通知邮箱、审批流程配置、异常检测阈值,确保AI预警结果可通过邮件推送;开启服务器IP日志记录,用于审计溯源。

  1. 数据库表优化与创建

– 连接MySQL数据库(替换为你的数据库信息)
mysql -u office_user -p -h 47.108.xxx.xxx office_data

– 创建操作审计表(operation_audit)
CREATE TABLE operation_audit (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL COMMENT ‘用户ID’,
username VARCHAR(50) NOT NULL COMMENT ‘用户名’,
user_role VARCHAR(50) NOT NULL COMMENT ‘用户角色’,
operation_type ENUM(‘query’, ‘add’, ‘update’, ‘delete’, ‘export’, ‘perm_change’, ‘login’, ‘logout’) NOT NULL COMMENT ‘操作类型’,
operation_content TEXT NOT NULL COMMENT ‘操作内容(JSON格式)’,
operation_result ENUM(‘success’, ‘fail’, ‘intercepted’) NOT NULL COMMENT ‘操作结果(成功/失败/被拦截)’,
ip_address VARCHAR(50) NOT NULL COMMENT ‘操作IP’,
user_agent TEXT NULL COMMENT ‘客户端信息(浏览器/设备)’,
operation_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘操作时间’,
audit_remark TEXT NULL COMMENT ‘审计备注’,
KEY idx_user_id (user_id),
KEY idx_operation_type (operation_type),
KEY idx_operation_time (operation_time),
KEY idx_operation_result (operation_result)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘全流程操作审计表’;

– 创建异常预警表(abnormal_alert)
CREATE TABLE abnormal_alert (
id INT AUTO_INCREMENT PRIMARY KEY,
audit_id BIGINT NULL COMMENT ‘关联审计ID(可为空,独立预警场景)’,
user_id INT NOT NULL COMMENT ‘触发用户ID’,
alert_type ENUM(‘batch_operation’, ‘abnormal_time’, ‘privilege_attempt’, ‘data_abnormal’) NOT NULL COMMENT ‘预警类型’,
alert_level ENUM(‘high’, ‘medium’, ‘low’) NOT NULL COMMENT ‘预警级别’,
alert_content TEXT NOT NULL COMMENT ‘预警内容’,
ai_analysis TEXT NULL COMMENT ‘AI风险分析’,
handle_status ENUM(‘pending’, ‘processed’, ‘ignored’) DEFAULT ‘pending’ COMMENT ‘处理状态’,
handle_time DATETIME NULL COMMENT ‘处理时间’,
handler_id INT NULL COMMENT ‘处理人ID’,
handle_remark TEXT NULL COMMENT ‘处理备注’,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间’,
KEY idx_user_id (user_id),
KEY idx_alert_level (alert_level),
KEY idx_handle_status (handle_status),
KEY idx_create_time (create_time),
FOREIGN KEY (audit_id) REFERENCES operation_audit(id) ON DELETE SET NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘AI异常预警表’;

– 创建风险拦截表(risk_intercept)
CREATE TABLE risk_intercept (
id INT AUTO_INCREMENT PRIMARY KEY,
audit_id BIGINT NOT NULL COMMENT ‘关联审计ID’,
alert_id INT NULL COMMENT ‘关联预警ID’,
operation_type ENUM(‘batch_delete’, ‘cross_user_update’, ‘full_data_export’, ‘perm_change_admin’) NOT NULL COMMENT ‘高危操作类型’,
intercept_reason TEXT NOT NULL COMMENT ‘拦截原因’,
intercept_rule VARCHAR(100) NOT NULL COMMENT ‘触发的拦截规则’,
approval_status ENUM(‘pending’, ‘approved’, ‘rejected’) DEFAULT ‘pending’ COMMENT ‘审批状态’,
approval_user_id INT NULL COMMENT ‘审批人ID’,
approval_time DATETIME NULL COMMENT ‘审批时间’,
approval_remark TEXT NULL COMMENT ‘审批备注’,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间’,
KEY idx_audit_id (audit_id),
KEY idx_approval_status (approval_status),
FOREIGN KEY (audit_id) REFERENCES operation_audit(id) ON DELETE CASCADE,
FOREIGN KEY (alert_id) REFERENCES abnormal_alert(id) ON DELETE SET NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘风险操作拦截表’;

– 创建审批流程表(approval_flow)
CREATE TABLE approval_flow (
id INT AUTO_INCREMENT PRIMARY KEY,
intercept_id INT NOT NULL COMMENT ‘关联拦截ID’,
apply_user_id INT NOT NULL COMMENT ‘申请人ID’,
approve_user_id INT NOT NULL COMMENT ‘审批人ID’,
flow_status ENUM(‘pending’, ‘approved’, ‘rejected’, ‘cancelled’) DEFAULT ‘pending’ COMMENT ‘流程状态’,
apply_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘申请时间’,
approve_time DATETIME NULL COMMENT ‘审批时间’,
apply_remark TEXT NULL COMMENT ‘申请备注’,
approve_remark TEXT NULL COMMENT ‘审批备注’,
KEY idx_intercept_id (intercept_id),
KEY idx_flow_status (flow_status),
FOREIGN KEY (intercept_id) REFERENCES risk_intercept(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘高危操作审批流程表’;

– 初始化预警规则数据(可后续通过接口动态配置)
INSERT INTO abnormal_alert (user_id, alert_type, alert_level, alert_content, handle_status)
VALUES (0, ‘batch_operation’, ‘high’, ‘单次修改数据量>50条,触发高风险预警’, ‘pending’);
INSERT INTO abnormal_alert (user_id, alert_type, alert_level, alert_content, handle_status)
VALUES (0, ‘abnormal_time’, ‘medium’, ‘凌晨0-6点进行数据操作,触发中风险预警’, ‘pending’);

三、实战:AI预警+全流程审计+风险拦截集成

  1. 第一步:搭建全流程操作审计引擎,实现行为留痕

-- coding: utf-8 --

audit_engine.py 全流程操作审计引擎

from flask import Blueprint, request, jsonify, g
from flask_login import login_required, current_user
from functools import wraps
import json
import datetime
from models import db, OperationAudit, Department, User
from user_agents import parse # 解析客户端信息
from dotenv import load_dotenv
import os

加载环境变量

load_dotenv()
audit_bp = Blueprint(“audit”, name)

====================== 核心装饰器:记录操作审计日志 ======================

def audit_log(operation_type):
“”"
操作审计装饰器:记录函数执行对应的操作日志
:param operation_type: 操作类型(query/add/update/delete/export/perm_change/login/logout)
“”"
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 初始化审计数据
audit_data = {
“user_id”: current_user.id if current_user.is_authenticated else 0,
“username”: current_user.username if current_user.is_authenticated else “anonymous”,
“user_role”: current_user.role.role_name if (current_user.is_authenticated and current_user.role) else “unknown”,
“operation_type”: operation_type,
“operation_content”: {},
“operation_result”: “fail”,
“ip_address”: request.remote_addr,
“user_agent”: str(parse(request.user_agent.string)),
“operation_time”: datetime.datetime.now()
}

        try:
            # 执行原函数,获取结果
            result = f(*args, **kwargs)
            # 更新操作结果为成功
            audit_data["operation_result"] = "success"
            # 补充操作内容(根据操作类型提取关键信息)
            if operation_type == "query":
                audit_data["operation_content"] = {
                    "query_params": request.args.to_dict(),
                    "query_path": request.path
                }
            elif operation_type in ["add", "update", "delete"]:
                audit_data["operation_content"] = {
                    "request_data": request.get_json() or request.form.to_dict(),
                    "target_id": kwargs.get("id") or ""
                }
            elif operation_type == "export":
                audit_data["operation_content"] = {
                    "export_type": request.args.get("export_type", "custom"),
                    "filter_params": request.args.to_dict()
                }
            elif operation_type == "perm_change":
                audit_data["operation_content"] = {
                    "target_user_id": kwargs.get("user_id"),
                    "new_role": kwargs.get("new_role"),
                    "old_role": kwargs.get("old_role")
                }
            elif operation_type in ["login", "logout"]:
                audit_data["operation_content"] = {
                    "status": "success" if operation_type == "login" else "normal_logout"
                }
            return result
        except Exception as e:
            # 操作失败,记录错误信息
            audit_data["operation_content"]["error_msg"] = str(e)
            raise e
        finally:
            # 保存审计日志(无论成功/失败都记录)
            if audit_data["user_id"] != 0 or operation_type == "login":  # 不记录匿名非登录操作
                audit = OperationAudit(
                    user_id=audit_data["user_id"],
                    username=audit_data["username"],
                    user_role=audit_data["user_role"],
                    operation_type=audit_data["operation_type"],
                    operation_content=json.dumps(audit_data["operation_content"], ensure_ascii=False),
                    operation_result=audit_data["operation_result"],
                    ip_address=audit_data["ip_address"],
                    user_agent=audit_data["user_agent"]
                )
                db.session.add(audit)
                db.session.commit()
                # 将审计ID存入g对象,供后续预警、拦截使用
                g.audit_id = audit.id
    return decorated_function
return decorator

====================== 全局中间件:捕获未被装饰器覆盖的操作 ======================

@audit_bp.before_app_request
def before_request_audit():
“”“全局请求拦截,记录未被audit_log装饰器覆盖的操作(主要是查询类)”“”
# 排除静态文件、登录接口(登录接口单独用audit_log装饰)
exclude_paths = [“/static”, “/auth/login”, “/auth/logout”, “/api/health”]
if any(request.path.startswith(path) for path in exclude_paths):
return

# 仅记录已登录用户的操作,且未被audit_log装饰的GET请求(查询类)
if current_user.is_authenticated and request.method == "GET" and not hasattr(g, "audit_recorded"):
    audit = OperationAudit(
        user_id=current_user.id,
        username=current_user.username,
        user_role=current_user.role.role_name,
        operation_type="query",
        operation_content=json.dumps({
            "query_params": request.args.to_dict(),
            "query_path": request.path
        }, ensure_ascii=False),
        operation_result="success",
        ip_address=request.remote_addr,
        user_agent=str(parse(request.user_agent.string))
    )
    db.session.add(audit)
    db.session.commit()
    g.audit_id = audit.id
    g.audit_recorded = True

====================== 接口:审计日志查询与筛选 ======================

@audit_bp.route(“/audit/log/list”, methods=[“GET”])
@login_required
@audit_log(“query”)
def get_audit_log():
“”“获取审计日志列表(支持多条件筛选)”“”
page = int(request.args.get(“page”, 1))
page_size = int(request.args.get(“page_size”, 20))
username = request.args.get(“username”)
operation_type = request.args.get(“operation_type”)
operation_result = request.args.get(“operation_result”)
start_time = request.args.get(“start_time”)
end_time = request.args.get(“end_time”)

# 构建查询条件
query = OperationAudit.query.order_by(OperationAudit.operation_time.desc())
if username:
    query = query.filter(OperationAudit.username.like(f"%{username}%"))
if operation_type:
    query = query.filter(OperationAudit.operation_type == operation_type)
if operation_result:
    query = query.filter(OperationAudit.operation_result == operation_result)
if start_time:
    query = query.filter(OperationAudit.operation_time >= datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S"))
if end_time:
    query = query.filter(OperationAudit.operation_time <= datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S"))

# 分页查询
pagination = query.paginate(page=page, per_page=page_size)
logs = pagination.items

# 格式化返回结果
log_list = []
for log in logs:
    content = json.loads(log.operation_content) if log.operation_content else {}
    log_list.append({
        "audit_id": log.id,
        "username": log.username,
        "user_role": log.user_role,
        "operation_type": {
            "query": "查询", "add": "新增", "update": "修改", "delete": "删除",
            "export": "导出", "perm_change": "权限变更", "login": "登录", "logout": "登出"
        }[log.operation_type],
        "operation_content": content,
        "operation_result": {"success": "成功", "fail": "失败", "intercepted": "被拦截"}[log.operation_result],
        "ip_address": log.ip_address,
        "user_agent": log.user_agent,
        "operation_time": log.operation_time.strftime("%Y-%m-%d %H:%M:%S"),
        "audit_remark": log.audit_remark
    })

return jsonify({
    "success": True,
    "data": log_list,
    "total": pagination.total,
    "page": page,
    "page_size": page_size
})

====================== 接口:审计日志导出(Excel) ======================

@audit_bp.route(“/audit/log/export”, methods=[“GET”])
@login_required
@audit_log(“export”)
def export_audit_log():
“”“导出审计日志为Excel(复用之前的报表导出功能)”“”
from report_export import export_excel_by_template # 复用报表导出模块
from models import ReportConfig

# 查找审计日志导出报表配置(需提前在report_config表中创建)
report_config = ReportConfig.query.filter_by(report_name="操作审计日志报表").first()
if not report_config:
    return jsonify({"success": False, "error": "未找到审计日志报表配置"})

# 筛选参数
filter_params = {
    "username": request.args.get("username"),
    "operation_type": request.args.get("operation_type"),
    "start_time": request.args.get("start_time"),
    "end_time": request.args.get("end_time")
}

try:
    # 调用报表导出功能
    temp_path = export_excel_by_template(report_config, filter_params)
    # 发送文件给前端下载
    return send_file(
        temp_path,
        as_attachment=True,
        download_name=f"操作审计日志_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.xlsx",
        mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
    )
except Exception as e:
    return jsonify({"success": False, "error": f"导出失败:{str(e)}"})

在app.py中新增以下内容

from audit_engine import audit_bp, audit_log
from flask import send_file

注册审计蓝图

app.register_blueprint(audit_bp)

示例:使用审计装饰器记录操作

@app.route(“/data/add”, methods=[“POST”])
@login_required
@audit_log(“add”) # 记录新增操作
def add_data():
“”“新增数据接口,自动记录审计日志”“”
data = request.get_json()
# 业务逻辑省略…
return jsonify({“success”: True, “msg”: “新增成功”})

@app.route(“/user/role/change/int:user_id”, methods=[“POST”])
@login_required
@audit_log(“perm_change”) # 记录权限变更操作
def change_user_role(user_id):
“”“变更用户角色接口,自动记录审计日志”“”
data = request.get_json()
new_role = data.get(“new_role”)
# 业务逻辑省略(查询旧角色、更新角色)
old_role = “sales” # 示例值
# 传递参数给审计装饰器
g.perm_change_info = {“user_id”: user_id, “new_role”: new_role, “old_role”: old_role}
return jsonify({“success”: True, “msg”: “角色变更成功”})

  1. 第二步:实现AI异常预警模型,主动识别风险

-- coding: utf-8 --

ai_alert.py AI异常预警模块

from flask import g
from flask_login import current_user
import datetime
import json
import pandas as pd
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from models import db, AbnormalAlert, OperationAudit
from dotenv import load_dotenv
import os
import smtplib
from email.mime.text import MIMEText
from openai import OpenAI

加载环境变量

load_dotenv()
OPENAI_API_KEY = os.getenv(“OPENAI_API_KEY”)
client = OpenAI(api_key=OPENAI_API_KEY)

预警配置

ALERT_THRESHOLDS = {
“batch_operation”: 50, # 批量操作阈值(单次修改/删除数据量)
“abnormal_time_start”: 0, # 异常时段开始时间(小时)
“abnormal_time_end”: 6, # 异常时段结束时间(小时)
“privilege_attempt_count”: 3, # 越权尝试次数阈值
“privilege_attempt_interval”: 10, # 越权尝试时间间隔(分钟)
“data_abnormal_ratio”: 0.5 # 数据异常比例阈值(超出常规范围的数据占比)
}

预警通知邮箱

ALERT_EMAILS = os.getenv(“ALERT_EMAILS”).split(“,”)

邮件配置(复用报表导出的SMTP配置)

SMTP_SERVER = os.getenv(“SMTP_SERVER”)
SMTP_PORT = int(os.getenv(“SMTP_PORT”, 465))
SMTP_USER = os.getenv(“SMTP_USER”)
SMTP_PASS = os.getenv(“SMTP_PASS”)

====================== 核心功能:发送预警邮件 ======================

def send_alert_email(alert):
“”“发送异常预警邮件”“”
if not ALERT_EMAILS:
return False

# 构建邮件内容
subject = f"【{alert.alert_level.upper()}风险预警】{alert.username}触发异常行为"
content = f"""
<h3>异常预警详情</h3>
<p>预警ID:{alert.id}</p>
<p>触发用户:{alert.username}(角色:{alert.user_role})</p>
<p>预警类型:{
    {"batch_operation": "批量操作", "abnormal_time": "异常时段操作", 
     "privilege_attempt": "越权尝试", "data_abnormal": "数据异常录入"}[alert.alert_type]
}</p>
<p>预警级别:{
    {"high": "高风险(需立即处理)", "medium": "中风险(建议核查)", "low": "低风险(可忽略)"}[alert.alert_level]
}</p>
<p>预警内容:{alert.alert_content}</p>
<p>AI风险分析:{alert.ai_analysis or "暂无分析"}</p>
<p>触发时间:{alert.create_time.strftime("%Y-%m-%d %H:%M:%S")}</p>
<p>操作IP:{alert.ip_address}</p>
"""

# 发送邮件
try:
    msg = MIMEText(content, "html", "utf-8")
    msg["Subject"] = subject
    msg["From"] = SMTP_USER
    msg["To"] = ",".join(ALERT_EMAILS)
    
    with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
        server.login(SMTP_USER, SMTP_PASS)
        server.sendmail(SMTP_USER, ALERT_EMAILS, msg.as_string())
    return True
except Exception as e:
    print(f"预警邮件发送失败:{str(e)}")
    return False

====================== 核心功能:AI风险分析 ======================

def ai_analyze_risk(alert):
“”“基于预警内容,用AI分析风险原因与处理建议”“”
prompt = f"“”
你是企业办公系统的安全风控助手,需分析以下异常行为预警,完成以下任务:
1. 分析异常行为的核心风险点(是否存在数据泄露、数据错误、恶意操作风险);
2. 给出具体的处理建议(步骤清晰,适配管理员操作,区分高/中/低风险处理优先级);
3. 标注是否需要进一步拦截相关操作,若需要,说明拦截规则优化方向。

预警详情:
- 触发用户:{alert.username}(角色:{alert.user_role})
- 预警类型:{alert.alert_type}
- 预警级别:{alert.alert_level}
- 预警内容:{alert.alert_content}
- 触发时间:{alert.create_time.strftime("%Y-%m-%d %H:%M:%S")}
- 操作IP:{alert.ip_address}

请按以下格式输出(简洁明了,重点突出):
【风险点】XXX
【处理建议】XXX
【拦截优化建议】XXX
"""

try:
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3  # 降低随机性,确保结果精准
    )
    return response.choices[0].message.content.strip()
except Exception as e:
    return f"AI分析失败:{str(e)},请手动核查"

====================== 核心功能:异常行为检测(批量操作) ======================

def detect_batch_operation(audit):
“”“检测批量操作异常(单次修改/删除数据量超出阈值)”“”
if audit.operation_type not in [“update”, “delete”]:
return None

content = json.loads(audit.operation_content)
# 提取操作数据量(新增/修改/删除的条数)
target_ids = content.get("target_ids") or [content.get("target_id")]
operation_count = len(target_ids) if isinstance(target_ids, list) else 1

if operation_count > ALERT_THRESHOLDS["batch_operation"]:
    # 触发批量操作预警
    alert_content = f"用户{audit.username}单次{audit.operation_type}数据{operation_count}条,超出阈值{ALERT_THRESHOLDS['batch_operation']}条,疑似批量操作风险"
    alert = AbnormalAlert(
        audit_id=audit.id,
        user_id=audit.user_id,
        alert_type="batch_operation",
        alert_level="high",
        alert_content=alert_content
    )
    # AI分析风险
    alert.ai_analysis = ai_analyze_risk(alert)
    db.session.add(alert)
    db.session.commit()
    # 高风险预警发送邮件
    send_alert_email(alert)
    return alert
return None

====================== 核心功能:异常行为检测(异常时段) ======================

def detect_abnormal_time(audit):
“”“检测异常时段操作(凌晨0-6点)”“”
operation_hour = audit.operation_time.hour
if (operation_hour >= ALERT_THRESHOLDS[“abnormal_time_start”] and
operation_hour < ALERT_THRESHOLDS[“abnormal_time_end”]):
# 排除登录/登出操作,仅关注数据操作
if audit.operation_type in [“add”, “update”, “delete”, “export”, “perm_change”]:
alert_content = f"用户{audit.username}在异常时段({operation_hour}:00-{operation_hour+1}:00)进行{audit.operation_type}操作,存在风险"
alert = AbnormalAlert(
audit_id=audit.id,
user_id=audit.user_id,
alert_type=“abnormal_time”,
alert_level=“medium”,
alert_content=alert_content
)
alert.ai_analysis = ai_analyze_risk(alert)
db.session.add(alert)
db.session.commit()
# 中风险预警发送邮件
send_alert_email(alert)
return alert
return None

====================== 核心功能:异常行为检测(越权尝试) ======================

def detect_privilege_attempt(audit):
“”“检测越权操作尝试(短时间内多次权限校验失败)”“”
if audit.operation_result != “fail”:
return None

# 查找该用户近N分钟内的权限失败记录
start_time = audit.operation_time - datetime.timedelta(minutes=ALERT_THRESHOLDS["privilege_attempt_interval"])
fail_records = OperationAudit.query.filter(
    OperationAudit.user_id == audit.user_id,
    OperationAudit.operation_result == "fail",
    OperationAudit.operation_time >= start_time,
    OperationAudit.operation_content.like("%权限%")
).count()

if fail_records >= ALERT_THRESHOLDS["privilege_attempt_count"]:
    alert_content = f"用户{audit.username}在{ALERT_THRESHOLDS['privilege_attempt_interval']}分钟内,越权操作尝试{fail_records}次,疑似恶意越权"
    alert = AbnormalAlert(
        audit_id=audit.id,
        user_id=audit.user_id,
        alert_type="privilege_attempt",
        alert_level="high",
        alert_content=alert_content
    )
    alert.ai_analysis = ai_analyze_risk(alert)
    db.session.add(alert)
    db.session.commit()
    send_alert_email(alert)
    return alert
return None

====================== 核心功能:异常行为检测(数据异常) ======================

def detect_data_abnormal(audit, data_list):
“”“检测数据异常录入(基于聚类算法识别超出常规范围的数据)”“”
if audit.operation_type != “add”:
return None

# 提取数据特征(以订单金额为例)
amounts = [data.get("amount") for data in data_list if data.get("amount")]
if len(amounts) < 10:  # 数据量不足,不进行检测
    return None

# 数据预处理
X = pd.DataFrame(amounts, columns=["amount"]).values
X_scaled = StandardScaler().fit_transform(X)

# DBSCAN聚类检测异常值
dbscan = DBSCAN(eps=0.5, min_samples=5)
labels = dbscan.fit_predict(X_scaled)
abnormal_count = sum(1 for label in labels if label == -1)
abnormal_ratio = abnormal_count / len(amounts)

if abnormal_ratio > ALERT_THRESHOLDS["data_abnormal_ratio"]:
    alert_content = f"用户{audit.username}新增数据中,异常数据占比{abnormal_ratio:.2f},超出阈值{ALERT_THRESHOLDS['data_abnormal_ratio']},疑似数据录入错误或恶意篡改"
    alert = AbnormalAlert(
        audit_id=audit.id,
        user_id=audit.user_id,
        alert_type="data_abnormal",
        alert_level="medium",
        alert_content=alert_content
    )
    alert.ai_analysis = ai_analyze_risk(alert)
    db.session.add(alert)
    db.session.commit()
    send_alert_email(alert)
    return alert
return None

====================== 入口函数:触发异常检测 ======================

def trigger_abnormal_detection(audit_id, data_list=None):
“”"
触发异常检测
:param audit_id: 关联审计ID
:param data_list: 新增/修改的数据列表(用于数据异常检测)
“”"
audit = OperationAudit.query.get(audit_id)
if not audit:
return

# 依次检测各类异常行为
detect_batch_operation(audit)
detect_abnormal_time(audit)
detect_privilege_attempt(audit)
if data_list:
    detect_data_abnormal(audit, data_list)

====================== 接口:异常预警列表与处理 ======================

@ai_alert_bp.route(“/alert/list”, methods=[“GET”])
@login_required
def get_alert_list():
“”“获取异常预警列表(支持筛选与处理状态更新)”“”
page = int(request.args.get(“page”, 1))
page_size = int(request.args.get(“page_size”, 20))
alert_level = request.args.get(“alert_level”)
handle_status = request.args.get(“handle_status”)
username = request.args.get(“username”)

query = AbnormalAlert.query.join(
    OperationAudit, AbnormalAlert.audit_id == OperationAudit.id
).add_columns(
    OperationAudit.username, OperationAudit.user_role, 
    OperationAudit.ip_address, OperationAudit.operation_time
).order_by(AbnormalAlert.create_time.desc())

if alert_level:
    query = query.filter(AbnormalAlert.alert_level == alert_level)
if handle_status:
    query = query.filter(AbnormalAlert.handle_status == handle_status)
if username:
    query = query.filter(OperationAudit.username.like(f"%{username}%"))

pagination = query.paginate(page=page, per_page=page_size)
alerts = pagination.items

alert_list = []
for alert, username, user_role, ip_address, operation_time in alerts:
    alert_list.append({
        "alert_id": alert.id,
        "username": username,
        "user_role": user_role,
        "alert_type": {
            "batch_operation": "批量操作", "abnormal_time": "异常时段操作",
            "privilege_attempt": "越权尝试", "data_abnormal": "数据异常录入"
        }[alert.alert_type],
        "alert_level": {"high": "高风险", "medium": "中风险", "low": "低风险"}[alert.alert_level],
        "alert_content": alert.alert_content,
        "ai_analysis": alert.ai_analysis,
        "handle_status": {"pending": "待处理", "processed": "已处理", "ignored": "已忽略"}[alert.handle_status],
        "handle_time": alert.handle_time.strftime("%Y-%m-%d %H:%M:%S") if alert.handle_time else "",
        "handler": alert.handler.username if (alert.handler and alert.handler.username) else "",
        "handle_remark": alert.handle_remark,
        "ip_address": ip_address,
        "operation_time": operation_time.strftime("%Y-%m-%d %H:%M:%S"),
        "create_time": alert.create_time.strftime("%Y-%m-%d %H:%M:%S")
    })

return jsonify({
    "success": True,
    "data": alert_list,
    "total": pagination.total,
    "page": page,
    "page_size": page_size
})

@ai_alert_bp.route(“/alert/handle/int:alert_id”, methods=[“POST”])
@login_required
def handle_alert(alert_id):
“”“处理异常预警(更新状态与备注)”“”
alert = AbnormalAlert.query.get(alert_id)
if not alert:
return jsonify({“success”: False, “error”: “预警记录不存在”})

data = request.get_json()
alert.handle_status = data.get("handle_status")
alert.handle_remark = data.get("handle_remark")
alert.handle_time = datetime.datetime.now()
alert.handler_id = current_user.id

db.session.commit()
return jsonify({"success": True, "msg": "预警处理成功"})

在数据新增/修改/删除接口中添加异常检测触发

@app.route(“/data/batch/delete”, methods=[“POST”])
@login_required
@audit_log(“delete”)
def batch_delete_data():
“”“批量删除数据接口,触发异常检测”“”
data = request.get_json()
target_ids = data.get(“target_ids”, [])
if not target_ids:
return jsonify({“success”: False, “error”: “请选择要删除的数据”})

# 业务逻辑省略(批量删除数据)
# 触发异常检测
trigger_abnormal_detection(g.audit_id)
return jsonify({"success": True, "msg": f"成功删除{len(target_ids)}条数据"})

@app.route(“/data/add/batch”, methods=[“POST”])
@login_required
@audit_log(“add”)
def batch_add_data():
“”“批量新增数据接口,触发数据异常检测”“”
data = request.get_json()
data_list = data.get(“data_list”, [])
if not data_list:
return jsonify({“success”: False, “error”: “请传入数据列表”})

# 业务逻辑省略(批量新增数据)
# 触发异常检测(传入数据列表用于数据异常分析)
trigger_abnormal_detection(g.audit_id, data_list)
return jsonify({"success": True, "msg": f"成功新增{len(data_list)}条数据"})
  1. 第三步:集成风险拦截与审批流程,阻止高危操作

-- coding: utf-8 --

risk_intercept.py 风险拦截与审批流程模块

from flask import Blueprint, request, jsonify, g
from flask_login import login_required, current_user
from functools import wraps
import datetime
import json
from models import db, RiskIntercept, OperationAudit, AbnormalAlert, ApprovalFlow, User
from dotenv import load_dotenv
import os

加载环境变量

load_dotenv()
risk_bp = Blueprint(“risk”, name)

高危操作清单与对应拦截规则

HIGH_RISK_OPERATIONS = {
“batch_delete”: {
“rule”: “单次删除数据量>=50条,需部门负责人审批”,
“approval_role”: “dept_leader” # 审批角色(部门负责人)
},
“cross_user_update”: {
“rule”: “修改非本人创建的数据,需二次确认”,
“approval_role”: None # 无需审批,仅二次确认
},
“full_data_export”: {
“rule”: “导出全量客户/订单数据,需管理员审批”,
“approval_role”: “leader” # 审批角色(管理员)
},
“perm_change_admin”: {
“rule”: “变更管理员权限,需超级管理员审批”,
“approval_role”: “super_leader” # 审批角色(超级管理员)
}
}

====================== 核心装饰器:风险操作拦截 ======================

def risk_intercept(operation_type):
“”"
风险拦截装饰器:拦截高危操作,触发审批或二次确认
:param operation_type: 高危操作类型(对应HIGH_RISK_OPERATIONS)
“”"
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 检查操作类型是否为高危操作
if operation_type not in HIGH_RISK_OPERATIONS:
return f(*args, **kwargs)

        # 获取拦截规则
        rule_info = HIGH_RISK_OPERATIONS[operation_type]
        intercept_reason = f"触发高危操作拦截规则:{rule_info['rule']}"
        
        # 1. 检查是否已通过审批(针对需要审批的操作)
        if rule_info["approval_role"]:
            approval_status = check_approval_status(operation_type, kwargs)
            if approval_status == "pending":
                # 待审批,返回审批流程信息
                return jsonify({
                    "success": False,
                    "code": "need_approval",
                    "msg": intercept_reason,
                    "approval_info": get_approval_info(operation_type, kwargs)
                })
            elif approval_status == "rejected":
                # 审批驳回,拦截操作
                update_audit_result(g.audit_id, "intercepted")
                return jsonify({"success": False, "code": "approval_rejected", "msg": "操作已被审批驳回,无法执行"})
        
        # 2. 无需审批的操作,进行二次确认校验
        if not rule_info["approval_role"]:
            confirm_flag = request.args.get("confirm") or request.get_json().get("confirm")
            if not confirm_flag or confirm_flag != "true":
                # 未二次确认,返回确认提示
                return jsonify({
                    "success": False,
                    "code": "need_confirm",
                    "msg": f"确认执行该高危操作?{intercept_reason}",
                    "operation_type": operation_type
                })
        
        # 3. 检查是否存在高风险预警,若存在则拦截
        if has_high_risk_alert(g.audit_id):
            update_audit_result(g.audit_id, "intercepted")
            return jsonify({"success": False, "code": "high_risk_alert", "msg": "存在高风险预警,操作已被拦截"})
        
        # 4. 允许执行操作
        return f(*args, **kwargs)
    return decorated_function
return decorator

====================== 辅助函数:更新审计结果为“被拦截” ======================

def update_audit_result(audit_id, result):
“”“更新审计记录的操作结果”“”
audit = OperationAudit.query.get(audit_id)
if audit:
audit.operation_result = result
db.session.commit()

====================== 辅助函数:检查审批状态 ======================

def check_approval_status(operation_type, kwargs):
“”“检查高危操作的审批状态”“”
# 构建审批查询条件(根据操作类型提取关键信息)
query = ApprovalFlow.query.join(
RiskIntercept, ApprovalFlow.intercept_id == RiskIntercept.id
).filter(
RiskIntercept.operation_type == operation_type
)

if operation_type == "batch_delete":
    query = query.filter(RiskIntercept.intercept_content.like(f"%{kwargs.get('target_ids')}%"))
elif operation_type == "cross_user_update":
    query = query.filter(RiskIntercept.intercept_content.like(f"%{kwargs.get('user_id')}%"))
elif operation_type == "full_data_export":
    query = query.filter(RiskIntercept.intercept_content.like(f"%{kwargs.get('export_type')}%"))
elif operation_type == "perm_change_admin":
    query = query.filter(RiskIntercept.intercept_content.like(f"%{kwargs.get('user_id')}%"))

# 获取最新的审批记录
latest_approval = query.order_by(ApprovalFlow.approve_time.desc()).first()
if not latest_approval:
    return "pending"  # 无审批记录,待审批
return latest_approval.flow_status

====================== 辅助函数:获取审批信息(创建审批流程) ======================

def get_approval_info(operation_type, kwargs):
“”“获取审批信息,若不存在则创建审批流程”“”
# 查找审批人(根据审批角色)
rule_info = HIGH_RISK_OPERATIONS[operation_type]
if rule_info[“approval_role”] == “dept_leader”:
# 部门负责人审批(当前用户所属部门的负责人)
dept_leader = User.query.join(
UserDepartment, User.id == UserDepartment.user_id
).filter(
UserDepartment.dept_id == current_user.departments[0].id,
UserDepartment.is_leader == 1
).first()
approve_user_id = dept_leader.id if dept_leader else None
elif rule_info[“approval_role”] == “leader”:
# 管理员审批
approve_user_id = User.query.filter_by(role_id=1).first().id # 假设role_id=1为管理员
elif rule_info[“approval_role”] == “super_leader”:
# 超级管理员审批
approve_user_id = User.query.filter_by(role_id=0).first().id # 假设role_id=0为超级管理员
else:
approve_user_id = None

if not approve_user_id:
    return {"error": "无对应审批人,请联系系统管理员"}

# 创建风险拦截记录
intercept_content = json.dumps(kwargs, ensure_ascii=False)
intercept = RiskIntercept(
    audit_id=g.audit_id,
    operation_type=operation_type,
    intercept_reason=HIGH_RISK_OPERATIONS[operation_type]["rule"],
    intercept_rule=operation_type,
    approval_status="pending"
)
db.session.add(intercept)
db.session.commit()

# 创建审批流程
approval = ApprovalFlow(
    intercept_id=intercept.id,
    apply_user_id=current_user.id,
    approve_user_id=approve_user_id,
    flow_status="pending",
    apply_remark=request.get_json().get("apply_remark", "")
)
db.session.add(approval)
db.session.commit()

# 返回审批信息
return {
    "approval_id": approval.id,
    "approve_user": User.query.get(approve_user_id).username,
    "approve_role": rule_info["approval_role"],
    "intercept_id": intercept.id,
    "apply_time": approval.apply_time.strftime("%Y-%m-%d %H:%M:%S")
}

====================== 辅助函数:检查是否存在高风险预警 ======================

def has_high_risk_alert(audit_id):
“”“检查该审计记录是否关联高风险预警”“”
alert = AbnormalAlert.query.filter(
AbnormalAlert.audit_id == audit_id,
AbnormalAlert.alert_level == “high”,
AbnormalAlert.handle_status == “pending”
).first()
return alert is not None

====================== 接口:审批流程操作(申请、审批、撤销) ======================

@risk_bp.route(“/approval/apply”, methods=[“POST”])
@login_required
def apply_approval():
“”“发起高危操作审批”“”
data = request.get_json()
operation_type = data.get(“operation_type”)

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐