大家好!我是CSDN的Python新手博主~ 上一篇我们完成了看板的AI异常预警与全流程审计,解决了安全合规与风险防控需求,但甲方客户反馈两大核心痛点:① 多系统数据孤立,ERP的订单数据、OA的审批数据、CRM的客户数据无法自动联动,需手动下载上传,效率低下且易出错;② 定时任务(如每日9点同步ERP数据、每周五推送报表)配置复杂,需开发人员修改代码,非技术人员无法操作,且任务失败后无自动重试、兜底机制,需人工排查修复;③ 任务流程无可视化编排能力,多步骤任务(如“同步数据→清洗处理→生成报表→邮件推送”)无法灵活配置执行顺序与依赖关系。今天就带来超落地的新手实战项目——办公看板集成跨系统联动+可视化任务编排+故障自愈!

本次基于之前的“风控审计看板”代码,新增3大核心功能:① 跨系统智能联动(适配ERP/OA/CRM等主流系统API,支持密钥/OAuth2.0鉴权,实现数据自动同步与双向交互);② 可视化任务编排(拖拽式配置任务流程,支持单任务定时执行、多任务串行/并行依赖,非技术人员也能操作);③ 故障自愈机制(任务失败自动重试、异常告警、兜底方案执行,减少人工干预)。全程基于现有技术栈(Flask+MySQL+Celery+ECharts),新增跨系统适配模块、任务编排引擎、故障处理工具,代码注释详细,新手只需配置系统API信息与任务规则,跟着步骤复制操作就能成功,让看板成为企业全链路自动化办公的核心中枢~

一、本次学习目标

  1. 掌握跨系统API集成技巧,适配不同鉴权方式(API密钥、OAuth2.0),实现多系统数据自动同步、双向交互与格式转换;

  2. 学会可视化任务编排设计,基于拖拽组件搭建任务流程,支持定时触发、依赖执行、条件分支,实现任务灵活配置;

  3. 理解故障自愈机制,实现任务失败自动重试(按策略调整间隔)、异常告警(邮件/看板通知)、兜底方案执行,提升系统稳定性;

  4. 实现联动、编排、故障处理功能闭环,跨系统数据联动触发任务流程,任务异常自动告警并自愈,全程留痕可追溯;

  5. 适配企业实际场景,支持任务权限管控、流程导出导入、执行日志分析,满足多角色协同与自动化办公需求。

二、前期准备

  1. 安装核心依赖库

安装核心依赖(任务调度、跨系统鉴权、前端拖拽)

pip3 install celery redis requests-oauthlib python-dotenv flask-dragdrop -i https://pypi.tuna.tsinghua.edu.cn/simple

确保已有依赖正常(Flask、ECharts、报表导出等)

pip3 install --upgrade flask flask-login flask-sqlalchemy apscheduler openpyxl reportlab pymysql -i https://pypi.tuna.tsinghua.edu.cn/simple

说明:跨系统联动用requests/requests-oauthlib处理API请求与鉴权;任务编排用Celery+Redis实现分布式调度(比APScheduler更适配复杂任务依赖);前端可视化拖拽基于flask-dragdrop扩展,无需额外开发复杂组件。

  1. 第三方服务与配置准备
  • 跨系统API配置:梳理需联动的系统(如ERP、OA、CRM),获取API地址、鉴权方式(API密钥/OAuth2.0的client_id/client_secret)、请求参数与返回格式,测试API连通性,将配置信息存入.env文件(避免硬编码);

  • 任务规则配置:定义任务类型(单任务/流程任务)、触发方式(定时触发/Cron表达式/数据联动触发)、执行策略(串行/并行)、依赖关系(如“报表生成”依赖“数据同步”完成);

  • 故障处理配置:设置重试策略(重试次数、间隔递增规则,如1分钟→3分钟→5分钟)、告警方式(邮件/看板弹窗)、兜底方案(如任务失败后执行备用接口、生成异常报表);

  • 环境准备:启动Redis服务(作为Celery消息队列,用于任务调度),确保云服务器开放对应端口,允许跨系统API访问(配置防火墙规则)。

  1. 数据库表优化与创建

– 连接MySQL数据库(替换为你的数据库信息)
mysql -u office_user -p -h 47.108.xxx.xxx office_data

– 创建跨系统配置表(system_config)
CREATE TABLE system_config (
id INT AUTO_INCREMENT PRIMARY KEY,
system_name VARCHAR(50) NOT NULL COMMENT ‘系统名称(如ERP、OA、CRM)’,
api_base_url VARCHAR(255) NOT NULL COMMENT ‘API基础地址’,
auth_type ENUM(‘api_key’, ‘oauth2’) NOT NULL COMMENT ‘鉴权方式’,
auth_config TEXT NOT NULL COMMENT ‘鉴权配置(JSON格式,含密钥/OAuth2参数)’,
sync_fields TEXT NOT NULL COMMENT ‘同步字段映射(JSON格式,本地字段→系统字段)’,
is_enable TINYINT(1) DEFAULT 1 COMMENT ‘是否启用(1-启用,0-禁用)’,
remark TEXT NULL COMMENT ‘备注(如系统版本、API限制)’,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间’,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ‘更新时间’,
UNIQUE KEY uk_system_name (system_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘跨系统联动配置表’;

– 创建任务表(task_info)
CREATE TABLE task_info (
id INT AUTO_INCREMENT PRIMARY KEY,
task_name VARCHAR(100) NOT NULL COMMENT ‘任务名称’,
task_type ENUM(‘single’, ‘flow’) NOT NULL COMMENT ‘任务类型(单任务/流程任务)’,
trigger_type ENUM(‘timed’, ‘cron’, ‘data_link’) NOT NULL COMMENT ‘触发方式(定时/ Cron/数据联动)’,
trigger_config TEXT NULL COMMENT ‘触发配置(JSON格式,定时时间/Cron表达式/联动条件)’,
execute_config TEXT NOT NULL COMMENT ‘执行配置(JSON格式,API地址、参数、执行策略)’,
system_id INT NOT NULL COMMENT ‘关联系统ID(对应system_config表)’,
retry_strategy TEXT NOT NULL COMMENT ‘重试策略(JSON格式,次数、间隔)’,
fail_handler TEXT NULL COMMENT ‘失败处理(JSON格式,告警方式、兜底方案)’,
creator_id INT NOT NULL COMMENT ‘创建人ID’,
is_enable TINYINT(1) DEFAULT 1 COMMENT ‘是否启用’,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间’,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ‘更新时间’,
UNIQUE KEY uk_task_name (task_name),
FOREIGN KEY (system_id) REFERENCES system_config(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘任务信息表’;

– 创建任务流程表(task_flow)
CREATE TABLE task_flow (
id INT AUTO_INCREMENT PRIMARY KEY,
flow_name VARCHAR(100) NOT NULL COMMENT ‘流程名称’,
task_ids TEXT NOT NULL COMMENT ‘包含的任务ID(JSON格式,按执行顺序排列)’,
dependencies TEXT NULL COMMENT ‘任务依赖关系(JSON格式,如{“2”:[“1”]}表示任务2依赖任务1)’,
execute_mode ENUM(‘serial’, ‘parallel’) DEFAULT ‘serial’ COMMENT ‘执行模式(串行/并行)’,
creator_id INT NOT NULL COMMENT ‘创建人ID’,
is_enable TINYINT(1) DEFAULT 1 COMMENT ‘是否启用’,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间’,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ‘更新时间’,
UNIQUE KEY uk_flow_name (flow_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘任务流程编排表’;

– 创建任务执行日志表(task_execute_log)
CREATE TABLE task_execute_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
task_id INT NULL COMMENT ‘关联任务ID(单任务)’,
flow_id INT NULL COMMENT ‘关联流程ID(流程任务)’,
execute_status ENUM(‘pending’, ‘running’, ‘success’, ‘fail’, ‘retrying’, ‘skipped’) DEFAULT ‘pending’ COMMENT ‘执行状态’,
execute_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘执行时间’,
finish_time DATETIME NULL COMMENT ‘结束时间’,
execute_result TEXT NULL COMMENT ‘执行结果(JSON格式,返回数据/错误信息)’,
retry_count INT DEFAULT 0 COMMENT ‘重试次数’,
handler_info TEXT NULL COMMENT ‘故障处理信息(JSON格式,告警记录/兜底执行结果)’,
audit_id BIGINT NULL COMMENT ‘关联审计ID’,
KEY idx_task_id (task_id),
KEY idx_flow_id (flow_id),
KEY idx_execute_status (execute_status),
KEY idx_execute_time (execute_time),
FOREIGN KEY (audit_id) REFERENCES operation_audit(id) ON DELETE SET NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘任务执行日志表’;

– 初始化跨系统配置示例(ERP系统)
INSERT INTO system_config (system_name, api_base_url, auth_type, auth_config, sync_fields, remark)
VALUES (
‘ERP系统’,
‘https://erp.example.com/api/v1’,
‘api_key’,
‘{“api_key”:“ERP_API_KEY_123456”,“api_secret”:“ERP_SECRET_654321”}’,
‘{“local_order_id”:“erp_order_code”,“local_amount”:“erp_total_amount”,“local_status”:“erp_order_status”}’,
‘企业ERP系统,用于同步订单数据’
);

– 初始化测试任务
INSERT INTO task_info (task_name, task_type, trigger_type, trigger_config, execute_config, system_id, retry_strategy, fail_handler, creator_id)
VALUES (
‘ERP订单数据同步’,
‘single’,
‘cron’,
‘{“cron”:“0 9 * * *”}’, – 每日9点执行
‘{“api_path”:“/order/list”,“method”:“GET”,“params”:{“page_size”:100},“data_format”:“json”}’,
1,
‘{“retry_count”:3,“retry_intervals”:[60, 180, 300]}’, – 重试3次,间隔1/3/5分钟
‘{“alert_type”:“email”,“fallback_task_id”:null}’,
1 – 假设创建人ID为1(管理员)
);

三、实战:跨系统联动+任务编排+故障自愈集成

  1. 第一步:搭建跨系统API联动引擎,实现多系统数据互通

-- coding: utf-8 --

cross_system_engine.py 跨系统API联动引擎

import requests
import json
import time
from requests_oauthlib import OAuth2Session
from models import SystemConfig, db
from dotenv import load_dotenv
import os

加载环境变量

load_dotenv()

通用请求头

DEFAULT_HEADERS = {“Content-Type”: “application/json; charset=utf-8”}

class CrossSystemClient:
“”“跨系统API客户端,适配不同鉴权方式与数据格式”“”
def init(self, system_id):
“”“初始化客户端,加载系统配置”“”
self.system_config = SystemConfig.query.get(system_id)
if not self.system_config or not self.system_config.is_enable:
raise Exception(f"系统配置不存在或已禁用(ID:{system_id})")
self.api_base_url = self.system_config.api_base_url
self.auth_type = self.system_config.auth_type
self.auth_config = json.loads(self.system_config.auth_config)
self.sync_fields = json.loads(self.system_config.sync_fields)
# 初始化鉴权信息
self.auth_headers = self._init_auth_headers()

def _init_auth_headers(self):
    """初始化鉴权请求头,适配api_key/oauth2"""
    if self.auth_type == "api_key":
        # API密钥鉴权(通常在请求头或参数中携带)
        api_key = self.auth_config.get("api_key")
        api_secret = self.auth_config.get("api_secret")
        if not api_key:
            raise Exception("API密钥配置缺失")
        return {**DEFAULT_HEADERS, "X-API-Key": api_key, "X-API-Secret": api_secret}
    elif self.auth_type == "oauth2":
        # OAuth2.0鉴权(获取access_token)
        return self._get_oauth2_headers()
    else:
        raise Exception(f"不支持的鉴权方式:{self.auth_type}")

def _get_oauth2_headers(self):
    """获取OAuth2.0的access_token,生成鉴权头"""
    oauth_config = self.auth_config
    required_params = ["client_id", "client_secret", "token_url"]
    if not all(param in oauth_config for param in required_params):
        raise Exception("OAuth2.0配置缺失关键参数")
    
    oauth_session = OAuth2Session(client_id=oauth_config["client_id"])
    token = oauth_session.fetch_token(
        token_url=oauth_config["token_url"],
        client_secret=oauth_config["client_secret"],
        grant_type=oauth_config.get("grant_type", "client_credentials")
    )
    if not token.get("access_token"):
        raise Exception("OAuth2.0获取access_token失败")
    return {**DEFAULT_HEADERS, "Authorization": f"Bearer {token['access_token']}"}

def _format_response_data(self, response_data):
    """格式化返回数据,按字段映射转换为本地格式"""
    if not isinstance(response_data, dict) and not isinstance(response_data, list):
        return response_data
    
    # 单条数据处理
    def format_single_data(data):
        formatted_data = {}
        for local_field, system_field in self.sync_fields.items():
            # 支持嵌套字段(如erp_data.order_code)
            if "." in system_field:
                keys = system_field.split(".")
                value = data
                for key in keys:
                    value = value.get(key) if isinstance(value, dict) else None
                    if value is None:
                        break
                formatted_data[local_field] = value
            else:
                formatted_data[local_field] = data.get(system_field)
        return formatted_data
    
    # 多条数据处理
    if isinstance(response_data, list):
        return [format_single_data(item) for item in response_data]
    return format_single_data(response_data)

def request(self, api_path, method="GET", params=None, data=None, retry_count=0):
    """
    通用API请求方法
    :param api_path: API路径(相对于基础地址)
    :param method: 请求方法(GET/POST/PUT/DELETE)
    :param params: URL参数(GET请求用)
    :param data: 请求体数据(POST/PUT请求用)
    :param retry_count: 当前重试次数
    :return: 格式化后的返回数据
    """
    url = f"{self.api_base_url}{api_path}"
    method = method.upper()
    params = params or {}
    data = json.dumps(data) if data and method in ["POST", "PUT"] else None

    try:
        print(f"发起跨系统请求:{method} {url},参数:{params},数据:{data}")
        response = requests.request(
            method=method,
            url=url,
            headers=self.auth_headers,
            params=params,
            data=data,
            timeout=30
        )
        response.raise_for_status()  # 触发HTTP错误(状态码≥400)
        response_data = response.json()
        # 格式化数据并返回
        return self._format_response_data(response_data)
    except Exception as e:
        # 重试逻辑(若未超过最大重试次数)
        max_retry = self.auth_config.get("max_retry", 1)
        if retry_count < max_retry:
            retry_interval = self.auth_config.get("retry_interval", 5)
            print(f"请求失败({str(e)}),{retry_interval}秒后重试(第{retry_count+1}次)")
            time.sleep(retry_interval)
            return self.request(api_path, method, params, data, retry_count+1)
        # 重试失败,抛出异常
        raise Exception(f"跨系统请求失败(重试{max_retry}次后仍失败):{str(e)}")

def sync_data(self, api_path, method="GET", params=None, data=None):
    """数据同步方法(封装请求,返回格式化数据,供任务调用)"""
    return self.request(api_path, method, params, data)

====================== 接口:跨系统配置管理 ======================

from flask import Blueprint, request, jsonify
from flask_login import login_required, current_user
from audit_engine import audit_log

cross_system_bp = Blueprint(“cross_system”, name)

@cross_system_bp.route(“/system/config/add”, methods=[“POST”])
@login_required
@audit_log(“add”)
def add_system_config():
“”“新增跨系统配置”“”
data = request.get_json()
required_fields = [“system_name”, “api_base_url”, “auth_type”, “auth_config”, “sync_fields”]
for field in required_fields:
if not data.get(field):
return jsonify({“success”: False, “error”: f"缺少必填字段:{field}"})

# 检查系统名称是否已存在
if SystemConfig.query.filter_by(system_name=data["system_name"]).first():
    return jsonify({"success": False, "error": "该系统配置已存在"})

try:
    # 验证JSON格式字段
    json.loads(data["auth_config"])
    json.loads(data["sync_fields"])
except json.JSONDecodeError:
    return jsonify({"success": False, "error": "auth_config或sync_fields格式错误(需JSON)"})

system_config = SystemConfig(
    system_name=data["system_name"],
    api_base_url=data["api_base_url"],
    auth_type=data["auth_type"],
    auth_config=data["auth_config"],
    sync_fields=data["sync_fields"],
    remark=data.get("remark", ""),
    is_enable=data.get("is_enable", 1)
)
db.session.add(system_config)
db.session.commit()
return jsonify({"success": True, "msg": "系统配置新增成功", "system_id": system_config.id})

@cross_system_bp.route(“/system/data/sync/int:system_id”, methods=[“POST”])
@login_required
@audit_log(“update”)
def sync_system_data(system_id):
“”“手动触发跨系统数据同步”“”
data = request.get_json()
api_path = data.get(“api_path”)
method = data.get(“method”, “GET”)
params = data.get(“params”, {})
request_data = data.get(“data”, {})

if not api_path:
    return jsonify({"success": False, "error": "缺少API路径"})

try:
    client = CrossSystemClient(system_id)
    sync_result = client.sync_data(api_path, method, params, request_data)
    return jsonify({"success": True, "msg": "数据同步成功", "data": sync_result})
except Exception as e:
    return jsonify({"success": False, "error": f"数据同步失败:{str(e)}"})

在app.py中新增以下内容

from cross_system_engine import cross_system_bp

注册跨系统联动蓝图

app.register_blueprint(cross_system_bp)

示例:测试ERP数据同步接口

@app.route(“/test/erp/sync”, methods=[“GET”])
@login_required
def test_erp_sync():
“”“测试ERP订单数据同步”“”
from cross_system_engine import CrossSystemClient
try:
client = CrossSystemClient(system_id=1) # 1为ERP系统配置ID
sync_data = client.sync_data(
api_path=“/order/list”,
method=“GET”,
params={“start_date”: “2026-01-01”, “end_date”: “2026-01-26”}
)
return jsonify({“success”: True, “data”: sync_data, “count”: len(sync_data)})
except Exception as e:
return jsonify({“success”: False, “error”: str(e)})

  1. 第二步:实现可视化任务编排,拖拽配置自动化流程

-- coding: utf-8 --

task_orchestration.py 可视化任务编排模块

import json
from flask import Blueprint, request, jsonify, g
from flask_login import login_required, current_user
from models import db, TaskInfo, TaskFlow, SystemConfig
from audit_engine import audit_log
from datetime import datetime

task_bp = Blueprint(“task”, name)

====================== 核心功能:解析任务流程,生成执行序列 ======================

def parse_task_flow(flow_id):
“”"
解析任务流程,生成执行序列(处理依赖关系)
:param flow_id: 流程ID
:return: 执行序列(列表,每个元素包含任务ID、执行顺序、依赖任务)
“”"
flow = TaskFlow.query.get(flow_id)
if not flow or not flow.is_enable:
raise Exception(f"流程不存在或已禁用(ID:{flow_id})")

task_ids = json.loads(flow.task_ids)
dependencies = json.loads(flow.dependencies) if flow.dependencies else {}
execute_mode = flow.execute_mode

# 串行执行:按顺序解析,确保依赖任务在前
if execute_mode == "serial":
    execute_sequence = []
    processed_tasks = set()
    # 递归处理依赖
    def add_task(task_id):
        if task_id in processed_tasks:
            return
        # 先添加依赖任务
        for dep_task_id in dependencies.get(str(task_id), []):
            add_task(dep_task_id)
        # 添加当前任务
        task = TaskInfo.query.get(task_id)
        if not task or not task.is_enable:
            raise Exception(f"任务不存在或已禁用(ID:{task_id})")
        execute_sequence.append({
            "task_id": task_id,
            "task_name": task.task_name,
            "dependencies": dependencies.get(str(task_id), [])
        })
        processed_tasks.add(task_id)
    
    for task_id in task_ids:
        add_task(task_id)
    return execute_sequence
# 并行执行:不考虑顺序(依赖任务需单独处理)
else:
    return [
        {
            "task_id": task_id,
            "task_name": TaskInfo.query.get(task_id).task_name,
            "dependencies": dependencies.get(str(task_id), [])
        }
        for task_id in task_ids if TaskInfo.query.get(task_id).is_enable
    ]

====================== 接口:任务管理(新增/编辑/删除) ======================

@task_bp.route(“/task/add”, methods=[“POST”])
@login_required
@audit_log(“add”)
def add_task():
“”“新增任务(单任务)”“”
data = request.get_json()
required_fields = [“task_name”, “trigger_type”, “execute_config”, “system_id”, “retry_strategy”]
for field in required_fields:
if not data.get(field):
return jsonify({“success”: False, “error”: f"缺少必填字段:{field}"})

# 检查任务名称是否重复
if TaskInfo.query.filter_by(task_name=data["task_name"]).first():
    return jsonify({"success": False, "error": "该任务名称已存在"})

try:
    # 验证JSON格式字段
    json.loads(data["execute_config"])
    json.loads(data["retry_strategy"])
    if data.get("trigger_config"):
        json.loads(data["trigger_config"])
    if data.get("fail_handler"):
        json.loads(data["fail_handler"])
except json.JSONDecodeError:
    return jsonify({"success": False, "error": "配置字段格式错误(需JSON)"})

task = TaskInfo(
    task_name=data["task_name"],
    task_type="single",
    trigger_type=data["trigger_type"],
    trigger_config=data.get("trigger_config", "{}"),
    execute_config=data["execute_config"],
    system_id=data["system_id"],
    retry_strategy=data["retry_strategy"],
    fail_handler=data.get("fail_handler", "{}"),
    creator_id=current_user.id,
    is_enable=data.get("is_enable", 1)
)
db.session.add(task)
db.session.commit()
return jsonify({"success": True, "msg": "任务新增成功", "task_id": task.id})

@task_bp.route(“/task/flow/add”, methods=[“POST”])
@login_required
@audit_log(“add”)
def add_task_flow():
“”“新增任务流程(可视化编排)”“”
data = request.get_json()
required_fields = [“flow_name”, “task_ids”, “execute_mode”]
for field in required_fields:
if not data.get(field):
return jsonify({“success”: False, “error”: f"缺少必填字段:{field}"})

# 检查流程名称是否重复
if TaskFlow.query.filter_by(flow_name=data["flow_name"]).first():
    return jsonify({"success": False, "error": "该流程名称已存在"})

try:
    # 验证任务ID与依赖关系格式
    task_ids = json.loads(data["task_ids"])
    if not isinstance(task_ids, list) or len(task_ids) == 0:
        return jsonify({"success": False, "error": "task_ids需为非空列表"})
    # 检查任务是否存在
    for task_id in task_ids:
        if not TaskInfo.query.get(task_id):
            return jsonify({"success": False, "error": f"任务ID不存在:{task_id}"})
    # 验证依赖关系
    dependencies = json.loads(data.get("dependencies", "{}")) if data.get("dependencies") else {}
except json.JSONDecodeError:
    return jsonify({"success": False, "error": "task_ids或dependencies格式错误(需JSON)"})

task_flow = TaskFlow(
    flow_name=data["flow_name"],
    task_ids=data["task_ids"],
    dependencies=json.dumps(dependencies, ensure_ascii=False),
    execute_mode=data["execute_mode"],
    creator_id=current_user.id,
    is_enable=data.get("is_enable", 1)
)
db.session.add(task_flow)
db.session.commit()
# 解析流程,验证合法性
try:
    parse_task_flow(task_flow.id)
except Exception as e:
    db.session.rollback()
    return jsonify({"success": False, "error": f"流程解析失败:{str(e)}"})
return jsonify({"success": True, "msg": "任务流程新增成功", "flow_id": task_flow.id})

====================== 接口:任务流程解析与预览 ======================

@task_bp.route(“/task/flow/parse/int:flow_id”, methods=[“GET”])
@login_required
def parse_flow():
“”“解析任务流程,返回执行序列(供前端预览)”“”
try:
execute_sequence = parse_task_flow(flow_id)
return jsonify({
“success”: True,
“execute_sequence”: execute_sequence,
“count”: len(execute_sequence)
})
except Exception as e:
return jsonify({“success”: False, “error”: str(e)})

====================== 前端拖拽编排页面(Jinja2模板) ======================

@task_bp.route(“/task/orchestration”, methods=[“GET”])
@login_required
def task_orchestration_page():
“”“任务编排可视化页面”“”
# 获取所有可用任务与系统配置
tasks = TaskInfo.query.filter_by(is_enable=1, task_type=“single”).all()
systems = SystemConfig.query.filter_by(is_enable=1).all()
return render_template(“task_orchestration.html”, tasks=tasks, systems=systems)

{% extends “base.html” %}
{% block content %}

任务流程可视化编排

可用任务库
{% for task in tasks %}
{{ task.task_name }} 系统:{{ task.system.system_name }} 触发方式:{{ {"timed":"定时","cron":"Cron","data_link":"数据联动"}[task.trigger_type] }}
{% else %}
暂无可用任务,请先新增单任务
{% endfor %}
<!-- 中间编排区域 -->
<div class="col-6">
  <div class="card h-100">
    <div class="card-header bg-info text-white">
      <div class="d-flex justify-content-between align-items-center">
        <h5 class="card-title mb-0">流程编排区(拖拽任务至此)</h5>
        <div>
          <select class="form-select form-select-sm d-inline-block w-auto me-2" id="executeMode">
            <option value="serial" selected>串行执行</option>
            <option value="parallel">并行执行</option>
          </select>
          <button class="btn btn-sm btn-white" onclick="clearFlow()">清空</button>
        </div>
      </div>
    </div>
    <div class="card-body">
      <div id="orchestrationArea" class="border border-dashed rounded p-3 min-vh-50">
        <div class="text-center text-muted py-5" id="emptyTip">
          <i class="bi bi-arrows-move fs-3 mb-2"></i>
          <p>拖拽左侧任务至此处编排流程,可调整顺序、设置依赖</p>
        </div>
        <div id="flowTasks" class="d-flex flex-column gap-2"></div>
      </div>
      
      <!-- 依赖设置区域 -->
      <div class="mt-4">
        <h6>依赖关系设置</h6>
        <div class="input-group mb-2">
          <select class="form-select" id="targetTask" disabled>
            <option value="">选择目标任务</option>
          </select>
          <span class="input-group-text">依赖于</span>
          <select class="form-select" id="depTask" disabled>
            <option value="">选择依赖任务</option>
          </select>
          <button class="btn btn-primary" id="addDepBtn" disabled>添加依赖</button>
        </div>
        <div id="depList" class="text-sm text-muted">暂无依赖关系</div>
      </div>
    </div>
  </div>
</div>

<!-- 右侧流程配置与提交 -->
<div class="col-3">
  <div class="card h-100">
    <div class="card-header bg-success text-white">
      <h5 class="card-title mb-0">流程配置</h5>
    </div>
    <div class="card-body">
      <form id="flowForm">
        <div class="mb-3">
          <label class="form-label">流程名称</label>
          <input type="text" class="form-control" id="flowName" placeholder="请输入流程名称" required>
        </div>
        <div class="mb-3">
          <label class="form-label">流程状态</label>
          <select class="form-select" id="flowEnable">
            <option value="1" selected>启用</option>
            <option value="0">禁用</option>
          </select>
        </div>
        <div class="mb-3">
          <label class="form-label">流程描述</label>
          <textarea class="form-control" id="flowRemark" rows="3" placeholder="可选,描述流程用途"></textarea>
        </div>
        <div class="d-grid gap-2">
          <button type="button" class="btn btn-primary" id="previewFlowBtn" disabled>预览流程</button>
          <button type="button" class="btn btn-success" id="saveFlowBtn" disabled>保存流程</button>
        </div>
      </form>
      
      <!-- 预览结果区域 -->
      <div class="mt-4" id="previewResult" style="display: none;">
        <h6>流程执行序列预览</h6>
        <div class="border rounded p-2 max-height-200 overflow-auto" id="previewList"></div>
      </div>
    </div>
  </div>
</div>

{% endblock %}

  1. 第三步:集成任务调度与故障自愈,确保自动化稳定运行
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐