Python Web 开发进阶实战:AI 编排引擎 —— 在 Flask + Vue 中构建低代码机器学习工作流平台
本文介绍了一个面向机器学习工作流的AI编排引擎设计,旨在解决传统ML开发中实验难复现、生产部署复杂等痛点。系统采用前后端分离架构,包含可视化DAG画布、组件化执行引擎和任务调度器三大模块。核心创新点包括:1) 基于JSON Schema的动态组件配置;2) 支持拓扑排序的DAG调度;3) 集成Optuna的超参自动优化;4) 组件市场实现团队协作。该平台通过低代码方式连接实验与生产环境,在销售预测
·
第一章:为什么需要 AI 编排引擎?
1.1 传统 ML 开发的痛点
| 阶段 | 问题 |
|---|
- 实验阶段 | Notebook 无法版本控制,参数散落在 cell 中
- 协作阶段 | 同事无法复现你的结果
- 生产阶段 | 需将 notebook 重构成 Airflow DAG,重复劳动
1.2 编排引擎 vs 现有方案
| 方案 | 优点 | 缺点 |
|---|
- Jupyter | 交互灵活 | 不可复现、难调度
- Airflow | 强大调度 | 学习曲线陡峭,非面向 ML
- MLflow Pipelines | 实验跟踪 | 缺少可视化编排
- 自研低代码平台 | ML 友好 + 可视化 + 轻量 | 需初期投入开发 |
定位:填补 实验 → 生产 的鸿沟,专注 ML 工作流。
第二章:平台架构设计
2.1 整体数据流
[Vue 前端]
│ (拖拽生成 DAG JSON)
↓
[Flask API] → 保存 workflow 定义
│
↓
[Celery 调度器] → 解析 DAG,按依赖执行任务
│
├── [组件执行器] → 加载 Python 模块(如 data_loader.py)
├── [Optuna 集成] → 自动超参搜索
└── [WebSocket] → 推送实时日志/状态
│
↓
[前端画布] → 高亮运行中节点,显示日志弹窗
2.2 核心抽象:组件(Component)
每个 ML 步骤封装为独立组件:
# components/base.py
class MLComponent:
name: str # 如 "CSV Data Loader"
inputs: List[str] # 输入端口 ["file_path"]
outputs: List[str] # 输出端口 ["dataframe"]
config_schema: dict # 配置表单(JSON Schema)
def run(self, inputs: dict, config: dict) -> dict:
"""执行逻辑,返回输出"""
raise NotImplementedError
示例组件:
CSVLoader:读取 CSV → 输出 DataFrameProphetTrainer:训练时间序列模型 → 输出 model.pklModelEvaluator:计算 MAE/RMSE → 输出 metrics.json
第三章:前端 DAG 画布(Vue + X6)
3.1 初始化 X6 画布
<template>
<div ref="graphContainer" class="workflow-canvas"></div>
</template>
<script setup>
import { Graph, Node } from '@antv/x6'
let graph
onMounted(() => {
graph = new Graph({
container: graphContainer.value,
grid: true,
snapline: true,
keyboard: true,
clipboard: true
})
// 注册节点模板
graph.registerNode('ml-component', {
inherit: 'rect',
width: 180,
height: 60,
attrs: {
body: { fill: '#f5f5f5', stroke: '#333' },
label: { text: '未命名组件', fill: '#333' }
}
})
})
</script>
3.2 拖拽添加组件
// 从左侧工具栏拖入
const componentList = [
{ type: 'csv_loader', name: 'CSV 数据加载器' },
{ type: 'prophet_trainer', name: 'Prophet 训练器' }
]
function addComponent(type, name) {
const node = graph.addNode({
shape: 'ml-component',
label: name,
data: { type, config: {} }, // 存储组件类型与配置
ports: {
groups: {
input: { position: 'top' },
output: { position: 'bottom' }
},
items: [
{ id: 'in-1', group: 'input' },
{ id: 'out-1', group: 'output' }
]
}
})
}
3.3 连接与配置
- 连线:X6 自动处理端口连接
- 配置弹窗:点击节点 → 弹出动态表单(基于 JSON Schema)
<!-- 动态表单 -->
<template>
<DynamicForm :schema="selectedNode.data.config_schema" v-model="config" />
</template>
表单库推荐:vue-json-schema-form 或 自研轻量版。
第四章:后端调度器实现
4.1 Workflow 定义存储
# models/workflow.py
class Workflow(db.Document):
name = StringField(required=True)
dag = DictField() # X6 导出的 JSON 结构
created_by = StringField()
# 示例 dag 结构
# {
# "nodes": [{"id": "n1", "type": "csv_loader", "config": {...}}],
# "edges": [{"source": "n1", "target": "n2"}]
# }
4.2 DAG 解析与执行
# services/scheduler.py
from celery import chain, group
def execute_workflow(workflow_id: str):
wf = Workflow.objects.get(id=workflow_id)
dag = wf.dag
# 构建拓扑排序
sorted_nodes = topological_sort(dag['nodes'], dag['edges'])
# 生成 Celery 任务链
tasks = []
for node in sorted_nodes:
component = load_component(node['type'])
task = component_task.s(
node_id=node['id'],
config=node['config'],
inputs=get_inputs_from_predecessors(node, results)
)
tasks.append(task)
# 执行
chain(*tasks).apply_async()
4.3 组件任务模板
# tasks/component_executor.py
@celery.task
def component_task(node_id: str, config: dict, inputs: dict):
# 1. 加载组件类
comp_class = get_component_class_by_type(config['type'])
component = comp_class()
# 2. 执行
outputs = component.run(inputs, config)
# 3. 保存输出(供下游使用)
save_outputs(node_id, outputs)
# 4. 推送状态
socketio.emit('node_status', {'node_id': node_id, 'status': 'completed'})
return outputs
第五章:组件开发规范
5.1 CSV 加载器示例
# components/data/csv_loader.py
class CSVLoader(MLComponent):
name = "CSV 数据加载器"
inputs = []
outputs = ["dataframe"]
config_schema = {
"type": "object",
"properties": {
"file_path": {"type": "string", "format": "file"},
"sep": {"type": "string", "default": ","}
},
"required": ["file_path"]
}
def run(self, inputs, config):
df = pd.read_csv(config["file_path"], sep=config["sep"])
return {"dataframe": df}
5.2 组件注册机制
# components/registry.py
COMPONENT_REGISTRY = {}
def register_component(name: str):
def decorator(cls):
COMPONENT_REGISTRY[name] = cls
return cls
return decorator
# 使用
@register_component("csv_loader")
class CSVLoader(MLComponent): ...
第六章:场景实战
6.1 销售预测 Pipeline
- 拖拽组件:
CSVLoader→DateFeatureEngineer→ProphetTrainer→ModelSaver
- 配置:
- CSV 路径:
/data/sales.csv - Prophet 参数:
changepoint_prior_scale=0.05
- CSV 路径:
- 执行:
- 一键运行,自动保存模型至 MLflow
6.2 图像分类 Pipeline
- 组件链:
ImageFolderLoader→AlbumentationsAugmenter→ResNetFinetuner→ConfusionMatrixEvaluator - 优势:
- 数据增强策略可配置(旋转/裁剪概率)
- 自动记录 Top-1 Accuracy 到平台
6.3 A/B 测试多模型
- 并行分支:
- 分支1:
RandomForestTrainer - 分支2:
XGBoostTrainer
- 分支1:
- 汇总节点:
ModelComparator:输出对比报告(准确率/训练时间)
- 前端展示:热力图高亮最优模型
第七章:高级功能
7.1 超参自动调优(Optuna 集成)
- 组件扩展:
OptunaTuner组件:包裹任意训练组件- 配置搜索空间(如
learning_rate: [0.001, 0.1])
# components/tuning/optuna_tuner.py
def run(self, inputs, config):
def objective(trial):
tuned_config = {k: trial.suggest_float(k, *v) for k, v in config["search_space"].items()}
outputs = wrapped_component.run(inputs, tuned_config)
return -outputs["metrics"]["accuracy"] # 最小化负准确率
study = optuna.create_study()
study.optimize(objective, n_trials=50)
return {"best_params": study.best_params}
7.2 实时日志流
- Celery 信号:捕获任务日志
- WebSocket 推送:
# 监听 Celery 任务
@celery.signals.after_task_publish.connect
def task_sent(sender=None, headers=None, **kwargs):
socketio.emit('log', f"Task {sender} queued")
@celery.signals.task_postrun.connect
def task_finished(task_id=None, retval=None, **kwargs):
socketio.emit('log', f"Task {task_id} completed")
第八章:组件市场与协作
8.1 团队共享组件
- 上传:用户可将自定义组件打包为
.zip - 审核:管理员审核后发布至市场
- 复用:其他成员直接拖入画布
8.2 版本管理
- Workflow 快照:每次运行保存完整 DAG + 组件版本
- 回滚:一键恢复到历史版本
第九章:性能与扩展
9.1 大规模 DAG 优化
- 分片执行:超大 workflow 拆分为子 DAG
- 缓存中间结果:避免重复计算(如数据清洗结果)
9.2 插件生态
- 自定义组件 SDK:提供 Python 模板
- 第三方集成:
- HuggingFace Transformers 组件
- Snowflake 数据源组件
第十章:安全与权限
10.1 代码沙箱
- 风险:用户组件可能执行任意代码
- 对策:
- Docker 容器隔离执行
- 禁止
os.system、subprocess等危险操作
10.2 数据权限
- RBAC:
- 数据科学家:可创建/运行 workflow
- 运维:可管理组件市场
- 访客:只读模式
总结:释放数据科学家的创造力
AI 编排引擎不是取代代码,而是让代码服务于更高层次的创新。
更多推荐


所有评论(0)