Docker沙箱、LangGraph、FastAPI整合到Multi-Agent系统的技术方案 4

第九章:完整实战项目:构建基于Docker+LangGraph+FastAPI的Multi-Agent系统

引言:从理论到实践的完整旅程

在前八章中,我们深入探讨了Docker沙箱技术、LangGraph框架、FastAPI接口层和多智能体系统设计的各个方面,构建了完整的技术知识体系。Docker提供了容器化隔离和资源管理的能力,确保每个智能体可以在独立、安全的环境中运行。LangGraph提供了基于图计算的智能体编排框架,使得复杂的工作流程能够以直观、灵活的方式表达和管理。FastAPI提供了高性能的API服务框架,将智能体系统的功能暴露为标准的接口。多智能体系统设计理论为我们提供了智能体节点建模、消息传递、状态共享和并发控制的基本原理。

然而,理论知识需要通过实践来巩固和验证。本章将带领读者完成一个完整的实战项目:构建一个基于Docker+LangGraph+FastAPI的Multi-Agent系统。我们将从需求分析开始,逐步深入到架构设计、代码实现、测试部署和运维监控,完成一个完整的智能体系统开发周期。通过这个实战项目,读者将亲身体验如何将理论知识转化为实际可用的系统,掌握构建生产级多智能体系统的完整流程。

本章的实战项目将构建一个"智能数据分析系统",该系统能够接收用户的数据分析请求,通过多个智能体的协作完成数据收集、处理、分析和报告生成。系统包含以下核心智能体:

  1. 请求解析智能体:解析用户请求,确定分析需求。
  2. 数据收集智能体:从多个数据源收集相关数据。
  3. 数据清洗智能体:清洗和预处理收集的数据。
  4. 分析引擎智能体:执行数据分析算法。
  5. 报告生成智能体:生成分析报告和可视化图表。
  6. 系统监控智能体:监控系统运行状态和性能。

通过这个实战项目,读者将学习到:

  1. 如何分析多智能体系统的业务需求和技术需求。
  2. 如何设计合理的系统架构和智能体协作流程。
  3. 如何使用LangGraph构建智能体工作流。
  4. 如何使用FastAPI构建RESTful API和WebSocket接口。
  5. 如何使用Docker容器化部署智能体系统。
  6. 如何实现系统的监控、日志和故障恢复。
  7. 如何进行系统测试和性能优化。

本章将提供完整的代码示例、配置文件和部署脚本,读者可以按照步骤实际操作,构建自己的多智能体系统。通过这个实战项目,读者将获得构建复杂智能体系统的实践经验,为在实际工作中应用多智能体技术奠定坚实基础。

9.1 项目需求分析

9.1.1 业务需求分析

智能数据分析系统的业务需求来源于企业对数据驱动决策的需求。随着数据量的增长和业务复杂度的提高,传统的数据分析工具和方法已经无法满足快速、准确、智能的分析需求。智能数据分析系统通过多个智能体的协作,实现自动化、智能化的数据分析流程。

核心业务需求

  1. 多源数据整合:系统需要能够从多个数据源(数据库、API、文件、网络)收集数据,并进行统一整合。
  2. 智能分析流程:系统需要根据分析需求自动选择合适的分析方法和算法,生成有价值的分析结果。
  3. 实时处理能力:系统需要支持实时数据分析和流式处理,及时响应业务变化。
  4. 可视化报告:系统需要生成直观的可视化报告,帮助用户理解分析结果。
  5. 可扩展性:系统需要支持新的数据源、分析算法和报告模板的扩展。
  6. 用户友好性:系统需要提供简单易用的接口,支持多种客户端访问方式。

用户场景

  1. 业务分析师:需要快速分析市场趋势、用户行为、销售数据等,生成分析报告。
  2. 数据科学家:需要执行复杂的数据分析算法,验证数据假设,发现数据模式。
  3. 系统管理员:需要监控系统运行状态,管理智能体实例,处理系统故障。
  4. 开发人员:需要扩展系统功能,添加新的数据源或分析算法。

功能需求

  1. 数据收集功能

    • 支持多种数据源连接(MySQL、PostgreSQL、MongoDB、Redis、Elasticsearch、API等)
    • 支持批量数据和流式数据收集
    • 支持数据源配置和管理
  2. 数据处理功能

    • 数据清洗和预处理
    • 数据转换和标准化
    • 数据质量检查
  3. 数据分析功能

    • 统计分析(描述统计、推断统计)
    • 机器学习分析(分类、回归、聚类)
    • 时间序列分析
    • 文本分析
    • 图像分析
  4. 报告生成功能

    • 自动生成分析报告
    • 支持多种报告格式(HTML、PDF、Markdown)
    • 可视化图表生成(折线图、柱状图、散点图、热力图等)
  5. 系统管理功能

    • 智能体管理(启动、停止、监控)
    • 任务管理(创建、执行、监控)
    • 用户管理(认证、授权、权限控制)
    • 系统监控(性能监控、日志管理、告警)

非功能需求

  1. 性能需求

    • 响应时间:普通分析请求应在30秒内完成
    • 吞吐量:系统应支持每秒处理10个以上分析请求
    • 并发能力:系统应支持100个以上并发用户
  2. 可靠性需求

    • 可用性:系统应达到99.9%的可用性
    • 容错性:单个智能体故障不应影响整个系统
    • 数据一致性:分析结果应保证一致性
  3. 安全性需求

    • 数据安全:敏感数据应加密存储和传输
    • 访问控制:严格的用户认证和权限控制
    • 审计日志:所有操作应有完整的审计日志
  4. 可维护性需求

    • 模块化设计:系统应模块化设计,便于维护和扩展
    • 文档完整:系统应有完整的技术文档和用户文档
    • 监控完善:系统应有完善的监控和告警机制

9.1.2 技术需求分析

基于业务需求,我们需要选择合适的技术栈来构建智能数据分析系统。技术需求分析确保所选技术能够满足系统的功能和非功能需求。

核心技术栈选择

  1. 容器化平台:Docker

    • 理由:提供智能体隔离、环境一致性、快速部署
    • 版本:Docker 20.10+,Docker Compose 2.0+
  2. 智能体编排框架:LangGraph

    • 理由:基于图计算的智能体编排,支持复杂工作流
    • 版本:LangGraph 0.0.20+
  3. API框架:FastAPI

    • 理由:高性能、异步支持、自动文档生成
    • 版本:FastAPI 0.104+
  4. 编程语言:Python

    • 理由:丰富的AI/ML库、良好的异步支持、社区活跃
    • 版本:Python 3.9+
  5. 数据存储

    • 关系数据库:PostgreSQL(元数据存储)
    • 缓存数据库:Redis(缓存和消息队列)
    • 搜索引擎:Elasticsearch(数据搜索)
    • 对象存储:MinIO(文件存储)
  6. 消息队列:RabbitMQ或Redis Streams

    • 理由:支持智能体间异步通信
  7. 监控系统

    • 指标收集:Prometheus
    • 可视化:Grafana
    • 日志收集:ELK Stack(Elasticsearch, Logstash, Kibana)
  8. 部署平台

    • 开发环境:Docker Compose
    • 生产环境:Kubernetes

技术约束

  1. 兼容性约束

    • 系统应兼容主流操作系统(Linux、macOS、Windows)
    • 系统应兼容主流浏览器(Chrome、Firefox、Safari)
    • 系统应提供RESTful API和WebSocket接口
  2. 性能约束

    • 单个智能体容器内存限制不超过2GB
    • 数据库查询响应时间不超过100ms
    • API接口响应时间不超过500ms
  3. 安全约束

    • 使用HTTPS加密通信
    • 数据库连接使用SSL/TLS
    • 敏感配置使用环境变量或密钥管理服务
  4. 部署约束

    • 支持容器化部署
    • 支持水平扩展
    • 支持蓝绿部署或金丝雀发布

技术风险与应对

  1. 技术复杂度风险

    • 风险:多智能体系统技术栈复杂,学习曲线陡峭
    • 应对:提供详细文档、示例代码、培训材料
  2. 性能瓶颈风险

    • 风险:智能体间通信可能成为性能瓶颈
    • 应对:使用异步通信、消息队列、连接池优化
  3. 数据一致性风险

    • 风险:分布式智能体可能导致数据不一致
    • 应对:使用事务、分布式锁、一致性算法
  4. 安全风险

    • 风险:容器逃逸、数据泄露、未授权访问
    • 应对:容器安全配置、网络隔离、访问控制

9.1.3 系统架构设计

基于需求分析,我们设计智能数据分析系统的架构。系统采用微服务架构,每个智能体作为独立的微服务,通过API网关和消息队列进行通信。

整体架构图

┌─────────────────────────────────────────────────────────────┐
│                       客户端层                               │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐    │
│  │ Web前端  │  │ 移动端   │  │ CLI工具  │  │ API调用  │    │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘    │
└─────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────┐
│                       API网关层                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │                 FastAPI API网关                      │   │
│  │  • RESTful API端点                                   │   │
│  │  • WebSocket连接                                     │   │
│  │  • 认证授权                                          │   │
│  │  • 请求路由                                          │   │
│  │  • 限流熔断                                          │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────┐
│                   智能体编排层                               │
│  ┌──────────────────────────────────────────────────────┐   │
│  │                 LangGraph工作流引擎                   │   │
│  │  • 工作流定义                                        │   │
│  │  • 状态管理                                          │   │
│  │  • 条件分支                                          │   │
│  │  • 循环控制                                          │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌───────┬───────┬───────┬───────┬───────┬───────┬───────┬─────┐
│       │       │       │       │       │       │       │     │
│ 请求  │ 数据  │ 数据  │ 分析  │ 报告  │ 监控  │ 通知  │ ... │
│ 解析  │ 收集  │ 清洗  │ 引擎  │ 生成  │ 智能  │ 智能  │     │
│ 智能体│ 智能体│ 智能体│ 智能体│ 智能体│ 体    │ 体    │     │
│       │       │       │       │       │       │       │     │
└───────┴───────┴───────┴───────┴───────┴───────┴───────┴─────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────┐
│                   消息中间件层                               │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐    │
│  │ RabbitMQ │  │ Redis    │  │ Kafka    │  │ NATS     │    │
│  │ (消息队列)│  │ (缓存)   │  │ (流处理) │  │ (消息)   │    │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘    │
└─────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────┐
│                   数据存储层                                 │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐    │
│  │PostgreSQL│  │ Elastic- │  │  Redis   │  │  MinIO   │    │
│  │ (元数据) │  │ search   │  │ (缓存)   │  │ (文件)   │    │
│  │          │  │ (搜索)   │  │          │  │          │    │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘    │
└─────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────┐
│                   监控运维层                                 │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐    │
│  │Prometheus│  │ Grafana  │  │  ELK     │  │ Jaeger   │    │
│  │ (指标)   │  │ (可视化) │  │ (日志)   │  │ (追踪)   │    │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘    │
└─────────────────────────────────────────────────────────────┘

架构组件详细说明

  1. 客户端层

    • Web前端:基于React/Vue的Web应用
    • 移动端:iOS/Android移动应用
    • CLI工具:命令行接口工具
    • API调用:第三方系统通过API集成
  2. API网关层

    • 基于FastAPI构建的统一API网关
    • 提供RESTful API和WebSocket接口
    • 处理认证、授权、限流、熔断
    • 路由请求到后端智能体服务
  3. 智能体编排层

    • 基于LangGraph的工作流引擎
    • 定义智能体协作流程
    • 管理工作流状态和执行
    • 支持条件分支和循环控制
  4. 智能体服务层

    • 请求解析智能体:解析用户请求,确定分析需求
    • 数据收集智能体:从多个数据源收集数据
    • 数据清洗智能体:清洗和预处理数据
    • 分析引擎智能体:执行数据分析算法
    • 报告生成智能体:生成分析报告和可视化
    • 监控智能体:监控系统运行状态
    • 通知智能体:发送分析结果通知
  5. 消息中间件层

    • RabbitMQ:智能体间异步消息通信
    • Redis:缓存和快速数据访问
    • Kafka:流式数据处理
    • NATS:轻量级消息通信
  6. 数据存储层

    • PostgreSQL:存储系统元数据、用户信息、任务信息
    • Elasticsearch:存储和分析数据,支持全文搜索
    • Redis:缓存热点数据,提高访问速度
    • MinIO:存储分析报告、可视化图表、日志文件
  7. 监控运维层

    • Prometheus:收集系统指标和性能数据
    • Grafana:可视化监控数据
    • ELK Stack:收集、存储、分析日志
    • Jaeger:分布式追踪,分析请求链路

架构设计原则

  1. 微服务原则:每个智能体作为独立的微服务,可以独立开发、测试、部署、扩展。
  2. 松耦合原则:智能体之间通过API和消息队列通信,减少直接依赖。
  3. 高内聚原则:每个智能体专注于单一功能领域,提高代码质量。
  4. 容错设计:系统设计考虑故障隔离和自动恢复。
  5. 可观测性:系统提供完善的监控、日志、追踪能力。
  6. 安全性:系统设计考虑数据安全、访问控制、审计日志。
  7. 可扩展性:系统支持水平扩展和垂直扩展。

数据流设计

  1. 用户请求流程

    用户请求 → API网关 → 请求解析智能体 → LangGraph工作流 → 数据收集智能体 → 
    数据清洗智能体 → 分析引擎智能体 → 报告生成智能体 → 通知智能体 → 用户
    
  2. 系统监控流程

    监控智能体 → 收集指标 → Prometheus → Grafana → 告警系统 → 管理员
    
  3. 日志收集流程

    各智能体日志 → Logstash → Elasticsearch → Kibana → 管理员
    

接口设计

  1. RESTful API接口

    • POST /api/v1/analysis:创建分析任务
    • GET /api/v1/analysis/{task_id}:获取分析任务状态
    • GET /api/v1/analysis/{task_id}/result:获取分析结果
    • GET /api/v1/agents:获取智能体列表
    • POST /api/v1/agents/{agent_id}/restart:重启智能体
  2. WebSocket接口

    • /ws/analysis/{task_id}:分析任务实时状态推送
    • /ws/system:系统监控数据实时推送
  3. 消息队列接口

    • analysis.request:分析请求消息
    • analysis.result:分析结果消息
    • system.alert:系统告警消息

安全设计

  1. 认证机制:JWT令牌认证
  2. 授权机制:基于角色的访问控制(RBAC)
  3. 数据加密:传输层使用TLS,存储层使用AES加密
  4. 审计日志:记录所有用户操作和系统事件
  5. 网络安全:容器网络隔离,防火墙规则

通过合理的架构设计,智能数据分析系统能够满足业务需求和技术需求,提供高性能、高可用、高安全的智能分析服务。在下一节中,我们将开始具体的代码实现,构建系统的各个组件。

9.1.4 项目目录结构

在开始代码实现之前,我们需要规划项目的目录结构。合理的目录结构有助于代码组织、模块划分和团队协作。

smart-data-analysis-system/
├── README.md                    # 项目说明文档
├── LICENSE                      # 许可证文件
├── .gitignore                   # Git忽略文件
├── .dockerignore                # Docker忽略文件
├── pyproject.toml               # Python项目配置
├── requirements.txt             # Python依赖文件
├── requirements-dev.txt         # 开发环境依赖
├── docker-compose.yml           # Docker Compose配置
├── docker-compose.prod.yml      # 生产环境配置
├── docker-compose.dev.yml       # 开发环境配置
├── kubernetes/                  # Kubernetes配置
│   ├── namespace.yaml
│   ├── rbac.yaml
│   ├── secrets.yaml
│   ├── configmaps.yaml
│   ├── persistent-volumes.yaml
│   ├── postgres.yaml
│   ├── redis.yaml
│   ├── elasticsearch.yaml
│   ├── rabbitmq.yaml
│   ├── minio.yaml
│   ├── planner-agent.yaml
│   ├── searcher-agent.yaml
│   ├── analyzer-agent.yaml
│   ├── executor-agent.yaml
│   ├── monitor-agent.yaml
│   ├── api-gateway.yaml
│   ├── ingress.yaml
│   └── monitoring.yaml
├── scripts/                     # 部署和管理脚本
│   ├── build.sh
│   ├── deploy.sh
│   ├── test.sh
│   ├── monitor.sh
│   └── backup.sh
├── docs/                        # 项目文档
│   ├── architecture.md
│   ├── api-reference.md
│   ├── deployment-guide.md
│   ├── user-guide.md
│   └── developer-guide.md
├── config/                      # 配置文件
│   ├── development/
│   │   ├── api.yaml
│   │   ├── planner.yaml
│   │   ├── searcher.yaml
│   │   ├── analyzer.yaml
│   │   ├── executor.yaml
│   │   └── monitor.yaml
│   ├── production/
│   │   └── ...
│   └── testing/
│       └── ...
├── src/                         # 源代码
│   ├── api_gateway/             # API网关
│   │   ├── __init__.py
│   │   ├── main.py              # FastAPI应用入口
│   │   ├── config.py            # 配置管理
│   │   ├── dependencies.py      # 依赖注入
│   │   ├── middleware.py        # 中间件
│   │   ├── routers/             # 路由模块
│   │   │   ├── __init__.py
│   │   │   ├── analysis.py      # 分析路由
│   │   │   ├── agents.py        # 智能体路由
│   │   │   ├── system.py        # 系统路由
│   │   │   └── websocket.py     # WebSocket路由
│   │   ├── schemas/             # Pydantic模型
│   │   │   ├── __init__.py
│   │   │   ├── analysis.py
│   │   │   ├── agent.py
│   │   │   └── system.py
│   │   ├── services/            # 业务服务
│   │   │   ├── __init__.py
│   │   │   ├── analysis_service.py
│   │   │   ├── agent_service.py
│   │   │   └── auth_service.py
│   │   └── utils/               # 工具函数
│   │       ├── __init__.py
│   │       ├── logging.py
│   │       ├── security.py
│   │       └── validation.py
│   ├── workflow_engine/         # 工作流引擎
│   │   ├── __init__.py
│   │   ├── main.py              # LangGraph工作流引擎
│   │   ├── config.py
│   │   ├── workflows/           # 工作流定义
│   │   │   ├── __init__.py
│   │   │   ├── data_analysis.py # 数据分析工作流
│   │   │   ├── data_collection.py
│   │   │   └── report_generation.py
│   │   ├── nodes/               # 工作流节点
│   │   │   ├── __init__.py
│   │   │   ├── request_parser.py
│   │   │   ├── data_collector.py
│   │   │   ├── data_cleaner.py
│   │   │   ├── analyzer.py
│   │   │   └── report_generator.py
│   │   └── utils/
│   │       ├── __init__.py
│   │       ├── state_management.py
│   │       └── error_handling.py
│   ├── agents/                  # 智能体实现
│   │   ├── __init__.py
│   │   ├── base_agent.py        # 智能体基类
│   │   ├── request_parser/      # 请求解析智能体
│   │   │   ├── __init__.py
│   │   │   ├── main.py
│   │   │   ├── config.py
│   │   │   ├── parser.py
│   │   │   └── utils.py
│   │   ├── data_collector/      # 数据收集智能体
│   │   │   ├── __init__.py
│   │   │   ├── main.py
│   │   │   ├── config.py
│   │   │   ├── collectors/      # 数据收集器
│   │   │   │   ├── __init__.py
│   │   │   │   ├── database_collector.py
│   │   │   │   ├── api_collector.py
│   │   │   │   ├── file_collector.py
│   │   │   │   └── web_collector.py
│   │   │   └── utils.py
│   │   ├── data_cleaner/        # 数据清洗智能体
│   │   │   ├── __init__.py
│   │   │   ├── main.py
│   │   │   ├── config.py
│   │   │   ├── cleaners/        # 数据清洗器
│   │   │   │   ├── __init__.py
│   │   │   │   ├── missing_handler.py
│   │   │   │   ├── outlier_detector.py
│   │   │   │   ├── normalizer.py
│   │   │   │   └── validator.py
│   │   │   └── utils.py
│   │   ├── analyzer/            # 分析引擎智能体
│   │   │   ├── __init__.py
│   │   │   ├── main.py
│   │   │   ├── config.py
│   │   │   ├── analyzers/       # 分析器
│   │   │   │   ├── __init__.py
│   │   │   │   ├── statistical_analyzer.py
│   │   │   │   ├── ml_analyzer.py
│   │   │   │   ├── time_series_analyzer.py
│   │   │   │   └── text_analyzer.py
│   │   │   └── utils.py
│   │   ├── report_generator/    # 报告生成智能体
│   │   │   ├── __init__.py
│   │   │   ├── main.py
│   │   │   ├── config.py
│   │   │   ├── generators/      # 报告生成器
│   │   │   │   ├── __init__.py
│   │   │   │   ├── html_generator.py
│   │   │   │   ├── pdf_generator.py
│   │   │   │   ├── markdown_generator.py
│   │   │   │   └── chart_generator.py
│   │   │   └── utils.py
│   │   ├── monitor/             # 监控智能体
│   │   │   ├── __init__.py
│   │   │   ├── main.py
│   │   │   ├── config.py
│   │   │   ├── monitors/        # 监控器
│   │   │   │   ├── __init__.py
│   │   │   │   ├── system_monitor.py
│   │   │   │   ├── agent_monitor.py
│   │   │   │   ├── performance_monitor.py
│   │   │   │   └── alert_manager.py
│   │   │   └── utils.py
│   │   └── notifier/            # 通知智能体
│   │       ├── __init__.py
│   │       ├── main.py
│   │       ├── config.py
│   │       ├── notifiers/       # 通知器
│   │       │   ├── __init__.py
│   │       │   ├── email_notifier.py
│   │       │   ├── webhook_notifier.py
│   │       │   ├── slack_notifier.py
│   │       │   └── sms_notifier.py
│   │       └── utils.py
│   ├── shared/                  # 共享代码
│   │   ├── __init__.py
│   │   ├── models/              # 数据模型
│   │   │   ├── __init__.py
│   │   │   ├── analysis.py
│   │   │   ├── agent.py
│   │   │   └── system.py
│   │   ├── database/            # 数据库层
│   │   │   ├── __init__.py
│   │   │   ├── base.py
│   │   │   ├── session.py
│   │   │   ├── repositories/    # 数据仓库
│   │   │   │   ├── __init__.py
│   │   │   │   ├── analysis_repository.py
│   │   │   │   ├── agent_repository.py
│   │   │   │   └── user_repository.py
│   │   │   └── migrations/      # 数据库迁移
│   │   │       ├── __init__.py
│   │   │       └── versions/
│   │   ├── messaging/           # 消息中间件
│   │   │   ├── __init__.py
│   │   │   ├── base.py
│   │   │   ├── rabbitmq_client.py
│   │   │   ├── redis_client.py
│   │   │   └── kafka_client.py
│   │   └── utils/               # 共享工具
│   │       ├── __init__.py
│   │       ├── logging.py
│   │       ├── security.py
│   │       ├── validation.py
│   │       └── serialization.py
│   └── tests/                   # 测试代码
│       ├── __init__.py
│       ├── conftest.py
│       ├── unit/                # 单元测试
│       │   ├── __init__.py
│       │   ├── test_api_gateway.py
│       │   ├── test_workflow_engine.py
│       │   └── test_agents.py
│       ├── integration/         # 集成测试
│       │   ├── __init__.py
│       │   ├── test_analysis_flow.py
│       │   └── test_system_integration.py
│       └── e2e/                 # 端到端测试
│           ├── __init__.py
│           └── test_full_workflow.py
├── docker/                      # Docker配置
│   ├── api_gateway/
│   │   └── Dockerfile
│   ├── workflow_engine/
│   │   └── Dockerfile
│   ├── agents/
│   │   ├── request_parser/
│   │   │   └── Dockerfile
│   │   ├── data_collector/
│   │   │   └── Dockerfile
│   │   ├── data_cleaner/
│   │   │   └── Dockerfile
│   │   ├── analyzer/
│   │   │   └── Dockerfile
│   │   ├── report_generator/
│   │   │   └── Dockerfile
│   │   ├── monitor/
│   │   │   └── Dockerfile
│   │   └── notifier/
│   │       └── Dockerfile
│   └── shared/
│       └── Dockerfile
├── monitoring/                  # 监控配置
│   ├── prometheus/
│   │   └── prometheus.yml
│   ├── grafana/
│   │   ├── dashboards/
│   │   │   ├── system.json
│   │   │   ├── agents.json
│   │   │   └── analysis.json
│   │   └── datasources/
│   │       └── prometheus.yml
│   └── alerts/
│       └── alert_rules.yml
└── logs/                        # 日志目录(运行时生成)
    ├── api_gateway/
    ├── workflow_engine/
    └── agents/

目录结构说明

  1. 根目录:包含项目配置文件、文档和部署配置。
  2. kubernetes/:Kubernetes部署配置文件。
  3. scripts/:自动化部署和管理脚本。
  4. docs/:项目文档,包括架构、API参考、部署指南等。
  5. config/:配置文件,按环境分离。
  6. src/:源代码目录,按模块组织。
  7. docker/:Docker构建配置,每个服务有独立的Dockerfile。
  8. monitoring/:监控系统配置。
  9. logs/:日志文件目录(运行时生成)。

模块划分原则

  1. 功能模块化:每个智能体作为独立模块,可以独立开发、测试、部署。
  2. 代码复用:共享代码放在shared目录,避免重复。
  3. 配置分离:配置文件按环境分离,便于部署管理。
  4. 测试完整:包含单元测试、集成测试、端到端测试。
  5. 文档齐全:每个模块都有相应的文档说明。

通过合理的目录结构设计,项目具有良好的可维护性、可扩展性和团队协作性。在下一节中,我们将开始具体的代码实现,从API网关开始构建系统的各个组件。

9.2 代码实现

9.2.1 API网关实现

API网关是整个系统的入口,负责接收客户端请求、路由到后端服务、处理认证授权等。我们使用FastAPI构建高性能的API网关。

API网关主应用

# src/api_gateway/main.py
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
import logging
import time
from typing import Dict, Any

from .config import settings
from .middleware import RequestIDMiddleware, LoggingMiddleware, RateLimitMiddleware
from .routers import analysis, agents, system, websocket
from .dependencies import get_current_user, get_db_session
from .utils.logging import setup_logging
from .utils.security import verify_api_key

# 设置日志
logger = setup_logging(__name__)

@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理"""
    # 启动时
    logger.info("API网关启动中...")
    
    # 初始化数据库连接池
    from .database import init_db_pool
    await init_db_pool()
    
    # 初始化消息队列连接
    from .messaging import init_mq_connection
    await init_mq_connection()
    
    logger.info("API网关启动完成")
    
    yield
    
    # 关闭时
    logger.info("API网关关闭中...")
    
    # 关闭数据库连接池
    from .database import close_db_pool
    await close_db_pool()
    
    # 关闭消息队列连接
    from .messaging import close_mq_connection
    await close_mq_connection()
    
    logger.info("API网关关闭完成")

# 创建FastAPI应用
app = FastAPI(
    title="智能数据分析系统API网关",
    description="基于Docker+LangGraph+FastAPI的多智能体系统API网关",
    version="1.0.0",
    docs_url="/docs" if settings.DEBUG else None,
    redoc_url="/redoc" if settings.DEBUG else None,
    openapi_url="/openapi.json" if settings.DEBUG else None,
    lifespan=lifespan
)

# 添加CORS中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.CORS_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 添加自定义中间件
app.add_middleware(RequestIDMiddleware)
app.add_middleware(LoggingMiddleware)
app.add_middleware(RateLimitMiddleware)

# 注册路由
app.include_router(
    analysis.router,
    prefix="/api/v1/analysis",
    tags=["分析"],
    dependencies=[Depends(verify_api_key)]
)

app.include_router(
    agents.router,
    prefix="/api/v1/agents",
    tags=["智能体"],
    dependencies=[Depends(verify_api_key)]
)

app.include_router(
    system.router,
    prefix="/api/v1/system",
    tags=["系统"],
    dependencies=[Depends(verify_api_key)]
)

app.include_router(
    websocket.router,
    prefix="/ws",
    tags=["WebSocket"]
)

# 根端点
@app.get("/")
async def root():
    """根端点,返回系统信息"""
    return {
        "service": "智能数据分析系统API网关",
        "version": "1.0.0",
        "status": "running",
        "endpoints": {
            "analysis": "/api/v1/analysis",
            "agents": "/api/v1/agents",
            "system": "/api/v1/system",
            "docs": "/docs",
            "health": "/health"
        }
    }

# 健康检查端点
@app.get("/health")
async def health_check():
    """健康检查端点"""
    health_status = {
        "status": "healthy",
        "timestamp": time.time(),
        "components": {}
    }
    
    # 检查数据库连接
    try:
        from .database import check_db_connection
        db_ok = await check_db_connection()
        health_status["components"]["database"] = "healthy" if db_ok else "unhealthy"
    except Exception as e:
        health_status["components"]["database"] = f"error: {str(e)}"
    
    # 检查消息队列连接
    try:
        from .messaging import check_mq_connection
        mq_ok = await check_mq_connection()
        health_status["components"]["message_queue"] = "healthy" if mq_ok else "unhealthy"
    except Exception as e:
        health_status["components"]["message_queue"] = f"error: {str(e)}"
    
    # 检查后端服务连接
    try:
        from .services.agent_service import check_agent_health
        agent_health = await check_agent_health()
        health_status["components"]["agents"] = agent_health
    except Exception as e:
        health_status["components"]["agents"] = f"error: {str(e)}"
    
    # 确定总体状态
    unhealthy_components = [
        status for status in health_status["components"].values()
        if status != "healthy"
    ]
    
    if unhealthy_components:
        health_status["status"] = "degraded"
        health_status["unhealthy_components"] = unhealthy_components
    
    return health_status

# 就绪检查端点
@app.get("/ready")
async def readiness_check():
    """就绪检查端点"""
    # 检查关键依赖是否就绪
    dependencies_ready = True
    issues = []
    
    # 检查数据库
    try:
        from .database import check_db_connection
        if not await check_db_connection():
            dependencies_ready = False
            issues.append("database_not_ready")
    except Exception:
        dependencies_ready = False
        issues.append("database_error")
    
    # 检查消息队列
    try:
        from .messaging import check_mq_connection
        if not await check_mq_connection():
            dependencies_ready = False
            issues.append("message_queue_not_ready")
    except Exception:
        dependencies_ready = False
        issues.append("message_queue_error")
    
    if dependencies_ready:
        return {"status": "ready", "timestamp": time.time()}
    else:
        return JSONResponse(
            status_code=503,
            content={
                "status": "not_ready",
                "timestamp": time.time(),
                "issues": issues
            }
        )

# 全局异常处理器
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
    """HTTP异常处理器"""
    logger.warning(
        f"HTTP异常: {exc.status_code} - {exc.detail}",
        extra={
            "request_id": request.state.request_id,
            "path": request.url.path,
            "method": request.method
        }
    )
    
    return JSONResponse(
        status_code=exc.status_code,
        content={
            "error": {
                "code": exc.status_code,
                "message": exc.detail,
                "request_id": request.state.request_id
            }
        }
    )

@app.exception_handler(Exception)
async def generic_exception_handler(request: Request, exc: Exception):
    """通用异常处理器"""
    logger.error(
        f"未处理异常: {str(exc)}",
        exc_info=True,
        extra={
            "request_id": request.state.request_id,
            "path": request.url.path,
            "method": request.method
        }
    )
    
    return JSONResponse(
        status_code=500,
        content={
            "error": {
                "code": 500,
                "message": "内部服务器错误",
                "request_id": request.state.request_id
            }
        }
    )

# 指标端点(用于Prometheus监控)
@app.get("/metrics")
async def metrics():
    """Prometheus指标端点"""
    from prometheus_client import generate_latest
    from .middleware import REQUEST_COUNT, REQUEST_LATENCY
    
    return Response(
        content=generate_latest(),
        media_type="text/plain"
    )

if __name__ == "__main__":
    import uvicorn
    
    uvicorn.run(
        "main:app",
        host=settings.HOST,
        port=settings.PORT,
        reload=settings.DEBUG,
        log_level="info"
    )

API网关配置管理

# src/api_gateway/config.py
from pydantic_settings import BaseSettings
from typing import List, Optional
from functools import lru_cache

class Settings(BaseSettings):
    """应用配置"""
    # 应用设置
    APP_NAME: str = "智能数据分析系统API网关"
    APP_VERSION: str = "1.0.0"
    DEBUG: bool = False
    
    # 服务器设置
    HOST: str = "0.0.0.0"
    PORT: int = 8000
    
    # 数据库设置
    DATABASE_URL: str = "postgresql+asyncpg://user:password@localhost/agentdb"
    DATABASE_POOL_SIZE: int = 20
    DATABASE_MAX_OVERFLOW: int = 30
    
    # Redis设置
    REDIS_URL: str = "redis://localhost:6379/0"
    REDIS_POOL_SIZE: int = 10
    
    # RabbitMQ设置
    RABBITMQ_URL: str = "amqp://guest:guest@localhost:5672/"
    RABBITMQ_POOL_SIZE: int = 5
    
    # 认证设置
    SECRET_KEY: str = "your-secret-key-here"
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
    API_KEY_HEADER: str = "X-API-Key"
    
    # CORS设置
    CORS_ORIGINS: List[str] = ["http://localhost:3000", "http://localhost:8080"]
    
    # 限流设置
    RATE_LIMIT_ENABLED: bool = True
    RATE_LIMIT_REQUESTS: int = 100
    RATE_LIMIT_PERIOD: int = 60  # 秒
    
    # 后端服务设置
    WORKFLOW_ENGINE_URL: str = "http://workflow-engine:8001"
    PLANNER_AGENT_URL: str = "http://planner-agent:8002"
    SEARCHER_AGENT_URL: str = "http://searcher-agent:8003"
    ANALYZER_AGENT_URL: str = "http://analyzer-agent:8004"
    EXECUTOR_AGENT_URL: str = "http://executor-agent:8005"
    MONITOR_AGENT_URL: str = "http://monitor-agent:8006"
    
    # 日志设置
    LOG_LEVEL: str = "INFO"
    LOG_FORMAT: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    LOG_FILE: Optional[str] = "/app/logs/api_gateway.log"
    
    # 监控设置
    PROMETHEUS_ENABLED: bool = True
    METRICS_PORT: int = 9090
    
    class Config:
        env_file = ".env"
        case_sensitive = True

@lru_cache()
def get_settings() -> Settings:
    """获取配置(缓存)"""
    return Settings()

settings = get_settings()

API网关中间件

# src/api_gateway/middleware.py
import time
import uuid
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
import logging
from typing import Dict, Any
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

logger = logging.getLogger(__name__)

# 限流器
limiter = Limiter(key_func=get_remote_address)

class RequestIDMiddleware(BaseHTTPMiddleware):
    """请求ID中间件"""
    async def dispatch(self, request: Request, call_next):
        # 生成请求ID
        request_id = str(uuid.uuid4())
        request.state.request_id = request_id
        
        # 处理请求
        response = await call_next(request)
        
        # 添加请求ID到响应头
        response.headers["X-Request-ID"] = request_id
        
        return response

class LoggingMiddleware(BaseHTTPMiddleware):
    """日志中间件"""
    async def dispatch(self, request: Request, call_next):
        # 记录请求开始
        start_time = time.time()
        
        # 记录请求信息
        logger.info(
            f"请求开始: {request.method} {request.url.path}",
            extra={
                "request_id": request.state.request_id,
                "client_ip": request.client.host if request.client else "unknown",
                "user_agent": request.headers.get("user-agent", "unknown")
            }
        )
        
        # 处理请求
        try:
            response = await call_next(request)
            
            # 计算处理时间
            process_time = time.time() - start_time
            
            # 记录响应信息
            logger.info(
                f"请求完成: {request.method} {request.url.path} - {response.status_code}",
                extra={
                    "request_id": request.state.request_id,
                    "status_code": response.status_code,
                    "process_time": process_time
                }
            )
            
            # 添加处理时间头
            response.headers["X-Process-Time"] = str(process_time)
            
            return response
            
        except Exception as e:
            # 记录异常
            process_time = time.time() - start_time
            logger.error(
                f"请求异常: {request.method} {request.url.path} - {str(e)}",
                exc_info=True,
                extra={
                    "request_id": request.state.request_id,
                    "process_time": process_time
                }
            )
            
            raise

class RateLimitMiddleware(BaseHTTPMiddleware):
    """限流中间件"""
    async def dispatch(self, request: Request, call_next):
        # 检查限流
        if limiter._rate_limit_exceeded_handler:
            try:
                # 这里简化处理,实际应使用slowapi的完整功能
                pass
            except RateLimitExceeded:
                return JSONResponse(
                    status_code=429,
                    content={
                        "error": {
                            "code": 429,
                            "message": "请求过于频繁,请稍后再试",
                            "request_id": request.state.request_id
                        }
                    },
                    headers={"Retry-After": "60"}
                )
        
        # 继续处理
        return await call_next(request)

# Prometheus指标(简化示例)
from prometheus_client import Counter, Histogram

REQUEST_COUNT = Counter(
    'http_requests_total',
    'HTTP请求总数',
    ['method', 'endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
    'http_request_duration_seconds',
    'HTTP请求延迟',
    ['method', 'endpoint']
)

class MetricsMiddleware(BaseHTTPMiddleware):
    """指标中间件"""
    async def dispatch(self, request: Request, call_next):
        start_time = time.time()
        
        # 处理请求
        response = await call_next(request)
        
        # 计算延迟
        latency = time.time() - start_time
        
        # 记录指标
        REQUEST_COUNT.labels(
            method=request.method,
            endpoint=request.url.path,
            status=response.status_code
        ).inc()
        
        REQUEST_LATENCY.labels(
            method=request.method,
            endpoint=request.url.path
        ).observe(latency)
        
        return response

分析路由实现

# src/api_gateway/routers/analysis.py
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from fastapi.responses import JSONResponse
from typing import List, Optional
from pydantic import BaseModel, Field
from uuid import UUID
from datetime import datetime

from ..schemas.analysis import AnalysisRequest, AnalysisResponse, AnalysisStatus
from ..services.analysis_service import AnalysisService
from ..dependencies import get_current_user, get_db_session
from ..utils.security import verify_api_key

router = APIRouter()

# 依赖服务
analysis_service = AnalysisService()

@router.post("/", 
            response_model=AnalysisResponse,
            status_code=status.HTTP_202_ACCEPTED,
            summary="创建分析任务")
async def create_analysis(
    request: AnalysisRequest,
    background_tasks: BackgroundTasks,
    current_user: dict = Depends(get_current_user),
    db_session = Depends(get_db_session)
):
    """
    创建新的数据分析任务。
    
    任务将在后台异步执行,立即返回任务ID。
    
    **请求体示例**:
    ```json
    {
        "name": "销售数据分析",
        "description": "分析2023年销售数据趋势",
        "data_sources": [
            {
                "type": "database",
                "connection": "postgresql://user:pass@localhost/sales",
                "query": "SELECT * FROM sales WHERE year = 2023"
            },
            {
                "type": "api",
                "url": "https://api.example.com/sales",
                "parameters": {"year": 2023}
            }
        ],
        "analysis_type": "time_series",
        "parameters": {
            "time_field": "date",
            "value_field": "amount",
            "frequency": "monthly"
        },
        "output_format": "html_report"
    }
    ```
    """
    try:
        # 创建分析任务
        task = await analysis_service.create_analysis_task(
            request=request,
            user_id=current_user["id"],
            db_session=db_session
        )
        
        # 在后台启动分析工作流
        background_tasks.add_task(
            analysis_service.start_analysis_workflow,
            task_id=task.id,
            request=request
        )
        
        return AnalysisResponse(
            id=task.id,
            name=task.name,
            description=task.description,
            status=AnalysisStatus.PENDING,
            created_at=task.created_at,
            updated_at=task.updated_at
        )
        
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"创建分析任务失败: {str(e)}"
        )

@router.get("/{task_id}", 
           response_model=AnalysisResponse,
           summary="获取分析任务状态")
async def get_analysis_status(
    task_id: UUID,
    current_user: dict = Depends(get_current_user),
    db_session = Depends(get_db_session)
):
    """
    获取分析任务的状态和进度。
    
    - **task_id**: 分析任务ID
    """
    task = await analysis_service.get_analysis_task(
        task_id=task_id,
        user_id=current_user["id"],
        db_session=db_session
    )
    
    if not task:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"分析任务 {task_id} 不存在"
        )
    
    return task

@router.get("/{task_id}/result", 
           summary="获取分析结果")
async def get_analysis_result(
    task_id: UUID,
    format: Optional[str] = "json",
    current_user: dict = Depends(get_current_user),
    db_session = Depends(get_db_session)
):
    """
    获取分析任务的结果。
    
    - **task_id**: 分析任务ID
    - **format**: 结果格式 (json, html, pdf, markdown)
    """
    result = await analysis_service.get_analysis_result(
        task_id=task_id,
        user_id=current_user["id"],
        format=format,
        db_session=db_session
    )
    
    if not result:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"分析任务 {task_id} 的结果不存在"
        )
    
    # 根据格式返回响应
    if format == "json":
        return result
    
    elif format == "html":
        from fastapi.responses import HTMLResponse
        return HTMLResponse(content=result["content"], media_type="text/html")
    
    elif format == "pdf":
        from fastapi.responses import Response
        return Response(
            content=result["content"],
            media_type="application/pdf",
            headers={
                "Content-Disposition": f"attachment; filename=analysis_{task_id}.pdf"
            }
        )
    
    elif format == "markdown":
        from fastapi.responses import PlainTextResponse
        return PlainTextResponse(content=result["content"], media_type="text/markdown")
    
    else:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"不支持的格式: {format}"
        )

@router.get("/", 
           response_model=List[AnalysisResponse],
           summary="获取分析任务列表")
async def list_analysis_tasks(
    skip: int = 0,
    limit: int = 10,
    status: Optional[str] = None,
    created_after: Optional[datetime] = None,
    created_before: Optional[datetime] = None,
    current_user: dict = Depends(get_current_user),
    db_session = Depends(get_db_session)
):
    """
    获取用户的分析任务列表。
    
    - **skip**: 跳过的记录数
    - **limit**: 每页记录数(最大100)
    - **status**: 按状态过滤
    - **created_after**: 创建时间之后
    - **created_before**: 创建时间之前
    """
    limit = min(limit, 100)  # 限制最大100
    
    tasks = await analysis_service.list_analysis_tasks(
        user_id=current_user["id"],
        skip=skip,
        limit=limit,
        status=status,
        created_after=created_after,
        created_before=created_before,
        db_session=db_session
    )
    
    return tasks

@router.delete("/{task_id}", 
              status_code=status.HTTP_204_NO_CONTENT,
              summary="取消分析任务")
async def cancel_analysis_task(
    task_id: UUID,
    current_user: dict = Depends(get_current_user),
    db_session = Depends(get_db_session)
):
    """
    取消分析任务。
    
    只能取消未完成的任务。
    
    - **task_id**: 分析任务ID
    """
    cancelled = await analysis_service.cancel_analysis_task(
        task_id=task_id,
        user_id=current_user["id"],
        db_session=db_session
    )
    
    if not cancelled:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"分析任务 {task_id} 无法取消"
        )
    
    # 返回204 No Content
    return None

@router.post("/{task_id}/retry", 
            status_code=status.HTTP_202_ACCEPTED,
            summary="重试分析任务")
async def retry_analysis_task(
    task_id: UUID,
    background_tasks: BackgroundTasks,
    current_user: dict = Depends(get_current_user),
    db_session = Depends(get_db_session)
):
    """
    重试失败的分析任务。
    
    - **task_id**: 分析任务ID
    """
    # 检查任务是否存在且可重试
    task = await analysis_service.get_analysis_task(
        task_id=task_id,
        user_id=current_user["id"],
        db_session=db_session
    )
    
    if not task:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"分析任务 {task_id} 不存在"
        )
    
    if task.status not in [AnalysisStatus.FAILED, AnalysisStatus.CANCELLED]:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"分析任务 {task_id} 无法重试,当前状态: {task.status}"
        )
    
    # 重试任务
    try:
        # 更新任务状态
        await analysis_service.update_analysis_status(
            task_id=task_id,
            status=AnalysisStatus.PENDING,
            db_session=db_session
        )
        
        # 获取原始请求
        original_request = await analysis_service.get_analysis_request(
            task_id=task_id,
            db_session=db_session
        )
        
        # 在后台重试工作流
        background_tasks.add_task(
            analysis_service.start_analysis_workflow,
            task_id=task_id,
            request=original_request
        )
        
        return {
            "task_id": task_id,
            "status": "retrying",
            "message": "分析任务重试中"
        }
        
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"重试分析任务失败: {str(e)}"
        )

WebSocket路由实现

# src/api_gateway/routers/websocket.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
from typing import Dict, Any, List
import json
import asyncio
from uuid import UUID

from ..services.websocket_service import WebSocketService
from ..utils.security import verify_websocket_token

router = APIRouter()
websocket_service = WebSocketService()

@router.websocket("/analysis/{task_id}")
async def analysis_websocket(
    websocket: WebSocket,
    task_id: str,
    token: str = None
):
    """
    分析任务WebSocket连接。
    
    实时推送分析任务状态和进度。
    
    - **task_id**: 分析任务ID
    - **token**: 认证令牌(查询参数)
    """
    # 验证令牌
    if not verify_websocket_token(token):
        await websocket.close(code=1008)  # 策略违规
        return
    
    # 接受连接
    await websocket.accept()
    
    try:
        # 注册连接
        connection_id = await websocket_service.register_connection(
            websocket=websocket,
            task_id=task_id
        )
        
        # 发送初始状态
        initial_status = await websocket_service.get_task_status(task_id)
        await websocket.send_json({
            "type": "initial_status",
            "task_id": task_id,
            "status": initial_status,
            "connection_id": connection_id
        })
        
        # 处理消息
        while True:
            try:
                # 接收消息
                data = await asyncio.wait_for(
                    websocket.receive_text(),
                    timeout=30.0  # 30秒超时
                )
                
                # 处理消息
                message = json.loads(data)
                await websocket_service.handle_message(
                    connection_id=connection_id,
                    message=message
                )
                
            except asyncio.TimeoutError:
                # 发送心跳
                await websocket.send_json({
                    "type": "heartbeat",
                    "timestamp": asyncio.get_event_loop().time()
                })
                
            except json.JSONDecodeError:
                # 无效JSON
                await websocket.send_json({
                    "type": "error",
                    "message": "无效的JSON格式"
                })
                
    except WebSocketDisconnect:
        # 连接断开
        await websocket_service.unregister_connection(connection_id)
        
    except Exception as e:
        # 其他异常
        print(f"WebSocket异常: {e}")
        await websocket.close(code=1011)  # 内部错误

@router.websocket("/system")
async def system_websocket(
    websocket: WebSocket,
    token: str = None
):
    """
    系统监控WebSocket连接。
    
    实时推送系统监控数据。
    
    - **token**: 认证令牌(查询参数)
    """
    # 验证令牌(需要管理员权限)
    if not verify_websocket_token(token, require_admin=True):
        await websocket.close(code=1008)
        return
    
    await websocket.accept()
    
    try:
        # 注册系统监控连接
        connection_id = await websocket_service.register_system_connection(websocket)
        
        # 发送初始系统状态
        system_status = await websocket_service.get_system_status()
        await websocket.send_json({
            "type": "system_status",
            "status": system_status,
            "connection_id": connection_id
        })
        
        # 定期发送监控数据
        while True:
            try:
                # 获取最新监控数据
                monitoring_data = await websocket_service.get_monitoring_data()
                
                # 发送监控数据
                await websocket.send_json({
                    "type": "monitoring_data",
                    "data": monitoring_data,
                    "timestamp": asyncio.get_event_loop().time()
                })
                
                # 等待5秒
                await asyncio.sleep(5)
                
            except WebSocketDisconnect:
                break
                
            except Exception as e:
                print(f"系统WebSocket异常: {e}")
                await websocket.send_json({
                    "type": "error",
                    "message": f"获取监控数据失败: {str(e)}"
                })
                await asyncio.sleep(5)
                
    except WebSocketDisconnect:
        # 连接断开
        await websocket_service.unregister_system_connection(connection_id)
        
    except Exception as e:
        print(f"系统WebSocket异常: {e}")
        await websocket.close(code=1011)

分析服务实现

# src/api_gateway/services/analysis_service.py
import asyncio
from typing import Dict, Any, List, Optional
from uuid import UUID, uuid4
from datetime import datetime
import json

from ..schemas.analysis import AnalysisRequest, AnalysisResponse, AnalysisStatus
from ..database.repositories.analysis_repository import AnalysisRepository
from ..messaging.rabbitmq_client import RabbitMQClient
from ..config import settings

class AnalysisService:
    """分析服务"""
    
    def __init__(self):
        self.analysis_repo = AnalysisRepository()
        self.mq_client = RabbitMQClient()
    
    async def create_analysis_task(
        self,
        request: AnalysisRequest,
        user_id: UUID,
        db_session
    ) -> Dict[str, Any]:
        """创建分析任务"""
        # 生成任务ID
        task_id = uuid4()
        
        # 创建任务记录
        task_data = {
            "id": task_id,
            "user_id": user_id,
            "name": request.name,
            "description": request.description,
            "status": AnalysisStatus.PENDING.value,
            "analysis_type": request.analysis_type,
            "output_format": request.output_format,
            "parameters": json.dumps(request.parameters),
            "data_sources": json.dumps(request.data_sources),
            "created_at": datetime.now(),
            "updated_at": datetime.now()
        }
        
        # 保存到数据库
        task = await self.analysis_repo.create_task(task_data, db_session)
        
        # 保存请求数据
        request_data = {
            "task_id": task_id,
            "request_data": request.dict(),
            "created_at": datetime.now()
        }
        
        await self.analysis_repo.save_request(request_data, db_session)
        
        return task
    
    async def start_analysis_workflow(
        self,
        task_id: UUID,
        request: AnalysisRequest
    ):
        """启动分析工作流"""
        try:
            # 更新任务状态为处理中
            await self.analysis_repo.update_task_status(
                task_id=task_id,
                status=AnalysisStatus.PROCESSING,
                db_session=None  # 使用新会话
            )
            
            # 发送消息到工作流引擎
            workflow_message = {
                "task_id": str(task_id),
                "request": request.dict(),
                "timestamp": datetime.now().isoformat()
            }
            
            await self.mq_client.publish(
                exchange="analysis",
                routing_key="analysis.request",
                message=json.dumps(workflow_message)
            )
            
            print(f"分析工作流启动: {task_id}")
            
        except Exception as e:
            # 更新任务状态为失败
            await self.analysis_repo.update_task_status(
                task_id=task_id,
                status=AnalysisStatus.FAILED,
                error_message=str(e),
                db_session=None
            )
            
            print(f"启动分析工作流失败: {e}")
    
    async def get_analysis_task(
        self,
        task_id: UUID,
        user_id: UUID,
        db_session
    ) -> Optional[Dict[str, Any]]:
        """获取分析任务"""
        task = await self.analysis_repo.get_task_by_id(task_id, db_session)
        
        if not task or task["user_id"] != user_id:
            return None
        
        return AnalysisResponse(
            id=task["id"],
            name=task["name"],
            description=task["description"],
            status=AnalysisStatus(task["status"]),
            error_message=task.get("error_message"),
            created_at=task["created_at"],
            updated_at=task["updated_at"]
        )
    
    async def get_analysis_result(
        self,
        task_id: UUID,
        user_id: UUID,
        format: str,
        db_session
    ) -> Optional[Dict[str, Any]]:
        """获取分析结果"""
        # 检查任务权限
        task = await self.analysis_repo.get_task_by_id(task_id, db_session)
        if not task or task["user_id"] != user_id:
            return None
        
        # 获取结果
        result = await self.analysis_repo.get_task_result(task_id, format, db_session)
        
        return result
    
    async def list_analysis_tasks(
        self,
        user_id: UUID,
        skip: int,
        limit: int,
        status: Optional[str],
        created_after: Optional[datetime],
        created_before: Optional[datetime],
        db_session
    ) -> List[Dict[str, Any]]:
        """列出分析任务"""
        tasks = await self.analysis_repo.list_tasks(
            user_id=user_id,
            skip=skip,
            limit=limit,
            status=status,
            created_after=created_after,
            created_before=created_before,
            db_session=db_session
        )
        
        return [
            AnalysisResponse(
                id=task["id"],
                name=task["name"],
                description=task["description"],
                status=AnalysisStatus(task["status"]),
                error_message=task.get("error_message"),
                created_at=task["created_at"],
                updated_at=task["updated_at"]
            )
            for task in tasks
        ]
    
    async def cancel_analysis_task(
        self,
        task_id: UUID,
        user_id: UUID,
        db_session
    ) -> bool:
        """取消分析任务"""
        # 检查任务状态
        task = await self.analysis_repo.get_task_by_id(task_id, db_session)
        
        if not task or task["user_id"] != user_id:
            return False
        
        # 只能取消未完成的任务
        if task["status"] in [
            AnalysisStatus.COMPLETED.value,
            AnalysisStatus.FAILED.value,
            AnalysisStatus.CANCELLED.value
        ]:
            return False
        
        # 更新状态
        updated = await self.analysis_repo.update_task_status(
            task_id=task_id,
            status=AnalysisStatus.CANCELLED,
            db_session=db_session
        )
        
        # 发送取消消息
        if updated:
            cancel_message = {
                "task_id": str(task_id),
                "timestamp": datetime.now().isoformat()
            }
            
            await self.mq_client.publish(
                exchange="analysis",
                routing_key="analysis.cancel",
                message=json.dumps(cancel_message)
            )
        
        return updated
    
    async def update_analysis_status(
        self,
        task_id: UUID,
        status: AnalysisStatus,
        error_message: Optional[str] = None,
        db_session = None
    ):
        """更新分析任务状态"""
        await self.analysis_repo.update_task_status(
            task_id=task_id,
            status=status,
            error_message=error_message,
            db_session=db_session
        )
    
    async def get_analysis_request(
        self,
        task_id: UUID,
        db_session
    ) -> Optional[AnalysisRequest]:
        """获取分析请求数据"""
        request_data = await self.analysis_repo.get_request_data(task_id, db_session)
        
        if not request_data:
            return None
        
        return AnalysisRequest(**request_data)
    
    async def save_analysis_result(
        self,
        task_id: UUID,
        result: Dict[str, Any],
        format: str,
        db_session = None
    ):
        """保存分析结果"""
        result_data = {
            "task_id": task_id,
            "format": format,
            "content": json.dumps(result) if format == "json" else result,
            "created_at": datetime.now()
        }
        
        await self.analysis_repo.save_result(result_data, db_session)

通过以上代码实现,我们完成了API网关的核心功能。API网关提供了完整的RESTful API和WebSocket接口,支持分析任务的创建、查询、取消、重试等操作,同时实现了认证、授权、限流、日志、监控等非功能需求。

在下一节中,我们将实现工作流引擎,使用LangGraph构建智能体协作流程。

第十章:系统测试、部署与运维

引言:确保系统质量与可靠性的关键环节

在前九章中,我们完成了基于Docker+LangGraph+FastAPI的Multi-Agent系统的设计与实现。从需求分析、架构设计到代码实现,我们构建了一个完整的智能数据分析系统。然而,一个成功的软件系统不仅需要正确的功能实现,还需要经过严格的测试、可靠的部署和完善的运维。本章将全面介绍系统的测试、部署与运维,确保系统在生产环境中能够稳定、高效、安全地运行。

系统测试是验证软件质量的关键环节,通过不同层次的测试(单元测试、集成测试、端到端测试)确保系统的正确性、可靠性和性能。部署是将软件交付到生产环境的过程,需要选择合适的部署策略和工具,确保部署的可靠性和可重复性。运维是系统上线后的持续管理,包括监控、告警、故障恢复、性能优化等,确保系统的稳定运行和持续改进。

本章将详细探讨以下内容:

  1. 系统测试策略:包括单元测试、集成测试、端到端测试的设计与实施。
  2. 测试工具与框架:介绍Python测试生态系统中的常用工具。
  3. Docker Compose部署:使用Docker Compose进行本地开发和测试环境部署。
  4. Kubernetes集群管理:在生产环境中使用Kubernetes进行容器编排和管理。
  5. 监控与告警系统:构建完整的监控体系,实现系统可观测性。
  6. 日志管理与分析:收集、存储和分析系统日志。
  7. 故障恢复与备份:设计容错机制和备份策略。
  8. 性能优化与调优:识别和解决性能瓶颈。

通过本章的学习,读者将掌握构建生产级多智能体系统的完整生命周期管理,从代码开发到生产运维的全流程技术。这些知识不仅适用于本项目的智能数据分析系统,也适用于其他基于微服务和容器化的分布式系统。

10.1 系统测试策略

10.1.1 测试金字塔与测试策略

测试金字塔是软件测试领域的重要概念,它描述了不同层次测试的理想比例。对于多智能体系统,我们需要构建完整的测试体系,确保系统的质量和可靠性。

测试金字塔模型

        ┌─────────────────┐
        │   端到端测试     │  (5-10%)
        │  (E2E Tests)    │
        └─────────────────┘
                │
        ┌─────────────────┐
        │   集成测试       │  (15-20%)
        │  (Integration   │
        │     Tests)      │
        └─────────────────┘
                │
        ┌─────────────────┐
        │   单元测试       │  (70-80%)
        │  (Unit Tests)   │
        └─────────────────┘

多智能体系统测试策略

  1. 单元测试:测试单个智能体或组件的功能。
  2. 集成测试:测试智能体之间的协作和通信。
  3. 端到端测试:测试完整的业务流程。
  4. 性能测试:测试系统的性能和可扩展性。
  5. 安全测试:测试系统的安全性和合规性。
  6. 混沌测试:测试系统的容错和恢复能力。

测试环境配置

# tests/conftest.py - 测试配置
import pytest
import asyncio
from typing import Dict, Any, AsyncGenerator
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

from src.api_gateway.main import app
from src.api_gateway.database import Base, get_db

# 测试数据库URL
TEST_DATABASE_URL = "postgresql+asyncpg://test:test@localhost/test_agentdb"

# 创建测试数据库引擎
test_engine = create_async_engine(
    TEST_DATABASE_URL,
    echo=False,
    pool_size=5,
    max_overflow=10
)

# 创建测试会话工厂
TestingSessionLocal = sessionmaker(
    test_engine, class_=AsyncSession, expire_on_commit=False
)

async def override_get_db() -> AsyncGenerator[AsyncSession, None]:
    """覆盖的数据库依赖"""
    async with TestingSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

# 覆盖应用依赖
app.dependency_overrides[get_db] = override_get_db

@pytest.fixture(scope="session")
def event_loop():
    """创建事件循环fixture"""
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

@pytest.fixture(scope="session")
async def test_db():
    """测试数据库fixture"""
    # 创建测试数据库表
    async with test_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    
    yield
    
    # 清理测试数据库表
    async with test_engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)

@pytest.fixture
async def db_session(test_db) -> AsyncGenerator[AsyncSession, None]:
    """数据库会话fixture"""
    async with TestingSessionLocal() as session:
        yield session
        # 回滚事务,保持测试隔离
        await session.rollback()

@pytest.fixture
def test_client() -> TestClient:
    """测试客户端fixture"""
    return TestClient(app)

@pytest.fixture
def api_headers() -> Dict[str, str]:
    """API请求头fixture"""
    return {
        "Content-Type": "application/json",
        "X-API-Key": "test-api-key"
    }

@pytest.fixture
async def test_user(db_session: AsyncSession) -> Dict[str, Any]:
    """测试用户fixture"""
    from src.api_gateway.database.repositories.user_repository import UserRepository
    
    user_repo = UserRepository()
    
    user_data = {
        "username": "testuser",
        "email": "test@example.com",
        "hashed_password": "hashed_password",
        "is_active": True,
        "is_admin": False
    }
    
    user = await user_repo.create_user(user_data, db_session)
    
    return {
        "id": user["id"],
        "username": user["username"],
        "email": user["email"]
    }

# 测试配置
def pytest_configure(config):
    """Pytest配置"""
    config.addinivalue_line(
        "markers", "slow: 标记为慢速测试"
    )
    config.addinivalue_line(
        "markers", "integration: 标记为集成测试"
    )
    config.addinivalue_line(
        "markers", "e2e: 标记为端到端测试"
    )

# 测试数据工厂
class TestDataFactory:
    """测试数据工厂"""
    
    @staticmethod
    def create_analysis_request() -> Dict[str, Any]:
        """创建分析请求测试数据"""
        return {
            "name": "测试分析任务",
            "description": "用于测试的分析任务",
            "data_sources": [
                {
                    "type": "database",
                    "connection": "postgresql://test:test@localhost/testdb",
                    "query": "SELECT * FROM test_table LIMIT 100"
                }
            ],
            "analysis_type": "statistical",
            "parameters": {
                "columns": ["value1", "value2"],
                "statistics": ["mean", "std", "min", "max"]
            },
            "output_format": "json"
        }
    
    @staticmethod
    def create_agent_config() -> Dict[str, Any]:
        """创建智能体配置测试数据"""
        return {
            "name": "测试智能体",
            "agent_type": "planner",
            "description": "测试用规划智能体",
            "capabilities": ["planning", "scheduling"],
            "config": {
                "max_depth": 5,
                "timeout": 30
            }
        }

10.1.2 单元测试实现

单元测试验证单个组件或函数的正确性。对于多智能体系统,我们需要测试每个智能体的核心功能。

API网关单元测试

# tests/unit/test_api_gateway.py
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from fastapi import status
from uuid import uuid4
from datetime import datetime

from src.api_gateway.schemas.analysis import AnalysisRequest, AnalysisStatus
from src.api_gateway.services.analysis_service import AnalysisService

class TestAnalysisService:
    """分析服务单元测试"""
    
    @pytest.fixture
    def analysis_service(self):
        """分析服务fixture"""
        service = AnalysisService()
        service.analysis_repo = AsyncMock()
        service.mq_client = AsyncMock()
        return service
    
    @pytest.fixture
    def analysis_request(self):
        """分析请求fixture"""
        return AnalysisRequest(
            name="测试分析",
            description="单元测试用分析",
            data_sources=[
                {
                    "type": "database",
                    "connection": "postgresql://test:test@localhost/testdb",
                    "query": "SELECT * FROM test_table"
                }
            ],
            analysis_type="statistical",
            parameters={"columns": ["value"]},
            output_format="json"
        )
    
    @pytest.mark.asyncio
    async def test_create_analysis_task(
        self, 
        analysis_service, 
        analysis_request
    ):
        """测试创建分析任务"""
        # 模拟数据库操作
        mock_task = {
            "id": uuid4(),
            "name": analysis_request.name,
            "description": analysis_request.description,
            "status": AnalysisStatus.PENDING.value,
            "created_at": datetime.now(),
            "updated_at": datetime.now()
        }
        
        analysis_service.analysis_repo.create_task.return_value = mock_task
        analysis_service.analysis_repo.save_request.return_value = None
        
        # 执行测试
        user_id = uuid4()
        db_session = AsyncMock()
        
        task = await analysis_service.create_analysis_task(
            request=analysis_request,
            user_id=user_id,
            db_session=db_session
        )
        
        # 验证结果
        assert task["id"] == mock_task["id"]
        assert task["name"] == analysis_request.name
        assert task["description"] == analysis_request.description
        
        # 验证方法调用
        analysis_service.analysis_repo.create_task.assert_called_once()
        analysis_service.analysis_repo.save_request.assert_called_once()
    
    @pytest.mark.asyncio
    async def test_start_analysis_workflow(
        self, 
        analysis_service, 
        analysis_request
    ):
        """测试启动分析工作流"""
        task_id = uuid4()
        
        # 模拟方法调用
        analysis_service.analysis_repo.update_task_status.return_value = None
        analysis_service.mq_client.publish.return_value = None
        
        # 执行测试
        await analysis_service.start_analysis_workflow(
            task_id=task_id,
            request=analysis_request
        )
        
        # 验证方法调用
        analysis_service.analysis_repo.update_task_status.assert_called_once_with(
            task_id=task_id,
            status=AnalysisStatus.PROCESSING,
            db_session=None
        )
        
        analysis_service.mq_client.publish.assert_called_once()
    
    @pytest.mark.asyncio
    async def test_get_analysis_task_success(
        self, 
        analysis_service
    ):
        """测试获取分析任务(成功)"""
        task_id = uuid4()
        user_id = uuid4()
        
        # 模拟数据库返回
        mock_task = {
            "id": task_id,
            "user_id": user_id,
            "name": "测试任务",
            "description": "测试描述",
            "status": AnalysisStatus.PROCESSING.value,
            "created_at": datetime.now(),
            "updated_at": datetime.now()
        }
        
        analysis_service.analysis_repo.get_task_by_id.return_value = mock_task
        
        # 执行测试
        db_session = AsyncMock()
        task = await analysis_service.get_analysis_task(
            task_id=task_id,
            user_id=user_id,
            db_session=db_session
        )
        
        # 验证结果
        assert task is not None
        assert task.id == task_id
        assert task.name == mock_task["name"]
        assert task.status == AnalysisStatus.PROCESSING
        
        # 验证方法调用
        analysis_service.analysis_repo.get_task_by_id.assert_called_once_with(
            task_id=task_id,
            db_session=db_session
        )
    
    @pytest.mark.asyncio
    async def test_get_analysis_task_not_found(
        self, 
        analysis_service
    ):
        """测试获取分析任务(未找到)"""
        task_id = uuid4()
        user_id = uuid4()
        
        # 模拟数据库返回None
        analysis_service.analysis_repo.get_task_by_id.return_value = None
        
        # 执行测试
        db_session = AsyncMock()
        task = await analysis_service.get_analysis_task(
            task_id=task_id,
            user_id=user_id,
            db_session=db_session
        )
        
        # 验证结果
        assert task is None
        
        # 验证方法调用
        analysis_service.analysis_repo.get_task_by_id.assert_called_once()
    
    @pytest.mark.asyncio
    async def test_get_analysis_task_unauthorized(
        self, 
        analysis_service
    ):
        """测试获取分析任务(未授权)"""
        task_id = uuid4()
        user_id = uuid4()
        other_user_id = uuid4()
        
        # 模拟数据库返回其他用户的任务
        mock_task = {
            "id": task_id,
            "user_id": other_user_id,  # 其他用户
            "name": "测试任务",
            "description": "测试描述",
            "status": AnalysisStatus.PROCESSING.value,
            "created_at": datetime.now(),
            "updated_at": datetime.now()
        }
        
        analysis_service.analysis_repo.get_task_by_id.return_value = mock_task
        
        # 执行测试
        db_session = AsyncMock()
        task = await analysis_service.get_analysis_task(
            task_id=task_id,
            user_id=user_id,
            db_session=db_session
        )
        
        # 验证结果
        assert task is None
        
        # 验证方法调用
        analysis_service.analysis_repo.get_task_by_id.assert_called_once()
    
    @pytest.mark.asyncio
    async def test_cancel_analysis_task_success(
        self, 
        analysis_service
    ):
        """测试取消分析任务(成功)"""
        task_id = uuid4()
        user_id = uuid4()
        
        # 模拟数据库返回
        mock_task = {
            "id": task_id,
            "user_id": user_id,
            "status": AnalysisStatus.PROCESSING.value
        }
        
        analysis_service.analysis_repo.get_task_by_id.return_value = mock_task
        analysis_service.analysis_repo.update_task_status.return_value = True
        analysis_service.mq_client.publish.return_value = None
        
        # 执行测试
        db_session = AsyncMock()
        result = await analysis_service.cancel_analysis_task(
            task_id=task_id,
            user_id=user_id,
            db_session=db_session
        )
        
        # 验证结果
        assert result is True
        
        # 验证方法调用
        analysis_service.analysis_repo.get_task_by_id.assert_called_once()
        analysis_service.analysis_repo.update_task_status.assert_called_once_with(
            task_id=task_id,
            status=AnalysisStatus.CANCELLED,
            db_session=db_session
        )
        analysis_service.mq_client.publish.assert_called_once()
    
    @pytest.mark.asyncio
    async def test_cancel_analysis_task_already_completed(
        self, 
        analysis_service
    ):
        """测试取消分析任务(已完成)"""
        task_id = uuid4()
        user_id = uuid4()
        
        # 模拟数据库返回已完成的任务
        mock_task = {
            "id": task_id,
            "user_id": user_id,
            "status": AnalysisStatus.COMPLETED.value  # 已完成的
        }
        
        analysis_service.analysis_repo.get_task_by_id.return_value = mock_task
        
        # 执行测试
        db_session = AsyncMock()
        result = await analysis_service.cancel_analysis_task(
            task_id=task_id,
            user_id=user_id,
            db_session=db_session
        )
        
        # 验证结果
        assert result is False
        
        # 验证方法调用
        analysis_service.analysis_repo.get_task_by_id.assert_called_once()
        analysis_service.analysis_repo.update_task_status.assert_not_called()
        analysis_service.mq_client.publish.assert_not_called()
    
    @pytest.mark.asyncio
    async def test_list_analysis_tasks(
        self, 
        analysis_service
    ):
        """测试列出分析任务"""
        user_id = uuid4()
        
        # 模拟数据库返回
        mock_tasks = [
            {
                "id": uuid4(),
                "user_id": user_id,
                "name": f"任务{i}",
                "description": f"描述{i}",
                "status": AnalysisStatus.PROCESSING.value,
                "created_at": datetime.now(),
                "updated_at": datetime.now()
            }
            for i in range(3)
        ]
        
        analysis_service.analysis_repo.list_tasks.return_value = mock_tasks
        
        # 执行测试
        db_session = AsyncMock()
        tasks = await analysis_service.list_analysis_tasks(
            user_id=user_id,
            skip=0,
            limit=10,
            status=None,
            created_after=None,
            created_before=None,
            db_session=db_session
        )
        
        # 验证结果
        assert len(tasks) == 3
        for i, task in enumerate(tasks):
            assert task.name == f"任务{i}"
            assert task.description == f"描述{i}"
            assert task.status == AnalysisStatus.PROCESSING
        
        # 验证方法调用
        analysis_service.analysis_repo.list_tasks.assert_called_once_with(
            user_id=user_id,
            skip=0,
            limit=10,
            status=None,
            created_after=None,
            created_before=None,
            db_session=db_session
        )

class TestAnalysisEndpoints:
    """分析端点单元测试"""
    
    @pytest.fixture
    def client(self, test_client):
        """测试客户端fixture"""
        return test_client
    
    @pytest.fixture
    def headers(self, api_headers):
        """请求头fixture"""
        return api_headers
    
    @pytest.fixture
    def analysis_request_data(self):
        """分析请求数据fixture"""
        return {
            "name": "单元测试分析",
            "description": "用于单元测试的分析任务",
            "data_sources": [
                {
                    "type": "database",
                    "connection": "postgresql://test:test@localhost/testdb",
                    "query": "SELECT * FROM test_table"
                }
            ],
            "analysis_type": "statistical",
            "parameters": {
                "columns": ["value"],
                "statistics": ["mean", "std"]
            },
            "output_format": "json"
        }
    
    @pytest.mark.asyncio
    async def test_create_analysis_endpoint(
        self, 
        client, 
        headers, 
        analysis_request_data
    ):
        """测试创建分析端点"""
        with patch.object(AnalysisService, 'create_analysis_task') as mock_create, \
             patch.object(AnalysisService, 'start_analysis_workflow') as mock_start:
            
            # 模拟服务返回
            mock_task = {
                "id": str(uuid4()),
                "name": analysis_request_data["name"],
                "description": analysis_request_data["description"],
                "status": AnalysisStatus.PENDING.value,
                "created_at": datetime.now().isoformat(),
                "updated_at": datetime.now().isoformat()
            }
            
            mock_create.return_value = mock_task
            mock_start.return_value = None
            
            # 发送请求
            response = client.post(
                "/api/v1/analysis/",
                json=analysis_request_data,
                headers=headers
            )
            
            # 验证响应
            assert response.status_code == status.HTTP_202_ACCEPTED
            
            data = response.json()
            assert data["id"] == mock_task["id"]
            assert data["name"] == analysis_request_data["name"]
            assert data["status"] == AnalysisStatus.PENDING.value
            
            # 验证方法调用
            mock_create.assert_called_once()
            mock_start.assert_called_once()
    
    @pytest.mark.asyncio
    async def test_create_analysis_endpoint_invalid_data(
        self, 
        client, 
        headers
    ):
        """测试创建分析端点(无效数据)"""
        invalid_data = {
            "name": "",  # 空名称
            "data_sources": []  # 空数据源
        }
        
        # 发送请求
        response = client.post(
            "/api/v1/analysis/",
            json=invalid_data,
            headers=headers
        )
        
        # 验证响应
        assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
        
        data = response.json()
        assert "detail" in data
    
    @pytest.mark.asyncio
    async def test_get_analysis_status_endpoint(
        self, 
        client, 
        headers
    ):
        """测试获取分析状态端点"""
        task_id = str(uuid4())
        
        with patch.object(AnalysisService, 'get_analysis_task') as mock_get:
            # 模拟服务返回
            mock_task = {
                "id": task_id,
                "name": "测试任务",
                "description": "测试描述",
                "status": AnalysisStatus.PROCESSING.value,
                "created_at": datetime.now().isoformat(),
                "updated_at": datetime.now().isoformat()
            }
            
            mock_get.return_value = mock_task
            
            # 发送请求
            response = client.get(
                f"/api/v1/analysis/{task_id}",
                headers=headers
            )
            
            # 验证响应
            assert response.status_code == status.HTTP_200_OK
            
            data = response.json()
            assert data["id"] == task_id
            assert data["status"] == AnalysisStatus.PROCESSING.value
            
            # 验证方法调用
            mock_get.assert_called_once()
    
    @pytest.mark.asyncio
    async def test_get_analysis_status_endpoint_not_found(
        self, 
        client, 
        headers
    ):
        """测试获取分析状态端点(未找到)"""
        task_id = str(uuid4())
        
        with patch.object(AnalysisService, 'get_analysis_task') as mock_get:
            # 模拟服务返回None
            mock_get.return_value = None
            
            # 发送请求
            response = client.get(
                f"/api/v1/analysis/{task_id}",
                headers=headers
            )
            
            # 验证响应
            assert response.status_code == status.HTTP_404_NOT_FOUND
            
            data = response.json()
            assert "detail" in data
            
            # 验证方法调用
            mock_get.assert_called_once()
    
    @pytest.mark.asyncio
    async def test_list_analysis_tasks_endpoint(
        self, 
        client, 
        headers
    ):
        """测试列出分析任务端点"""
        with patch.object(AnalysisService, 'list_analysis_tasks') as mock_list:
            # 模拟服务返回
            mock_tasks = [
                {
                    "id": str(uuid4()),
                    "name": f"任务{i}",
                    "description": f"描述{i}",
                    "status": AnalysisStatus.PROCESSING.value,
                    "created_at": datetime.now().isoformat(),
                    "updated_at": datetime.now().isoformat()
                }
                for i in range(3)
            ]
            
            mock_list.return_value = mock_tasks
            
            # 发送请求
            response = client.get(
                "/api/v1/analysis/",
                headers=headers,
                params={"skip": 0, "limit": 10}
            )
            
            # 验证响应
            assert response.status_code == status.HTTP_200_OK
            
            data = response.json()
            assert len(data) == 3
            
            for i, task in enumerate(data):
                assert task["name"] == f"任务{i}"
                assert task["status"] == AnalysisStatus.PROCESSING.value
            
            # 验证方法调用
            mock_list.assert_called_once()
    
    @pytest.mark.asyncio
    async def test_cancel_analysis_task_endpoint(
        self, 
        client, 
        headers
    ):
        """测试取消分析任务端点"""
        task_id = str(uuid4())
        
        with patch.object(AnalysisService, 'cancel_analysis_task') as mock_cancel:
            # 模拟服务返回成功
            mock_cancel.return_value = True
            
            # 发送请求
            response = client.delete(
                f"/api/v1/analysis/{task_id}",
                headers=headers
            )
            
            # 验证响应
            assert response.status_code == status.HTTP_204_NO_CONTENT
            
            # 验证方法调用
            mock_cancel.assert_called_once()
    
    @pytest.mark.asyncio
    async def test_cancel_analysis_task_endpoint_failed(
        self, 
        client, 
        headers
    ):
        """测试取消分析任务端点(失败)"""
        task_id = str(uuid4())
        
        with patch.object(AnalysisService, 'cancel_analysis_task') as mock_cancel:
            # 模拟服务返回失败
            mock_cancel.return_value = False
            
            # 发送请求
            response = client.delete(
                f"/api/v1/analysis/{task_id}",
                headers=headers
            )
            
            # 验证响应
            assert response.status_code == status.HTTP_400_BAD_REQUEST
            
            data = response.json()
            assert "detail" in data
            
            # 验证方法调用
            mock_cancel.assert_called_once()
    
    @pytest.mark.asyncio
    async def test_retry_analysis_task_endpoint(
        self, 
        client, 
        headers
    ):
        """测试重试分析任务端点"""
        task_id = str(uuid4())
        
        with patch.object(AnalysisService, 'get_analysis_task') as mock_get, \
             patch.object(AnalysisService, 'update_analysis_status') as mock_update, \
             patch.object(AnalysisService, 'get_analysis_request') as mock_get_request, \
             patch.object(AnalysisService, 'start_analysis_workflow') as mock_start:
            
            # 模拟服务返回
            mock_task = {
                "id": task_id,
                "name": "测试任务",
                "description": "测试描述",
                "status": AnalysisStatus.FAILED.value,
                "created_at": datetime.now().isoformat(),
                "updated_at": datetime.now().isoformat()
            }
            
            mock_request = {
                "name": "测试任务",
                "description": "测试描述",
                "data_sources": [],
                "analysis_type": "statistical",
                "parameters": {},
                "output_format": "json"
            }
            
            mock_get.return_value = mock_task
            mock_update.return_value = None
            mock_get_request.return_value = mock_request
            mock_start.return_value = None
            
            # 发送请求
            response = client.post(
                f"/api/v1/analysis/{task_id}/retry",
                headers=headers
            )
            
            # 验证响应
            assert response.status_code == status.HTTP_202_ACCEPTED
            
            data = response.json()
            assert data["task_id"] == task_id
            assert data["status"] == "retrying"
            
            # 验证方法调用
            mock_get.assert_called_once()
            mock_update.assert_called_once()
            mock_get_request.assert_called_once()
            mock_start.assert_called_once()

智能体单元测试

# tests/unit/test_agents.py
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
import json
from datetime import datetime

from src.agents.request_parser.parser import RequestParser
from src.agents.data_collector.collectors.database_collector import DatabaseCollector
from src.agents.data_cleaner.cleaners.missing_handler import MissingValueHandler
from src.agents.analyzer.analyzers.statistical_analyzer import StatisticalAnalyzer

class TestRequestParser:
    """请求解析智能体单元测试"""
    
    @pytest.fixture
    def request_parser(self):
        """请求解析器fixture"""
        return RequestParser()
    
    @pytest.mark.asyncio
    async def test_parse_analysis_request_simple(self, request_parser):
        """测试解析简单分析请求"""
        request_data = {
            "name": "销售数据分析",
            "description": "分析2023年销售数据",
            "data_sources": [
                {
                    "type": "database",
                    "connection": "postgresql://user:pass@localhost/sales",
                    "query": "SELECT * FROM sales WHERE year = 2023"
                }
            ],
            "analysis_type": "time_series",
            "parameters": {
                "time_field": "date",
                "value_field": "amount"
            },
            "output_format": "html_report"
        }
        
        result = await request_parser.parse(request_data)
        
        assert result["success"] is True
        assert "parsed_request" in result
        assert "analysis_plan" in result
        
        parsed = result["parsed_request"]
        assert parsed["name"] == request_data["name"]
        assert parsed["analysis_type"] == request_data["analysis_type"]
        
        plan = result["analysis_plan"]
        assert "steps" in plan
        assert len(plan["steps"]) > 0
    
    @pytest.mark.asyncio
    async def test_parse_analysis_request_complex(self, request_parser):
        """测试解析复杂分析请求"""
        request_data = {
            "name": "市场趋势分析",
            "description": "综合分析市场趋势和用户行为",
            "data_sources": [
                {
                    "type": "database",
                    "connection": "postgresql://user:pass@localhost/market",
                    "query": "SELECT * FROM market_data"
                },
                {
                    "type": "api",
                    "url": "https://api.example.com/user_behavior",
                    "parameters": {"time_range": "last_30_days"}
                },
                {
                    "type": "file",
                    "path": "/data/external/social_media.csv",
                    "format": "csv"
                }
            ],
            "analysis_type": "multi_analysis",
            "parameters": {
                "analyses": [
                    {"type": "time_series", "field": "market_value"},
                    {"type": "correlation", "fields": ["user_engagement", "sales"]},
                    {"type": "clustering", "features": ["age", "income", "preferences"]}
                ]
            },
            "output_format": "comprehensive_report"
        }
        
        result = await request_parser.parse(request_data)
        
        assert result["success"] is True
        assert "parsed_request" in result
        assert "analysis_plan" in result
        
        plan = result["analysis_plan"]
        assert "steps" in plan
        assert len(plan["steps"]) >= 3  # 至少3个分析步骤
    
    @pytest.mark.asyncio
    async def test_parse_analysis_request_invalid(self, request_parser):
        """测试解析无效分析请求"""
        request_data = {
            "name": "",  # 空名称
            "data_sources": []  # 空数据源
        }
        
        result = await request_parser.parse(request_data)
        
        assert result["success"] is False
        assert "error" in result
        assert "validation_errors" in result
    
    @pytest.mark.asyncio
    async def test_parse_analysis_request_unsupported_type(self, request_parser):
        """测试解析不支持的分析类型"""
        request_data = {
            "name": "测试分析",
            "description": "测试",
            "data_sources": [
                {
                    "type": "database",
                    "connection": "postgresql://test",
                    "query": "SELECT * FROM test"
                }
            ],
            "analysis_type": "unsupported_type",  # 不支持的类型
            "parameters": {},
            "output_format": "json"
        }
        
        result = await request_parser.parse(request_data)
        
        assert result["success"] is False
        assert "error" in result
        assert "unsupported_analysis_type" in result["error"]
    
    @pytest.mark.asyncio
    async def test_generate_analysis_plan_simple(self, request_parser):
        """测试生成简单分析计划"""
        parsed_request = {
            "name": "简单分析",
            "analysis_type": "statistical",
            "data_sources": [
                {
                    "type": "database",
                    "source_id": "db_1"
                }
            ],
            "parameters": {
                "columns": ["value"],
                "statistics": ["mean", "std"]
            }
        }
        
        plan = await request_parser._generate_analysis_plan(parsed_request)
        
        assert "steps" in plan
        assert len(plan["steps"]) == 3  # 收集、清洗、分析
        
        steps = plan["steps"]
        
        # 第一步:数据收集
        assert steps[0]["type"] == "data_collection"
        assert steps[0]["source_id"] == "db_1"
        
        # 第二步:数据清洗
        assert steps[1]["type"] == "data_cleaning"
        
        # 第三步:分析
        assert steps[2]["type"] == "analysis"
        assert steps[2]["analysis_type"] == "statistical"
    
    @pytest.mark.asyncio
    async def test_generate_analysis_plan_complex(self, request_parser):
        """测试生成复杂分析计划"""
        parsed_request = {
            "name": "复杂分析",
            "analysis_type": "multi_step",
            "data_sources": [
                {
                    "type": "database",
                    "source_id": "db_1"
                },
                {
                    "type": "api",
                    "source_id": "api_1"
                }
            ],
            "parameters": {
                "steps": [
                    {"type": "data_merge", "sources": ["db_1", "api_1"]},
                    {"type": "statistical", "columns": ["value"]},
                    {"type": "visualization", "charts": ["line", "bar"]}
                ]
            }
        }
        
        plan = await request_parser._generate_analysis_plan(parsed_request)
        
        assert "steps" in plan
        assert len(plan["steps"]) >= 4  # 至少4个步骤
        
        steps = plan["steps"]
        
        # 检查步骤类型
        step_types = [step["type"] for step in steps]
        assert "data_collection" in step_types
        assert "data_cleaning" in step_types
        assert "data_merge" in step_types
        assert "analysis" in step_types
        assert "visualization" in step_types
    
    @pytest.mark.asyncio
    async def test_validate_data_sources_valid(self, request_parser):
        """测试验证有效数据源"""
        data_sources = [
            {
                "type": "database",
                "connection": "postgresql://user:pass@localhost/db",
                "query": "SELECT * FROM table"
            },
            {
                "type": "api",
                "url": "https://api.example.com/data",
                "method": "GET"
            },
            {
                "type": "file",
                "path": "/data/file.csv",
                "format": "csv"
            }
        ]
        
        result = await request_parser._validate_data_sources(data_sources)
        
        assert result["valid"] is True
        assert len(result["valid_sources"]) == 3
        assert len(result["invalid_sources"]) == 0
    
    @pytest.mark.asyncio
    async def test_validate_data_sources_invalid(self, request_parser):
        """测试验证无效数据源"""
        data_sources = [
            {
                "type": "database",
                "connection": "invalid_connection",  # 无效连接
                "query": ""
            },
            {
                "type": "unsupported_type",  # 不支持的类型
                "url": "http://example.com"
            },
            {
                "type": "file",
                # 缺少path
            }
        ]
        
        result = await request_parser._validate_data_sources(data_sources)
        
        assert result["valid"] is False
        assert len(result["valid_sources"]) == 0
        assert len(result["invalid_sources"]) == 3
        
        for invalid in result["invalid_sources"]:
            assert "error" in invalid
    
    @pytest.mark.asyncio
    async def test_determine_analysis_type(self, request_parser):
        """测试确定分析类型"""
        test_cases = [
            {
                "request": {"analysis_type": "time_series"},
                "expected": "time_series"
            },
            {
                "request": {"analysis_type": "statistical"},
                "expected": "statistical"
            },
            {
                "request": {"analysis_type": "clustering"},
                "expected": "clustering"
            },
            {
                "request": {"analysis_type": "regression"},
                "expected": "regression"
            },
            {
                "request": {"analysis_type": "custom_type"},
                "expected": "custom"
            }
        ]
        
        for test_case in test_cases:
            result = await request_parser._determine_analysis_type(test_case["request"])
            assert result == test_case["expected"]
    
    @pytest.mark.asyncio
    async def test_extract_analysis_parameters(self, request_parser):
        """测试提取分析参数"""
        request = {
            "parameters": {
                "time_field": "date",
                "value_field": "amount",
                "frequency": "daily",
                "additional": {
                    "confidence_level": 0.95,
                    "trend_detection": True
                }
            }
        }
        
        result = await request_parser._extract_analysis_parameters(request)
        
        assert "time_field" in result
        assert result["time_field"] == "date"
        assert "value_field" in result
        assert result["value_field"] == "amount"
        assert "frequency" in result
        assert result["frequency"] == "daily"
        assert "additional" in result
        assert result["additional"]["confidence_level"] == 0.95
    
    @pytest.mark.asyncio
    async def test_estimate_analysis_complexity_simple(self, request_parser):
        """测试估计分析复杂度(简单)"""
        request = {
            "data_sources": [
                {"type": "database", "estimated_rows": 1000}
            ],
            "analysis_type": "statistical",
            "parameters": {"columns": ["value"]}
        }
        
        complexity = await request_parser._estimate_analysis_complexity(request)
        
        assert "level" in complexity
        assert complexity["level"] == "simple"
        assert "estimated_duration" in complexity
        assert complexity["estimated_duration"] > 0
        assert "resource_requirements" in complexity
    
    @pytest.mark.asyncio
    async def test_estimate_analysis_complexity_complex(self, request_parser):
        """测试估计分析复杂度(复杂)"""
        request = {
            "data_sources": [
                {"type": "database", "estimated_rows": 1000000},
                {"type": "api", "estimated_calls": 100},
                {"type": "file", "estimated_size": "500MB"}
            ],
            "analysis_type": "multi_analysis",
            "parameters": {
                "analyses": [
                    {"type": "time_series"},
                    {"type": "clustering"},
                    {"type": "predictive"}
                ]
            }
        }
        
        complexity = await request_parser._estimate_analysis_complexity(request)
        
        assert "level" in complexity
        assert complexity["level"] == "complex"
        assert "estimated_duration" in complexity
        assert complexity["estimated_duration"] > 300  # 至少5分钟
        assert "resource_requirements" in complexity
        assert "high_memory" in complexity["resource_requirements"]
    
    @pytest.mark.asyncio
    async def test_create_execution_plan(self, request_parser):
        """测试创建执行计划"""
        parsed_request = {
            "name": "测试分析",
            "analysis_type": "statistical",
            "data_sources": [
                {"type": "database", "source_id": "db1"}
            ],
            "parameters": {"columns": ["value"]}
        }
        
        analysis_plan = {
            "steps": [
                {"type": "data_collection", "source_id": "db1"},
                {"type": "data_cleaning"},
                {"type": "analysis", "analysis_type": "statistical"}
            ]
        }
        
        execution_plan = await request_parser._create_execution_plan(
            parsed_request, 
            analysis_plan
        )
        
        assert "workflow_id" in execution_plan
        assert "steps" in execution_plan
        assert len(execution_plan["steps"]) == 3
        
        for step in execution_plan["steps"]:
            assert "step_id" in step
            assert "type" in step
            assert "dependencies" in step
            assert "timeout" in step
            assert "retry_policy" in step
    
    @pytest.mark.asyncio
    async def test_handle_parse_error(self, request_parser):
        """测试处理解析错误"""
        error = ValueError("测试错误")
        
        result = await request_parser._handle_parse_error(error)
        
        assert result["success"] is False
        assert "error" in result
        assert result["error"] == "测试错误"
        assert "error_type" in result
        assert result["error_type"] == "ValueError"
        assert "timestamp" in result

通过以上单元测试实现,我们验证了API网关和分析服务的关键功能。单元测试覆盖了正常情况、边界情况和异常情况,确保代码的正确性和健壮性。在下一节中,我们将实现集成测试,验证智能体之间的协作和系统集成。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐