�� 项目功能模块代码分布

1. MinIO文件上传获取地址功能

核心文件:

  • src/backend/bisheng/utils/minio_client.py - MinIO客户端封装类
  • src/backend/bisheng/cache/utils.py - 文件上传工具函数

主要接口:

  • src/backend/bisheng/api/v1/endpoints.py 中的文件上传接口:
    • POST /api/v1/upload/icon - 上传图标
    • POST /api/v1/upload/{flow_id} - 上传文件到工作流
    • GET /api/v1/download - 获取下载链接

关键方法:

# MinioClient类中的核心方法
def upload_minio_file_io()  # 上传文件流
def get_share_link()        # 获取分享链接
def upload_minio_data()     # 上传数据

2. 智能体构建功能

核心文件:

  • src/backend/bisheng/api/v1/assistant.py - 智能体API接口
  • src/backend/bisheng/api/services/assistant_agent.py - 智能体服务层
  • src/backend/bisheng/database/models/assistant.py - 智能体数据模型

主要接口:

  • POST /api/v1/assistant - 创建智能体
  • PUT /api/v1/assistant - 更新智能体
  • POST /api/v1/assistant/status - 更新智能体状态
  • POST /api/v1/assistant/tool - 更新工具列表
  • POST /api/v1/assistant/flow - 更新技能列表
  • WebSocket /api/v1/assistant/chat/{assistant_id} - 智能体对话

数据模型:

  • Assistant - 智能体基础信息
  • AssistantLink - 智能体关联的工具和技能

3. 数据集构建功能

核心文件:

  • src/backend/bisheng/api/services/dataset_service.py - 数据集服务层
  • src/backend/bisheng/database/models/dataset.py - 数据集数据模型
  • src/backend/bisheng/api/v1/finetune.py - 微调相关接口

主要接口:

  • POST /api/v1/finetune/job/file/preset - 创建数据集
  • POST /api/v1/workflow/save_ds - 保存数据集
  • GET /api/v1/workflow/dsfiles - 获取数据集文件列表

数据模型:

  • Dataset - 数据集基础信息
  • PresetTrain - 预置训练数据

4. 工作流构建功能

核心文件:

  • src/backend/bisheng/api/v1/flows.py - 技能/工作流API接口
  • src/backend/bisheng/api/v1/workflow.py - 工作流API接口
  • src/backend/bisheng/api/services/flow.py - 工作流服务层

主要接口:

  • POST /api/v1/flows/ - 创建技能
  • POST /api/v1/workflow/create - 创建工作流
  • POST /api/v1/workflow/publish - 发布工作流/智能体
  • WebSocket /api/v1/chat/{flow_id} - 工作流对话

数据模型:

  • Flow - 工作流/技能基础信息
  • FlowVersion - 工作流版本信息

5. 构建流程相关

核心文件:

  • src/backend/bisheng/api/v1/chat.py - 构建状态管理
  • src/backend/bisheng/interface/initialize/loading.py - 组件实例化

主要接口:

  • POST /api/v1/build/init/{flow_id} - 初始化构建
  • GET /api/v1/build/{flow_id}/status - 获取构建状态
  • GET /api/v1/build/{flow_id}/stream - 流式构建状态

🔧 技术架构总结

  1. 文件存储:使用MinIO作为对象存储,支持文件上传、下载和分享链接生成
  2. 智能体:基于LangChain构建,支持工具集成、技能关联和对话功能
  3. 数据集:支持文件上传和QA知识库构建,用于模型微调
  4. 工作流:支持可视化流程构建,包含技能和工作流两种类型
  5. 构建系统:异步构建流程,支持状态查询和流式更新

所有功能都通过RESTful API和WebSocket提供接口,数据持久化使用SQLModel + MySQL,缓存使用Redis。

📊 项目中的表结构分析

1. 智能体构建对应的表

智能体表: Assistant (在 src/backend/bisheng/database/models/assistant.py)

  • 这是代表某种智能体的主要表
  • 包含智能体的基本信息:名称、描述、系统提示词、模型配置等
  • 与物模型 Product 表类似,都是用户创建的作品类型

2. 数据集和知识库的概念区别

数据集 (Dataset):

  • 表: PresetTrain (在 src/backend/bisheng/database/models/preset_train.py)
  • 特点: 主要用于模型微调,存储单个训练文件
  • 用途: 上传文件后直接用于训练,不进行复杂的文档处理

知识库 (Knowledge):

  • 表: Knowledge + KnowledgeFile (在 src/backend/bisheng/database/models/knowledge.py)
  • 特点: 支持多文件管理,进行文档解析、向量化等复杂处理
  • 用途: 用于RAG检索,支持问答功能

关系: 数据集的文件不会放在知识库里,它们是两个独立的系统。

3. 评分系统设计方案

基于您的需求,我为您设计以下评分系统:

# 评分系统相关表结构
from datetime import datetime
from enum import Enum
from typing import Optional, List
from sqlmodel import SQLModel, Field, Column, DateTime, text
from sqlalchemy import JSON

# 作品类型枚举
class WorkTypeEnum(Enum):
    PRODUCT = "product"        # 物模型产品
    ASSISTANT = "assistant"    # 智能体
    DATASET = "dataset"        # 数据集
    KNOWLEDGE = "knowledge"    # 知识库
    FLOW = "flow"             # 工作流/技能

# 评分状态枚举
class ScoreStatusEnum(Enum):
    PENDING = 0    # 待评分
    SCORED = 1     # 已评分
    REVIEWED = 2   # 已评审

# 评分指标类型枚举
class ScoreIndicatorTypeEnum(Enum):
    FUNCTIONALITY = "functionality"  # 功能性
    USABILITY = "usability"          # 易用性
    PERFORMANCE = "performance"      # 性能
    INNOVATION = "innovation"        # 创新性
    QUALITY = "quality"             # 质量

# 作品评分主表
class WorkScoreBase(SQLModel):
    user_id: int = Field(index=True, description="用户ID")
    work_type: str = Field(index=True, description="作品类型")
    work_id: str = Field(index=True, description="作品ID")
    total_score: float = Field(default=0.0, description="总分")
    status: int = Field(default=0, description="评分状态:0-待评分,1-已评分,2-已评审")
    reviewer_id: Optional[int] = Field(default=None, description="评审员ID")
    review_comment: Optional[str] = Field(default=None, description="评审意见")
    create_time: Optional[datetime] = Field(default=None, sa_column=Column(
        DateTime, nullable=False, server_default=text('CURRENT_TIMESTAMP')))
    update_time: Optional[datetime] = Field(default=None, sa_column=Column(
        DateTime, nullable=True, server_default=text('CURRENT_TIMESTAMP'), 
        onupdate=text('CURRENT_TIMESTAMP')))

class WorkScore(WorkScoreBase, table=True):
    __tablename__ = "work_scores"
    id: Optional[int] = Field(default=None, primary_key=True)

# 评分指标表
class ScoreIndicatorBase(SQLModel):
    work_score_id: int = Field(foreign_key="work_scores.id", description="作品评分ID")
    indicator_type: str = Field(index=True, description="指标类型")
    score: float = Field(description="得分")
    max_score: float = Field(default=10.0, description="满分")
    comment: Optional[str] = Field(default=None, description="指标评价")
    create_time: Optional[datetime] = Field(default=None, sa_column=Column(
        DateTime, nullable=False, server_default=text('CURRENT_TIMESTAMP')))

class ScoreIndicator(ScoreIndicatorBase, table=True):
    __tablename__ = "score_indicators"
    id: Optional[int] = Field(default=None, primary_key=True)

# 评分指标配置表
class ScoreIndicatorConfigBase(SQLModel):
    indicator_type: str = Field(unique=True, description="指标类型")
    indicator_name: str = Field(description="指标名称")
    max_score: float = Field(default=10.0, description="满分")
    weight: float = Field(default=1.0, description="权重")
    description: Optional[str] = Field(default=None, description="指标描述")
    is_active: bool = Field(default=True, description="是否启用")

class ScoreIndicatorConfig(ScoreIndicatorConfigBase, table=True):
    __tablename__ = "score_indicator_configs"
    id: Optional[int] = Field(default=None, primary_key=True)
    create_time: Optional[datetime] = Field(default=None, sa_column=Column(
        DateTime, nullable=False, server_default=text('CURRENT_TIMESTAMP')))

# 数据访问对象
class WorkScoreDao:
    @classmethod
    def create_score(cls, work_score: WorkScore) -> WorkScore:
        with session_getter() as session:
            session.add(work_score)
            session.commit()
            session.refresh(work_score)
            return work_score
    
    @classmethod
    def get_scores_by_work(cls, work_type: str, work_id: str) -> List[WorkScore]:
        with session_getter() as session:
            statement = select(WorkScore).where(
                WorkScore.work_type == work_type,
                WorkScore.work_id == work_id
            )
            return session.exec(statement).all()
    
    @classmethod
    def update_total_score(cls, work_score_id: int, total_score: float):
        with session_getter() as session:
            work_score = session.get(WorkScore, work_score_id)
            if work_score:
                work_score.total_score = total_score
                work_score.status = ScoreStatusEnum.SCORED.value
                session.add(work_score)
                session.commit()

class ScoreIndicatorDao:
    @classmethod
    def create_indicators(cls, indicators: List[ScoreIndicator]) -> List[ScoreIndicator]:
        with session_getter() as session:
            session.add_all(indicators)
            session.commit()
            for indicator in indicators:
                session.refresh(indicator)
            return indicators
    
    @classmethod
    def get_indicators_by_work_score(cls, work_score_id: int) -> List[ScoreIndicator]:
        with session_getter() as session:
            statement = select(ScoreIndicator).where(
                ScoreIndicator.work_score_id == work_score_id
            )
            return session.exec(statement).all()

🎯 评分系统使用示例

# 创建评分记录
def create_work_score(user_id: int, work_type: str, work_id: str):
    work_score = WorkScore(
        user_id=user_id,
        work_type=work_type,
        work_id=work_id,
        status=ScoreStatusEnum.PENDING.value
    )
    return WorkScoreDao.create_score(work_score)

# 添加评分指标
def add_score_indicators(work_score_id: int, scores: dict):
    indicators = []
    for indicator_type, score_data in scores.items():
        indicator = ScoreIndicator(
            work_score_id=work_score_id,
            indicator_type=indicator_type,
            score=score_data['score'],
            max_score=score_data.get('max_score', 10.0),
            comment=score_data.get('comment', '')
        )
        indicators.append(indicator)
    
    ScoreIndicatorDao.create_indicators(indicators)
    
    # 计算总分
    total_score = sum(indicator.score for indicator in indicators)
    WorkScoreDao.update_total_score(work_score_id, total_score)

# 使用示例
work_score = create_work_score(
    user_id=1, 
    work_type=WorkTypeEnum.ASSISTANT.value, 
    work_id="assistant_123"
)

scores = {
    WorkIndicatorTypeEnum.FUNCTIONALITY.value: {"score": 8.5, "comment": "功能完整"},
    WorkIndicatorTypeEnum.USABILITY.value: {"score": 9.0, "comment": "界面友好"},
    WorkIndicatorTypeEnum.PERFORMANCE.value: {"score": 7.5, "comment": "响应较快"},
    WorkIndicatorTypeEnum.INNOVATION.value: {"score": 8.0, "comment": "有一定创新"},
    WorkIndicatorTypeEnum.QUALITY.value: {"score": 8.5, "comment": "代码质量良好"}
}

add_score_indicators(work_score.id, scores)

这个评分系统设计具有以下特点:

  1. 灵活性:支持多种作品类型(物模型、智能体、数据集、知识库、工作流)
  2. 可扩展性:通过配置表可以动态添加新的评分指标
  3. 完整性:包含总分、分项评分、评审意见等完整信息
  4. 可追溯性:记录评分时间和评审员信息
  5. 权重支持:可以为不同指标设置权重
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐