新手进阶Python:办公看板集成跨系统联动+可视化任务编排+故障自愈
大家好!我是CSDN的Python新手博主~ 上一篇我们完成了看板的AI异常预警与全流程审计,解决了安全合规与风险防控需求,但甲方客户反馈两大核心痛点:① 多系统数据孤立,ERP的订单数据、OA的审批数据、CRM的客户数据无法自动联动,需手动下载上传,效率低下且易出错;② 定时任务(如每日9点同步ERP数据、每周五推送报表)配置复杂,需开发人员修改代码,非技术人员无法操作,且任务失败后无自动重试、
大家好!我是CSDN的Python新手博主~ 上一篇我们完成了看板的AI异常预警与全流程审计,解决了安全合规与风险防控需求,但甲方客户反馈两大核心痛点:① 多系统数据孤立,ERP的订单数据、OA的审批数据、CRM的客户数据无法自动联动,需手动下载上传,效率低下且易出错;② 定时任务(如每日9点同步ERP数据、每周五推送报表)配置复杂,需开发人员修改代码,非技术人员无法操作,且任务失败后无自动重试、兜底机制,需人工排查修复;③ 任务流程无可视化编排能力,多步骤任务(如“同步数据→清洗处理→生成报表→邮件推送”)无法灵活配置执行顺序与依赖关系。今天就带来超落地的新手实战项目——办公看板集成跨系统联动+可视化任务编排+故障自愈!
本次基于之前的“风控审计看板”代码,新增3大核心功能:① 跨系统智能联动(适配ERP/OA/CRM等主流系统API,支持密钥/OAuth2.0鉴权,实现数据自动同步与双向交互);② 可视化任务编排(拖拽式配置任务流程,支持单任务定时执行、多任务串行/并行依赖,非技术人员也能操作);③ 故障自愈机制(任务失败自动重试、异常告警、兜底方案执行,减少人工干预)。全程基于现有技术栈(Flask+MySQL+Celery+ECharts),新增跨系统适配模块、任务编排引擎、故障处理工具,代码注释详细,新手只需配置系统API信息与任务规则,跟着步骤复制操作就能成功,让看板成为企业全链路自动化办公的核心中枢~
一、本次学习目标
-
掌握跨系统API集成技巧,适配不同鉴权方式(API密钥、OAuth2.0),实现多系统数据自动同步、双向交互与格式转换;
-
学会可视化任务编排设计,基于拖拽组件搭建任务流程,支持定时触发、依赖执行、条件分支,实现任务灵活配置;
-
理解故障自愈机制,实现任务失败自动重试(按策略调整间隔)、异常告警(邮件/看板通知)、兜底方案执行,提升系统稳定性;
-
实现联动、编排、故障处理功能闭环,跨系统数据联动触发任务流程,任务异常自动告警并自愈,全程留痕可追溯;
-
适配企业实际场景,支持任务权限管控、流程导出导入、执行日志分析,满足多角色协同与自动化办公需求。
二、前期准备
- 安装核心依赖库
安装核心依赖(任务调度、跨系统鉴权、前端拖拽)
pip3 install celery redis requests-oauthlib python-dotenv flask-dragdrop -i https://pypi.tuna.tsinghua.edu.cn/simple
确保已有依赖正常(Flask、ECharts、报表导出等)
pip3 install --upgrade flask flask-login flask-sqlalchemy apscheduler openpyxl reportlab pymysql -i https://pypi.tuna.tsinghua.edu.cn/simple
说明:跨系统联动用requests/requests-oauthlib处理API请求与鉴权;任务编排用Celery+Redis实现分布式调度(比APScheduler更适配复杂任务依赖);前端可视化拖拽基于flask-dragdrop扩展,无需额外开发复杂组件。
- 第三方服务与配置准备
-
跨系统API配置:梳理需联动的系统(如ERP、OA、CRM),获取API地址、鉴权方式(API密钥/OAuth2.0的client_id/client_secret)、请求参数与返回格式,测试API连通性,将配置信息存入.env文件(避免硬编码);
-
任务规则配置:定义任务类型(单任务/流程任务)、触发方式(定时触发/Cron表达式/数据联动触发)、执行策略(串行/并行)、依赖关系(如“报表生成”依赖“数据同步”完成);
-
故障处理配置:设置重试策略(重试次数、间隔递增规则,如1分钟→3分钟→5分钟)、告警方式(邮件/看板弹窗)、兜底方案(如任务失败后执行备用接口、生成异常报表);
-
环境准备:启动Redis服务(作为Celery消息队列,用于任务调度),确保云服务器开放对应端口,允许跨系统API访问(配置防火墙规则)。
- 数据库表优化与创建
– 连接MySQL数据库(替换为你的数据库信息)
mysql -u office_user -p -h 47.108.xxx.xxx office_data
– 创建跨系统配置表(system_config)
CREATE TABLE system_config (
id INT AUTO_INCREMENT PRIMARY KEY,
system_name VARCHAR(50) NOT NULL COMMENT ‘系统名称(如ERP、OA、CRM)’,
api_base_url VARCHAR(255) NOT NULL COMMENT ‘API基础地址’,
auth_type ENUM(‘api_key’, ‘oauth2’) NOT NULL COMMENT ‘鉴权方式’,
auth_config TEXT NOT NULL COMMENT ‘鉴权配置(JSON格式,含密钥/OAuth2参数)’,
sync_fields TEXT NOT NULL COMMENT ‘同步字段映射(JSON格式,本地字段→系统字段)’,
is_enable TINYINT(1) DEFAULT 1 COMMENT ‘是否启用(1-启用,0-禁用)’,
remark TEXT NULL COMMENT ‘备注(如系统版本、API限制)’,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间’,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ‘更新时间’,
UNIQUE KEY uk_system_name (system_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘跨系统联动配置表’;
– 创建任务表(task_info)
CREATE TABLE task_info (
id INT AUTO_INCREMENT PRIMARY KEY,
task_name VARCHAR(100) NOT NULL COMMENT ‘任务名称’,
task_type ENUM(‘single’, ‘flow’) NOT NULL COMMENT ‘任务类型(单任务/流程任务)’,
trigger_type ENUM(‘timed’, ‘cron’, ‘data_link’) NOT NULL COMMENT ‘触发方式(定时/ Cron/数据联动)’,
trigger_config TEXT NULL COMMENT ‘触发配置(JSON格式,定时时间/Cron表达式/联动条件)’,
execute_config TEXT NOT NULL COMMENT ‘执行配置(JSON格式,API地址、参数、执行策略)’,
system_id INT NOT NULL COMMENT ‘关联系统ID(对应system_config表)’,
retry_strategy TEXT NOT NULL COMMENT ‘重试策略(JSON格式,次数、间隔)’,
fail_handler TEXT NULL COMMENT ‘失败处理(JSON格式,告警方式、兜底方案)’,
creator_id INT NOT NULL COMMENT ‘创建人ID’,
is_enable TINYINT(1) DEFAULT 1 COMMENT ‘是否启用’,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间’,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ‘更新时间’,
UNIQUE KEY uk_task_name (task_name),
FOREIGN KEY (system_id) REFERENCES system_config(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘任务信息表’;
– 创建任务流程表(task_flow)
CREATE TABLE task_flow (
id INT AUTO_INCREMENT PRIMARY KEY,
flow_name VARCHAR(100) NOT NULL COMMENT ‘流程名称’,
task_ids TEXT NOT NULL COMMENT ‘包含的任务ID(JSON格式,按执行顺序排列)’,
dependencies TEXT NULL COMMENT ‘任务依赖关系(JSON格式,如{“2”:[“1”]}表示任务2依赖任务1)’,
execute_mode ENUM(‘serial’, ‘parallel’) DEFAULT ‘serial’ COMMENT ‘执行模式(串行/并行)’,
creator_id INT NOT NULL COMMENT ‘创建人ID’,
is_enable TINYINT(1) DEFAULT 1 COMMENT ‘是否启用’,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间’,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ‘更新时间’,
UNIQUE KEY uk_flow_name (flow_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘任务流程编排表’;
– 创建任务执行日志表(task_execute_log)
CREATE TABLE task_execute_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
task_id INT NULL COMMENT ‘关联任务ID(单任务)’,
flow_id INT NULL COMMENT ‘关联流程ID(流程任务)’,
execute_status ENUM(‘pending’, ‘running’, ‘success’, ‘fail’, ‘retrying’, ‘skipped’) DEFAULT ‘pending’ COMMENT ‘执行状态’,
execute_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘执行时间’,
finish_time DATETIME NULL COMMENT ‘结束时间’,
execute_result TEXT NULL COMMENT ‘执行结果(JSON格式,返回数据/错误信息)’,
retry_count INT DEFAULT 0 COMMENT ‘重试次数’,
handler_info TEXT NULL COMMENT ‘故障处理信息(JSON格式,告警记录/兜底执行结果)’,
audit_id BIGINT NULL COMMENT ‘关联审计ID’,
KEY idx_task_id (task_id),
KEY idx_flow_id (flow_id),
KEY idx_execute_status (execute_status),
KEY idx_execute_time (execute_time),
FOREIGN KEY (audit_id) REFERENCES operation_audit(id) ON DELETE SET NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘任务执行日志表’;
– 初始化跨系统配置示例(ERP系统)
INSERT INTO system_config (system_name, api_base_url, auth_type, auth_config, sync_fields, remark)
VALUES (
‘ERP系统’,
‘https://erp.example.com/api/v1’,
‘api_key’,
‘{“api_key”:“ERP_API_KEY_123456”,“api_secret”:“ERP_SECRET_654321”}’,
‘{“local_order_id”:“erp_order_code”,“local_amount”:“erp_total_amount”,“local_status”:“erp_order_status”}’,
‘企业ERP系统,用于同步订单数据’
);
– 初始化测试任务
INSERT INTO task_info (task_name, task_type, trigger_type, trigger_config, execute_config, system_id, retry_strategy, fail_handler, creator_id)
VALUES (
‘ERP订单数据同步’,
‘single’,
‘cron’,
‘{“cron”:“0 9 * * *”}’, – 每日9点执行
‘{“api_path”:“/order/list”,“method”:“GET”,“params”:{“page_size”:100},“data_format”:“json”}’,
1,
‘{“retry_count”:3,“retry_intervals”:[60, 180, 300]}’, – 重试3次,间隔1/3/5分钟
‘{“alert_type”:“email”,“fallback_task_id”:null}’,
1 – 假设创建人ID为1(管理员)
);
三、实战:跨系统联动+任务编排+故障自愈集成
- 第一步:搭建跨系统API联动引擎,实现多系统数据互通
-- coding: utf-8 --
cross_system_engine.py 跨系统API联动引擎
import requests
import json
import time
from requests_oauthlib import OAuth2Session
from models import SystemConfig, db
from dotenv import load_dotenv
import os
加载环境变量
load_dotenv()
通用请求头
DEFAULT_HEADERS = {“Content-Type”: “application/json; charset=utf-8”}
class CrossSystemClient:
“”“跨系统API客户端,适配不同鉴权方式与数据格式”“”
def init(self, system_id):
“”“初始化客户端,加载系统配置”“”
self.system_config = SystemConfig.query.get(system_id)
if not self.system_config or not self.system_config.is_enable:
raise Exception(f"系统配置不存在或已禁用(ID:{system_id})")
self.api_base_url = self.system_config.api_base_url
self.auth_type = self.system_config.auth_type
self.auth_config = json.loads(self.system_config.auth_config)
self.sync_fields = json.loads(self.system_config.sync_fields)
# 初始化鉴权信息
self.auth_headers = self._init_auth_headers()
def _init_auth_headers(self):
"""初始化鉴权请求头,适配api_key/oauth2"""
if self.auth_type == "api_key":
# API密钥鉴权(通常在请求头或参数中携带)
api_key = self.auth_config.get("api_key")
api_secret = self.auth_config.get("api_secret")
if not api_key:
raise Exception("API密钥配置缺失")
return {**DEFAULT_HEADERS, "X-API-Key": api_key, "X-API-Secret": api_secret}
elif self.auth_type == "oauth2":
# OAuth2.0鉴权(获取access_token)
return self._get_oauth2_headers()
else:
raise Exception(f"不支持的鉴权方式:{self.auth_type}")
def _get_oauth2_headers(self):
"""获取OAuth2.0的access_token,生成鉴权头"""
oauth_config = self.auth_config
required_params = ["client_id", "client_secret", "token_url"]
if not all(param in oauth_config for param in required_params):
raise Exception("OAuth2.0配置缺失关键参数")
oauth_session = OAuth2Session(client_id=oauth_config["client_id"])
token = oauth_session.fetch_token(
token_url=oauth_config["token_url"],
client_secret=oauth_config["client_secret"],
grant_type=oauth_config.get("grant_type", "client_credentials")
)
if not token.get("access_token"):
raise Exception("OAuth2.0获取access_token失败")
return {**DEFAULT_HEADERS, "Authorization": f"Bearer {token['access_token']}"}
def _format_response_data(self, response_data):
"""格式化返回数据,按字段映射转换为本地格式"""
if not isinstance(response_data, dict) and not isinstance(response_data, list):
return response_data
# 单条数据处理
def format_single_data(data):
formatted_data = {}
for local_field, system_field in self.sync_fields.items():
# 支持嵌套字段(如erp_data.order_code)
if "." in system_field:
keys = system_field.split(".")
value = data
for key in keys:
value = value.get(key) if isinstance(value, dict) else None
if value is None:
break
formatted_data[local_field] = value
else:
formatted_data[local_field] = data.get(system_field)
return formatted_data
# 多条数据处理
if isinstance(response_data, list):
return [format_single_data(item) for item in response_data]
return format_single_data(response_data)
def request(self, api_path, method="GET", params=None, data=None, retry_count=0):
"""
通用API请求方法
:param api_path: API路径(相对于基础地址)
:param method: 请求方法(GET/POST/PUT/DELETE)
:param params: URL参数(GET请求用)
:param data: 请求体数据(POST/PUT请求用)
:param retry_count: 当前重试次数
:return: 格式化后的返回数据
"""
url = f"{self.api_base_url}{api_path}"
method = method.upper()
params = params or {}
data = json.dumps(data) if data and method in ["POST", "PUT"] else None
try:
print(f"发起跨系统请求:{method} {url},参数:{params},数据:{data}")
response = requests.request(
method=method,
url=url,
headers=self.auth_headers,
params=params,
data=data,
timeout=30
)
response.raise_for_status() # 触发HTTP错误(状态码≥400)
response_data = response.json()
# 格式化数据并返回
return self._format_response_data(response_data)
except Exception as e:
# 重试逻辑(若未超过最大重试次数)
max_retry = self.auth_config.get("max_retry", 1)
if retry_count < max_retry:
retry_interval = self.auth_config.get("retry_interval", 5)
print(f"请求失败({str(e)}),{retry_interval}秒后重试(第{retry_count+1}次)")
time.sleep(retry_interval)
return self.request(api_path, method, params, data, retry_count+1)
# 重试失败,抛出异常
raise Exception(f"跨系统请求失败(重试{max_retry}次后仍失败):{str(e)}")
def sync_data(self, api_path, method="GET", params=None, data=None):
"""数据同步方法(封装请求,返回格式化数据,供任务调用)"""
return self.request(api_path, method, params, data)
====================== 接口:跨系统配置管理 ======================
from flask import Blueprint, request, jsonify
from flask_login import login_required, current_user
from audit_engine import audit_log
cross_system_bp = Blueprint(“cross_system”, name)
@cross_system_bp.route(“/system/config/add”, methods=[“POST”])
@login_required
@audit_log(“add”)
def add_system_config():
“”“新增跨系统配置”“”
data = request.get_json()
required_fields = [“system_name”, “api_base_url”, “auth_type”, “auth_config”, “sync_fields”]
for field in required_fields:
if not data.get(field):
return jsonify({“success”: False, “error”: f"缺少必填字段:{field}"})
# 检查系统名称是否已存在
if SystemConfig.query.filter_by(system_name=data["system_name"]).first():
return jsonify({"success": False, "error": "该系统配置已存在"})
try:
# 验证JSON格式字段
json.loads(data["auth_config"])
json.loads(data["sync_fields"])
except json.JSONDecodeError:
return jsonify({"success": False, "error": "auth_config或sync_fields格式错误(需JSON)"})
system_config = SystemConfig(
system_name=data["system_name"],
api_base_url=data["api_base_url"],
auth_type=data["auth_type"],
auth_config=data["auth_config"],
sync_fields=data["sync_fields"],
remark=data.get("remark", ""),
is_enable=data.get("is_enable", 1)
)
db.session.add(system_config)
db.session.commit()
return jsonify({"success": True, "msg": "系统配置新增成功", "system_id": system_config.id})
@cross_system_bp.route(“/system/data/sync/int:system_id”, methods=[“POST”])
@login_required
@audit_log(“update”)
def sync_system_data(system_id):
“”“手动触发跨系统数据同步”“”
data = request.get_json()
api_path = data.get(“api_path”)
method = data.get(“method”, “GET”)
params = data.get(“params”, {})
request_data = data.get(“data”, {})
if not api_path:
return jsonify({"success": False, "error": "缺少API路径"})
try:
client = CrossSystemClient(system_id)
sync_result = client.sync_data(api_path, method, params, request_data)
return jsonify({"success": True, "msg": "数据同步成功", "data": sync_result})
except Exception as e:
return jsonify({"success": False, "error": f"数据同步失败:{str(e)}"})
在app.py中新增以下内容
from cross_system_engine import cross_system_bp
注册跨系统联动蓝图
app.register_blueprint(cross_system_bp)
示例:测试ERP数据同步接口
@app.route(“/test/erp/sync”, methods=[“GET”])
@login_required
def test_erp_sync():
“”“测试ERP订单数据同步”“”
from cross_system_engine import CrossSystemClient
try:
client = CrossSystemClient(system_id=1) # 1为ERP系统配置ID
sync_data = client.sync_data(
api_path=“/order/list”,
method=“GET”,
params={“start_date”: “2026-01-01”, “end_date”: “2026-01-26”}
)
return jsonify({“success”: True, “data”: sync_data, “count”: len(sync_data)})
except Exception as e:
return jsonify({“success”: False, “error”: str(e)})
- 第二步:实现可视化任务编排,拖拽配置自动化流程
-- coding: utf-8 --
task_orchestration.py 可视化任务编排模块
import json
from flask import Blueprint, request, jsonify, g
from flask_login import login_required, current_user
from models import db, TaskInfo, TaskFlow, SystemConfig
from audit_engine import audit_log
from datetime import datetime
task_bp = Blueprint(“task”, name)
====================== 核心功能:解析任务流程,生成执行序列 ======================
def parse_task_flow(flow_id):
“”"
解析任务流程,生成执行序列(处理依赖关系)
:param flow_id: 流程ID
:return: 执行序列(列表,每个元素包含任务ID、执行顺序、依赖任务)
“”"
flow = TaskFlow.query.get(flow_id)
if not flow or not flow.is_enable:
raise Exception(f"流程不存在或已禁用(ID:{flow_id})")
task_ids = json.loads(flow.task_ids)
dependencies = json.loads(flow.dependencies) if flow.dependencies else {}
execute_mode = flow.execute_mode
# 串行执行:按顺序解析,确保依赖任务在前
if execute_mode == "serial":
execute_sequence = []
processed_tasks = set()
# 递归处理依赖
def add_task(task_id):
if task_id in processed_tasks:
return
# 先添加依赖任务
for dep_task_id in dependencies.get(str(task_id), []):
add_task(dep_task_id)
# 添加当前任务
task = TaskInfo.query.get(task_id)
if not task or not task.is_enable:
raise Exception(f"任务不存在或已禁用(ID:{task_id})")
execute_sequence.append({
"task_id": task_id,
"task_name": task.task_name,
"dependencies": dependencies.get(str(task_id), [])
})
processed_tasks.add(task_id)
for task_id in task_ids:
add_task(task_id)
return execute_sequence
# 并行执行:不考虑顺序(依赖任务需单独处理)
else:
return [
{
"task_id": task_id,
"task_name": TaskInfo.query.get(task_id).task_name,
"dependencies": dependencies.get(str(task_id), [])
}
for task_id in task_ids if TaskInfo.query.get(task_id).is_enable
]
====================== 接口:任务管理(新增/编辑/删除) ======================
@task_bp.route(“/task/add”, methods=[“POST”])
@login_required
@audit_log(“add”)
def add_task():
“”“新增任务(单任务)”“”
data = request.get_json()
required_fields = [“task_name”, “trigger_type”, “execute_config”, “system_id”, “retry_strategy”]
for field in required_fields:
if not data.get(field):
return jsonify({“success”: False, “error”: f"缺少必填字段:{field}"})
# 检查任务名称是否重复
if TaskInfo.query.filter_by(task_name=data["task_name"]).first():
return jsonify({"success": False, "error": "该任务名称已存在"})
try:
# 验证JSON格式字段
json.loads(data["execute_config"])
json.loads(data["retry_strategy"])
if data.get("trigger_config"):
json.loads(data["trigger_config"])
if data.get("fail_handler"):
json.loads(data["fail_handler"])
except json.JSONDecodeError:
return jsonify({"success": False, "error": "配置字段格式错误(需JSON)"})
task = TaskInfo(
task_name=data["task_name"],
task_type="single",
trigger_type=data["trigger_type"],
trigger_config=data.get("trigger_config", "{}"),
execute_config=data["execute_config"],
system_id=data["system_id"],
retry_strategy=data["retry_strategy"],
fail_handler=data.get("fail_handler", "{}"),
creator_id=current_user.id,
is_enable=data.get("is_enable", 1)
)
db.session.add(task)
db.session.commit()
return jsonify({"success": True, "msg": "任务新增成功", "task_id": task.id})
@task_bp.route(“/task/flow/add”, methods=[“POST”])
@login_required
@audit_log(“add”)
def add_task_flow():
“”“新增任务流程(可视化编排)”“”
data = request.get_json()
required_fields = [“flow_name”, “task_ids”, “execute_mode”]
for field in required_fields:
if not data.get(field):
return jsonify({“success”: False, “error”: f"缺少必填字段:{field}"})
# 检查流程名称是否重复
if TaskFlow.query.filter_by(flow_name=data["flow_name"]).first():
return jsonify({"success": False, "error": "该流程名称已存在"})
try:
# 验证任务ID与依赖关系格式
task_ids = json.loads(data["task_ids"])
if not isinstance(task_ids, list) or len(task_ids) == 0:
return jsonify({"success": False, "error": "task_ids需为非空列表"})
# 检查任务是否存在
for task_id in task_ids:
if not TaskInfo.query.get(task_id):
return jsonify({"success": False, "error": f"任务ID不存在:{task_id}"})
# 验证依赖关系
dependencies = json.loads(data.get("dependencies", "{}")) if data.get("dependencies") else {}
except json.JSONDecodeError:
return jsonify({"success": False, "error": "task_ids或dependencies格式错误(需JSON)"})
task_flow = TaskFlow(
flow_name=data["flow_name"],
task_ids=data["task_ids"],
dependencies=json.dumps(dependencies, ensure_ascii=False),
execute_mode=data["execute_mode"],
creator_id=current_user.id,
is_enable=data.get("is_enable", 1)
)
db.session.add(task_flow)
db.session.commit()
# 解析流程,验证合法性
try:
parse_task_flow(task_flow.id)
except Exception as e:
db.session.rollback()
return jsonify({"success": False, "error": f"流程解析失败:{str(e)}"})
return jsonify({"success": True, "msg": "任务流程新增成功", "flow_id": task_flow.id})
====================== 接口:任务流程解析与预览 ======================
@task_bp.route(“/task/flow/parse/int:flow_id”, methods=[“GET”])
@login_required
def parse_flow():
“”“解析任务流程,返回执行序列(供前端预览)”“”
try:
execute_sequence = parse_task_flow(flow_id)
return jsonify({
“success”: True,
“execute_sequence”: execute_sequence,
“count”: len(execute_sequence)
})
except Exception as e:
return jsonify({“success”: False, “error”: str(e)})
====================== 前端拖拽编排页面(Jinja2模板) ======================
@task_bp.route(“/task/orchestration”, methods=[“GET”])
@login_required
def task_orchestration_page():
“”“任务编排可视化页面”“”
# 获取所有可用任务与系统配置
tasks = TaskInfo.query.filter_by(is_enable=1, task_type=“single”).all()
systems = SystemConfig.query.filter_by(is_enable=1).all()
return render_template(“task_orchestration.html”, tasks=tasks, systems=systems)
{% extends “base.html” %}
{% block content %}
任务流程可视化编排
可用任务库
<!-- 中间编排区域 -->
<div class="col-6">
<div class="card h-100">
<div class="card-header bg-info text-white">
<div class="d-flex justify-content-between align-items-center">
<h5 class="card-title mb-0">流程编排区(拖拽任务至此)</h5>
<div>
<select class="form-select form-select-sm d-inline-block w-auto me-2" id="executeMode">
<option value="serial" selected>串行执行</option>
<option value="parallel">并行执行</option>
</select>
<button class="btn btn-sm btn-white" onclick="clearFlow()">清空</button>
</div>
</div>
</div>
<div class="card-body">
<div id="orchestrationArea" class="border border-dashed rounded p-3 min-vh-50">
<div class="text-center text-muted py-5" id="emptyTip">
<i class="bi bi-arrows-move fs-3 mb-2"></i>
<p>拖拽左侧任务至此处编排流程,可调整顺序、设置依赖</p>
</div>
<div id="flowTasks" class="d-flex flex-column gap-2"></div>
</div>
<!-- 依赖设置区域 -->
<div class="mt-4">
<h6>依赖关系设置</h6>
<div class="input-group mb-2">
<select class="form-select" id="targetTask" disabled>
<option value="">选择目标任务</option>
</select>
<span class="input-group-text">依赖于</span>
<select class="form-select" id="depTask" disabled>
<option value="">选择依赖任务</option>
</select>
<button class="btn btn-primary" id="addDepBtn" disabled>添加依赖</button>
</div>
<div id="depList" class="text-sm text-muted">暂无依赖关系</div>
</div>
</div>
</div>
</div>
<!-- 右侧流程配置与提交 -->
<div class="col-3">
<div class="card h-100">
<div class="card-header bg-success text-white">
<h5 class="card-title mb-0">流程配置</h5>
</div>
<div class="card-body">
<form id="flowForm">
<div class="mb-3">
<label class="form-label">流程名称</label>
<input type="text" class="form-control" id="flowName" placeholder="请输入流程名称" required>
</div>
<div class="mb-3">
<label class="form-label">流程状态</label>
<select class="form-select" id="flowEnable">
<option value="1" selected>启用</option>
<option value="0">禁用</option>
</select>
</div>
<div class="mb-3">
<label class="form-label">流程描述</label>
<textarea class="form-control" id="flowRemark" rows="3" placeholder="可选,描述流程用途"></textarea>
</div>
<div class="d-grid gap-2">
<button type="button" class="btn btn-primary" id="previewFlowBtn" disabled>预览流程</button>
<button type="button" class="btn btn-success" id="saveFlowBtn" disabled>保存流程</button>
</div>
</form>
<!-- 预览结果区域 -->
<div class="mt-4" id="previewResult" style="display: none;">
<h6>流程执行序列预览</h6>
<div class="border rounded p-2 max-height-200 overflow-auto" id="previewList"></div>
</div>
</div>
</div>
</div>
{% endblock %}
- 第三步:集成任务调度与故障自愈,确保自动化稳定运行
更多推荐



所有评论(0)